commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs within a caller-provided
12//! `partition`. Each `Blob` contains a configurable maximum of `items_per_blob`, with each item
13//! followed by its checksum (CRC32):
14//!
15//! ```text
16//! +--------+-----------+--------+-----------+--------+----------+-------------+
17//! | item_0 | C(Item_0) | item_1 | C(Item_1) |   ...  | item_n-1 | C(Item_n-1) |
18//! +--------+-----------+--------+----0------+--------+----------+-------------+
19//!
20//! n = config.items_per_blob, C = CRC32
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! A fetched or replayed item's checksum is always computed and checked against the stored value
27//! before it is returned. If the checksums do not match, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` or pruning old items.
33//!
34//! # Consistency
35//!
36//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
37//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
38//! calling `close`, all pending data is automatically synced and any open blobs are closed.
39//!
40//! # Pruning
41//!
42//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
43//! given point in history.
44//!
45//! # State Sync
46//!
47//! `Journal::init_sync` allows for initializing a journal for use in state sync.
48//! When opened in this mode, we attempt to populate the journal within the given range
49//! with persisted data.
50//! If the journal is empty, we create a fresh journal at the specified position.
51//! If the journal is not empty, we prune the journal to the specified lower bound and rewind to
52//! the specified upper bound.
53//!
54//! # Replay
55//!
56//! The `replay` method supports fast reading of all unpruned items into memory.
57
58use crate::journal::{
59    contiguous::{MutableContiguous, PersistableContiguous},
60    Error,
61};
62use bytes::BufMut;
63use commonware_codec::{CodecFixed, DecodeExt as _, FixedSize};
64use commonware_runtime::{
65    buffer::{Append, PoolRef, Read},
66    telemetry::metrics::status::GaugeExt,
67    Blob, Error as RError, Metrics, Storage,
68};
69use commonware_utils::hex;
70use futures::{
71    future::try_join_all,
72    stream::{self, Stream},
73    StreamExt,
74};
75use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
76use std::{
77    collections::BTreeMap,
78    marker::PhantomData,
79    num::{NonZeroU64, NonZeroUsize},
80};
81use tracing::{debug, trace, warn};
82
83/// Configuration for `Journal` storage.
84#[derive(Clone)]
85pub struct Config {
86    /// The `commonware-runtime::Storage` partition to use for storing journal blobs.
87    pub partition: String,
88
89    /// The maximum number of journal items to store in each blob.
90    ///
91    /// Any unpruned historical blobs will contain exactly this number of items.
92    /// Only the newest blob may contain fewer items.
93    pub items_per_blob: NonZeroU64,
94
95    /// The buffer pool to use for caching data.
96    pub buffer_pool: PoolRef,
97
98    /// The size of the write buffer to use for each blob.
99    pub write_buffer: NonZeroUsize,
100}
101
102/// Implementation of `Journal` storage.
103pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
104    pub(crate) context: E,
105    pub(crate) cfg: Config,
106
107    /// Stores the historical blobs. A BTreeMap allows iterating over them from oldest to newest.
108    ///
109    /// # Invariants
110    ///
111    /// - Indices are consecutive and without gaps.
112    /// - Contains only full blobs.
113    /// - Never contains the most recent blob.
114    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
115
116    /// The most recent blob.
117    ///
118    /// # Invariant
119    ///
120    /// Always has room for at least one more item (and may be empty).
121    pub(crate) tail: Append<E::Blob>,
122
123    /// The index of the most recent blob.
124    pub(crate) tail_index: u64,
125
126    /// Cached size of the journal.
127    pub(crate) size: u64,
128
129    /// Cached pruning boundary.
130    pub(crate) pruning_boundary: u64,
131
132    pub(crate) tracked: Gauge,
133    pub(crate) synced: Counter,
134    pub(crate) pruned: Counter,
135
136    pub(crate) _array: PhantomData<A>,
137}
138
139impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
140    pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
141    pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
142
143    /// Initialize a new `Journal` instance.
144    ///
145    /// All backing blobs are opened but not read during initialization. The `replay` method can be
146    /// used to iterate over all items in the `Journal`.
147    ///
148    /// # Repair
149    ///
150    /// Like [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
151    /// and [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
152    /// the first invalid data read will be considered the new end of the journal (and the underlying [Blob] will be truncated to the last
153    /// valid item).
154    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
155        // Iterate over blobs in partition
156        let mut blobs = BTreeMap::new();
157        let stored_blobs = match context.scan(&cfg.partition).await {
158            Ok(blobs) => blobs,
159            Err(RError::PartitionMissing(_)) => Vec::new(),
160            Err(err) => return Err(Error::Runtime(err)),
161        };
162        for name in stored_blobs {
163            let (blob, size) = context
164                .open(&cfg.partition, &name)
165                .await
166                .map_err(Error::Runtime)?;
167            let index = match name.try_into() {
168                Ok(index) => u64::from_be_bytes(index),
169                Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
170            };
171            debug!(blob = index, size, "loaded blob");
172            blobs.insert(index, (blob, size));
173        }
174
175        // Check that there are no gaps in the historical blobs and that they are all full.
176        let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
177        if !blobs.is_empty() {
178            let mut it = blobs.keys().rev();
179            let mut prev_index = *it.next().unwrap();
180            for index in it {
181                let (_, size) = blobs.get(index).unwrap();
182                if *index != prev_index - 1 {
183                    return Err(Error::MissingBlob(prev_index - 1));
184                }
185                prev_index = *index;
186                if *size != full_size {
187                    // Non-final blobs that have invalid sizes are not recoverable.
188                    return Err(Error::InvalidBlobSize(*index, *size));
189                }
190            }
191        } else {
192            debug!("no blobs found");
193            let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
194            assert_eq!(size, 0);
195            blobs.insert(0, (blob, size));
196        }
197
198        // Initialize metrics.
199        let tracked = Gauge::default();
200        let synced = Counter::default();
201        let pruned = Counter::default();
202        context.register("tracked", "Number of blobs", tracked.clone());
203        context.register("synced", "Number of syncs", synced.clone());
204        context.register("pruned", "Number of blobs pruned", pruned.clone());
205        let _ = tracked.try_set(blobs.len());
206
207        // Initialize the tail blob.
208        let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
209
210        // Trim invalid items from the tail blob.
211        tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
212        if tail_size > full_size {
213            return Err(Error::InvalidBlobSize(tail_index, tail_size));
214        }
215
216        // If the tail blob is full we need to start a new one to maintain its invariant that there
217        // is always room for another item.
218        if tail_size == full_size {
219            warn!(
220                blob = tail_index,
221                "tail blob is full, creating a new empty one"
222            );
223            blobs.insert(tail_index, (tail, tail_size));
224            tail_index += 1;
225            (tail, tail_size) = context
226                .open(&cfg.partition, &tail_index.to_be_bytes())
227                .await?;
228            assert_eq!(tail_size, 0);
229            tracked.inc();
230        }
231
232        // Wrap all blobs with Append wrappers.
233        // TODO(https://github.com/commonwarexyz/monorepo/issues/1219): Consider creating an
234        // Immutable wrapper which doesn't allocate a write buffer for these.
235        let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
236            let pool = cfg.buffer_pool.clone();
237            async move {
238                let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
239                Ok::<_, Error>((index, (blob, size)))
240            }
241        }))
242        .await?;
243        let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
244        let size = tail_index * cfg.items_per_blob.get() + (tail_size / Self::CHUNK_SIZE_U64);
245        let pruning_boundary = if blobs.is_empty() {
246            tail_index * cfg.items_per_blob.get()
247        } else {
248            blobs[0].0 * cfg.items_per_blob.get()
249        };
250        assert!(size >= pruning_boundary);
251
252        Ok(Self {
253            context,
254            cfg,
255            blobs: blobs
256                .into_iter()
257                .map(|(index, (blob, _))| (index, blob))
258                .collect(),
259            tail,
260            tail_index,
261            size,
262            pruning_boundary,
263            tracked,
264            synced,
265            pruned,
266            _array: PhantomData,
267        })
268    }
269
270    /// Trim any invalid data found at the end of the tail blob and return the new size. The new
271    /// size will be less than or equal to the originally provided size, and a multiple of the item
272    /// size.
273    async fn trim_tail(
274        tail: &<E as Storage>::Blob,
275        mut tail_size: u64,
276        tail_index: u64,
277    ) -> Result<u64, Error> {
278        let mut truncated = false;
279        if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
280            warn!(
281                blob = tail_index,
282                invalid_size = tail_size,
283                "last blob size is not a multiple of item size, truncating"
284            );
285            tail_size -= tail_size % Self::CHUNK_SIZE_U64;
286            tail.resize(tail_size).await?;
287            truncated = true;
288        }
289
290        // Truncate any records with failing checksums. This can happen if the file system allocated
291        // extra space for a blob but there was a crash before any data was written to that space.
292        while tail_size > 0 {
293            let offset = tail_size - Self::CHUNK_SIZE_U64;
294            let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
295            match Self::verify_integrity(read.as_ref()) {
296                Ok(_) => break, // Valid item found, we can stop truncating.
297                Err(Error::ChecksumMismatch(_, _)) => {
298                    warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
299                    tail_size -= Self::CHUNK_SIZE_U64;
300                    tail.resize(tail_size).await?;
301                    truncated = true;
302                }
303                Err(err) => return Err(err),
304            }
305        }
306
307        // If we truncated the blob, make sure to sync it.
308        if truncated {
309            tail.sync().await?;
310        }
311
312        Ok(tail_size)
313    }
314
315    /// Sync any pending updates to disk.
316    pub async fn sync(&mut self) -> Result<(), Error> {
317        self.synced.inc();
318        debug!(blob = self.tail_index, "syncing blob");
319        self.tail.sync().await.map_err(Error::Runtime)
320    }
321
322    /// Return the total number of items in the journal, irrespective of pruning. The next value
323    /// appended to the journal will be at this position.
324    pub const fn size(&self) -> u64 {
325        self.size
326    }
327
328    /// Append a new item to the journal. Return the item's position in the journal, or error if the
329    /// operation fails.
330    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
331        // There should always be room to append an item in the newest blob
332        let mut size = self.tail.size().await;
333        assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
334        assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
335        let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
336        let item = item.encode();
337        let checksum = crc32fast::hash(&item);
338        buf.extend_from_slice(&item);
339        buf.put_u32(checksum);
340
341        // Write the item to the blob
342        let item_pos =
343            (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
344        self.tail.append(buf).await?;
345        trace!(blob = self.tail_index, pos = item_pos, "appended item");
346        size += Self::CHUNK_SIZE_U64;
347
348        // If the tail blob is now full we need to create a new empty one to fulfill the invariant
349        // that the tail blob always has room for a new element.
350        if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
351            // Sync the tail blob before creating a new one so if we crash we don't end up with a
352            // non-full historical blob.
353            self.tail.sync().await?;
354
355            // Create a new empty blob.
356            let next_blob_index = self.tail_index + 1;
357            debug!(blob = next_blob_index, "creating next blob");
358            let (next_blob, size) = self
359                .context
360                .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
361                .await?;
362            assert_eq!(size, 0);
363            let next_blob = Append::new(
364                next_blob,
365                size,
366                self.cfg.write_buffer,
367                self.cfg.buffer_pool.clone(),
368            )
369            .await?;
370            self.tracked.inc();
371
372            // Move the old tail blob to the historical blobs map and set the new blob as the tail.
373            let old_tail = std::mem::replace(&mut self.tail, next_blob);
374            assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
375            self.tail_index = next_blob_index;
376        }
377        self.size += 1;
378
379        Ok(item_pos)
380    }
381
382    /// Rewind the journal to the given `size`. Returns [Error::MissingBlob] if the rewind point
383    /// precedes the oldest retained element point. The journal is not synced after rewinding.
384    ///
385    /// # Warnings
386    ///
387    /// * This operation is not guaranteed to survive restarts until sync is called.
388    /// * This operation is not atomic, but it will always leave the journal in a consistent state
389    ///   in the event of failure since blobs are always removed from newest to oldest.
390    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
391        match size.cmp(&self.size()) {
392            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
393            std::cmp::Ordering::Equal => return Ok(()),
394            std::cmp::Ordering::Less => {}
395        }
396        let rewind_to_blob_index = size / self.cfg.items_per_blob;
397        if rewind_to_blob_index < self.oldest_blob_index() {
398            return Err(Error::InvalidRewind(size));
399        }
400        let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
401
402        // Remove blobs until we reach the rewind point.  Blobs must be removed in reverse order to
403        // preserve consistency in the event of failures.
404        while rewind_to_blob_index < self.tail_index {
405            let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
406            assert_eq!(blob_index, self.tail_index - 1);
407            std::mem::swap(&mut self.tail, &mut new_tail);
408            self.remove_blob(self.tail_index, new_tail).await?;
409            self.tail_index -= 1;
410        }
411
412        // Truncate the tail blob to the correct offset.
413        self.tail.resize(rewind_to_offset).await?;
414
415        self.size = size;
416        assert!(size >= self.pruning_boundary);
417
418        Ok(())
419    }
420
421    /// Return the position of the oldest item in the journal that remains readable.
422    ///
423    /// Note that this value could be older than the `min_item_pos` last passed to prune.
424    pub const fn oldest_retained_pos(&self) -> Option<u64> {
425        if self.pruning_boundary == self.size {
426            return None;
427        }
428
429        Some(self.pruning_boundary)
430    }
431
432    /// Return the location before which all items have been pruned.
433    pub const fn pruning_boundary(&self) -> u64 {
434        self.pruning_boundary
435    }
436
437    /// Read the item at position `pos` in the journal.
438    ///
439    /// # Errors
440    ///
441    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
442    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
443    pub async fn read(&self, pos: u64) -> Result<A, Error> {
444        let blob_index = pos / self.cfg.items_per_blob.get();
445        if blob_index > self.tail_index {
446            return Err(Error::ItemOutOfRange(pos));
447        }
448
449        let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
450
451        let blob = if blob_index == self.tail_index {
452            if offset >= self.tail.size().await {
453                return Err(Error::ItemOutOfRange(pos));
454            }
455            &self.tail
456        } else {
457            self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
458        };
459
460        let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
461        Self::verify_integrity(read.as_ref())
462    }
463
464    /// Verify the integrity of the Array + checksum in `buf`, returning:
465    /// - The array if it is valid,
466    /// - Error::ChecksumMismatch if the checksum is invalid, or
467    /// - Error::Codec if the array could not be decoded after passing the checksum check.
468    ///
469    ///  Error::Codec likely indicates a logic error rather than a corruption issue.
470    fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
471        let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
472        let checksum = crc32fast::hash(&buf[..A::SIZE]);
473        if checksum != stored_checksum {
474            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
475        }
476        A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
477    }
478
479    /// Returns an ordered stream of all items in the journal with position >= `start_pos`.
480    ///
481    /// # Panics
482    ///
483    /// Panics `start_pos` exceeds log size.
484    ///
485    /// # Integrity
486    ///
487    /// If any corrupted data is found, the stream will return an error.
488    pub async fn replay(
489        &self,
490        buffer: NonZeroUsize,
491        start_pos: u64,
492    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
493        assert!(start_pos <= self.size());
494
495        // Collect all blobs to replay paired with their index.
496        let items_per_blob = self.cfg.items_per_blob.get();
497        let start_blob = start_pos / items_per_blob;
498        assert!(start_blob <= self.tail_index);
499        let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
500        let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
501        let mut blob_plus = blobs
502            .into_iter()
503            .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
504            .collect::<Vec<_>>();
505
506        // Include the tail blob.
507        self.tail.sync().await?; // make sure no data is buffered
508        let tail_size = self.tail.size().await;
509        blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
510        let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
511
512        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
513        // memory with buffered data).
514        let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
515            // Create a new reader and buffer for each blob. Preallocating the buffer here to avoid
516            // a per-iteration allocation improves performance by ~20%.
517            let mut reader = Read::new(blob, size, buffer);
518            let buf = vec![0u8; Self::CHUNK_SIZE];
519            let initial_offset = if blob_index == start_blob {
520                // If this is the very first blob then we need to seek to the starting position.
521                reader.seek_to(start_offset).expect("invalid start_pos");
522                start_offset
523            } else {
524                0
525            };
526
527            stream::unfold(
528                (buf, reader, initial_offset),
529                move |(mut buf, mut reader, offset)| async move {
530                    if offset >= reader.blob_size() {
531                        return None;
532                    }
533
534                    // Even though we are reusing the buffer, `read_exact` will overwrite any
535                    // previous data, so there's no need to explicitly clear it.
536                    let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
537                    match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
538                        Ok(()) => {
539                            let next_offset = offset + Self::CHUNK_SIZE_U64;
540                            let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
541                            if result.is_err() {
542                                warn!("corrupted item at {item_pos}");
543                            }
544                            Some((result, (buf, reader, next_offset)))
545                        }
546                        Err(err) => {
547                            warn!(
548                                item_pos,
549                                err = err.to_string(),
550                                "error reading item during replay"
551                            );
552                            Some((Err(Error::Runtime(err)), (buf, reader, size)))
553                        }
554                    }
555                },
556            )
557        });
558
559        Ok(stream)
560    }
561
562    /// Return the index of blob containing the oldest retained items.
563    fn oldest_blob_index(&self) -> u64 {
564        if self.blobs.is_empty() {
565            self.tail_index
566        } else {
567            *self.blobs.first_key_value().unwrap().0
568        }
569    }
570
571    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
572    /// such items in order to preserve blob boundaries, but the amount of such items will always be
573    /// less than the configured number of items per blob. Returns true if any items were pruned.
574    ///
575    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
576    /// event of failure as items are always pruned in order from oldest to newest.
577    pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
578        let oldest_blob_index = self.oldest_blob_index();
579        let new_oldest_blob =
580            std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
581
582        let mut pruned = false;
583        for index in oldest_blob_index..new_oldest_blob {
584            pruned = true;
585            let blob = self.blobs.remove(&index).unwrap();
586            self.remove_blob(index, blob).await?;
587            self.pruned.inc();
588        }
589        if pruned {
590            self.pruning_boundary = new_oldest_blob * self.cfg.items_per_blob.get();
591        }
592
593        Ok(pruned)
594    }
595
596    /// Safely removes any previously tracked blob from underlying storage.
597    async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
598        drop(blob);
599        self.context
600            .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
601            .await?;
602        debug!(blob = index, "removed blob");
603        self.tracked.dec();
604
605        Ok(())
606    }
607
608    /// Syncs and closes all open sections.
609    pub async fn close(self) -> Result<(), Error> {
610        for (i, blob) in self.blobs.into_iter() {
611            blob.sync().await?;
612            debug!(blob = i, "synced blob");
613        }
614        self.tail.sync().await?;
615        debug!(blob = self.tail_index, "synced tail");
616
617        Ok(())
618    }
619
620    /// Remove any underlying blobs created by the journal.
621    pub async fn destroy(self) -> Result<(), Error> {
622        for (i, blob) in self.blobs.into_iter() {
623            drop(blob);
624            debug!(blob = i, "destroyed blob");
625            self.context
626                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
627                .await?;
628        }
629
630        drop(self.tail);
631        debug!(blob = self.tail_index, "destroyed blob");
632        self.context
633            .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
634            .await?;
635
636        match self.context.remove(&self.cfg.partition, None).await {
637            Ok(()) => {}
638            Err(RError::PartitionMissing(_)) => {
639                // Partition already removed or never existed.
640            }
641            Err(err) => return Err(Error::Runtime(err)),
642        }
643
644        Ok(())
645    }
646}
647
648// Implement Contiguous trait for fixed-length journals
649impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> super::Contiguous for Journal<E, A> {
650    type Item = A;
651
652    fn size(&self) -> u64 {
653        Self::size(self)
654    }
655
656    fn oldest_retained_pos(&self) -> Option<u64> {
657        Self::oldest_retained_pos(self)
658    }
659
660    fn pruning_boundary(&self) -> u64 {
661        Self::pruning_boundary(self)
662    }
663
664    async fn replay(
665        &self,
666        start_pos: u64,
667        buffer: NonZeroUsize,
668    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
669        Self::replay(self, buffer, start_pos).await
670    }
671
672    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
673        Self::read(self, position).await
674    }
675}
676
677impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> MutableContiguous for Journal<E, A> {
678    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
679        Self::append(self, item).await
680    }
681
682    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
683        Self::prune(self, min_position).await
684    }
685
686    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
687        Self::rewind(self, size).await
688    }
689}
690
691impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> PersistableContiguous for Journal<E, A> {
692    async fn commit(&mut self) -> Result<(), Error> {
693        Self::sync(self).await
694    }
695
696    async fn sync(&mut self) -> Result<(), Error> {
697        Self::sync(self).await
698    }
699
700    async fn close(self) -> Result<(), Error> {
701        Self::close(self).await
702    }
703
704    async fn destroy(self) -> Result<(), Error> {
705        Self::destroy(self).await
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
713    use commonware_macros::test_traced;
714    use commonware_runtime::{
715        deterministic::{self, Context},
716        Blob, Runner, Storage,
717    };
718    use commonware_utils::{NZUsize, NZU64};
719    use futures::{pin_mut, StreamExt};
720
721    const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
722    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
723
724    /// Generate a SHA-256 digest for the given value.
725    fn test_digest(value: u64) -> Digest {
726        Sha256::hash(&value.to_be_bytes())
727    }
728
729    fn test_cfg(items_per_blob: NonZeroU64) -> Config {
730        Config {
731            partition: "test_partition".into(),
732            items_per_blob,
733            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
734            write_buffer: NZUsize!(2048),
735        }
736    }
737
738    #[test_traced]
739    fn test_fixed_journal_append_and_prune() {
740        // Initialize the deterministic context
741        let executor = deterministic::Runner::default();
742
743        // Start the test within the executor
744        executor.start(|context| async move {
745            // Initialize the journal, allowing a max of 2 items per blob.
746            let cfg = test_cfg(NZU64!(2));
747            let mut journal = Journal::init(context.clone(), cfg.clone())
748                .await
749                .expect("failed to initialize journal");
750
751            let buffer = context.encode();
752            assert!(buffer.contains("tracked 1"));
753
754            // Append an item to the journal
755            let mut pos = journal
756                .append(test_digest(0))
757                .await
758                .expect("failed to append data 0");
759            assert_eq!(pos, 0);
760
761            // Close the journal
762            journal.close().await.expect("Failed to close journal");
763
764            // Re-initialize the journal to simulate a restart
765            let cfg = test_cfg(NZU64!(2));
766            let mut journal = Journal::init(context.clone(), cfg.clone())
767                .await
768                .expect("failed to re-initialize journal");
769            assert_eq!(journal.size(), 1);
770
771            // Append two more items to the journal to trigger a new blob creation
772            pos = journal
773                .append(test_digest(1))
774                .await
775                .expect("failed to append data 1");
776            assert_eq!(pos, 1);
777            pos = journal
778                .append(test_digest(2))
779                .await
780                .expect("failed to append data 2");
781            assert_eq!(pos, 2);
782            let buffer = context.encode();
783            assert!(buffer.contains("tracked 2"));
784
785            // Read the items back
786            let item0 = journal.read(0).await.expect("failed to read data 0");
787            assert_eq!(item0, test_digest(0));
788            let item1 = journal.read(1).await.expect("failed to read data 1");
789            assert_eq!(item1, test_digest(1));
790            let item2 = journal.read(2).await.expect("failed to read data 2");
791            assert_eq!(item2, test_digest(2));
792            let err = journal.read(3).await.expect_err("expected read to fail");
793            assert!(matches!(err, Error::ItemOutOfRange(3)));
794
795            // Sync the journal
796            journal.sync().await.expect("failed to sync journal");
797            let buffer = context.encode();
798            assert!(buffer.contains("synced_total 1"));
799
800            // Pruning to 1 should be a no-op because there's no blob with only older items.
801            journal.prune(1).await.expect("failed to prune journal 1");
802            let buffer = context.encode();
803            assert!(buffer.contains("tracked 2"));
804
805            // Pruning to 2 should allow the first blob to be pruned.
806            journal.prune(2).await.expect("failed to prune journal 2");
807            assert_eq!(journal.oldest_retained_pos(), Some(2));
808            let buffer = context.encode();
809            assert!(buffer.contains("tracked 1"));
810            assert!(buffer.contains("pruned_total 1"));
811
812            // Reading from the first blob should fail since it's now pruned
813            let result0 = journal.read(0).await;
814            assert!(matches!(result0, Err(Error::ItemPruned(0))));
815            let result1 = journal.read(1).await;
816            assert!(matches!(result1, Err(Error::ItemPruned(1))));
817
818            // Third item should still be readable
819            let result2 = journal.read(2).await.unwrap();
820            assert_eq!(result2, test_digest(2));
821
822            // Should be able to continue to append items
823            for i in 3..10 {
824                let pos = journal
825                    .append(test_digest(i))
826                    .await
827                    .expect("failed to append data");
828                assert_eq!(pos, i);
829            }
830
831            // Check no-op pruning
832            journal.prune(0).await.expect("no-op pruning failed");
833            assert_eq!(journal.oldest_blob_index(), 1);
834            assert_eq!(journal.tail_index, 5);
835            assert_eq!(journal.oldest_retained_pos(), Some(2));
836
837            // Prune first 3 blobs (6 items)
838            journal
839                .prune(3 * cfg.items_per_blob.get())
840                .await
841                .expect("failed to prune journal 2");
842            assert_eq!(journal.oldest_retained_pos(), Some(6));
843            let buffer = context.encode();
844            assert_eq!(journal.oldest_blob_index(), 3);
845            assert_eq!(journal.tail_index, 5);
846            assert!(buffer.contains("tracked 3"));
847            assert!(buffer.contains("pruned_total 3"));
848
849            // Try pruning (more than) everything in the journal.
850            journal
851                .prune(10000)
852                .await
853                .expect("failed to max-prune journal");
854            let buffer = context.encode();
855            let size = journal.size();
856            assert_eq!(size, 10);
857            assert_eq!(journal.oldest_blob_index(), 5);
858            assert_eq!(journal.tail_index, 5);
859            assert!(buffer.contains("tracked 1"));
860            assert!(buffer.contains("pruned_total 5"));
861            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
862            // will be empty, and there will be no retained items.
863            assert_eq!(journal.oldest_retained_pos(), None);
864            // Pruning boundary should equal size when oldest_retained is None.
865            assert_eq!(journal.pruning_boundary(), size);
866
867            {
868                let stream = journal
869                    .replay(NZUsize!(1024), 0)
870                    .await
871                    .expect("failed to replay journal");
872                pin_mut!(stream);
873                let mut items = Vec::new();
874                while let Some(result) = stream.next().await {
875                    match result {
876                        Ok((pos, item)) => {
877                            assert_eq!(test_digest(pos), item);
878                            items.push(pos);
879                        }
880                        Err(err) => panic!("Failed to read item: {err}"),
881                    }
882                }
883                assert_eq!(items, Vec::<u64>::new());
884            }
885
886            journal.destroy().await.unwrap();
887        });
888    }
889
890    /// Append a lot of data to make sure we exercise buffer pool paging boundaries.
891    #[test_traced]
892    fn test_fixed_journal_append_a_lot_of_data() {
893        // Initialize the deterministic context
894        let executor = deterministic::Runner::default();
895        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
896        executor.start(|context| async move {
897            let cfg = test_cfg(ITEMS_PER_BLOB);
898            let mut journal = Journal::init(context.clone(), cfg.clone())
899                .await
900                .expect("failed to initialize journal");
901            // Append 2 blobs worth of items.
902            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
903                journal
904                    .append(test_digest(i))
905                    .await
906                    .expect("failed to append data");
907            }
908            // Close, reopen, then read back.
909            journal.close().await.expect("failed to close journal");
910            let journal = Journal::init(context.clone(), cfg.clone())
911                .await
912                .expect("failed to re-initialize journal");
913            for i in 0u64..10000 {
914                let item: Digest = journal.read(i).await.expect("failed to read data");
915                assert_eq!(item, test_digest(i));
916            }
917            journal.destroy().await.expect("failed to destroy journal");
918        });
919    }
920
921    #[test_traced]
922    fn test_fixed_journal_replay() {
923        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
924        // Initialize the deterministic context
925        let executor = deterministic::Runner::default();
926
927        // Start the test within the executor
928        executor.start(|context| async move {
929            // Initialize the journal, allowing a max of 7 items per blob.
930            let cfg = test_cfg(ITEMS_PER_BLOB);
931            let mut journal = Journal::init(context.clone(), cfg.clone())
932                .await
933                .expect("failed to initialize journal");
934
935            // Append many items, filling 100 blobs and part of the 101st
936            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
937                let pos = journal
938                    .append(test_digest(i))
939                    .await
940                    .expect("failed to append data");
941                assert_eq!(pos, i);
942            }
943
944            let buffer = context.encode();
945            assert!(buffer.contains("tracked 101"));
946
947            // Read them back the usual way.
948            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
949                let item: Digest = journal.read(i).await.expect("failed to read data");
950                assert_eq!(item, test_digest(i), "i={i}");
951            }
952
953            // Replay should return all items
954            {
955                let stream = journal
956                    .replay(NZUsize!(1024), 0)
957                    .await
958                    .expect("failed to replay journal");
959                let mut items = Vec::new();
960                pin_mut!(stream);
961                while let Some(result) = stream.next().await {
962                    match result {
963                        Ok((pos, item)) => {
964                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
965                            items.push(pos);
966                        }
967                        Err(err) => panic!("Failed to read item: {err}"),
968                    }
969                }
970
971                // Make sure all items were replayed
972                assert_eq!(
973                    items.len(),
974                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
975                );
976                items.sort();
977                for (i, pos) in items.iter().enumerate() {
978                    assert_eq!(i as u64, *pos);
979                }
980            }
981            journal.close().await.expect("Failed to close journal");
982
983            // Corrupt one of the checksums and make sure it's detected.
984            let checksum_offset = Digest::SIZE as u64
985                + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
986            let (blob, _) = context
987                .open(&cfg.partition, &40u64.to_be_bytes())
988                .await
989                .expect("Failed to open blob");
990            // Write incorrect checksum
991            let bad_checksum = 123456789u32;
992            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
993                .await
994                .expect("Failed to write incorrect checksum");
995            let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
996            blob.sync().await.expect("Failed to sync blob");
997
998            // Re-initialize the journal to simulate a restart
999            let journal = Journal::init(context.clone(), cfg.clone())
1000                .await
1001                .expect("Failed to re-initialize journal");
1002
1003            // Make sure reading the corrupted item fails with appropriate error.
1004            let err = journal.read(corrupted_item_pos).await.unwrap_err();
1005            assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
1006
1007            // Replay all items, making sure the checksum mismatch error is handled correctly.
1008            {
1009                let stream = journal
1010                    .replay(NZUsize!(1024), 0)
1011                    .await
1012                    .expect("failed to replay journal");
1013                let mut items = Vec::new();
1014                pin_mut!(stream);
1015                let mut error_count = 0;
1016                while let Some(result) = stream.next().await {
1017                    match result {
1018                        Ok((pos, item)) => {
1019                            assert_eq!(test_digest(pos), item);
1020                            items.push(pos);
1021                        }
1022                        Err(err) => {
1023                            error_count += 1;
1024                            assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1025                        }
1026                    }
1027                }
1028                assert_eq!(error_count, 1);
1029                // Result will be missing only the one corrupted value.
1030                assert_eq!(
1031                    items.len(),
1032                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
1033                );
1034            }
1035            journal.close().await.expect("Failed to close journal");
1036        });
1037    }
1038
1039    #[test_traced]
1040    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1041        // Initialize the deterministic context
1042        let executor = deterministic::Runner::default();
1043        // Start the test within the executor
1044        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1045        executor.start(|context| async move {
1046            // Initialize the journal, allowing a max of 7 items per blob.
1047            let cfg = test_cfg(ITEMS_PER_BLOB);
1048            let mut journal = Journal::init(context.clone(), cfg.clone())
1049                .await
1050                .expect("failed to initialize journal");
1051
1052            // Append many items, filling 100 blobs and part of the 101st
1053            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1054                let pos = journal
1055                    .append(test_digest(i))
1056                    .await
1057                    .expect("failed to append data");
1058                assert_eq!(pos, i);
1059            }
1060            journal.close().await.expect("Failed to close journal");
1061
1062            let buffer = context.encode();
1063            assert!(buffer.contains("tracked 101"));
1064
1065            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1066            let (blob, size) = context
1067                .open(&cfg.partition, &40u64.to_be_bytes())
1068                .await
1069                .expect("Failed to open blob");
1070            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1071            blob.sync().await.expect("Failed to sync blob");
1072            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1073            assert!(matches!(
1074                result.err().unwrap(),
1075                Error::InvalidBlobSize(_, _)
1076            ));
1077
1078            // Delete a blob and make sure the gap is detected during initialization.
1079            context
1080                .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
1081                .await
1082                .expect("Failed to remove blob");
1083            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1084            assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
1085        });
1086    }
1087
1088    #[test_traced]
1089    fn test_fixed_journal_test_trim_blob() {
1090        // Initialize the deterministic context
1091        let executor = deterministic::Runner::default();
1092        // Start the test within the executor
1093        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1094        executor.start(|context| async move {
1095            // Initialize the journal, allowing a max of 7 items per blob.
1096            let cfg = test_cfg(ITEMS_PER_BLOB);
1097            let mut journal = Journal::init(context.clone(), cfg.clone())
1098                .await
1099                .expect("failed to initialize journal");
1100
1101            // Fill one blob and put 3 items in the second.
1102            let item_count = ITEMS_PER_BLOB.get() + 3;
1103            for i in 0u64..item_count {
1104                journal
1105                    .append(test_digest(i))
1106                    .await
1107                    .expect("failed to append data");
1108            }
1109            assert_eq!(journal.size(), item_count);
1110            journal.close().await.expect("Failed to close journal");
1111
1112            // Truncate the tail blob by one byte, which should result in the 3rd item being
1113            // trimmed.
1114            let (blob, size) = context
1115                .open(&cfg.partition, &1u64.to_be_bytes())
1116                .await
1117                .expect("Failed to open blob");
1118            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1119
1120            // Write incorrect checksum into the second item in the blob, which should result in the
1121            // second item being trimmed.
1122            let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1123
1124            let bad_checksum = 123456789u32;
1125            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1126                .await
1127                .expect("Failed to write incorrect checksum");
1128            blob.sync().await.expect("Failed to sync blob");
1129
1130            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1131                .await
1132                .unwrap();
1133
1134            // Confirm 2 items were trimmed.
1135            assert_eq!(journal.size(), item_count - 2);
1136
1137            // Corrupt the last item, ensuring last blob is trimmed to empty state.
1138            let (blob, size) = context
1139                .open(&cfg.partition, &1u64.to_be_bytes())
1140                .await
1141                .expect("Failed to open blob");
1142            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1143            blob.sync().await.expect("Failed to sync blob");
1144
1145            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1146                .await
1147                .unwrap();
1148
1149            // Confirm last item in blob was trimmed.
1150            assert_eq!(journal.size(), item_count - 3);
1151
1152            // Cleanup.
1153            journal.destroy().await.expect("Failed to destroy journal");
1154        });
1155    }
1156
1157    #[test_traced]
1158    fn test_fixed_journal_partial_replay() {
1159        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1160        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1161        // starting position.
1162        const START_POS: u64 = 53;
1163
1164        // Initialize the deterministic context
1165        let executor = deterministic::Runner::default();
1166        // Start the test within the executor
1167        executor.start(|context| async move {
1168            // Initialize the journal, allowing a max of 7 items per blob.
1169            let cfg = test_cfg(ITEMS_PER_BLOB);
1170            let mut journal = Journal::init(context.clone(), cfg.clone())
1171                .await
1172                .expect("failed to initialize journal");
1173
1174            // Append many items, filling 100 blobs and part of the 101st
1175            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1176                let pos = journal
1177                    .append(test_digest(i))
1178                    .await
1179                    .expect("failed to append data");
1180                assert_eq!(pos, i);
1181            }
1182
1183            let buffer = context.encode();
1184            assert!(buffer.contains("tracked 101"));
1185
1186            // Replay should return all items except the first `START_POS`.
1187            {
1188                let stream = journal
1189                    .replay(NZUsize!(1024), START_POS)
1190                    .await
1191                    .expect("failed to replay journal");
1192                let mut items = Vec::new();
1193                pin_mut!(stream);
1194                while let Some(result) = stream.next().await {
1195                    match result {
1196                        Ok((pos, item)) => {
1197                            assert!(pos >= START_POS, "pos={pos}");
1198                            assert_eq!(
1199                                test_digest(pos),
1200                                item,
1201                                "Item at position {pos} did not match expected digest"
1202                            );
1203                            items.push(pos);
1204                        }
1205                        Err(err) => panic!("Failed to read item: {err}"),
1206                    }
1207                }
1208
1209                // Make sure all items were replayed
1210                assert_eq!(
1211                    items.len(),
1212                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1213                        - START_POS as usize
1214                );
1215                items.sort();
1216                for (i, pos) in items.iter().enumerate() {
1217                    assert_eq!(i as u64, *pos - START_POS);
1218                }
1219            }
1220
1221            journal.destroy().await.unwrap();
1222        });
1223    }
1224
1225    #[test_traced]
1226    fn test_fixed_journal_recover_from_partial_write() {
1227        // Initialize the deterministic context
1228        let executor = deterministic::Runner::default();
1229
1230        // Start the test within the executor
1231        executor.start(|context| async move {
1232            // Initialize the journal, allowing a max of 3 items per blob.
1233            let cfg = test_cfg(NZU64!(3));
1234            let mut journal = Journal::init(context.clone(), cfg.clone())
1235                .await
1236                .expect("failed to initialize journal");
1237            for i in 0..5 {
1238                journal
1239                    .append(test_digest(i))
1240                    .await
1241                    .expect("failed to append data");
1242            }
1243            assert_eq!(journal.size(), 5);
1244            let buffer = context.encode();
1245            assert!(buffer.contains("tracked 2"));
1246            journal.close().await.expect("Failed to close journal");
1247
1248            // Manually truncate most recent blob to simulate a partial write.
1249            let (blob, size) = context
1250                .open(&cfg.partition, &1u64.to_be_bytes())
1251                .await
1252                .expect("Failed to open blob");
1253            // truncate the most recent blob by 1 byte which corrupts the most recent item
1254            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1255            blob.sync().await.expect("Failed to sync blob");
1256
1257            // Re-initialize the journal to simulate a restart
1258            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1259                .await
1260                .expect("Failed to re-initialize journal");
1261            // the last corrupted item should get discarded
1262            assert_eq!(journal.size(), 4);
1263            let buffer = context.encode();
1264            assert!(buffer.contains("tracked 2"));
1265            journal.close().await.expect("Failed to close journal");
1266
1267            // Delete the tail blob to simulate a sync() that wrote the last blob at the point it
1268            // was entirely full, but a crash happened before the next empty blob could be created.
1269            context
1270                .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1271                .await
1272                .expect("Failed to remove blob");
1273            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1274                .await
1275                .expect("Failed to re-initialize journal");
1276            assert_eq!(journal.size(), 3);
1277            let buffer = context.encode();
1278            // Even though it was deleted, tail blob should be re-created and left empty by the
1279            // recovery code. This means we have 2 blobs total, with 3 items in the first, and none
1280            // in the tail.
1281            assert!(buffer.contains("tracked 2"));
1282            assert_eq!(journal.size(), 3);
1283
1284            journal.destroy().await.unwrap();
1285        });
1286    }
1287
1288    #[test_traced]
1289    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1290        let executor = deterministic::Runner::default();
1291        executor.start(|context| async move {
1292            // Initialize the journal, allowing a max of 10 items per blob.
1293            let cfg = test_cfg(NZU64!(10));
1294            let mut journal = Journal::init(context.clone(), cfg.clone())
1295                .await
1296                .expect("failed to initialize journal");
1297            // Add only a single item
1298            journal
1299                .append(test_digest(0))
1300                .await
1301                .expect("failed to append data");
1302            assert_eq!(journal.size(), 1);
1303            journal.close().await.expect("Failed to close journal");
1304
1305            // Manually truncate most recent blob to simulate a partial write.
1306            let (blob, size) = context
1307                .open(&cfg.partition, &0u64.to_be_bytes())
1308                .await
1309                .expect("Failed to open blob");
1310            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1311            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1312            blob.sync().await.expect("Failed to sync blob");
1313
1314            // Re-initialize the journal to simulate a restart
1315            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1316                .await
1317                .expect("Failed to re-initialize journal");
1318
1319            // Since there was only a single item appended which we then corrupted, recovery should
1320            // leave us in the state of an empty journal.
1321            assert_eq!(journal.size(), 0);
1322            assert_eq!(journal.oldest_retained_pos(), None);
1323            // Make sure journal still works for appending.
1324            journal
1325                .append(test_digest(0))
1326                .await
1327                .expect("failed to append data");
1328            assert_eq!(journal.size(), 1);
1329
1330            journal.destroy().await.unwrap();
1331        });
1332    }
1333
1334    #[test_traced]
1335    fn test_fixed_journal_recover_from_unwritten_data() {
1336        let executor = deterministic::Runner::default();
1337        executor.start(|context| async move {
1338            // Initialize the journal, allowing a max of 10 items per blob.
1339            let cfg = test_cfg(NZU64!(10));
1340            let mut journal = Journal::init(context.clone(), cfg.clone())
1341                .await
1342                .expect("failed to initialize journal");
1343
1344            // Add only a single item
1345            journal
1346                .append(test_digest(0))
1347                .await
1348                .expect("failed to append data");
1349            assert_eq!(journal.size(), 1);
1350            journal.close().await.expect("Failed to close journal");
1351
1352            // Manually extend the blob by an amount at least some multiple of the chunk size to
1353            // simulate a failure where the file was extended, but no bytes were written due to
1354            // failure.
1355            let (blob, size) = context
1356                .open(&cfg.partition, &0u64.to_be_bytes())
1357                .await
1358                .expect("Failed to open blob");
1359            blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1360                .await
1361                .expect("Failed to extend blob");
1362            blob.sync().await.expect("Failed to sync blob");
1363
1364            // Re-initialize the journal to simulate a restart
1365            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1366                .await
1367                .expect("Failed to re-initialize journal");
1368
1369            // Ensure we've recovered to the state of a single item.
1370            assert_eq!(journal.size(), 1);
1371            assert_eq!(journal.oldest_retained_pos(), Some(0));
1372
1373            // Make sure journal still works for appending.
1374            journal
1375                .append(test_digest(1))
1376                .await
1377                .expect("failed to append data");
1378            assert_eq!(journal.size(), 2);
1379
1380            // Get the value of the first item
1381            let item = journal.read(0).await.unwrap();
1382            assert_eq!(item, test_digest(0));
1383
1384            // Get the value of new item
1385            let item = journal.read(1).await.unwrap();
1386            assert_eq!(item, test_digest(1));
1387
1388            journal.destroy().await.unwrap();
1389        });
1390    }
1391
1392    #[test_traced]
1393    fn test_fixed_journal_rewinding() {
1394        let executor = deterministic::Runner::default();
1395        executor.start(|context| async move {
1396            // Initialize the journal, allowing a max of 2 items per blob.
1397            let cfg = test_cfg(NZU64!(2));
1398            let mut journal = Journal::init(context.clone(), cfg.clone())
1399                .await
1400                .expect("failed to initialize journal");
1401            assert!(matches!(journal.rewind(0).await, Ok(())));
1402            assert!(matches!(
1403                journal.rewind(1).await,
1404                Err(Error::InvalidRewind(1))
1405            ));
1406
1407            // Append an item to the journal
1408            journal
1409                .append(test_digest(0))
1410                .await
1411                .expect("failed to append data 0");
1412            assert_eq!(journal.size(), 1);
1413            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1414            assert!(matches!(journal.rewind(0).await, Ok(())));
1415            assert_eq!(journal.size(), 0);
1416
1417            // append 7 items
1418            for i in 0..7 {
1419                let pos = journal
1420                    .append(test_digest(i))
1421                    .await
1422                    .expect("failed to append data");
1423                assert_eq!(pos, i);
1424            }
1425            let buffer = context.encode();
1426            assert!(buffer.contains("tracked 4"));
1427            assert_eq!(journal.size(), 7);
1428
1429            // rewind back to item #4, which should prune 2 blobs
1430            assert!(matches!(journal.rewind(4).await, Ok(())));
1431            assert_eq!(journal.size(), 4);
1432            let buffer = context.encode();
1433            assert!(buffer.contains("tracked 3"));
1434
1435            // rewind back to empty and ensure all blobs are rewound over
1436            assert!(matches!(journal.rewind(0).await, Ok(())));
1437            let buffer = context.encode();
1438            assert!(buffer.contains("tracked 1"));
1439            assert_eq!(journal.size(), 0);
1440
1441            // stress test: add 100 items, rewind 49, repeat x10.
1442            for _ in 0..10 {
1443                for i in 0..100 {
1444                    journal
1445                        .append(test_digest(i))
1446                        .await
1447                        .expect("failed to append data");
1448                }
1449                journal.rewind(journal.size() - 49).await.unwrap();
1450            }
1451            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1452            assert_eq!(journal.size(), ITEMS_REMAINING);
1453
1454            journal.close().await.expect("Failed to close journal");
1455
1456            // Repeat with a different blob size (3 items per blob)
1457            let mut cfg = test_cfg(NZU64!(3));
1458            cfg.partition = "test_partition_2".into();
1459            let mut journal = Journal::init(context.clone(), cfg.clone())
1460                .await
1461                .expect("failed to initialize journal");
1462            for _ in 0..10 {
1463                for i in 0..100 {
1464                    journal
1465                        .append(test_digest(i))
1466                        .await
1467                        .expect("failed to append data");
1468                }
1469                journal.rewind(journal.size() - 49).await.unwrap();
1470            }
1471            assert_eq!(journal.size(), ITEMS_REMAINING);
1472
1473            journal.close().await.expect("Failed to close journal");
1474
1475            // Make sure re-opened journal is as expected
1476            let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1477                .await
1478                .expect("failed to re-initialize journal");
1479            assert_eq!(journal.size(), 10 * (100 - 49));
1480
1481            // Make sure rewinding works after pruning
1482            journal.prune(300).await.expect("pruning failed");
1483            assert_eq!(journal.size(), ITEMS_REMAINING);
1484            // Rewinding prior to our prune point should fail.
1485            assert!(matches!(
1486                journal.rewind(299).await,
1487                Err(Error::InvalidRewind(299))
1488            ));
1489            // Rewinding to the prune point should work.
1490            // always remain in the journal.
1491            assert!(matches!(journal.rewind(300).await, Ok(())));
1492            assert_eq!(journal.size(), 300);
1493            assert_eq!(journal.oldest_retained_pos(), None);
1494
1495            journal.destroy().await.unwrap();
1496        });
1497    }
1498
1499    /// Protect against accidental changes to the journal disk format.
1500    #[test_traced]
1501    fn test_journal_conformance() {
1502        // Initialize the deterministic context
1503        let executor = deterministic::Runner::default();
1504
1505        // Start the test within the executor
1506        executor.start(|context| async move {
1507            // Create a journal configuration
1508            let cfg = test_cfg(NZU64!(60));
1509
1510            // Initialize the journal
1511            let mut journal = Journal::init(context.clone(), cfg.clone())
1512                .await
1513                .expect("failed to initialize journal");
1514
1515            // Append 100 items to the journal
1516            for i in 0..100 {
1517                journal
1518                    .append(test_digest(i))
1519                    .await
1520                    .expect("Failed to append data");
1521            }
1522
1523            // Close the journal
1524            journal.close().await.expect("Failed to close journal");
1525
1526            // Hash blob contents
1527            let (blob, size) = context
1528                .open(&cfg.partition, &0u64.to_be_bytes())
1529                .await
1530                .expect("Failed to open blob");
1531            assert!(size > 0);
1532            let buf = blob
1533                .read_at(vec![0u8; size as usize], 0)
1534                .await
1535                .expect("Failed to read blob");
1536            let digest = Sha256::hash(buf.as_ref());
1537            assert_eq!(
1538                hex(&digest),
1539                "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1540            );
1541            blob.sync().await.expect("Failed to sync blob");
1542            let (blob, size) = context
1543                .open(&cfg.partition, &1u64.to_be_bytes())
1544                .await
1545                .expect("Failed to open blob");
1546            assert!(size > 0);
1547            let buf = blob
1548                .read_at(vec![0u8; size as usize], 0)
1549                .await
1550                .expect("Failed to read blob");
1551            let digest = Sha256::hash(buf.as_ref());
1552            assert_eq!(
1553                hex(&digest),
1554                "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1555            );
1556            blob.sync().await.expect("Failed to sync blob");
1557
1558            let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1559                .await
1560                .expect("failed to initialize journal");
1561            journal.destroy().await.unwrap();
1562        });
1563    }
1564}