commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Size | Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method to produce a stream of all items in the `Journal` in order of their `section` and
61//! `offset`.
62//!
63//! # Exact Reads
64//!
65//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
66//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
67//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
68//! behavior.
69//!
70//! # Compression
71//!
72//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
73//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
74//! be changed between initializations of `Journal`, however, it must remain populated if any data
75//! was written with compression enabled.
76//!
77//! # Example
78//!
79//! ```rust
80//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
81//! use commonware_storage::journal::variable::{Journal, Config};
82//! use commonware_utils::NZUsize;
83//!
84//! let executor = deterministic::Runner::default();
85//! executor.start(|context| async move {
86//!     // Create a journal
87//!     let mut journal = Journal::init(context, Config{
88//!         partition: "partition".to_string(),
89//!         compression: None,
90//!         codec_config: (),
91//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
92//!         write_buffer: NZUsize!(1024 * 1024),
93//!     }).await.unwrap();
94//!
95//!     // Append data to the journal
96//!     journal.append(1, 128).await.unwrap();
97//!
98//!     // Close the journal
99//!     journal.close().await.unwrap();
100//! });
101//! ```
102
103use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107    buffer::{Append, PoolRef, Read},
108    Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114    collections::{btree_map::Entry, BTreeMap},
115    io::Cursor,
116    marker::PhantomData,
117    num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122/// Configuration for `Journal` storage.
123#[derive(Clone)]
124pub struct Config<C> {
125    /// The `commonware-runtime::Storage` partition to use
126    /// for storing journal blobs.
127    pub partition: String,
128
129    /// Optional compression level (using `zstd`) to apply to data before storing.
130    pub compression: Option<u8>,
131
132    /// The codec configuration to use for encoding and decoding items.
133    pub codec_config: C,
134
135    /// The buffer pool to use for caching data.
136    pub buffer_pool: PoolRef,
137
138    /// The size of the write buffer to use for each blob.
139    pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144/// Computes the next offset for an item using the underlying `u64`
145/// offset of `Blob`.
146#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148    let overage = offset % ITEM_ALIGNMENT;
149    if overage != 0 {
150        offset += ITEM_ALIGNMENT - overage;
151    }
152    let offset = offset / ITEM_ALIGNMENT;
153    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154    Ok(aligned_offset)
155}
156
157/// Implementation of `Journal` storage.
158pub struct Journal<E: Storage + Metrics, V: Codec> {
159    pub(crate) context: E,
160    pub(crate) cfg: Config<V::Cfg>,
161
162    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164    /// A section number before which all sections have been pruned. This value is not persisted,
165    /// and is initialized to 0 at startup. It's updated only during calls to `prune` during the
166    /// current execution, and therefore provides only a best effort lower-bound on the true value.
167    pub(crate) oldest_retained_section: u64,
168
169    pub(crate) tracked: Gauge,
170    pub(crate) synced: Counter,
171    pub(crate) pruned: Counter,
172
173    pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177    /// Initialize a new `Journal` instance.
178    ///
179    /// All backing blobs are opened but not read during
180    /// initialization. The `replay` method can be used
181    /// to iterate over all items in the `Journal`.
182    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183        // Iterate over blobs in partition
184        let mut blobs = BTreeMap::new();
185        let stored_blobs = match context.scan(&cfg.partition).await {
186            Ok(blobs) => blobs,
187            Err(RError::PartitionMissing(_)) => Vec::new(),
188            Err(err) => return Err(Error::Runtime(err)),
189        };
190        for name in stored_blobs {
191            let (blob, size) = context.open(&cfg.partition, &name).await?;
192            let hex_name = hex(&name);
193            let section = match name.try_into() {
194                Ok(section) => u64::from_be_bytes(section),
195                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196            };
197            debug!(section, blob = hex_name, size, "loaded section");
198            let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199            blobs.insert(section, blob);
200        }
201
202        // Initialize metrics
203        let tracked = Gauge::default();
204        let synced = Counter::default();
205        let pruned = Counter::default();
206        context.register("tracked", "Number of blobs", tracked.clone());
207        context.register("synced", "Number of syncs", synced.clone());
208        context.register("pruned", "Number of blobs pruned", pruned.clone());
209        tracked.set(blobs.len() as i64);
210
211        // Create journal instance
212        Ok(Self {
213            context,
214            cfg,
215            blobs,
216            oldest_retained_section: 0,
217            tracked,
218            synced,
219            pruned,
220
221            _phantom: PhantomData,
222        })
223    }
224
225    /// Ensures that a section pruned during the current execution is not accessed.
226    fn prune_guard(&self, section: u64) -> Result<(), Error> {
227        if section < self.oldest_retained_section {
228            Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229        } else {
230            Ok(())
231        }
232    }
233
234    /// Reads an item from the blob at the given offset.
235    pub(crate) async fn read(
236        compressed: bool,
237        cfg: &V::Cfg,
238        blob: &Append<E::Blob>,
239        offset: u32,
240    ) -> Result<(u32, u32, V), Error> {
241        // Read item size
242        let mut hasher = crc32fast::Hasher::new();
243        let offset = offset as u64 * ITEM_ALIGNMENT;
244        let size = blob.read_at(vec![0; 4], offset).await?;
245        hasher.update(size.as_ref());
246        let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249        // Read remaining
250        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252        let buf = buf.as_ref();
253        let offset = offset
254            .checked_add(buf_size as u64)
255            .ok_or(Error::OffsetOverflow)?;
256
257        // Read item
258        let item = &buf[..size];
259        hasher.update(item);
260
261        // Verify integrity
262        let checksum = hasher.finalize();
263        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264        if checksum != stored_checksum {
265            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266        }
267
268        // Compute next offset
269        let aligned_offset = compute_next_offset(offset)?;
270
271        // If compression is enabled, decompress the item
272        let item = if compressed {
273            let decompressed =
274                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276        } else {
277            V::decode_cfg(item, cfg).map_err(Error::Codec)?
278        };
279
280        // Return item
281        Ok((aligned_offset, size as u32, item))
282    }
283
284    /// Helper function to read an item from a [Read].
285    async fn read_buffered(
286        reader: &mut Read<Append<E::Blob>>,
287        offset: u32,
288        cfg: &V::Cfg,
289        compressed: bool,
290    ) -> Result<(u32, u64, u32, V), Error> {
291        // Calculate absolute file offset from the item offset
292        let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294        // If we're not at the right position, seek to it
295        if reader.position() != file_offset {
296            reader.seek_to(file_offset).map_err(Error::Runtime)?;
297        }
298
299        // Read item size (4 bytes)
300        let mut hasher = crc32fast::Hasher::new();
301        let mut size_buf = [0u8; 4];
302        reader
303            .read_exact(&mut size_buf, 4)
304            .await
305            .map_err(Error::Runtime)?;
306        hasher.update(&size_buf);
307
308        // Read remaining
309        let size = u32::from_be_bytes(size_buf) as usize;
310        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311        let mut buf = vec![0u8; buf_size];
312        reader
313            .read_exact(&mut buf, buf_size)
314            .await
315            .map_err(Error::Runtime)?;
316
317        // Read item
318        let item = &buf[..size];
319        hasher.update(item);
320
321        // Verify integrity
322        let checksum = hasher.finalize();
323        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324        if checksum != stored_checksum {
325            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326        }
327
328        // If compression is enabled, decompress the item
329        let item = if compressed {
330            let decompressed =
331                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333        } else {
334            V::decode_cfg(item, cfg).map_err(Error::Codec)?
335        };
336
337        // Calculate next offset
338        let current_pos = reader.position();
339        let aligned_offset = compute_next_offset(current_pos)?;
340        Ok((aligned_offset, current_pos, size as u32, item))
341    }
342
343    /// Reads an item from the blob at the given offset and of a given size.
344    async fn read_exact(
345        compressed: bool,
346        cfg: &V::Cfg,
347        blob: &Append<E::Blob>,
348        offset: u32,
349        len: u32,
350    ) -> Result<V, Error> {
351        // Read buffer
352        let offset = offset as u64 * ITEM_ALIGNMENT;
353        let entry_size = 4 + len as usize + 4;
354        let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356        // Check size
357        let mut hasher = crc32fast::Hasher::new();
358        let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359        hasher.update(&buf.as_ref()[..4]);
360        if disk_size != len {
361            return Err(Error::UnexpectedSize(disk_size, len));
362        }
363
364        // Verify integrity
365        let item = &buf.as_ref()[4..4 + len as usize];
366        hasher.update(item);
367        let checksum = hasher.finalize();
368        let stored_checksum =
369            u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370        if checksum != stored_checksum {
371            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372        }
373
374        // Decompress item
375        let item = if compressed {
376            decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377        } else {
378            item.to_vec()
379        };
380
381        // Return item
382        let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383        Ok(item)
384    }
385
386    /// Returns an ordered stream of all items in the journal starting with the item at the given
387    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
388    /// (section, offset, size, item).
389    ///
390    /// # Repair
391    ///
392    /// Like
393    /// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
394    /// and
395    /// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
396    /// the first invalid data read will be considered the new end of the journal (and the
397    /// underlying [Blob] will be truncated to the last valid item).
398    pub async fn replay(
399        &self,
400        start_section: u64,
401        mut offset: u32,
402        buffer: NonZeroUsize,
403    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404        // Collect all blobs to replay
405        let codec_config = self.cfg.codec_config.clone();
406        let compressed = self.cfg.compression.is_some();
407        let mut blobs = Vec::with_capacity(self.blobs.len());
408        for (section, blob) in self.blobs.range(start_section..) {
409            let blob_size = blob.size().await;
410            let max_offset = compute_next_offset(blob_size)?;
411            blobs.push((
412                *section,
413                blob.clone(),
414                max_offset,
415                blob_size,
416                codec_config.clone(),
417                compressed,
418            ));
419        }
420
421        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
422        // memory with buffered data)
423        Ok(stream::iter(blobs).flat_map(
424            move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425                // Created buffered reader
426                let mut reader = Read::new(blob, blob_size, buffer);
427                if section == start_section && offset != 0 {
428                    if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429                        warn!(section, offset, ?err, "failed to seek to offset");
430                        // Return early with the error to terminate the entire stream
431                        return stream::once(async move { Err(err.into()) }).left_stream();
432                    }
433                } else {
434                    offset = 0;
435                }
436
437                // Read over the blob
438                stream::unfold(
439                        (section, reader, offset, 0u64, codec_config, compressed),
440                        move |(
441                            section,
442                            mut reader,
443                            offset,
444                            valid_size,
445                            codec_config,
446                            compressed,
447                        )| async move {
448                            // Check if we are at the end of the blob
449                            if offset >= max_offset {
450                                return None;
451                            }
452
453                            // Read an item from the buffer
454                            match Self::read_buffered(
455                                &mut reader,
456                                offset,
457                                &codec_config,
458                                compressed,
459                            )
460                            .await
461                            {
462                                Ok((next_offset, next_valid_size, size, item)) => {
463                                    trace!(blob = section, cursor = offset, "replayed item");
464                                    Some((
465                                        Ok((section, offset, size, item)),
466                                        (
467                                            section,
468                                            reader,
469                                            next_offset,
470                                            next_valid_size,
471                                            codec_config,
472                                            compressed,
473                                        ),
474                                    ))
475                                }
476                                Err(Error::ChecksumMismatch(expected, found)) => {
477                                    // If we encounter corruption, we prune to the last valid item. This
478                                    // can happen during an unclean file close (where pending data is not
479                                    // fully synced to disk).
480                                    warn!(
481                                        blob = section,
482                                        bad_offset = offset,
483                                        new_size = valid_size,
484                                        expected,
485                                        found,
486                                        "corruption detected: truncating"
487                                    );
488                                    reader.resize(valid_size).await.ok()?;
489                                    None
490                                }
491                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492                                    // If we encounter trailing bytes, we prune to the last
493                                    // valid item. This can happen during an unclean file close (where
494                                    // pending data is not fully synced to disk).
495                                    warn!(
496                                        blob = section,
497                                        bad_offset = offset,
498                                        new_size = valid_size,
499                                        "trailing bytes detected: truncating"
500                                    );
501                                    reader.resize(valid_size).await.ok()?;
502                                    None
503                                }
504                                Err(err) => {
505                                    // If we encounter an unexpected error, return it without attempting
506                                    // to fix anything.
507                                    warn!(
508                                        blob = section,
509                                        cursor = offset,
510                                        ?err,
511                                        "unexpected error"
512                                    );
513                                    Some((
514                                        Err(err),
515                                        (
516                                            section,
517                                            reader,
518                                            offset,
519                                            valid_size,
520                                            codec_config,
521                                            compressed,
522                                        ),
523                                    ))
524                                }
525                            }
526                        },
527                    ).right_stream()
528            },
529        ))
530    }
531
532    /// Appends an item to `Journal` in a given `section`, returning the offset
533    /// where the item was written and the size of the item (which may now be smaller
534    /// than the encoded size from the codec, if compression is enabled).
535    ///
536    /// # Warning
537    ///
538    /// If there exist trailing bytes in the `Blob` of a particular `section` and
539    /// `replay` is not called before this, it is likely that subsequent data added
540    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
541    /// the checksum verification). It is recommended to call `replay` before calling
542    /// `append` to prevent this.
543    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544        // Check last pruned
545        self.prune_guard(section)?;
546
547        // Create item
548        let encoded = item.encode();
549        let encoded = if let Some(compression) = self.cfg.compression {
550            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551        } else {
552            encoded.into()
553        };
554
555        // Ensure item is not too large
556        let item_len = encoded.len();
557        let entry_len = 4 + item_len + 4;
558        let item_len = match item_len.try_into() {
559            Ok(len) => len,
560            Err(_) => return Err(Error::ItemTooLarge(item_len)),
561        };
562
563        // Get existing blob or create new one
564        let blob = match self.blobs.entry(section) {
565            Entry::Occupied(entry) => entry.into_mut(),
566            Entry::Vacant(entry) => {
567                let name = section.to_be_bytes();
568                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569                let blob = Append::new(
570                    blob,
571                    size,
572                    self.cfg.write_buffer,
573                    self.cfg.buffer_pool.clone(),
574                )
575                .await?;
576                self.tracked.inc();
577                entry.insert(blob)
578            }
579        };
580
581        // Calculate alignment
582        let cursor = blob.size().await;
583        let offset = compute_next_offset(cursor)?;
584        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585        let padding = (aligned_cursor - cursor) as usize;
586
587        // Populate buffer
588        let mut buf = Vec::with_capacity(padding + entry_len);
589
590        // Add padding bytes if necessary
591        if padding > 0 {
592            buf.resize(padding, 0);
593        }
594
595        // Add entry data
596        let entry_start = buf.len();
597        buf.put_u32(item_len);
598        buf.put_slice(&encoded);
599
600        // Calculate checksum only for the entry data (without padding)
601        let checksum = crc32fast::hash(&buf[entry_start..]);
602        buf.put_u32(checksum);
603        assert_eq!(buf[entry_start..].len(), entry_len);
604
605        // Append item to blob
606        blob.append(buf).await?;
607        trace!(blob = section, offset, "appended item");
608        Ok((offset, item_len))
609    }
610
611    /// Retrieves an item from `Journal` at a given `section` and `offset`.
612    ///
613    /// # Errors
614    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
615    ///    current execution.
616    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
617    ///    data appended to it, or has been pruned in a previous execution).
618    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
619    ///    previously appended item) will result in an error, with the specific type being
620    ///    undefined.
621    pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
622        self.prune_guard(section)?;
623        let blob = match self.blobs.get(&section) {
624            Some(blob) => blob,
625            None => return Err(Error::SectionOutOfRange(section)),
626        };
627
628        // Perform a multi-op read.
629        let (_, _, item) = Self::read(
630            self.cfg.compression.is_some(),
631            &self.cfg.codec_config,
632            blob,
633            offset,
634        )
635        .await?;
636        Ok(item)
637    }
638
639    /// Retrieves an item from `Journal` at a given `section` and `offset` with a given size.
640    pub async fn get_exact(&self, section: u64, offset: u32, size: u32) -> Result<V, Error> {
641        self.prune_guard(section)?;
642        let blob = match self.blobs.get(&section) {
643            Some(blob) => blob,
644            None => return Err(Error::SectionOutOfRange(section)),
645        };
646
647        // Perform a multi-op read.
648        let item = Self::read_exact(
649            self.cfg.compression.is_some(),
650            &self.cfg.codec_config,
651            blob,
652            offset,
653            size,
654        )
655        .await?;
656        Ok(item)
657    }
658
659    /// Gets the size of the journal for a specific section.
660    ///
661    /// Returns 0 if the section does not exist.
662    pub async fn size(&self, section: u64) -> Result<u64, Error> {
663        self.prune_guard(section)?;
664        match self.blobs.get(&section) {
665            Some(blob) => Ok(blob.size().await),
666            None => Ok(0),
667        }
668    }
669
670    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
671    ///
672    /// # Warnings
673    ///
674    /// * This operation is not guaranteed to survive restarts until sync is called.
675    /// * This operation is not atomic, but it will always leave the journal in a consistent state
676    ///   in the event of failure since blobs are always removed in reverse order of section.
677    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
678        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
679    }
680
681    /// Rewinds the journal to the given `section` and `size`.
682    ///
683    /// This removes any data beyond the specified `section` and `size`.
684    ///
685    /// # Warnings
686    ///
687    /// * This operation is not guaranteed to survive restarts until sync is called.
688    /// * This operation is not atomic, but it will always leave the journal in a consistent state
689    ///   in the event of failure since blobs are always removed in reverse order of section.
690    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
691        self.prune_guard(section)?;
692
693        // Remove any sections beyond the given section
694        let trailing: Vec<u64> = self
695            .blobs
696            .range((
697                std::ops::Bound::Excluded(section),
698                std::ops::Bound::Unbounded,
699            ))
700            .map(|(&section, _)| section)
701            .collect();
702        for index in trailing.iter().rev() {
703            // Remove the underlying blob from storage.
704            let blob = self.blobs.remove(index).unwrap();
705
706            // Destroy the blob
707            drop(blob);
708            self.context
709                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
710                .await?;
711            debug!(section = index, "removed section");
712            self.tracked.dec();
713        }
714
715        // If the section exists, truncate it to the given offset
716        let blob = match self.blobs.get_mut(&section) {
717            Some(blob) => blob,
718            None => return Ok(()),
719        };
720        let current = blob.size().await;
721        if size >= current {
722            return Ok(()); // Already smaller than or equal to target size
723        }
724        blob.resize(size).await?;
725        debug!(
726            section,
727            from = current,
728            to = size,
729            ?trailing,
730            "rewound journal"
731        );
732        Ok(())
733    }
734
735    /// Rewinds the `section` to the given `size`.
736    ///
737    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
738    ///
739    /// # Warning
740    ///
741    /// This operation is not guaranteed to survive restarts until sync is called.
742    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
743        self.prune_guard(section)?;
744
745        // Get the blob at the given section
746        let blob = match self.blobs.get_mut(&section) {
747            Some(blob) => blob,
748            None => return Ok(()),
749        };
750
751        // Truncate the blob to the given size
752        let current = blob.size().await;
753        if size >= current {
754            return Ok(()); // Already smaller than or equal to target size
755        }
756        blob.resize(size).await?;
757        debug!(section, from = current, to = size, "rewound section");
758        Ok(())
759    }
760
761    /// Ensures that all data in a given `section` is synced to the underlying store.
762    ///
763    /// If the `section` does not exist, no error will be returned.
764    pub async fn sync(&self, section: u64) -> Result<(), Error> {
765        self.prune_guard(section)?;
766        let blob = match self.blobs.get(&section) {
767            Some(blob) => blob,
768            None => return Ok(()),
769        };
770        self.synced.inc();
771        blob.sync().await.map_err(Error::Runtime)
772    }
773
774    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
775    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
776        // Prune any blobs that are smaller than the minimum
777        let mut pruned = false;
778        while let Some((&section, _)) = self.blobs.first_key_value() {
779            // Stop pruning if we reach the minimum
780            if section >= min {
781                break;
782            }
783
784            // Remove blob from journal
785            let blob = self.blobs.remove(&section).unwrap();
786            let size = blob.size().await;
787            drop(blob);
788
789            // Remove blob from storage
790            self.context
791                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
792                .await?;
793            pruned = true;
794
795            debug!(blob = section, size, "pruned blob");
796            self.tracked.dec();
797            self.pruned.inc();
798        }
799
800        Ok(pruned)
801    }
802
803    /// Syncs and closes all open sections.
804    pub async fn close(self) -> Result<(), Error> {
805        for (section, blob) in self.blobs.into_iter() {
806            let size = blob.size().await;
807            blob.sync().await?;
808            debug!(blob = section, size, "synced blob");
809        }
810        Ok(())
811    }
812
813    /// Returns the number of the oldest section in the journal.
814    pub fn oldest_section(&self) -> Option<u64> {
815        self.blobs.first_key_value().map(|(section, _)| *section)
816    }
817
818    /// Removes any underlying blobs created by the journal.
819    pub async fn destroy(self) -> Result<(), Error> {
820        for (i, blob) in self.blobs.into_iter() {
821            let size = blob.size().await;
822            drop(blob);
823            debug!(blob = i, size, "destroyed blob");
824            self.context
825                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
826                .await?;
827        }
828        match self.context.remove(&self.cfg.partition, None).await {
829            Ok(()) => {}
830            Err(RError::PartitionMissing(_)) => {
831                // Partition already removed or never existed.
832            }
833            Err(err) => return Err(Error::Runtime(err)),
834        }
835        Ok(())
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use bytes::BufMut;
843    use commonware_cryptography::{Hasher, Sha256};
844    use commonware_macros::test_traced;
845    use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
846    use commonware_utils::{NZUsize, StableBuf};
847    use futures::{pin_mut, StreamExt};
848    use prometheus_client::registry::Metric;
849
850    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
851    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
852
853    #[test_traced]
854    fn test_journal_append_and_read() {
855        // Initialize the deterministic context
856        let executor = deterministic::Runner::default();
857
858        // Start the test within the executor
859        executor.start(|context| async move {
860            // Initialize the journal
861            let cfg = Config {
862                partition: "test_partition".into(),
863                compression: None,
864                codec_config: (),
865                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
866                write_buffer: NZUsize!(1024),
867            };
868            let index = 1u64;
869            let data = 10;
870            let mut journal = Journal::init(context.clone(), cfg.clone())
871                .await
872                .expect("Failed to initialize journal");
873
874            // Append an item to the journal
875            journal
876                .append(index, data)
877                .await
878                .expect("Failed to append data");
879
880            // Check metrics
881            let buffer = context.encode();
882            assert!(buffer.contains("tracked 1"));
883
884            // Close the journal
885            journal.close().await.expect("Failed to close journal");
886
887            // Re-initialize the journal to simulate a restart
888            let cfg = Config {
889                partition: "test_partition".into(),
890                compression: None,
891                codec_config: (),
892                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
893                write_buffer: NZUsize!(1024),
894            };
895            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
896                .await
897                .expect("Failed to re-initialize journal");
898
899            // Replay the journal and collect items
900            let mut items = Vec::new();
901            let stream = journal
902                .replay(0, 0, NZUsize!(1024))
903                .await
904                .expect("unable to setup replay");
905            pin_mut!(stream);
906            while let Some(result) = stream.next().await {
907                match result {
908                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
909                    Err(err) => panic!("Failed to read item: {err}"),
910                }
911            }
912
913            // Verify that the item was replayed correctly
914            assert_eq!(items.len(), 1);
915            assert_eq!(items[0].0, index);
916            assert_eq!(items[0].1, data);
917
918            // Check metrics
919            let buffer = context.encode();
920            assert!(buffer.contains("tracked 1"));
921        });
922    }
923
924    #[test_traced]
925    fn test_journal_multiple_appends_and_reads() {
926        // Initialize the deterministic context
927        let executor = deterministic::Runner::default();
928
929        // Start the test within the executor
930        executor.start(|context| async move {
931            // Create a journal configuration
932            let cfg = Config {
933                partition: "test_partition".into(),
934                compression: None,
935                codec_config: (),
936                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
937                write_buffer: NZUsize!(1024),
938            };
939
940            // Initialize the journal
941            let mut journal = Journal::init(context.clone(), cfg.clone())
942                .await
943                .expect("Failed to initialize journal");
944
945            // Append multiple items to different blobs
946            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
947            for (index, data) in &data_items {
948                journal
949                    .append(*index, *data)
950                    .await
951                    .expect("Failed to append data");
952                journal.sync(*index).await.expect("Failed to sync blob");
953            }
954
955            // Check metrics
956            let buffer = context.encode();
957            assert!(buffer.contains("tracked 3"));
958            assert!(buffer.contains("synced_total 4"));
959
960            // Close the journal
961            journal.close().await.expect("Failed to close journal");
962
963            // Re-initialize the journal to simulate a restart
964            let journal = Journal::init(context, cfg)
965                .await
966                .expect("Failed to re-initialize journal");
967
968            // Replay the journal and collect items
969            let mut items = Vec::<(u64, u32)>::new();
970            {
971                let stream = journal
972                    .replay(0, 0, NZUsize!(1024))
973                    .await
974                    .expect("unable to setup replay");
975                pin_mut!(stream);
976                while let Some(result) = stream.next().await {
977                    match result {
978                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
979                        Err(err) => panic!("Failed to read item: {err}"),
980                    }
981                }
982            }
983
984            // Verify that all items were replayed correctly
985            assert_eq!(items.len(), data_items.len());
986            for ((expected_index, expected_data), (actual_index, actual_data)) in
987                data_items.iter().zip(items.iter())
988            {
989                assert_eq!(actual_index, expected_index);
990                assert_eq!(actual_data, expected_data);
991            }
992
993            // Cleanup
994            journal.destroy().await.expect("Failed to destroy journal");
995        });
996    }
997
998    #[test_traced]
999    fn test_journal_prune_blobs() {
1000        // Initialize the deterministic context
1001        let executor = deterministic::Runner::default();
1002
1003        // Start the test within the executor
1004        executor.start(|context| async move {
1005            // Create a journal configuration
1006            let cfg = Config {
1007                partition: "test_partition".into(),
1008                compression: None,
1009                codec_config: (),
1010                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1011                write_buffer: NZUsize!(1024),
1012            };
1013
1014            // Initialize the journal
1015            let mut journal = Journal::init(context.clone(), cfg.clone())
1016                .await
1017                .expect("Failed to initialize journal");
1018
1019            // Append items to multiple blobs
1020            for index in 1u64..=5u64 {
1021                journal
1022                    .append(index, index)
1023                    .await
1024                    .expect("Failed to append data");
1025                journal.sync(index).await.expect("Failed to sync blob");
1026            }
1027
1028            // Add one item out-of-order
1029            let data = 99;
1030            journal
1031                .append(2u64, data)
1032                .await
1033                .expect("Failed to append data");
1034            journal.sync(2u64).await.expect("Failed to sync blob");
1035
1036            // Prune blobs with indices less than 3
1037            journal.prune(3).await.expect("Failed to prune blobs");
1038
1039            // Check metrics
1040            let buffer = context.encode();
1041            assert!(buffer.contains("pruned_total 2"));
1042
1043            // Prune again with a section less than the previous one, should be a no-op
1044            journal.prune(2).await.expect("Failed to no-op prune");
1045            let buffer = context.encode();
1046            assert!(buffer.contains("pruned_total 2"));
1047
1048            // Close the journal
1049            journal.close().await.expect("Failed to close journal");
1050
1051            // Re-initialize the journal to simulate a restart
1052            let mut journal = Journal::init(context.clone(), cfg.clone())
1053                .await
1054                .expect("Failed to re-initialize journal");
1055
1056            // Replay the journal and collect items
1057            let mut items = Vec::<(u64, u64)>::new();
1058            {
1059                let stream = journal
1060                    .replay(0, 0, NZUsize!(1024))
1061                    .await
1062                    .expect("unable to setup replay");
1063                pin_mut!(stream);
1064                while let Some(result) = stream.next().await {
1065                    match result {
1066                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1067                        Err(err) => panic!("Failed to read item: {err}"),
1068                    }
1069                }
1070            }
1071
1072            // Verify that items from blobs 1 and 2 are not present
1073            assert_eq!(items.len(), 3);
1074            let expected_indices = [3u64, 4u64, 5u64];
1075            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1076                assert_eq!(item.0, *expected_index);
1077            }
1078
1079            // Prune all blobs
1080            journal.prune(6).await.expect("Failed to prune blobs");
1081
1082            // Close the journal
1083            journal.close().await.expect("Failed to close journal");
1084
1085            // Ensure no remaining blobs exist
1086            //
1087            // Note: We don't remove the partition, so this does not error
1088            // and instead returns an empty list of blobs.
1089            assert!(context
1090                .scan(&cfg.partition)
1091                .await
1092                .expect("Failed to list blobs")
1093                .is_empty());
1094        });
1095    }
1096
1097    #[test_traced]
1098    fn test_journal_with_invalid_blob_name() {
1099        // Initialize the deterministic context
1100        let executor = deterministic::Runner::default();
1101
1102        // Start the test within the executor
1103        executor.start(|context| async move {
1104            // Create a journal configuration
1105            let cfg = Config {
1106                partition: "test_partition".into(),
1107                compression: None,
1108                codec_config: (),
1109                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1110                write_buffer: NZUsize!(1024),
1111            };
1112
1113            // Manually create a blob with an invalid name (not 8 bytes)
1114            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1115            let (blob, _) = context
1116                .open(&cfg.partition, invalid_blob_name)
1117                .await
1118                .expect("Failed to create blob with invalid name");
1119            blob.sync().await.expect("Failed to sync blob");
1120
1121            // Attempt to initialize the journal
1122            let result = Journal::<_, u64>::init(context, cfg).await;
1123
1124            // Expect an error
1125            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1126        });
1127    }
1128
1129    #[test_traced]
1130    fn test_journal_read_size_missing() {
1131        // Initialize the deterministic context
1132        let executor = deterministic::Runner::default();
1133
1134        // Start the test within the executor
1135        executor.start(|context| async move {
1136            // Create a journal configuration
1137            let cfg = Config {
1138                partition: "test_partition".into(),
1139                compression: None,
1140                codec_config: (),
1141                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1142                write_buffer: NZUsize!(1024),
1143            };
1144
1145            // Manually create a blob with incomplete size data
1146            let section = 1u64;
1147            let blob_name = section.to_be_bytes();
1148            let (blob, _) = context
1149                .open(&cfg.partition, &blob_name)
1150                .await
1151                .expect("Failed to create blob");
1152
1153            // Write incomplete size data (less than 4 bytes)
1154            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
1155            blob.write_at(incomplete_data, 0)
1156                .await
1157                .expect("Failed to write incomplete data");
1158            blob.sync().await.expect("Failed to sync blob");
1159
1160            // Initialize the journal
1161            let journal = Journal::init(context, cfg)
1162                .await
1163                .expect("Failed to initialize journal");
1164
1165            // Attempt to replay the journal
1166            let stream = journal
1167                .replay(0, 0, NZUsize!(1024))
1168                .await
1169                .expect("unable to setup replay");
1170            pin_mut!(stream);
1171            let mut items = Vec::<(u64, u64)>::new();
1172            while let Some(result) = stream.next().await {
1173                match result {
1174                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1175                    Err(err) => panic!("Failed to read item: {err}"),
1176                }
1177            }
1178            assert!(items.is_empty());
1179        });
1180    }
1181
1182    #[test_traced]
1183    fn test_journal_read_item_missing() {
1184        // Initialize the deterministic context
1185        let executor = deterministic::Runner::default();
1186
1187        // Start the test within the executor
1188        executor.start(|context| async move {
1189            // Create a journal configuration
1190            let cfg = Config {
1191                partition: "test_partition".into(),
1192                compression: None,
1193                codec_config: (),
1194                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1195                write_buffer: NZUsize!(1024),
1196            };
1197
1198            // Manually create a blob with missing item data
1199            let section = 1u64;
1200            let blob_name = section.to_be_bytes();
1201            let (blob, _) = context
1202                .open(&cfg.partition, &blob_name)
1203                .await
1204                .expect("Failed to create blob");
1205
1206            // Write size but no item data
1207            let item_size: u32 = 10; // Size of the item
1208            let mut buf = Vec::new();
1209            buf.put_u32(item_size);
1210            let data = [2u8; 5];
1211            BufMut::put_slice(&mut buf, &data);
1212            blob.write_at(buf, 0)
1213                .await
1214                .expect("Failed to write item size");
1215            blob.sync().await.expect("Failed to sync blob");
1216
1217            // Initialize the journal
1218            let journal = Journal::init(context, cfg)
1219                .await
1220                .expect("Failed to initialize journal");
1221
1222            // Attempt to replay the journal
1223            let stream = journal
1224                .replay(0, 0, NZUsize!(1024))
1225                .await
1226                .expect("unable to setup replay");
1227            pin_mut!(stream);
1228            let mut items = Vec::<(u64, u64)>::new();
1229            while let Some(result) = stream.next().await {
1230                match result {
1231                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1232                    Err(err) => panic!("Failed to read item: {err}"),
1233                }
1234            }
1235            assert!(items.is_empty());
1236        });
1237    }
1238
1239    #[test_traced]
1240    fn test_journal_read_checksum_missing() {
1241        // Initialize the deterministic context
1242        let executor = deterministic::Runner::default();
1243
1244        // Start the test within the executor
1245        executor.start(|context| async move {
1246            // Create a journal configuration
1247            let cfg = Config {
1248                partition: "test_partition".into(),
1249                compression: None,
1250                codec_config: (),
1251                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1252                write_buffer: NZUsize!(1024),
1253            };
1254
1255            // Manually create a blob with missing checksum
1256            let section = 1u64;
1257            let blob_name = section.to_be_bytes();
1258            let (blob, _) = context
1259                .open(&cfg.partition, &blob_name)
1260                .await
1261                .expect("Failed to create blob");
1262
1263            // Prepare item data
1264            let item_data = b"Test data";
1265            let item_size = item_data.len() as u32;
1266
1267            // Write size
1268            let mut offset = 0;
1269            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1270                .await
1271                .expect("Failed to write item size");
1272            offset += 4;
1273
1274            // Write item data
1275            blob.write_at(item_data.to_vec(), offset)
1276                .await
1277                .expect("Failed to write item data");
1278            // Do not write checksum (omit it)
1279
1280            blob.sync().await.expect("Failed to sync blob");
1281
1282            // Initialize the journal
1283            let journal = Journal::init(context, cfg)
1284                .await
1285                .expect("Failed to initialize journal");
1286
1287            // Attempt to replay the journal
1288            //
1289            // This will truncate the leftover bytes from our manual write.
1290            let stream = journal
1291                .replay(0, 0, NZUsize!(1024))
1292                .await
1293                .expect("unable to setup replay");
1294            pin_mut!(stream);
1295            let mut items = Vec::<(u64, u64)>::new();
1296            while let Some(result) = stream.next().await {
1297                match result {
1298                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1299                    Err(err) => panic!("Failed to read item: {err}"),
1300                }
1301            }
1302            assert!(items.is_empty());
1303        });
1304    }
1305
1306    #[test_traced]
1307    fn test_journal_read_checksum_mismatch() {
1308        // Initialize the deterministic context
1309        let executor = deterministic::Runner::default();
1310
1311        // Start the test within the executor
1312        executor.start(|context| async move {
1313            // Create a journal configuration
1314            let cfg = Config {
1315                partition: "test_partition".into(),
1316                compression: None,
1317                codec_config: (),
1318                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1319                write_buffer: NZUsize!(1024),
1320            };
1321
1322            // Manually create a blob with incorrect checksum
1323            let section = 1u64;
1324            let blob_name = section.to_be_bytes();
1325            let (blob, _) = context
1326                .open(&cfg.partition, &blob_name)
1327                .await
1328                .expect("Failed to create blob");
1329
1330            // Prepare item data
1331            let item_data = b"Test data";
1332            let item_size = item_data.len() as u32;
1333            let incorrect_checksum: u32 = 0xDEADBEEF;
1334
1335            // Write size
1336            let mut offset = 0;
1337            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1338                .await
1339                .expect("Failed to write item size");
1340            offset += 4;
1341
1342            // Write item data
1343            blob.write_at(item_data.to_vec(), offset)
1344                .await
1345                .expect("Failed to write item data");
1346            offset += item_data.len() as u64;
1347
1348            // Write incorrect checksum
1349            blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1350                .await
1351                .expect("Failed to write incorrect checksum");
1352
1353            blob.sync().await.expect("Failed to sync blob");
1354
1355            // Initialize the journal
1356            let journal = Journal::init(context.clone(), cfg.clone())
1357                .await
1358                .expect("Failed to initialize journal");
1359
1360            // Attempt to replay the journal
1361            {
1362                let stream = journal
1363                    .replay(0, 0, NZUsize!(1024))
1364                    .await
1365                    .expect("unable to setup replay");
1366                pin_mut!(stream);
1367                let mut items = Vec::<(u64, u64)>::new();
1368                while let Some(result) = stream.next().await {
1369                    match result {
1370                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1371                        Err(err) => panic!("Failed to read item: {err}"),
1372                    }
1373                }
1374                assert!(items.is_empty());
1375            }
1376            journal.close().await.expect("Failed to close journal");
1377
1378            // Confirm blob is expected length
1379            let (_, blob_size) = context
1380                .open(&cfg.partition, &section.to_be_bytes())
1381                .await
1382                .expect("Failed to open blob");
1383            assert_eq!(blob_size, 0);
1384        });
1385    }
1386
1387    #[test_traced]
1388    fn test_journal_handling_unaligned_truncated_data() {
1389        // Initialize the deterministic context
1390        let executor = deterministic::Runner::default();
1391
1392        // Start the test within the executor
1393        executor.start(|context| async move {
1394            // Create a journal configuration
1395            let cfg = Config {
1396                partition: "test_partition".into(),
1397                compression: None,
1398                codec_config: (),
1399                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1400                write_buffer: NZUsize!(1024),
1401            };
1402
1403            // Initialize the journal
1404            let mut journal = Journal::init(context.clone(), cfg.clone())
1405                .await
1406                .expect("Failed to initialize journal");
1407
1408            // Append 1 item to the first index
1409            journal.append(1, 1).await.expect("Failed to append data");
1410
1411            // Append multiple items to the second index (with unaligned values)
1412            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1413            for (index, data) in &data_items {
1414                journal
1415                    .append(*index, *data)
1416                    .await
1417                    .expect("Failed to append data");
1418                journal.sync(*index).await.expect("Failed to sync blob");
1419            }
1420
1421            // Close the journal
1422            journal.close().await.expect("Failed to close journal");
1423
1424            // Manually corrupt the end of the second blob
1425            let (blob, blob_size) = context
1426                .open(&cfg.partition, &2u64.to_be_bytes())
1427                .await
1428                .expect("Failed to open blob");
1429            blob.resize(blob_size - 4)
1430                .await
1431                .expect("Failed to corrupt blob");
1432            blob.sync().await.expect("Failed to sync blob");
1433
1434            // Re-initialize the journal to simulate a restart
1435            let journal = Journal::init(context.clone(), cfg.clone())
1436                .await
1437                .expect("Failed to re-initialize journal");
1438
1439            // Attempt to replay the journal
1440            let mut items = Vec::<(u64, u32)>::new();
1441            {
1442                let stream = journal
1443                    .replay(0, 0, NZUsize!(1024))
1444                    .await
1445                    .expect("unable to setup replay");
1446                pin_mut!(stream);
1447                while let Some(result) = stream.next().await {
1448                    match result {
1449                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1450                        Err(err) => panic!("Failed to read item: {err}"),
1451                    }
1452                }
1453            }
1454            journal.close().await.expect("Failed to close journal");
1455
1456            // Verify that only non-corrupted items were replayed
1457            assert_eq!(items.len(), 3);
1458            assert_eq!(items[0].0, 1);
1459            assert_eq!(items[0].1, 1);
1460            assert_eq!(items[1].0, data_items[0].0);
1461            assert_eq!(items[1].1, data_items[0].1);
1462            assert_eq!(items[2].0, data_items[1].0);
1463            assert_eq!(items[2].1, data_items[1].1);
1464
1465            // Confirm blob is expected length
1466            let (_, blob_size) = context
1467                .open(&cfg.partition, &2u64.to_be_bytes())
1468                .await
1469                .expect("Failed to open blob");
1470            assert_eq!(blob_size, 28);
1471
1472            // Attempt to replay journal after truncation
1473            let mut journal = Journal::init(context.clone(), cfg.clone())
1474                .await
1475                .expect("Failed to re-initialize journal");
1476
1477            // Attempt to replay the journal
1478            let mut items = Vec::<(u64, u32)>::new();
1479            {
1480                let stream = journal
1481                    .replay(0, 0, NZUsize!(1024))
1482                    .await
1483                    .expect("unable to setup replay");
1484                pin_mut!(stream);
1485                while let Some(result) = stream.next().await {
1486                    match result {
1487                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1488                        Err(err) => panic!("Failed to read item: {err}"),
1489                    }
1490                }
1491            }
1492
1493            // Verify that only non-corrupted items were replayed
1494            assert_eq!(items.len(), 3);
1495            assert_eq!(items[0].0, 1);
1496            assert_eq!(items[0].1, 1);
1497            assert_eq!(items[1].0, data_items[0].0);
1498            assert_eq!(items[1].1, data_items[0].1);
1499            assert_eq!(items[2].0, data_items[1].0);
1500            assert_eq!(items[2].1, data_items[1].1);
1501
1502            // Append a new item to truncated partition
1503            journal.append(2, 5).await.expect("Failed to append data");
1504            journal.sync(2).await.expect("Failed to sync blob");
1505
1506            // Get the new item
1507            let item = journal.get(2, 2).await.expect("Failed to get item");
1508            assert_eq!(item, 5);
1509
1510            // Close the journal
1511            journal.close().await.expect("Failed to close journal");
1512
1513            // Confirm blob is expected length
1514            let (_, blob_size) = context
1515                .open(&cfg.partition, &2u64.to_be_bytes())
1516                .await
1517                .expect("Failed to open blob");
1518            assert_eq!(blob_size, 44);
1519
1520            // Re-initialize the journal to simulate a restart
1521            let journal = Journal::init(context.clone(), cfg.clone())
1522                .await
1523                .expect("Failed to re-initialize journal");
1524
1525            // Attempt to replay the journal
1526            let mut items = Vec::<(u64, u32)>::new();
1527            {
1528                let stream = journal
1529                    .replay(0, 0, NZUsize!(1024))
1530                    .await
1531                    .expect("unable to setup replay");
1532                pin_mut!(stream);
1533                while let Some(result) = stream.next().await {
1534                    match result {
1535                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1536                        Err(err) => panic!("Failed to read item: {err}"),
1537                    }
1538                }
1539            }
1540
1541            // Verify that only non-corrupted items were replayed
1542            assert_eq!(items.len(), 4);
1543            assert_eq!(items[0].0, 1);
1544            assert_eq!(items[0].1, 1);
1545            assert_eq!(items[1].0, data_items[0].0);
1546            assert_eq!(items[1].1, data_items[0].1);
1547            assert_eq!(items[2].0, data_items[1].0);
1548            assert_eq!(items[2].1, data_items[1].1);
1549            assert_eq!(items[3].0, 2);
1550            assert_eq!(items[3].1, 5);
1551        });
1552    }
1553
1554    #[test_traced]
1555    fn test_journal_handling_aligned_truncated_data() {
1556        // Initialize the deterministic context
1557        let executor = deterministic::Runner::default();
1558
1559        // Start the test within the executor
1560        executor.start(|context| async move {
1561            // Create a journal configuration
1562            let cfg = Config {
1563                partition: "test_partition".into(),
1564                compression: None,
1565                codec_config: (),
1566                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1567                write_buffer: NZUsize!(1024),
1568            };
1569
1570            // Initialize the journal
1571            let mut journal = Journal::init(context.clone(), cfg.clone())
1572                .await
1573                .expect("Failed to initialize journal");
1574
1575            // Append 1 item to the first index
1576            journal.append(1, 1).await.expect("Failed to append data");
1577
1578            // Append multiple items to the second index (with unaligned values)
1579            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1580            for (index, data) in &data_items {
1581                journal
1582                    .append(*index, *data)
1583                    .await
1584                    .expect("Failed to append data");
1585                journal.sync(*index).await.expect("Failed to sync blob");
1586            }
1587
1588            // Close the journal
1589            journal.close().await.expect("Failed to close journal");
1590
1591            // Manually corrupt the end of the second blob
1592            let (blob, blob_size) = context
1593                .open(&cfg.partition, &2u64.to_be_bytes())
1594                .await
1595                .expect("Failed to open blob");
1596            blob.resize(blob_size - 4)
1597                .await
1598                .expect("Failed to corrupt blob");
1599            blob.sync().await.expect("Failed to sync blob");
1600
1601            // Re-initialize the journal to simulate a restart
1602            let mut journal = Journal::init(context.clone(), cfg.clone())
1603                .await
1604                .expect("Failed to re-initialize journal");
1605
1606            // Attempt to replay the journal
1607            let mut items = Vec::<(u64, u64)>::new();
1608            {
1609                let stream = journal
1610                    .replay(0, 0, NZUsize!(1024))
1611                    .await
1612                    .expect("unable to setup replay");
1613                pin_mut!(stream);
1614                while let Some(result) = stream.next().await {
1615                    match result {
1616                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1617                        Err(err) => panic!("Failed to read item: {err}"),
1618                    }
1619                }
1620            }
1621
1622            // Verify that only non-corrupted items were replayed
1623            assert_eq!(items.len(), 3);
1624            assert_eq!(items[0].0, 1);
1625            assert_eq!(items[0].1, 1);
1626            assert_eq!(items[1].0, data_items[0].0);
1627            assert_eq!(items[1].1, data_items[0].1);
1628            assert_eq!(items[2].0, data_items[1].0);
1629            assert_eq!(items[2].1, data_items[1].1);
1630
1631            // Append a new item to the truncated partition
1632            journal.append(2, 5).await.expect("Failed to append data");
1633            journal.sync(2).await.expect("Failed to sync blob");
1634
1635            // Get the new item
1636            let item = journal.get(2, 2).await.expect("Failed to get item");
1637            assert_eq!(item, 5);
1638
1639            // Close the journal
1640            journal.close().await.expect("Failed to close journal");
1641
1642            // Confirm blob is expected length
1643            let (_, blob_size) = context
1644                .open(&cfg.partition, &2u64.to_be_bytes())
1645                .await
1646                .expect("Failed to open blob");
1647            assert_eq!(blob_size, 48);
1648
1649            // Attempt to replay journal after truncation
1650            let journal = Journal::init(context, cfg)
1651                .await
1652                .expect("Failed to re-initialize journal");
1653
1654            // Attempt to replay the journal
1655            let mut items = Vec::<(u64, u64)>::new();
1656            {
1657                let stream = journal
1658                    .replay(0, 0, NZUsize!(1024))
1659                    .await
1660                    .expect("unable to setup replay");
1661                pin_mut!(stream);
1662                while let Some(result) = stream.next().await {
1663                    match result {
1664                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1665                        Err(err) => panic!("Failed to read item: {err}"),
1666                    }
1667                }
1668            }
1669            journal.close().await.expect("Failed to close journal");
1670
1671            // Verify that only non-corrupted items were replayed
1672            assert_eq!(items.len(), 4);
1673            assert_eq!(items[0].0, 1);
1674            assert_eq!(items[0].1, 1);
1675            assert_eq!(items[1].0, data_items[0].0);
1676            assert_eq!(items[1].1, data_items[0].1);
1677            assert_eq!(items[2].0, data_items[1].0);
1678            assert_eq!(items[2].1, data_items[1].1);
1679            assert_eq!(items[3].0, 2);
1680            assert_eq!(items[3].1, 5);
1681        });
1682    }
1683
1684    #[test_traced]
1685    fn test_journal_handling_extra_data() {
1686        // Initialize the deterministic context
1687        let executor = deterministic::Runner::default();
1688
1689        // Start the test within the executor
1690        executor.start(|context| async move {
1691            // Create a journal configuration
1692            let cfg = Config {
1693                partition: "test_partition".into(),
1694                compression: None,
1695                codec_config: (),
1696                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1697                write_buffer: NZUsize!(1024),
1698            };
1699
1700            // Initialize the journal
1701            let mut journal = Journal::init(context.clone(), cfg.clone())
1702                .await
1703                .expect("Failed to initialize journal");
1704
1705            // Append 1 item to the first index
1706            journal.append(1, 1).await.expect("Failed to append data");
1707
1708            // Append multiple items to the second index
1709            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1710            for (index, data) in &data_items {
1711                journal
1712                    .append(*index, *data)
1713                    .await
1714                    .expect("Failed to append data");
1715                journal.sync(*index).await.expect("Failed to sync blob");
1716            }
1717
1718            // Close the journal
1719            journal.close().await.expect("Failed to close journal");
1720
1721            // Manually add extra data to the end of the second blob
1722            let (blob, blob_size) = context
1723                .open(&cfg.partition, &2u64.to_be_bytes())
1724                .await
1725                .expect("Failed to open blob");
1726            blob.write_at(vec![0u8; 16], blob_size)
1727                .await
1728                .expect("Failed to add extra data");
1729            blob.sync().await.expect("Failed to sync blob");
1730
1731            // Re-initialize the journal to simulate a restart
1732            let journal = Journal::init(context, cfg)
1733                .await
1734                .expect("Failed to re-initialize journal");
1735
1736            // Attempt to replay the journal
1737            let mut items = Vec::<(u64, i32)>::new();
1738            let stream = journal
1739                .replay(0, 0, NZUsize!(1024))
1740                .await
1741                .expect("unable to setup replay");
1742            pin_mut!(stream);
1743            while let Some(result) = stream.next().await {
1744                match result {
1745                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1746                    Err(err) => panic!("Failed to read item: {err}"),
1747                }
1748            }
1749        });
1750    }
1751
1752    // Define `MockBlob` that returns an offset length that should overflow
1753    #[derive(Clone)]
1754    struct MockBlob {}
1755
1756    impl Blob for MockBlob {
1757        async fn read_at(
1758            &self,
1759            buf: impl Into<StableBuf> + Send,
1760            _offset: u64,
1761        ) -> Result<StableBuf, RError> {
1762            Ok(buf.into())
1763        }
1764
1765        async fn write_at(
1766            &self,
1767            _buf: impl Into<StableBuf> + Send,
1768            _offset: u64,
1769        ) -> Result<(), RError> {
1770            Ok(())
1771        }
1772
1773        async fn resize(&self, _len: u64) -> Result<(), RError> {
1774            Ok(())
1775        }
1776
1777        async fn sync(&self) -> Result<(), RError> {
1778            Ok(())
1779        }
1780    }
1781
1782    // Define `MockStorage` that returns `MockBlob`
1783    #[derive(Clone)]
1784    struct MockStorage {
1785        len: u64,
1786    }
1787
1788    impl Storage for MockStorage {
1789        type Blob = MockBlob;
1790
1791        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1792            Ok((MockBlob {}, self.len))
1793        }
1794
1795        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1796            Ok(())
1797        }
1798
1799        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1800            Ok(vec![])
1801        }
1802    }
1803
1804    impl Metrics for MockStorage {
1805        fn with_label(&self, _: &str) -> Self {
1806            self.clone()
1807        }
1808
1809        fn label(&self) -> String {
1810            String::new()
1811        }
1812
1813        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1814
1815        fn encode(&self) -> String {
1816            String::new()
1817        }
1818    }
1819
1820    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1821    // changes to the value
1822    const INDEX_ALIGNMENT: u64 = 16;
1823
1824    #[test_traced]
1825    fn test_journal_large_offset() {
1826        // Initialize the deterministic context
1827        let executor = deterministic::Runner::default();
1828        executor.start(|_| async move {
1829            // Create journal
1830            let cfg = Config {
1831                partition: "partition".to_string(),
1832                compression: None,
1833                codec_config: (),
1834                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1835                write_buffer: NZUsize!(1024),
1836            };
1837            let context = MockStorage {
1838                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1839            };
1840            let mut journal = Journal::init(context, cfg).await.unwrap();
1841
1842            // Append data
1843            let data = 1;
1844            let (result, _) = journal
1845                .append(1, data)
1846                .await
1847                .expect("Failed to append data");
1848            assert_eq!(result, u32::MAX);
1849        });
1850    }
1851
1852    #[test_traced]
1853    fn test_journal_offset_overflow() {
1854        // Initialize the deterministic context
1855        let executor = deterministic::Runner::default();
1856        executor.start(|_| async move {
1857            // Create journal
1858            let cfg = Config {
1859                partition: "partition".to_string(),
1860                compression: None,
1861                codec_config: (),
1862                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863                write_buffer: NZUsize!(1024),
1864            };
1865            let context = MockStorage {
1866                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1867            };
1868            let mut journal = Journal::init(context, cfg).await.unwrap();
1869
1870            // Append data
1871            let data = 1;
1872            let result = journal.append(1, data).await;
1873            assert!(matches!(result, Err(Error::OffsetOverflow)));
1874        });
1875    }
1876
1877    #[test_traced]
1878    fn test_journal_rewind() {
1879        // Initialize the deterministic context
1880        let executor = deterministic::Runner::default();
1881        executor.start(|context| async move {
1882            // Create journal
1883            let cfg = Config {
1884                partition: "test_partition".to_string(),
1885                compression: None,
1886                codec_config: (),
1887                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1888                write_buffer: NZUsize!(1024),
1889            };
1890            let mut journal = Journal::init(context, cfg).await.unwrap();
1891
1892            // Check size of non-existent section
1893            let size = journal.size(1).await.unwrap();
1894            assert_eq!(size, 0);
1895
1896            // Append data to section 1
1897            journal.append(1, 42i32).await.unwrap();
1898
1899            // Check size of section 1 - should be greater than 0
1900            let size = journal.size(1).await.unwrap();
1901            assert!(size > 0);
1902
1903            // Append more data and verify size increases
1904            journal.append(1, 43i32).await.unwrap();
1905            let new_size = journal.size(1).await.unwrap();
1906            assert!(new_size > size);
1907
1908            // Check size of different section - should still be 0
1909            let size = journal.size(2).await.unwrap();
1910            assert_eq!(size, 0);
1911
1912            // Append data to section 2
1913            journal.append(2, 44i32).await.unwrap();
1914
1915            // Check size of section 2 - should be greater than 0
1916            let size = journal.size(2).await.unwrap();
1917            assert!(size > 0);
1918
1919            // Rollback everything in section 1 and 2
1920            journal.rewind(1, 0).await.unwrap();
1921
1922            // Check size of section 1 - should be 0
1923            let size = journal.size(1).await.unwrap();
1924            assert_eq!(size, 0);
1925
1926            // Check size of section 2 - should be 0
1927            let size = journal.size(2).await.unwrap();
1928            assert_eq!(size, 0);
1929        });
1930    }
1931
1932    #[test_traced]
1933    fn test_journal_rewind_section() {
1934        // Initialize the deterministic context
1935        let executor = deterministic::Runner::default();
1936        executor.start(|context| async move {
1937            // Create journal
1938            let cfg = Config {
1939                partition: "test_partition".to_string(),
1940                compression: None,
1941                codec_config: (),
1942                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1943                write_buffer: NZUsize!(1024),
1944            };
1945            let mut journal = Journal::init(context, cfg).await.unwrap();
1946
1947            // Check size of non-existent section
1948            let size = journal.size(1).await.unwrap();
1949            assert_eq!(size, 0);
1950
1951            // Append data to section 1
1952            journal.append(1, 42i32).await.unwrap();
1953
1954            // Check size of section 1 - should be greater than 0
1955            let size = journal.size(1).await.unwrap();
1956            assert!(size > 0);
1957
1958            // Append more data and verify size increases
1959            journal.append(1, 43i32).await.unwrap();
1960            let new_size = journal.size(1).await.unwrap();
1961            assert!(new_size > size);
1962
1963            // Check size of different section - should still be 0
1964            let size = journal.size(2).await.unwrap();
1965            assert_eq!(size, 0);
1966
1967            // Append data to section 2
1968            journal.append(2, 44i32).await.unwrap();
1969
1970            // Check size of section 2 - should be greater than 0
1971            let size = journal.size(2).await.unwrap();
1972            assert!(size > 0);
1973
1974            // Rollback everything in section 1
1975            journal.rewind_section(1, 0).await.unwrap();
1976
1977            // Check size of section 1 - should be 0
1978            let size = journal.size(1).await.unwrap();
1979            assert_eq!(size, 0);
1980
1981            // Check size of section 2 - should be greater than 0
1982            let size = journal.size(2).await.unwrap();
1983            assert!(size > 0);
1984        });
1985    }
1986
1987    /// Protect against accidental changes to the journal disk format.
1988    #[test_traced]
1989    fn test_journal_conformance() {
1990        // Initialize the deterministic context
1991        let executor = deterministic::Runner::default();
1992
1993        // Start the test within the executor
1994        executor.start(|context| async move {
1995            // Create a journal configuration
1996            let cfg = Config {
1997                partition: "test_partition".into(),
1998                compression: None,
1999                codec_config: (),
2000                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2001                write_buffer: NZUsize!(1024),
2002            };
2003
2004            // Initialize the journal
2005            let mut journal = Journal::init(context.clone(), cfg.clone())
2006                .await
2007                .expect("Failed to initialize journal");
2008
2009            // Append 100 items to the journal
2010            for i in 0..100 {
2011                journal.append(1, i).await.expect("Failed to append data");
2012            }
2013            journal.sync(1).await.expect("Failed to sync blob");
2014
2015            // Close the journal
2016            journal.close().await.expect("Failed to close journal");
2017
2018            // Hash blob contents
2019            let (blob, size) = context
2020                .open(&cfg.partition, &1u64.to_be_bytes())
2021                .await
2022                .expect("Failed to open blob");
2023            assert!(size > 0);
2024            let buf = blob
2025                .read_at(vec![0u8; size as usize], 0)
2026                .await
2027                .expect("Failed to read blob");
2028            let digest = Sha256::hash(buf.as_ref());
2029            assert_eq!(
2030                hex(&digest),
2031                "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2032            );
2033        });
2034    }
2035}