commonware_storage/journal/
fixed.rs

1//! An append-only log for storing fixed length items on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [crate::journal::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs within a caller-provided
12//! `partition`. Each `Blob` contains a configurable maximum of `items_per_blob`, with each item
13//! followed by its checksum (CRC32):
14//!
15//! ```text
16//! +--------+-----------+--------+-----------+--------+----------+-------------+
17//! | item_0 | C(Item_0) | item_1 | C(Item_1) |   ...  | item_n-1 | C(Item_n-1) |
18//! +--------+-----------+--------+----0------+--------+----------+-------------+
19//!
20//! n = config.items_per_blob, C = CRC32
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! A fetched or replayed item's checksum is always computed and checked against the stored value
27//! before it is returned. If the checksums do not match, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` or pruning old items.
33//!
34//! # Consistency
35//!
36//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
37//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
38//! calling `close`, all pending data is automatically synced and any open blobs are closed.
39//!
40//! # Pruning
41//!
42//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
43//! given point in history.
44//!
45//! # State Sync
46//!
47//! `Journal::init_sync` allows for initializing a journal for use in state sync.
48//! When opened in this mode, we attempt to populate the journal within the given range
49//! with persisted data.
50//! If the journal is empty, we create a fresh journal at the specified position.
51//! If the journal is not empty, we prune the journal to the specified lower bound and rewind to
52//! the specified upper bound.
53//!
54//! # Replay
55//!
56//! The `replay` method supports fast reading of all unpruned items into memory.
57
58use super::Error;
59use bytes::BufMut;
60use commonware_codec::{CodecFixed, DecodeExt, FixedSize};
61use commonware_runtime::{
62    buffer::{Append, PoolRef, Read},
63    Blob, Error as RError, Metrics, Storage,
64};
65use commonware_utils::hex;
66use futures::{
67    future::try_join_all,
68    stream::{self, Stream},
69    StreamExt,
70};
71use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
72use std::{
73    collections::BTreeMap,
74    marker::PhantomData,
75    num::{NonZeroU64, NonZeroUsize},
76};
77use tracing::{debug, trace, warn};
78
79/// Configuration for `Journal` storage.
80#[derive(Clone)]
81pub struct Config {
82    /// The `commonware-runtime::Storage` partition to use for storing journal blobs.
83    pub partition: String,
84
85    /// The maximum number of journal items to store in each blob.
86    ///
87    /// Any unpruned historical blobs will contain exactly this number of items.
88    /// Only the newest blob may contain fewer items.
89    pub items_per_blob: NonZeroU64,
90
91    /// The buffer pool to use for caching data.
92    pub buffer_pool: PoolRef,
93
94    /// The size of the write buffer to use for each blob.
95    pub write_buffer: NonZeroUsize,
96}
97
98/// Implementation of `Journal` storage.
99pub struct Journal<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> {
100    pub(crate) context: E,
101    pub(crate) cfg: Config,
102
103    /// Stores the historical blobs. A BTreeMap allows iterating over them from oldest to newest.
104    ///
105    /// # Invariants
106    ///
107    /// - Indices are consecutive and without gaps.
108    /// - Contains only full blobs.
109    /// - Never contains the most recent blob.
110    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
111
112    /// The most recent blob.
113    ///
114    /// # Invariant
115    ///
116    /// Always has room for at least one more item (and may be empty).
117    pub(crate) tail: Append<E::Blob>,
118
119    /// The index of the most recent blob.
120    pub(crate) tail_index: u64,
121
122    pub(crate) tracked: Gauge,
123    pub(crate) synced: Counter,
124    pub(crate) pruned: Counter,
125
126    pub(crate) _array: PhantomData<A>,
127}
128
129impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
130    pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
131    pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
132
133    /// Initialize a new `Journal` instance.
134    ///
135    /// All backing blobs are opened but not read during initialization. The `replay` method can be
136    /// used to iterate over all items in the `Journal`.
137    ///
138    /// # Repair
139    ///
140    /// Like [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
141    /// and [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
142    /// the first invalid data read will be considered the new end of the journal (and the underlying [Blob] will be truncated to the last
143    /// valid item).
144    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
145        // Iterate over blobs in partition
146        let mut blobs = BTreeMap::new();
147        let stored_blobs = match context.scan(&cfg.partition).await {
148            Ok(blobs) => blobs,
149            Err(RError::PartitionMissing(_)) => Vec::new(),
150            Err(err) => return Err(Error::Runtime(err)),
151        };
152        for name in stored_blobs {
153            let (blob, size) = context
154                .open(&cfg.partition, &name)
155                .await
156                .map_err(Error::Runtime)?;
157            let index = match name.try_into() {
158                Ok(index) => u64::from_be_bytes(index),
159                Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
160            };
161            debug!(blob = index, size, "loaded blob");
162            blobs.insert(index, (blob, size));
163        }
164
165        // Check that there are no gaps in the historical blobs and that they are all full.
166        let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
167        if !blobs.is_empty() {
168            let mut it = blobs.keys().rev();
169            let mut prev_index = *it.next().unwrap();
170            for index in it {
171                let (_, size) = blobs.get(index).unwrap();
172                if *index != prev_index - 1 {
173                    return Err(Error::MissingBlob(prev_index - 1));
174                }
175                prev_index = *index;
176                if *size != full_size {
177                    // Non-final blobs that have invalid sizes are not recoverable.
178                    return Err(Error::InvalidBlobSize(*index, *size));
179                }
180            }
181        } else {
182            debug!("no blobs found");
183            let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
184            assert_eq!(size, 0);
185            blobs.insert(0, (blob, size));
186        }
187
188        // Initialize metrics.
189        let tracked = Gauge::default();
190        let synced = Counter::default();
191        let pruned = Counter::default();
192        context.register("tracked", "Number of blobs", tracked.clone());
193        context.register("synced", "Number of syncs", synced.clone());
194        context.register("pruned", "Number of blobs pruned", pruned.clone());
195        tracked.set(blobs.len() as i64);
196
197        // Initialize the tail blob.
198        let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
199
200        // Trim invalid items from the tail blob.
201        tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
202        if tail_size > full_size {
203            return Err(Error::InvalidBlobSize(tail_index, tail_size));
204        }
205
206        // If the tail blob is full we need to start a new one to maintain its invariant that there
207        // is always room for another item.
208        if tail_size == full_size {
209            warn!(
210                blob = tail_index,
211                "tail blob is full, creating a new empty one"
212            );
213            blobs.insert(tail_index, (tail, tail_size));
214            tail_index += 1;
215            (tail, tail_size) = context
216                .open(&cfg.partition, &tail_index.to_be_bytes())
217                .await?;
218            assert_eq!(tail_size, 0);
219            tracked.inc();
220        }
221
222        // Wrap all blobs with Append wrappers.
223        // TODO(https://github.com/commonwarexyz/monorepo/issues/1219): Consider creating an
224        // Immutable wrapper which doesn't allocate a write buffer for these.
225        let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
226            let pool = cfg.buffer_pool.clone();
227            async move {
228                let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
229                Ok::<_, Error>((index, (blob, size)))
230            }
231        }))
232        .await?;
233        let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
234
235        Ok(Self {
236            context,
237            cfg,
238            blobs: blobs
239                .into_iter()
240                .map(|(index, (blob, _))| (index, blob))
241                .collect(),
242            tail,
243            tail_index,
244            tracked,
245            synced,
246            pruned,
247            _array: PhantomData,
248        })
249    }
250
251    /// Trim any invalid data found at the end of the tail blob and return the new size. The new
252    /// size will be less than or equal to the originally provided size, and a multiple of the item
253    /// size.
254    async fn trim_tail(
255        tail: &<E as Storage>::Blob,
256        mut tail_size: u64,
257        tail_index: u64,
258    ) -> Result<u64, Error> {
259        let mut truncated = false;
260        if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
261            warn!(
262                blob = tail_index,
263                invalid_size = tail_size,
264                "last blob size is not a multiple of item size, truncating"
265            );
266            tail_size -= tail_size % Self::CHUNK_SIZE_U64;
267            tail.resize(tail_size).await?;
268            truncated = true;
269        }
270
271        // Truncate any records with failing checksums. This can happen if the file system allocated
272        // extra space for a blob but there was a crash before any data was written to that space.
273        while tail_size > 0 {
274            let offset = tail_size - Self::CHUNK_SIZE_U64;
275            let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
276            match Self::verify_integrity(read.as_ref()) {
277                Ok(_) => break, // Valid item found, we can stop truncating.
278                Err(Error::ChecksumMismatch(_, _)) => {
279                    warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
280                    tail_size -= Self::CHUNK_SIZE_U64;
281                    tail.resize(tail_size).await?;
282                    truncated = true;
283                }
284                Err(err) => return Err(err),
285            }
286        }
287
288        // If we truncated the blob, make sure to sync it.
289        if truncated {
290            tail.sync().await?;
291        }
292
293        Ok(tail_size)
294    }
295
296    /// Sync any pending updates to disk.
297    pub async fn sync(&mut self) -> Result<(), Error> {
298        self.synced.inc();
299        debug!(blob = self.tail_index, "syncing blob");
300        self.tail.sync().await.map_err(Error::Runtime)
301    }
302
303    /// Return the total number of items in the journal, irrespective of pruning. The next value
304    /// appended to the journal will be at this position.
305    pub async fn size(&self) -> Result<u64, Error> {
306        let size = self.tail.size().await;
307        assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
308        let items_in_blob = size / Self::CHUNK_SIZE_U64;
309        Ok(items_in_blob + self.cfg.items_per_blob.get() * self.tail_index)
310    }
311
312    /// Append a new item to the journal. Return the item's position in the journal, or error if the
313    /// operation fails.
314    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
315        // There should always be room to append an item in the newest blob
316        let mut size = self.tail.size().await;
317        assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
318        assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
319        let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
320        let item = item.encode();
321        let checksum = crc32fast::hash(&item);
322        buf.extend_from_slice(&item);
323        buf.put_u32(checksum);
324
325        // Write the item to the blob
326        let item_pos =
327            (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
328        self.tail.append(buf).await?;
329        trace!(blob = self.tail_index, pos = item_pos, "appended item");
330        size += Self::CHUNK_SIZE_U64;
331
332        // If the tail blob is now full we need to create a new empty one to fulfill the invariant
333        // that the tail blob always has room for a new element.
334        if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
335            // Sync the tail blob before creating a new one so if we crash we don't end up with a
336            // non-full historical blob.
337            self.tail.sync().await?;
338
339            // Create a new empty blob.
340            let next_blob_index = self.tail_index + 1;
341            debug!(blob = next_blob_index, "creating next blob");
342            let (next_blob, size) = self
343                .context
344                .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
345                .await?;
346            assert_eq!(size, 0);
347            let next_blob = Append::new(
348                next_blob,
349                size,
350                self.cfg.write_buffer,
351                self.cfg.buffer_pool.clone(),
352            )
353            .await?;
354            self.tracked.inc();
355
356            // Move the old tail blob to the historical blobs map and set the new blob as the tail.
357            let old_tail = std::mem::replace(&mut self.tail, next_blob);
358            assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
359            self.tail_index = next_blob_index;
360        }
361
362        Ok(item_pos)
363    }
364
365    /// Rewind the journal to the given `size`. Returns [Error::MissingBlob] if the rewind point
366    /// precedes the oldest retained element point. The journal is not synced after rewinding.
367    ///
368    /// # Warnings
369    ///
370    /// * This operation is not guaranteed to survive restarts until sync is called.
371    /// * This operation is not atomic, but it will always leave the journal in a consistent state
372    ///   in the event of failure since blobs are always removed from newest to oldest.
373    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
374        match size.cmp(&self.size().await?) {
375            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
376            std::cmp::Ordering::Equal => return Ok(()),
377            std::cmp::Ordering::Less => {}
378        }
379        let rewind_to_blob_index = size / self.cfg.items_per_blob;
380        if rewind_to_blob_index < self.oldest_blob_index() {
381            return Err(Error::InvalidRewind(size));
382        }
383        let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
384
385        // Remove blobs until we reach the rewind point.  Blobs must be removed in reverse order to
386        // preserve consistency in the event of failures.
387        while rewind_to_blob_index < self.tail_index {
388            let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
389            assert_eq!(blob_index, self.tail_index - 1);
390            std::mem::swap(&mut self.tail, &mut new_tail);
391            self.remove_blob(self.tail_index, new_tail).await?;
392            self.tail_index -= 1;
393        }
394
395        // Truncate the tail blob to the correct offset.
396        self.tail.resize(rewind_to_offset).await?;
397
398        Ok(())
399    }
400
401    /// Return the position of the oldest item in the journal that remains readable.
402    ///
403    /// Note that this value could be older than the `min_item_pos` last passed to prune.
404    pub async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
405        let oldest_blob_index = self.oldest_blob_index();
406        if oldest_blob_index == self.tail_index && self.tail.size().await == 0 {
407            return Ok(None);
408        }
409
410        // The oldest retained item is the first item in the oldest blob.
411        Ok(Some(oldest_blob_index * self.cfg.items_per_blob.get()))
412    }
413
414    /// Read the item at position `pos` in the journal.
415    ///
416    /// # Errors
417    ///
418    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
419    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
420    pub async fn read(&self, pos: u64) -> Result<A, Error> {
421        let blob_index = pos / self.cfg.items_per_blob.get();
422        if blob_index > self.tail_index {
423            return Err(Error::ItemOutOfRange(pos));
424        }
425
426        let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
427
428        let blob = if blob_index == self.tail_index {
429            if offset >= self.tail.size().await {
430                return Err(Error::ItemOutOfRange(pos));
431            }
432            &self.tail
433        } else {
434            self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
435        };
436
437        let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
438        Self::verify_integrity(read.as_ref())
439    }
440
441    /// Verify the integrity of the Array + checksum in `buf`, returning:
442    /// - The array if it is valid,
443    /// - Error::ChecksumMismatch if the checksum is invalid, or
444    /// - Error::Codec if the array could not be decoded after passing the checksum check.
445    ///
446    ///  Error::Codec likely indicates a logic error rather than a corruption issue.
447    fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
448        let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
449        let checksum = crc32fast::hash(&buf[..A::SIZE]);
450        if checksum != stored_checksum {
451            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
452        }
453        A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
454    }
455
456    /// Returns an ordered stream of all items in the journal with position >= `start_pos`.
457    ///
458    /// # Panics
459    ///
460    /// Panics `start_pos` exceeds log size.
461    ///
462    /// # Integrity
463    ///
464    /// If any corrupted data is found, the stream will return an error.
465    pub async fn replay(
466        &self,
467        buffer: NonZeroUsize,
468        start_pos: u64,
469    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
470        assert!(start_pos <= self.size().await?);
471
472        // Collect all blobs to replay paired with their index.
473        let items_per_blob = self.cfg.items_per_blob.get();
474        let start_blob = start_pos / items_per_blob;
475        assert!(start_blob <= self.tail_index);
476        let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
477        let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
478        let mut blob_plus = blobs
479            .into_iter()
480            .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
481            .collect::<Vec<_>>();
482
483        // Include the tail blob.
484        self.tail.sync().await?; // make sure no data is buffered
485        let tail_size = self.tail.size().await;
486        blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
487        let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
488
489        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
490        // memory with buffered data).
491        let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
492            // Create a new reader and buffer for each blob. Preallocating the buffer here to avoid
493            // a per-iteration allocation improves performance by ~20%.
494            let mut reader = Read::new(blob, size, buffer);
495            let buf = vec![0u8; Self::CHUNK_SIZE];
496            let initial_offset = if blob_index == start_blob {
497                // If this is the very first blob then we need to seek to the starting position.
498                reader.seek_to(start_offset).expect("invalid start_pos");
499                start_offset
500            } else {
501                0
502            };
503
504            stream::unfold(
505                (buf, reader, initial_offset),
506                move |(mut buf, mut reader, offset)| async move {
507                    if offset >= reader.blob_size() {
508                        return None;
509                    }
510
511                    // Even though we are reusing the buffer, `read_exact` will overwrite any
512                    // previous data, so there's no need to explicitly clear it.
513                    let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
514                    match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
515                        Ok(()) => {
516                            let next_offset = offset + Self::CHUNK_SIZE_U64;
517                            let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
518                            if result.is_err() {
519                                warn!("corrupted item at {item_pos}");
520                            }
521                            Some((result, (buf, reader, next_offset)))
522                        }
523                        Err(err) => {
524                            warn!(
525                                item_pos,
526                                err = err.to_string(),
527                                "error reading item during replay"
528                            );
529                            Some((Err(Error::Runtime(err)), (buf, reader, size)))
530                        }
531                    }
532                },
533            )
534        });
535
536        Ok(stream)
537    }
538
539    /// Return the index of blob containing the oldest retained items.
540    fn oldest_blob_index(&self) -> u64 {
541        if self.blobs.is_empty() {
542            self.tail_index
543        } else {
544            *self.blobs.first_key_value().unwrap().0
545        }
546    }
547
548    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
549    /// such items in order to preserve blob boundaries, but the amount of such items will always be
550    /// less than the configured number of items per blob. Returns true if any items were pruned.
551    ///
552    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
553    /// event of failure as items are always pruned in order from oldest to newest.
554    pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
555        let oldest_blob_index = self.oldest_blob_index();
556        let new_oldest_blob =
557            std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
558
559        let mut pruned = false;
560        for index in oldest_blob_index..new_oldest_blob {
561            pruned = true;
562            let blob = self.blobs.remove(&index).unwrap();
563            self.remove_blob(index, blob).await?;
564            self.pruned.inc();
565        }
566
567        Ok(pruned)
568    }
569
570    /// Safely removes any previously tracked blob from underlying storage.
571    async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
572        drop(blob);
573        self.context
574            .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
575            .await?;
576        debug!(blob = index, "removed blob");
577        self.tracked.dec();
578
579        Ok(())
580    }
581
582    /// Syncs and closes all open sections.
583    pub async fn close(self) -> Result<(), Error> {
584        for (i, blob) in self.blobs.into_iter() {
585            blob.sync().await?;
586            debug!(blob = i, "synced blob");
587        }
588        self.tail.sync().await?;
589        debug!(blob = self.tail_index, "synced tail");
590
591        Ok(())
592    }
593
594    /// Remove any underlying blobs created by the journal.
595    pub async fn destroy(self) -> Result<(), Error> {
596        for (i, blob) in self.blobs.into_iter() {
597            drop(blob);
598            debug!(blob = i, "destroyed blob");
599            self.context
600                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
601                .await?;
602        }
603
604        drop(self.tail);
605        debug!(blob = self.tail_index, "destroyed blob");
606        self.context
607            .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
608            .await?;
609
610        match self.context.remove(&self.cfg.partition, None).await {
611            Ok(()) => {}
612            Err(RError::PartitionMissing(_)) => {
613                // Partition already removed or never existed.
614            }
615            Err(err) => return Err(Error::Runtime(err)),
616        }
617
618        Ok(())
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
626    use commonware_macros::test_traced;
627    use commonware_runtime::{
628        deterministic::{self, Context},
629        Blob, Runner, Storage,
630    };
631    use commonware_utils::{NZUsize, NZU64};
632    use futures::{pin_mut, StreamExt};
633
634    const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
635    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
636
637    /// Generate a SHA-256 digest for the given value.
638    fn test_digest(value: u64) -> Digest {
639        Sha256::hash(&value.to_be_bytes())
640    }
641
642    fn test_cfg(items_per_blob: NonZeroU64) -> Config {
643        Config {
644            partition: "test_partition".into(),
645            items_per_blob,
646            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
647            write_buffer: NZUsize!(2048),
648        }
649    }
650
651    #[test_traced]
652    fn test_fixed_journal_append_and_prune() {
653        // Initialize the deterministic context
654        let executor = deterministic::Runner::default();
655
656        // Start the test within the executor
657        executor.start(|context| async move {
658            // Initialize the journal, allowing a max of 2 items per blob.
659            let cfg = test_cfg(NZU64!(2));
660            let mut journal = Journal::init(context.clone(), cfg.clone())
661                .await
662                .expect("failed to initialize journal");
663
664            let buffer = context.encode();
665            assert!(buffer.contains("tracked 1"));
666
667            // Append an item to the journal
668            let mut pos = journal
669                .append(test_digest(0))
670                .await
671                .expect("failed to append data 0");
672            assert_eq!(pos, 0);
673
674            // Close the journal
675            journal.close().await.expect("Failed to close journal");
676
677            // Re-initialize the journal to simulate a restart
678            let cfg = test_cfg(NZU64!(2));
679            let mut journal = Journal::init(context.clone(), cfg.clone())
680                .await
681                .expect("failed to re-initialize journal");
682
683            // Append two more items to the journal to trigger a new blob creation
684            pos = journal
685                .append(test_digest(1))
686                .await
687                .expect("failed to append data 1");
688            assert_eq!(pos, 1);
689            pos = journal
690                .append(test_digest(2))
691                .await
692                .expect("failed to append data 2");
693            assert_eq!(pos, 2);
694            let buffer = context.encode();
695            assert!(buffer.contains("tracked 2"));
696
697            // Read the items back
698            let item0 = journal.read(0).await.expect("failed to read data 0");
699            assert_eq!(item0, test_digest(0));
700            let item1 = journal.read(1).await.expect("failed to read data 1");
701            assert_eq!(item1, test_digest(1));
702            let item2 = journal.read(2).await.expect("failed to read data 2");
703            assert_eq!(item2, test_digest(2));
704            let err = journal.read(3).await.expect_err("expected read to fail");
705            assert!(matches!(err, Error::ItemOutOfRange(3)));
706
707            // Sync the journal
708            journal.sync().await.expect("failed to sync journal");
709            let buffer = context.encode();
710            assert!(buffer.contains("synced_total 1"));
711
712            // Pruning to 1 should be a no-op because there's no blob with only older items.
713            journal.prune(1).await.expect("failed to prune journal 1");
714            let buffer = context.encode();
715            assert!(buffer.contains("tracked 2"));
716
717            // Pruning to 2 should allow the first blob to be pruned.
718            journal.prune(2).await.expect("failed to prune journal 2");
719            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
720            let buffer = context.encode();
721            assert!(buffer.contains("tracked 1"));
722            assert!(buffer.contains("pruned_total 1"));
723
724            // Reading from the first blob should fail since it's now pruned
725            let result0 = journal.read(0).await;
726            assert!(matches!(result0, Err(Error::ItemPruned(0))));
727            let result1 = journal.read(1).await;
728            assert!(matches!(result1, Err(Error::ItemPruned(1))));
729
730            // Third item should still be readable
731            let result2 = journal.read(2).await.unwrap();
732            assert_eq!(result2, test_digest(2));
733
734            // Should be able to continue to append items
735            for i in 3..10 {
736                let pos = journal
737                    .append(test_digest(i))
738                    .await
739                    .expect("failed to append data");
740                assert_eq!(pos, i);
741            }
742
743            // Check no-op pruning
744            journal.prune(0).await.expect("no-op pruning failed");
745            assert_eq!(journal.oldest_blob_index(), 1);
746            assert_eq!(journal.tail_index, 5);
747            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
748
749            // Prune first 3 blobs (6 items)
750            journal
751                .prune(3 * cfg.items_per_blob.get())
752                .await
753                .expect("failed to prune journal 2");
754            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(6));
755            let buffer = context.encode();
756            assert_eq!(journal.oldest_blob_index(), 3);
757            assert_eq!(journal.tail_index, 5);
758            assert!(buffer.contains("tracked 3"));
759            assert!(buffer.contains("pruned_total 3"));
760
761            // Try pruning (more than) everything in the journal.
762            journal
763                .prune(10000)
764                .await
765                .expect("failed to max-prune journal");
766            let buffer = context.encode();
767            let size = journal.size().await.unwrap();
768            assert_eq!(size, 10);
769            assert_eq!(journal.oldest_blob_index(), 5);
770            assert_eq!(journal.tail_index, 5);
771            assert!(buffer.contains("tracked 1"));
772            assert!(buffer.contains("pruned_total 5"));
773            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
774            // will be empty, and there will be no retained items.
775            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
776
777            {
778                let stream = journal
779                    .replay(NZUsize!(1024), 0)
780                    .await
781                    .expect("failed to replay journal");
782                pin_mut!(stream);
783                let mut items = Vec::new();
784                while let Some(result) = stream.next().await {
785                    match result {
786                        Ok((pos, item)) => {
787                            assert_eq!(test_digest(pos), item);
788                            items.push(pos);
789                        }
790                        Err(err) => panic!("Failed to read item: {err}"),
791                    }
792                }
793                assert_eq!(items, Vec::<u64>::new());
794            }
795
796            journal.destroy().await.unwrap();
797        });
798    }
799
800    /// Append a lot of data to make sure we exercise buffer pool paging boundaries.
801    #[test_traced]
802    fn test_fixed_journal_append_a_lot_of_data() {
803        // Initialize the deterministic context
804        let executor = deterministic::Runner::default();
805        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
806        executor.start(|context| async move {
807            let cfg = test_cfg(ITEMS_PER_BLOB);
808            let mut journal = Journal::init(context.clone(), cfg.clone())
809                .await
810                .expect("failed to initialize journal");
811            // Append 2 blobs worth of items.
812            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
813                journal
814                    .append(test_digest(i))
815                    .await
816                    .expect("failed to append data");
817            }
818            // Close, reopen, then read back.
819            journal.close().await.expect("failed to close journal");
820            let journal = Journal::init(context.clone(), cfg.clone())
821                .await
822                .expect("failed to re-initialize journal");
823            for i in 0u64..10000 {
824                let item: Digest = journal.read(i).await.expect("failed to read data");
825                assert_eq!(item, test_digest(i));
826            }
827            journal.destroy().await.expect("failed to destroy journal");
828        });
829    }
830
831    #[test_traced]
832    fn test_fixed_journal_replay() {
833        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
834        // Initialize the deterministic context
835        let executor = deterministic::Runner::default();
836
837        // Start the test within the executor
838        executor.start(|context| async move {
839            // Initialize the journal, allowing a max of 7 items per blob.
840            let cfg = test_cfg(ITEMS_PER_BLOB);
841            let mut journal = Journal::init(context.clone(), cfg.clone())
842                .await
843                .expect("failed to initialize journal");
844
845            // Append many items, filling 100 blobs and part of the 101st
846            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
847                let pos = journal
848                    .append(test_digest(i))
849                    .await
850                    .expect("failed to append data");
851                assert_eq!(pos, i);
852            }
853
854            let buffer = context.encode();
855            assert!(buffer.contains("tracked 101"));
856
857            // Read them back the usual way.
858            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
859                let item: Digest = journal.read(i).await.expect("failed to read data");
860                assert_eq!(item, test_digest(i), "i={i}");
861            }
862
863            // Replay should return all items
864            {
865                let stream = journal
866                    .replay(NZUsize!(1024), 0)
867                    .await
868                    .expect("failed to replay journal");
869                let mut items = Vec::new();
870                pin_mut!(stream);
871                while let Some(result) = stream.next().await {
872                    match result {
873                        Ok((pos, item)) => {
874                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
875                            items.push(pos);
876                        }
877                        Err(err) => panic!("Failed to read item: {err}"),
878                    }
879                }
880
881                // Make sure all items were replayed
882                assert_eq!(
883                    items.len(),
884                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
885                );
886                items.sort();
887                for (i, pos) in items.iter().enumerate() {
888                    assert_eq!(i as u64, *pos);
889                }
890            }
891            journal.close().await.expect("Failed to close journal");
892
893            // Corrupt one of the checksums and make sure it's detected.
894            let checksum_offset = Digest::SIZE as u64
895                + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
896            let (blob, _) = context
897                .open(&cfg.partition, &40u64.to_be_bytes())
898                .await
899                .expect("Failed to open blob");
900            // Write incorrect checksum
901            let bad_checksum = 123456789u32;
902            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
903                .await
904                .expect("Failed to write incorrect checksum");
905            let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
906            blob.sync().await.expect("Failed to sync blob");
907
908            // Re-initialize the journal to simulate a restart
909            let journal = Journal::init(context.clone(), cfg.clone())
910                .await
911                .expect("Failed to re-initialize journal");
912
913            // Make sure reading the corrupted item fails with appropriate error.
914            let err = journal.read(corrupted_item_pos).await.unwrap_err();
915            assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
916
917            // Replay all items, making sure the checksum mismatch error is handled correctly.
918            {
919                let stream = journal
920                    .replay(NZUsize!(1024), 0)
921                    .await
922                    .expect("failed to replay journal");
923                let mut items = Vec::new();
924                pin_mut!(stream);
925                let mut error_count = 0;
926                while let Some(result) = stream.next().await {
927                    match result {
928                        Ok((pos, item)) => {
929                            assert_eq!(test_digest(pos), item);
930                            items.push(pos);
931                        }
932                        Err(err) => {
933                            error_count += 1;
934                            assert!(matches!(err, Error::ChecksumMismatch(_, _)));
935                        }
936                    }
937                }
938                assert_eq!(error_count, 1);
939                // Result will be missing only the one corrupted value.
940                assert_eq!(
941                    items.len(),
942                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
943                );
944            }
945            journal.close().await.expect("Failed to close journal");
946        });
947    }
948
949    #[test_traced]
950    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
951        // Initialize the deterministic context
952        let executor = deterministic::Runner::default();
953        // Start the test within the executor
954        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
955        executor.start(|context| async move {
956            // Initialize the journal, allowing a max of 7 items per blob.
957            let cfg = test_cfg(ITEMS_PER_BLOB);
958            let mut journal = Journal::init(context.clone(), cfg.clone())
959                .await
960                .expect("failed to initialize journal");
961
962            // Append many items, filling 100 blobs and part of the 101st
963            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
964                let pos = journal
965                    .append(test_digest(i))
966                    .await
967                    .expect("failed to append data");
968                assert_eq!(pos, i);
969            }
970            journal.close().await.expect("Failed to close journal");
971
972            let buffer = context.encode();
973            assert!(buffer.contains("tracked 101"));
974
975            // Manually truncate a non-tail blob to make sure it's detected during initialization.
976            let (blob, size) = context
977                .open(&cfg.partition, &40u64.to_be_bytes())
978                .await
979                .expect("Failed to open blob");
980            blob.resize(size - 1).await.expect("Failed to corrupt blob");
981            blob.sync().await.expect("Failed to sync blob");
982            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
983            assert!(matches!(
984                result.err().unwrap(),
985                Error::InvalidBlobSize(_, _)
986            ));
987
988            // Delete a blob and make sure the gap is detected during initialization.
989            context
990                .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
991                .await
992                .expect("Failed to remove blob");
993            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
994            assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
995        });
996    }
997
998    #[test_traced]
999    fn test_fixed_journal_test_trim_blob() {
1000        // Initialize the deterministic context
1001        let executor = deterministic::Runner::default();
1002        // Start the test within the executor
1003        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1004        executor.start(|context| async move {
1005            // Initialize the journal, allowing a max of 7 items per blob.
1006            let cfg = test_cfg(ITEMS_PER_BLOB);
1007            let mut journal = Journal::init(context.clone(), cfg.clone())
1008                .await
1009                .expect("failed to initialize journal");
1010
1011            // Fill one blob and put 3 items in the second.
1012            let item_count = ITEMS_PER_BLOB.get() + 3;
1013            for i in 0u64..item_count {
1014                journal
1015                    .append(test_digest(i))
1016                    .await
1017                    .expect("failed to append data");
1018            }
1019            assert_eq!(journal.size().await.unwrap(), item_count);
1020            journal.close().await.expect("Failed to close journal");
1021
1022            // Truncate the tail blob by one byte, which should result in the 3rd item being
1023            // trimmed.
1024            let (blob, size) = context
1025                .open(&cfg.partition, &1u64.to_be_bytes())
1026                .await
1027                .expect("Failed to open blob");
1028            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1029
1030            // Write incorrect checksum into the second item in the blob, which should result in the
1031            // second item being trimmed.
1032            let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1033
1034            let bad_checksum = 123456789u32;
1035            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1036                .await
1037                .expect("Failed to write incorrect checksum");
1038            blob.sync().await.expect("Failed to sync blob");
1039
1040            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1041                .await
1042                .unwrap();
1043
1044            // Confirm 2 items were trimmed.
1045            assert_eq!(journal.size().await.unwrap(), item_count - 2);
1046
1047            // Corrupt the last item, ensuring last blob is trimmed to empty state.
1048            let (blob, size) = context
1049                .open(&cfg.partition, &1u64.to_be_bytes())
1050                .await
1051                .expect("Failed to open blob");
1052            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1053            blob.sync().await.expect("Failed to sync blob");
1054
1055            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1056                .await
1057                .unwrap();
1058
1059            // Confirm last item in blob was trimmed.
1060            assert_eq!(journal.size().await.unwrap(), item_count - 3);
1061
1062            // Cleanup.
1063            journal.destroy().await.expect("Failed to destroy journal");
1064        });
1065    }
1066
1067    #[test_traced]
1068    fn test_fixed_journal_partial_replay() {
1069        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1070        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1071        // starting position.
1072        const START_POS: u64 = 53;
1073
1074        // Initialize the deterministic context
1075        let executor = deterministic::Runner::default();
1076        // Start the test within the executor
1077        executor.start(|context| async move {
1078            // Initialize the journal, allowing a max of 7 items per blob.
1079            let cfg = test_cfg(ITEMS_PER_BLOB);
1080            let mut journal = Journal::init(context.clone(), cfg.clone())
1081                .await
1082                .expect("failed to initialize journal");
1083
1084            // Append many items, filling 100 blobs and part of the 101st
1085            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1086                let pos = journal
1087                    .append(test_digest(i))
1088                    .await
1089                    .expect("failed to append data");
1090                assert_eq!(pos, i);
1091            }
1092
1093            let buffer = context.encode();
1094            assert!(buffer.contains("tracked 101"));
1095
1096            // Replay should return all items except the first `START_POS`.
1097            {
1098                let stream = journal
1099                    .replay(NZUsize!(1024), START_POS)
1100                    .await
1101                    .expect("failed to replay journal");
1102                let mut items = Vec::new();
1103                pin_mut!(stream);
1104                while let Some(result) = stream.next().await {
1105                    match result {
1106                        Ok((pos, item)) => {
1107                            assert!(pos >= START_POS, "pos={pos}");
1108                            assert_eq!(
1109                                test_digest(pos),
1110                                item,
1111                                "Item at position {pos} did not match expected digest"
1112                            );
1113                            items.push(pos);
1114                        }
1115                        Err(err) => panic!("Failed to read item: {err}"),
1116                    }
1117                }
1118
1119                // Make sure all items were replayed
1120                assert_eq!(
1121                    items.len(),
1122                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1123                        - START_POS as usize
1124                );
1125                items.sort();
1126                for (i, pos) in items.iter().enumerate() {
1127                    assert_eq!(i as u64, *pos - START_POS);
1128                }
1129            }
1130
1131            journal.destroy().await.unwrap();
1132        });
1133    }
1134
1135    #[test_traced]
1136    fn test_fixed_journal_recover_from_partial_write() {
1137        // Initialize the deterministic context
1138        let executor = deterministic::Runner::default();
1139
1140        // Start the test within the executor
1141        executor.start(|context| async move {
1142            // Initialize the journal, allowing a max of 3 items per blob.
1143            let cfg = test_cfg(NZU64!(3));
1144            let mut journal = Journal::init(context.clone(), cfg.clone())
1145                .await
1146                .expect("failed to initialize journal");
1147            for i in 0..5 {
1148                journal
1149                    .append(test_digest(i))
1150                    .await
1151                    .expect("failed to append data");
1152            }
1153            assert_eq!(journal.size().await.unwrap(), 5);
1154            let buffer = context.encode();
1155            assert!(buffer.contains("tracked 2"));
1156            journal.close().await.expect("Failed to close journal");
1157
1158            // Manually truncate most recent blob to simulate a partial write.
1159            let (blob, size) = context
1160                .open(&cfg.partition, &1u64.to_be_bytes())
1161                .await
1162                .expect("Failed to open blob");
1163            // truncate the most recent blob by 1 byte which corrupts the most recent item
1164            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1165            blob.sync().await.expect("Failed to sync blob");
1166
1167            // Re-initialize the journal to simulate a restart
1168            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1169                .await
1170                .expect("Failed to re-initialize journal");
1171            // the last corrupted item should get discarded
1172            assert_eq!(journal.size().await.unwrap(), 4);
1173            let buffer = context.encode();
1174            assert!(buffer.contains("tracked 2"));
1175            journal.close().await.expect("Failed to close journal");
1176
1177            // Delete the tail blob to simulate a sync() that wrote the last blob at the point it
1178            // was entirely full, but a crash happened before the next empty blob could be created.
1179            context
1180                .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1181                .await
1182                .expect("Failed to remove blob");
1183            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1184                .await
1185                .expect("Failed to re-initialize journal");
1186            assert_eq!(journal.size().await.unwrap(), 3);
1187            let buffer = context.encode();
1188            // Even though it was deleted, tail blob should be re-created and left empty by the
1189            // recovery code. This means we have 2 blobs total, with 3 items in the first, and none
1190            // in the tail.
1191            assert!(buffer.contains("tracked 2"));
1192            assert_eq!(journal.size().await.unwrap(), 3);
1193
1194            journal.destroy().await.unwrap();
1195        });
1196    }
1197
1198    #[test_traced]
1199    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1200        let executor = deterministic::Runner::default();
1201        executor.start(|context| async move {
1202            // Initialize the journal, allowing a max of 10 items per blob.
1203            let cfg = test_cfg(NZU64!(10));
1204            let mut journal = Journal::init(context.clone(), cfg.clone())
1205                .await
1206                .expect("failed to initialize journal");
1207            // Add only a single item
1208            journal
1209                .append(test_digest(0))
1210                .await
1211                .expect("failed to append data");
1212            assert_eq!(journal.size().await.unwrap(), 1);
1213            journal.close().await.expect("Failed to close journal");
1214
1215            // Manually truncate most recent blob to simulate a partial write.
1216            let (blob, size) = context
1217                .open(&cfg.partition, &0u64.to_be_bytes())
1218                .await
1219                .expect("Failed to open blob");
1220            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1221            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1222            blob.sync().await.expect("Failed to sync blob");
1223
1224            // Re-initialize the journal to simulate a restart
1225            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1226                .await
1227                .expect("Failed to re-initialize journal");
1228
1229            // Since there was only a single item appended which we then corrupted, recovery should
1230            // leave us in the state of an empty journal.
1231            assert_eq!(journal.size().await.unwrap(), 0);
1232            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1233            // Make sure journal still works for appending.
1234            journal
1235                .append(test_digest(0))
1236                .await
1237                .expect("failed to append data");
1238            assert_eq!(journal.size().await.unwrap(), 1);
1239
1240            journal.destroy().await.unwrap();
1241        });
1242    }
1243
1244    #[test_traced]
1245    fn test_fixed_journal_recover_from_unwritten_data() {
1246        let executor = deterministic::Runner::default();
1247        executor.start(|context| async move {
1248            // Initialize the journal, allowing a max of 10 items per blob.
1249            let cfg = test_cfg(NZU64!(10));
1250            let mut journal = Journal::init(context.clone(), cfg.clone())
1251                .await
1252                .expect("failed to initialize journal");
1253
1254            // Add only a single item
1255            journal
1256                .append(test_digest(0))
1257                .await
1258                .expect("failed to append data");
1259            assert_eq!(journal.size().await.unwrap(), 1);
1260            journal.close().await.expect("Failed to close journal");
1261
1262            // Manually extend the blob by an amount at least some multiple of the chunk size to
1263            // simulate a failure where the file was extended, but no bytes were written due to
1264            // failure.
1265            let (blob, size) = context
1266                .open(&cfg.partition, &0u64.to_be_bytes())
1267                .await
1268                .expect("Failed to open blob");
1269            blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1270                .await
1271                .expect("Failed to extend blob");
1272            blob.sync().await.expect("Failed to sync blob");
1273
1274            // Re-initialize the journal to simulate a restart
1275            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1276                .await
1277                .expect("Failed to re-initialize journal");
1278
1279            // Ensure we've recovered to the state of a single item.
1280            assert_eq!(journal.size().await.unwrap(), 1);
1281            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(0));
1282
1283            // Make sure journal still works for appending.
1284            journal
1285                .append(test_digest(1))
1286                .await
1287                .expect("failed to append data");
1288            assert_eq!(journal.size().await.unwrap(), 2);
1289
1290            // Get the value of the first item
1291            let item = journal.read(0).await.unwrap();
1292            assert_eq!(item, test_digest(0));
1293
1294            // Get the value of new item
1295            let item = journal.read(1).await.unwrap();
1296            assert_eq!(item, test_digest(1));
1297
1298            journal.destroy().await.unwrap();
1299        });
1300    }
1301
1302    #[test_traced]
1303    fn test_fixed_journal_rewinding() {
1304        let executor = deterministic::Runner::default();
1305        executor.start(|context| async move {
1306            // Initialize the journal, allowing a max of 2 items per blob.
1307            let cfg = test_cfg(NZU64!(2));
1308            let mut journal = Journal::init(context.clone(), cfg.clone())
1309                .await
1310                .expect("failed to initialize journal");
1311            assert!(matches!(journal.rewind(0).await, Ok(())));
1312            assert!(matches!(
1313                journal.rewind(1).await,
1314                Err(Error::InvalidRewind(1))
1315            ));
1316
1317            // Append an item to the journal
1318            journal
1319                .append(test_digest(0))
1320                .await
1321                .expect("failed to append data 0");
1322            assert_eq!(journal.size().await.unwrap(), 1);
1323            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1324            assert!(matches!(journal.rewind(0).await, Ok(())));
1325            assert_eq!(journal.size().await.unwrap(), 0);
1326
1327            // append 7 items
1328            for i in 0..7 {
1329                let pos = journal
1330                    .append(test_digest(i))
1331                    .await
1332                    .expect("failed to append data");
1333                assert_eq!(pos, i);
1334            }
1335            let buffer = context.encode();
1336            assert!(buffer.contains("tracked 4"));
1337            assert_eq!(journal.size().await.unwrap(), 7);
1338
1339            // rewind back to item #4, which should prune 2 blobs
1340            assert!(matches!(journal.rewind(4).await, Ok(())));
1341            assert_eq!(journal.size().await.unwrap(), 4);
1342            let buffer = context.encode();
1343            assert!(buffer.contains("tracked 3"));
1344
1345            // rewind back to empty and ensure all blobs are rewound over
1346            assert!(matches!(journal.rewind(0).await, Ok(())));
1347            let buffer = context.encode();
1348            assert!(buffer.contains("tracked 1"));
1349            assert_eq!(journal.size().await.unwrap(), 0);
1350
1351            // stress test: add 100 items, rewind 49, repeat x10.
1352            for _ in 0..10 {
1353                for i in 0..100 {
1354                    journal
1355                        .append(test_digest(i))
1356                        .await
1357                        .expect("failed to append data");
1358                }
1359                journal
1360                    .rewind(journal.size().await.unwrap() - 49)
1361                    .await
1362                    .unwrap();
1363            }
1364            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1365            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1366
1367            journal.close().await.expect("Failed to close journal");
1368
1369            // Repeat with a different blob size (3 items per blob)
1370            let mut cfg = test_cfg(NZU64!(3));
1371            cfg.partition = "test_partition_2".into();
1372            let mut journal = Journal::init(context.clone(), cfg.clone())
1373                .await
1374                .expect("failed to initialize journal");
1375            for _ in 0..10 {
1376                for i in 0..100 {
1377                    journal
1378                        .append(test_digest(i))
1379                        .await
1380                        .expect("failed to append data");
1381                }
1382                journal
1383                    .rewind(journal.size().await.unwrap() - 49)
1384                    .await
1385                    .unwrap();
1386            }
1387            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1388
1389            journal.close().await.expect("Failed to close journal");
1390
1391            // Make sure re-opened journal is as expected
1392            let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1393                .await
1394                .expect("failed to re-initialize journal");
1395            assert_eq!(journal.size().await.unwrap(), 10 * (100 - 49));
1396
1397            // Make sure rewinding works after pruning
1398            journal.prune(300).await.expect("pruning failed");
1399            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1400            // Rewinding prior to our prune point should fail.
1401            assert!(matches!(
1402                journal.rewind(299).await,
1403                Err(Error::InvalidRewind(299))
1404            ));
1405            // Rewinding to the prune point should work.
1406            // always remain in the journal.
1407            assert!(matches!(journal.rewind(300).await, Ok(())));
1408            assert_eq!(journal.size().await.unwrap(), 300);
1409            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1410
1411            journal.destroy().await.unwrap();
1412        });
1413    }
1414
1415    /// Protect against accidental changes to the journal disk format.
1416    #[test_traced]
1417    fn test_journal_conformance() {
1418        // Initialize the deterministic context
1419        let executor = deterministic::Runner::default();
1420
1421        // Start the test within the executor
1422        executor.start(|context| async move {
1423            // Create a journal configuration
1424            let cfg = test_cfg(NZU64!(60));
1425
1426            // Initialize the journal
1427            let mut journal = Journal::init(context.clone(), cfg.clone())
1428                .await
1429                .expect("failed to initialize journal");
1430
1431            // Append 100 items to the journal
1432            for i in 0..100 {
1433                journal
1434                    .append(test_digest(i))
1435                    .await
1436                    .expect("Failed to append data");
1437            }
1438
1439            // Close the journal
1440            journal.close().await.expect("Failed to close journal");
1441
1442            // Hash blob contents
1443            let (blob, size) = context
1444                .open(&cfg.partition, &0u64.to_be_bytes())
1445                .await
1446                .expect("Failed to open blob");
1447            assert!(size > 0);
1448            let buf = blob
1449                .read_at(vec![0u8; size as usize], 0)
1450                .await
1451                .expect("Failed to read blob");
1452            let digest = Sha256::hash(buf.as_ref());
1453            assert_eq!(
1454                hex(&digest),
1455                "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1456            );
1457            blob.sync().await.expect("Failed to sync blob");
1458            let (blob, size) = context
1459                .open(&cfg.partition, &1u64.to_be_bytes())
1460                .await
1461                .expect("Failed to open blob");
1462            assert!(size > 0);
1463            let buf = blob
1464                .read_at(vec![0u8; size as usize], 0)
1465                .await
1466                .expect("Failed to read blob");
1467            let digest = Sha256::hash(buf.as_ref());
1468            assert_eq!(
1469                hex(&digest),
1470                "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1471            );
1472            blob.sync().await.expect("Failed to sync blob");
1473
1474            let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1475                .await
1476                .expect("failed to initialize journal");
1477            journal.destroy().await.unwrap();
1478        });
1479    }
1480}