commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Size | Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method to produce a stream of all items in the `Journal` in order of their `section` and
61//! `offset`.
62//!
63//! # Exact Reads
64//!
65//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
66//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
67//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
68//! behavior.
69//!
70//! # Compression
71//!
72//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
73//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
74//! be changed between initializations of `Journal`, however, it must remain populated if any data
75//! was written with compression enabled.
76//!
77//! # Example
78//!
79//! ```rust
80//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
81//! use commonware_storage::journal::variable::{Journal, Config};
82//! use commonware_utils::NZUsize;
83//!
84//! let executor = deterministic::Runner::default();
85//! executor.start(|context| async move {
86//!     // Create a journal
87//!     let mut journal = Journal::init(context, Config{
88//!         partition: "partition".to_string(),
89//!         compression: None,
90//!         codec_config: (),
91//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
92//!         write_buffer: NZUsize!(1024 * 1024),
93//!     }).await.unwrap();
94//!
95//!     // Append data to the journal
96//!     journal.append(1, 128).await.unwrap();
97//!
98//!     // Close the journal
99//!     journal.close().await.unwrap();
100//! });
101//! ```
102
103use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107    buffer::{Append, PoolRef, Read},
108    Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114    collections::{btree_map::Entry, BTreeMap},
115    io::Cursor,
116    marker::PhantomData,
117    num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122/// Configuration for `Journal` storage.
123#[derive(Clone)]
124pub struct Config<C> {
125    /// The `commonware-runtime::Storage` partition to use
126    /// for storing journal blobs.
127    pub partition: String,
128
129    /// Optional compression level (using `zstd`) to apply to data before storing.
130    pub compression: Option<u8>,
131
132    /// The codec configuration to use for encoding and decoding items.
133    pub codec_config: C,
134
135    /// The buffer pool to use for caching data.
136    pub buffer_pool: PoolRef,
137
138    /// The size of the write buffer to use for each blob.
139    pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144/// Computes the next offset for an item using the underlying `u64`
145/// offset of `Blob`.
146#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148    let overage = offset % ITEM_ALIGNMENT;
149    if overage != 0 {
150        offset += ITEM_ALIGNMENT - overage;
151    }
152    let offset = offset / ITEM_ALIGNMENT;
153    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154    Ok(aligned_offset)
155}
156
157/// Implementation of `Journal` storage.
158pub struct Journal<E: Storage + Metrics, V: Codec> {
159    pub(crate) context: E,
160    pub(crate) cfg: Config<V::Cfg>,
161
162    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164    /// A section number before which all sections have been pruned. This value is not persisted,
165    /// and is initialized to 0 at startup. It's updated only during calls to `prune` during the
166    /// current execution, and therefore provides only a best effort lower-bound on the true value.
167    pub(crate) oldest_retained_section: u64,
168
169    pub(crate) tracked: Gauge,
170    pub(crate) synced: Counter,
171    pub(crate) pruned: Counter,
172
173    pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177    /// Initialize a new `Journal` instance.
178    ///
179    /// All backing blobs are opened but not read during
180    /// initialization. The `replay` method can be used
181    /// to iterate over all items in the `Journal`.
182    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183        // Iterate over blobs in partition
184        let mut blobs = BTreeMap::new();
185        let stored_blobs = match context.scan(&cfg.partition).await {
186            Ok(blobs) => blobs,
187            Err(RError::PartitionMissing(_)) => Vec::new(),
188            Err(err) => return Err(Error::Runtime(err)),
189        };
190        for name in stored_blobs {
191            let (blob, size) = context.open(&cfg.partition, &name).await?;
192            let hex_name = hex(&name);
193            let section = match name.try_into() {
194                Ok(section) => u64::from_be_bytes(section),
195                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196            };
197            debug!(section, blob = hex_name, size, "loaded section");
198            let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199            blobs.insert(section, blob);
200        }
201
202        // Initialize metrics
203        let tracked = Gauge::default();
204        let synced = Counter::default();
205        let pruned = Counter::default();
206        context.register("tracked", "Number of blobs", tracked.clone());
207        context.register("synced", "Number of syncs", synced.clone());
208        context.register("pruned", "Number of blobs pruned", pruned.clone());
209        tracked.set(blobs.len() as i64);
210
211        // Create journal instance
212        Ok(Self {
213            context,
214            cfg,
215            blobs,
216            oldest_retained_section: 0,
217            tracked,
218            synced,
219            pruned,
220
221            _phantom: PhantomData,
222        })
223    }
224
225    /// Ensures that a section pruned during the current execution is not accessed.
226    fn prune_guard(&self, section: u64) -> Result<(), Error> {
227        if section < self.oldest_retained_section {
228            Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229        } else {
230            Ok(())
231        }
232    }
233
234    /// Reads an item from the blob at the given offset.
235    pub(crate) async fn read(
236        compressed: bool,
237        cfg: &V::Cfg,
238        blob: &Append<E::Blob>,
239        offset: u32,
240    ) -> Result<(u32, u32, V), Error> {
241        // Read item size
242        let mut hasher = crc32fast::Hasher::new();
243        let offset = offset as u64 * ITEM_ALIGNMENT;
244        let size = blob.read_at(vec![0; 4], offset).await?;
245        hasher.update(size.as_ref());
246        let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249        // Read remaining
250        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252        let buf = buf.as_ref();
253        let offset = offset
254            .checked_add(buf_size as u64)
255            .ok_or(Error::OffsetOverflow)?;
256
257        // Read item
258        let item = &buf[..size];
259        hasher.update(item);
260
261        // Verify integrity
262        let checksum = hasher.finalize();
263        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264        if checksum != stored_checksum {
265            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266        }
267
268        // Compute next offset
269        let aligned_offset = compute_next_offset(offset)?;
270
271        // If compression is enabled, decompress the item
272        let item = if compressed {
273            let decompressed =
274                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276        } else {
277            V::decode_cfg(item, cfg).map_err(Error::Codec)?
278        };
279
280        // Return item
281        Ok((aligned_offset, size as u32, item))
282    }
283
284    /// Helper function to read an item from a [Read].
285    async fn read_buffered(
286        reader: &mut Read<Append<E::Blob>>,
287        offset: u32,
288        cfg: &V::Cfg,
289        compressed: bool,
290    ) -> Result<(u32, u64, u32, V), Error> {
291        // Calculate absolute file offset from the item offset
292        let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294        // If we're not at the right position, seek to it
295        if reader.position() != file_offset {
296            reader.seek_to(file_offset).map_err(Error::Runtime)?;
297        }
298
299        // Read item size (4 bytes)
300        let mut hasher = crc32fast::Hasher::new();
301        let mut size_buf = [0u8; 4];
302        reader
303            .read_exact(&mut size_buf, 4)
304            .await
305            .map_err(Error::Runtime)?;
306        hasher.update(&size_buf);
307
308        // Read remaining
309        let size = u32::from_be_bytes(size_buf) as usize;
310        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311        let mut buf = vec![0u8; buf_size];
312        reader
313            .read_exact(&mut buf, buf_size)
314            .await
315            .map_err(Error::Runtime)?;
316
317        // Read item
318        let item = &buf[..size];
319        hasher.update(item);
320
321        // Verify integrity
322        let checksum = hasher.finalize();
323        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324        if checksum != stored_checksum {
325            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326        }
327
328        // If compression is enabled, decompress the item
329        let item = if compressed {
330            let decompressed =
331                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333        } else {
334            V::decode_cfg(item, cfg).map_err(Error::Codec)?
335        };
336
337        // Calculate next offset
338        let current_pos = reader.position();
339        let aligned_offset = compute_next_offset(current_pos)?;
340        Ok((aligned_offset, current_pos, size as u32, item))
341    }
342
343    /// Reads an item from the blob at the given offset and of a given size.
344    async fn read_exact(
345        compressed: bool,
346        cfg: &V::Cfg,
347        blob: &Append<E::Blob>,
348        offset: u32,
349        len: u32,
350    ) -> Result<V, Error> {
351        // Read buffer
352        let offset = offset as u64 * ITEM_ALIGNMENT;
353        let entry_size = 4 + len as usize + 4;
354        let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356        // Check size
357        let mut hasher = crc32fast::Hasher::new();
358        let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359        hasher.update(&buf.as_ref()[..4]);
360        if disk_size != len {
361            return Err(Error::UnexpectedSize(disk_size, len));
362        }
363
364        // Verify integrity
365        let item = &buf.as_ref()[4..4 + len as usize];
366        hasher.update(item);
367        let checksum = hasher.finalize();
368        let stored_checksum =
369            u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370        if checksum != stored_checksum {
371            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372        }
373
374        // Decompress item
375        let item = if compressed {
376            decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377        } else {
378            item.to_vec()
379        };
380
381        // Return item
382        let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383        Ok(item)
384    }
385
386    /// Returns an ordered stream of all items in the journal starting with the item at the given
387    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
388    /// (section, offset, size, item).
389    ///
390    /// # Repair
391    ///
392    /// Like
393    /// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
394    /// and
395    /// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
396    /// the first invalid data read will be considered the new end of the journal (and the
397    /// underlying [Blob] will be truncated to the last valid item).
398    pub async fn replay(
399        &self,
400        start_section: u64,
401        mut offset: u32,
402        buffer: NonZeroUsize,
403    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404        // Collect all blobs to replay
405        let codec_config = self.cfg.codec_config.clone();
406        let compressed = self.cfg.compression.is_some();
407        let mut blobs = Vec::with_capacity(self.blobs.len());
408        for (section, blob) in self.blobs.range(start_section..) {
409            let blob_size = blob.size().await;
410            let max_offset = compute_next_offset(blob_size)?;
411            blobs.push((
412                *section,
413                blob.clone(),
414                max_offset,
415                blob_size,
416                codec_config.clone(),
417                compressed,
418            ));
419        }
420
421        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
422        // memory with buffered data)
423        Ok(stream::iter(blobs).flat_map(
424            move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425                // Created buffered reader
426                let mut reader = Read::new(blob, blob_size, buffer);
427                if section == start_section && offset != 0 {
428                    if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429                        warn!(section, offset, ?err, "failed to seek to offset");
430                        // Return early with the error to terminate the entire stream
431                        return stream::once(async move { Err(err.into()) }).left_stream();
432                    }
433                } else {
434                    offset = 0;
435                }
436
437                // Read over the blob
438                stream::unfold(
439                        (section, reader, offset, 0u64, codec_config, compressed),
440                        move |(
441                            section,
442                            mut reader,
443                            offset,
444                            valid_size,
445                            codec_config,
446                            compressed,
447                        )| async move {
448                            // Check if we are at the end of the blob
449                            if offset >= max_offset {
450                                return None;
451                            }
452
453                            // Read an item from the buffer
454                            match Self::read_buffered(
455                                &mut reader,
456                                offset,
457                                &codec_config,
458                                compressed,
459                            )
460                            .await
461                            {
462                                Ok((next_offset, next_valid_size, size, item)) => {
463                                    trace!(blob = section, cursor = offset, "replayed item");
464                                    Some((
465                                        Ok((section, offset, size, item)),
466                                        (
467                                            section,
468                                            reader,
469                                            next_offset,
470                                            next_valid_size,
471                                            codec_config,
472                                            compressed,
473                                        ),
474                                    ))
475                                }
476                                Err(Error::ChecksumMismatch(expected, found)) => {
477                                    // If we encounter corruption, we prune to the last valid item. This
478                                    // can happen during an unclean file close (where pending data is not
479                                    // fully synced to disk).
480                                    warn!(
481                                        blob = section,
482                                        bad_offset = offset,
483                                        new_size = valid_size,
484                                        expected,
485                                        found,
486                                        "corruption detected: truncating"
487                                    );
488                                    reader.resize(valid_size).await.ok()?;
489                                    None
490                                }
491                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492                                    // If we encounter trailing bytes, we prune to the last
493                                    // valid item. This can happen during an unclean file close (where
494                                    // pending data is not fully synced to disk).
495                                    warn!(
496                                        blob = section,
497                                        bad_offset = offset,
498                                        new_size = valid_size,
499                                        "trailing bytes detected: truncating"
500                                    );
501                                    reader.resize(valid_size).await.ok()?;
502                                    None
503                                }
504                                Err(err) => {
505                                    // If we encounter an unexpected error, return it without attempting
506                                    // to fix anything.
507                                    warn!(
508                                        blob = section,
509                                        cursor = offset,
510                                        ?err,
511                                        "unexpected error"
512                                    );
513                                    Some((
514                                        Err(err),
515                                        (
516                                            section,
517                                            reader,
518                                            offset,
519                                            valid_size,
520                                            codec_config,
521                                            compressed,
522                                        ),
523                                    ))
524                                }
525                            }
526                        },
527                    ).right_stream()
528            },
529        ))
530    }
531
532    /// Appends an item to `Journal` in a given `section`, returning the offset
533    /// where the item was written and the size of the item (which may now be smaller
534    /// than the encoded size from the codec, if compression is enabled).
535    ///
536    /// # Warning
537    ///
538    /// If there exist trailing bytes in the `Blob` of a particular `section` and
539    /// `replay` is not called before this, it is likely that subsequent data added
540    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
541    /// the checksum verification). It is recommended to call `replay` before calling
542    /// `append` to prevent this.
543    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544        // Check last pruned
545        self.prune_guard(section)?;
546
547        // Create item
548        let encoded = item.encode();
549        let encoded = if let Some(compression) = self.cfg.compression {
550            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551        } else {
552            encoded.into()
553        };
554
555        // Ensure item is not too large
556        let item_len = encoded.len();
557        let entry_len = 4 + item_len + 4;
558        let item_len = match item_len.try_into() {
559            Ok(len) => len,
560            Err(_) => return Err(Error::ItemTooLarge(item_len)),
561        };
562
563        // Get existing blob or create new one
564        let blob = match self.blobs.entry(section) {
565            Entry::Occupied(entry) => entry.into_mut(),
566            Entry::Vacant(entry) => {
567                let name = section.to_be_bytes();
568                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569                let blob = Append::new(
570                    blob,
571                    size,
572                    self.cfg.write_buffer,
573                    self.cfg.buffer_pool.clone(),
574                )
575                .await?;
576                self.tracked.inc();
577                entry.insert(blob)
578            }
579        };
580
581        // Calculate alignment
582        let cursor = blob.size().await;
583        let offset = compute_next_offset(cursor)?;
584        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585        let padding = (aligned_cursor - cursor) as usize;
586
587        // Populate buffer
588        let mut buf = Vec::with_capacity(padding + entry_len);
589
590        // Add padding bytes if necessary
591        if padding > 0 {
592            buf.resize(padding, 0);
593        }
594
595        // Add entry data
596        let entry_start = buf.len();
597        buf.put_u32(item_len);
598        buf.put_slice(&encoded);
599
600        // Calculate checksum only for the entry data (without padding)
601        let checksum = crc32fast::hash(&buf[entry_start..]);
602        buf.put_u32(checksum);
603        assert_eq!(buf[entry_start..].len(), entry_len);
604
605        // Append item to blob
606        blob.append(buf).await?;
607        trace!(blob = section, offset, "appended item");
608        Ok((offset, item_len))
609    }
610
611    /// Retrieves an item from `Journal` at a given `section` and `offset`.
612    pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
613        self.prune_guard(section)?;
614        let blob = match self.blobs.get(&section) {
615            Some(blob) => blob,
616            None => return Ok(None),
617        };
618
619        // Perform a multi-op read.
620        let (_, _, item) = Self::read(
621            self.cfg.compression.is_some(),
622            &self.cfg.codec_config,
623            blob,
624            offset,
625        )
626        .await?;
627        Ok(Some(item))
628    }
629
630    /// Retrieves an item from `Journal` at a given `section` and `offset` with a given size.
631    pub async fn get_exact(
632        &self,
633        section: u64,
634        offset: u32,
635        size: u32,
636    ) -> Result<Option<V>, Error> {
637        self.prune_guard(section)?;
638        let blob = match self.blobs.get(&section) {
639            Some(blob) => blob,
640            None => return Ok(None),
641        };
642
643        // Perform a multi-op read.
644        let item = Self::read_exact(
645            self.cfg.compression.is_some(),
646            &self.cfg.codec_config,
647            blob,
648            offset,
649            size,
650        )
651        .await?;
652        Ok(Some(item))
653    }
654
655    /// Gets the size of the journal for a specific section.
656    ///
657    /// Returns 0 if the section does not exist.
658    pub async fn size(&self, section: u64) -> Result<u64, Error> {
659        self.prune_guard(section)?;
660        match self.blobs.get(&section) {
661            Some(blob) => Ok(blob.size().await),
662            None => Ok(0),
663        }
664    }
665
666    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
667    ///
668    /// # Warnings
669    ///
670    /// * This operation is not guaranteed to survive restarts until sync is called.
671    /// * This operation is not atomic, but it will always leave the journal in a consistent state
672    ///   in the event of failure since blobs are always removed in reverse order of section.
673    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
674        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
675    }
676
677    /// Rewinds the journal to the given `section` and `size`.
678    ///
679    /// This removes any data beyond the specified `section` and `size`.
680    ///
681    /// # Warnings
682    ///
683    /// * This operation is not guaranteed to survive restarts until sync is called.
684    /// * This operation is not atomic, but it will always leave the journal in a consistent state
685    ///   in the event of failure since blobs are always removed in reverse order of section.
686    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
687        self.prune_guard(section)?;
688
689        // Remove any sections beyond the given section
690        let trailing: Vec<u64> = self
691            .blobs
692            .range((
693                std::ops::Bound::Excluded(section),
694                std::ops::Bound::Unbounded,
695            ))
696            .map(|(&section, _)| section)
697            .collect();
698        for index in trailing.iter().rev() {
699            // Remove the underlying blob from storage.
700            let blob = self.blobs.remove(index).unwrap();
701
702            // Destroy the blob
703            drop(blob);
704            self.context
705                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
706                .await?;
707            debug!(section = index, "removed section");
708            self.tracked.dec();
709        }
710
711        // If the section exists, truncate it to the given offset
712        let blob = match self.blobs.get_mut(&section) {
713            Some(blob) => blob,
714            None => return Ok(()),
715        };
716        let current = blob.size().await;
717        if size >= current {
718            return Ok(()); // Already smaller than or equal to target size
719        }
720        blob.resize(size).await?;
721        debug!(
722            section,
723            from = current,
724            to = size,
725            ?trailing,
726            "rewound journal"
727        );
728        Ok(())
729    }
730
731    /// Rewinds the `section` to the given `size`.
732    ///
733    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
734    ///
735    /// # Warning
736    ///
737    /// This operation is not guaranteed to survive restarts until sync is called.
738    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
739        self.prune_guard(section)?;
740
741        // Get the blob at the given section
742        let blob = match self.blobs.get_mut(&section) {
743            Some(blob) => blob,
744            None => return Ok(()),
745        };
746
747        // Truncate the blob to the given size
748        let current = blob.size().await;
749        if size >= current {
750            return Ok(()); // Already smaller than or equal to target size
751        }
752        blob.resize(size).await?;
753        debug!(section, from = current, to = size, "rewound section");
754        Ok(())
755    }
756
757    /// Ensures that all data in a given `section` is synced to the underlying store.
758    ///
759    /// If the `section` does not exist, no error will be returned.
760    pub async fn sync(&self, section: u64) -> Result<(), Error> {
761        self.prune_guard(section)?;
762        let blob = match self.blobs.get(&section) {
763            Some(blob) => blob,
764            None => return Ok(()),
765        };
766        self.synced.inc();
767        blob.sync().await.map_err(Error::Runtime)
768    }
769
770    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
771    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
772        // Prune any blobs that are smaller than the minimum
773        let mut pruned = false;
774        while let Some((&section, _)) = self.blobs.first_key_value() {
775            // Stop pruning if we reach the minimum
776            if section >= min {
777                break;
778            }
779
780            // Remove blob from journal
781            let blob = self.blobs.remove(&section).unwrap();
782            let size = blob.size().await;
783            drop(blob);
784
785            // Remove blob from storage
786            self.context
787                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
788                .await?;
789            pruned = true;
790
791            debug!(blob = section, size, "pruned blob");
792            self.tracked.dec();
793            self.pruned.inc();
794        }
795
796        Ok(pruned)
797    }
798
799    /// Syncs and closes all open sections.
800    pub async fn close(self) -> Result<(), Error> {
801        for (section, blob) in self.blobs.into_iter() {
802            let size = blob.size().await;
803            blob.sync().await?;
804            debug!(blob = section, size, "synced blob");
805        }
806        Ok(())
807    }
808
809    /// Returns the number of the oldest section in the journal.
810    pub fn oldest_section(&self) -> Option<u64> {
811        self.blobs.first_key_value().map(|(section, _)| *section)
812    }
813
814    /// Removes any underlying blobs created by the journal.
815    pub async fn destroy(self) -> Result<(), Error> {
816        for (i, blob) in self.blobs.into_iter() {
817            let size = blob.size().await;
818            drop(blob);
819            debug!(blob = i, size, "destroyed blob");
820            self.context
821                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
822                .await?;
823        }
824        match self.context.remove(&self.cfg.partition, None).await {
825            Ok(()) => {}
826            Err(RError::PartitionMissing(_)) => {
827                // Partition already removed or never existed.
828            }
829            Err(err) => return Err(Error::Runtime(err)),
830        }
831        Ok(())
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use bytes::BufMut;
839    use commonware_cryptography::{Hasher, Sha256};
840    use commonware_macros::test_traced;
841    use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
842    use commonware_utils::{NZUsize, StableBuf};
843    use futures::{pin_mut, StreamExt};
844    use prometheus_client::registry::Metric;
845
846    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
847    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
848
849    #[test_traced]
850    fn test_journal_append_and_read() {
851        // Initialize the deterministic context
852        let executor = deterministic::Runner::default();
853
854        // Start the test within the executor
855        executor.start(|context| async move {
856            // Initialize the journal
857            let cfg = Config {
858                partition: "test_partition".into(),
859                compression: None,
860                codec_config: (),
861                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
862                write_buffer: NZUsize!(1024),
863            };
864            let index = 1u64;
865            let data = 10;
866            let mut journal = Journal::init(context.clone(), cfg.clone())
867                .await
868                .expect("Failed to initialize journal");
869
870            // Append an item to the journal
871            journal
872                .append(index, data)
873                .await
874                .expect("Failed to append data");
875
876            // Check metrics
877            let buffer = context.encode();
878            assert!(buffer.contains("tracked 1"));
879
880            // Close the journal
881            journal.close().await.expect("Failed to close journal");
882
883            // Re-initialize the journal to simulate a restart
884            let cfg = Config {
885                partition: "test_partition".into(),
886                compression: None,
887                codec_config: (),
888                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
889                write_buffer: NZUsize!(1024),
890            };
891            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
892                .await
893                .expect("Failed to re-initialize journal");
894
895            // Replay the journal and collect items
896            let mut items = Vec::new();
897            let stream = journal
898                .replay(0, 0, NZUsize!(1024))
899                .await
900                .expect("unable to setup replay");
901            pin_mut!(stream);
902            while let Some(result) = stream.next().await {
903                match result {
904                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
905                    Err(err) => panic!("Failed to read item: {err}"),
906                }
907            }
908
909            // Verify that the item was replayed correctly
910            assert_eq!(items.len(), 1);
911            assert_eq!(items[0].0, index);
912            assert_eq!(items[0].1, data);
913
914            // Check metrics
915            let buffer = context.encode();
916            assert!(buffer.contains("tracked 1"));
917        });
918    }
919
920    #[test_traced]
921    fn test_journal_multiple_appends_and_reads() {
922        // Initialize the deterministic context
923        let executor = deterministic::Runner::default();
924
925        // Start the test within the executor
926        executor.start(|context| async move {
927            // Create a journal configuration
928            let cfg = Config {
929                partition: "test_partition".into(),
930                compression: None,
931                codec_config: (),
932                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
933                write_buffer: NZUsize!(1024),
934            };
935
936            // Initialize the journal
937            let mut journal = Journal::init(context.clone(), cfg.clone())
938                .await
939                .expect("Failed to initialize journal");
940
941            // Append multiple items to different blobs
942            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
943            for (index, data) in &data_items {
944                journal
945                    .append(*index, *data)
946                    .await
947                    .expect("Failed to append data");
948                journal.sync(*index).await.expect("Failed to sync blob");
949            }
950
951            // Check metrics
952            let buffer = context.encode();
953            assert!(buffer.contains("tracked 3"));
954            assert!(buffer.contains("synced_total 4"));
955
956            // Close the journal
957            journal.close().await.expect("Failed to close journal");
958
959            // Re-initialize the journal to simulate a restart
960            let journal = Journal::init(context, cfg)
961                .await
962                .expect("Failed to re-initialize journal");
963
964            // Replay the journal and collect items
965            let mut items = Vec::<(u64, u32)>::new();
966            {
967                let stream = journal
968                    .replay(0, 0, NZUsize!(1024))
969                    .await
970                    .expect("unable to setup replay");
971                pin_mut!(stream);
972                while let Some(result) = stream.next().await {
973                    match result {
974                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
975                        Err(err) => panic!("Failed to read item: {err}"),
976                    }
977                }
978            }
979
980            // Verify that all items were replayed correctly
981            assert_eq!(items.len(), data_items.len());
982            for ((expected_index, expected_data), (actual_index, actual_data)) in
983                data_items.iter().zip(items.iter())
984            {
985                assert_eq!(actual_index, expected_index);
986                assert_eq!(actual_data, expected_data);
987            }
988
989            // Cleanup
990            journal.destroy().await.expect("Failed to destroy journal");
991        });
992    }
993
994    #[test_traced]
995    fn test_journal_prune_blobs() {
996        // Initialize the deterministic context
997        let executor = deterministic::Runner::default();
998
999        // Start the test within the executor
1000        executor.start(|context| async move {
1001            // Create a journal configuration
1002            let cfg = Config {
1003                partition: "test_partition".into(),
1004                compression: None,
1005                codec_config: (),
1006                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1007                write_buffer: NZUsize!(1024),
1008            };
1009
1010            // Initialize the journal
1011            let mut journal = Journal::init(context.clone(), cfg.clone())
1012                .await
1013                .expect("Failed to initialize journal");
1014
1015            // Append items to multiple blobs
1016            for index in 1u64..=5u64 {
1017                journal
1018                    .append(index, index)
1019                    .await
1020                    .expect("Failed to append data");
1021                journal.sync(index).await.expect("Failed to sync blob");
1022            }
1023
1024            // Add one item out-of-order
1025            let data = 99;
1026            journal
1027                .append(2u64, data)
1028                .await
1029                .expect("Failed to append data");
1030            journal.sync(2u64).await.expect("Failed to sync blob");
1031
1032            // Prune blobs with indices less than 3
1033            journal.prune(3).await.expect("Failed to prune blobs");
1034
1035            // Check metrics
1036            let buffer = context.encode();
1037            assert!(buffer.contains("pruned_total 2"));
1038
1039            // Prune again with a section less than the previous one, should be a no-op
1040            journal.prune(2).await.expect("Failed to no-op prune");
1041            let buffer = context.encode();
1042            assert!(buffer.contains("pruned_total 2"));
1043
1044            // Close the journal
1045            journal.close().await.expect("Failed to close journal");
1046
1047            // Re-initialize the journal to simulate a restart
1048            let mut journal = Journal::init(context.clone(), cfg.clone())
1049                .await
1050                .expect("Failed to re-initialize journal");
1051
1052            // Replay the journal and collect items
1053            let mut items = Vec::<(u64, u64)>::new();
1054            {
1055                let stream = journal
1056                    .replay(0, 0, NZUsize!(1024))
1057                    .await
1058                    .expect("unable to setup replay");
1059                pin_mut!(stream);
1060                while let Some(result) = stream.next().await {
1061                    match result {
1062                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1063                        Err(err) => panic!("Failed to read item: {err}"),
1064                    }
1065                }
1066            }
1067
1068            // Verify that items from blobs 1 and 2 are not present
1069            assert_eq!(items.len(), 3);
1070            let expected_indices = [3u64, 4u64, 5u64];
1071            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1072                assert_eq!(item.0, *expected_index);
1073            }
1074
1075            // Prune all blobs
1076            journal.prune(6).await.expect("Failed to prune blobs");
1077
1078            // Close the journal
1079            journal.close().await.expect("Failed to close journal");
1080
1081            // Ensure no remaining blobs exist
1082            //
1083            // Note: We don't remove the partition, so this does not error
1084            // and instead returns an empty list of blobs.
1085            assert!(context
1086                .scan(&cfg.partition)
1087                .await
1088                .expect("Failed to list blobs")
1089                .is_empty());
1090        });
1091    }
1092
1093    #[test_traced]
1094    fn test_journal_with_invalid_blob_name() {
1095        // Initialize the deterministic context
1096        let executor = deterministic::Runner::default();
1097
1098        // Start the test within the executor
1099        executor.start(|context| async move {
1100            // Create a journal configuration
1101            let cfg = Config {
1102                partition: "test_partition".into(),
1103                compression: None,
1104                codec_config: (),
1105                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106                write_buffer: NZUsize!(1024),
1107            };
1108
1109            // Manually create a blob with an invalid name (not 8 bytes)
1110            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1111            let (blob, _) = context
1112                .open(&cfg.partition, invalid_blob_name)
1113                .await
1114                .expect("Failed to create blob with invalid name");
1115            blob.sync().await.expect("Failed to sync blob");
1116
1117            // Attempt to initialize the journal
1118            let result = Journal::<_, u64>::init(context, cfg).await;
1119
1120            // Expect an error
1121            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1122        });
1123    }
1124
1125    #[test_traced]
1126    fn test_journal_read_size_missing() {
1127        // Initialize the deterministic context
1128        let executor = deterministic::Runner::default();
1129
1130        // Start the test within the executor
1131        executor.start(|context| async move {
1132            // Create a journal configuration
1133            let cfg = Config {
1134                partition: "test_partition".into(),
1135                compression: None,
1136                codec_config: (),
1137                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1138                write_buffer: NZUsize!(1024),
1139            };
1140
1141            // Manually create a blob with incomplete size data
1142            let section = 1u64;
1143            let blob_name = section.to_be_bytes();
1144            let (blob, _) = context
1145                .open(&cfg.partition, &blob_name)
1146                .await
1147                .expect("Failed to create blob");
1148
1149            // Write incomplete size data (less than 4 bytes)
1150            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
1151            blob.write_at(incomplete_data, 0)
1152                .await
1153                .expect("Failed to write incomplete data");
1154            blob.sync().await.expect("Failed to sync blob");
1155
1156            // Initialize the journal
1157            let journal = Journal::init(context, cfg)
1158                .await
1159                .expect("Failed to initialize journal");
1160
1161            // Attempt to replay the journal
1162            let stream = journal
1163                .replay(0, 0, NZUsize!(1024))
1164                .await
1165                .expect("unable to setup replay");
1166            pin_mut!(stream);
1167            let mut items = Vec::<(u64, u64)>::new();
1168            while let Some(result) = stream.next().await {
1169                match result {
1170                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1171                    Err(err) => panic!("Failed to read item: {err}"),
1172                }
1173            }
1174            assert!(items.is_empty());
1175        });
1176    }
1177
1178    #[test_traced]
1179    fn test_journal_read_item_missing() {
1180        // Initialize the deterministic context
1181        let executor = deterministic::Runner::default();
1182
1183        // Start the test within the executor
1184        executor.start(|context| async move {
1185            // Create a journal configuration
1186            let cfg = Config {
1187                partition: "test_partition".into(),
1188                compression: None,
1189                codec_config: (),
1190                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1191                write_buffer: NZUsize!(1024),
1192            };
1193
1194            // Manually create a blob with missing item data
1195            let section = 1u64;
1196            let blob_name = section.to_be_bytes();
1197            let (blob, _) = context
1198                .open(&cfg.partition, &blob_name)
1199                .await
1200                .expect("Failed to create blob");
1201
1202            // Write size but no item data
1203            let item_size: u32 = 10; // Size of the item
1204            let mut buf = Vec::new();
1205            buf.put_u32(item_size);
1206            let data = [2u8; 5];
1207            BufMut::put_slice(&mut buf, &data);
1208            blob.write_at(buf, 0)
1209                .await
1210                .expect("Failed to write item size");
1211            blob.sync().await.expect("Failed to sync blob");
1212
1213            // Initialize the journal
1214            let journal = Journal::init(context, cfg)
1215                .await
1216                .expect("Failed to initialize journal");
1217
1218            // Attempt to replay the journal
1219            let stream = journal
1220                .replay(0, 0, NZUsize!(1024))
1221                .await
1222                .expect("unable to setup replay");
1223            pin_mut!(stream);
1224            let mut items = Vec::<(u64, u64)>::new();
1225            while let Some(result) = stream.next().await {
1226                match result {
1227                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1228                    Err(err) => panic!("Failed to read item: {err}"),
1229                }
1230            }
1231            assert!(items.is_empty());
1232        });
1233    }
1234
1235    #[test_traced]
1236    fn test_journal_read_checksum_missing() {
1237        // Initialize the deterministic context
1238        let executor = deterministic::Runner::default();
1239
1240        // Start the test within the executor
1241        executor.start(|context| async move {
1242            // Create a journal configuration
1243            let cfg = Config {
1244                partition: "test_partition".into(),
1245                compression: None,
1246                codec_config: (),
1247                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1248                write_buffer: NZUsize!(1024),
1249            };
1250
1251            // Manually create a blob with missing checksum
1252            let section = 1u64;
1253            let blob_name = section.to_be_bytes();
1254            let (blob, _) = context
1255                .open(&cfg.partition, &blob_name)
1256                .await
1257                .expect("Failed to create blob");
1258
1259            // Prepare item data
1260            let item_data = b"Test data";
1261            let item_size = item_data.len() as u32;
1262
1263            // Write size
1264            let mut offset = 0;
1265            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1266                .await
1267                .expect("Failed to write item size");
1268            offset += 4;
1269
1270            // Write item data
1271            blob.write_at(item_data.to_vec(), offset)
1272                .await
1273                .expect("Failed to write item data");
1274            // Do not write checksum (omit it)
1275
1276            blob.sync().await.expect("Failed to sync blob");
1277
1278            // Initialize the journal
1279            let journal = Journal::init(context, cfg)
1280                .await
1281                .expect("Failed to initialize journal");
1282
1283            // Attempt to replay the journal
1284            //
1285            // This will truncate the leftover bytes from our manual write.
1286            let stream = journal
1287                .replay(0, 0, NZUsize!(1024))
1288                .await
1289                .expect("unable to setup replay");
1290            pin_mut!(stream);
1291            let mut items = Vec::<(u64, u64)>::new();
1292            while let Some(result) = stream.next().await {
1293                match result {
1294                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1295                    Err(err) => panic!("Failed to read item: {err}"),
1296                }
1297            }
1298            assert!(items.is_empty());
1299        });
1300    }
1301
1302    #[test_traced]
1303    fn test_journal_read_checksum_mismatch() {
1304        // Initialize the deterministic context
1305        let executor = deterministic::Runner::default();
1306
1307        // Start the test within the executor
1308        executor.start(|context| async move {
1309            // Create a journal configuration
1310            let cfg = Config {
1311                partition: "test_partition".into(),
1312                compression: None,
1313                codec_config: (),
1314                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1315                write_buffer: NZUsize!(1024),
1316            };
1317
1318            // Manually create a blob with incorrect checksum
1319            let section = 1u64;
1320            let blob_name = section.to_be_bytes();
1321            let (blob, _) = context
1322                .open(&cfg.partition, &blob_name)
1323                .await
1324                .expect("Failed to create blob");
1325
1326            // Prepare item data
1327            let item_data = b"Test data";
1328            let item_size = item_data.len() as u32;
1329            let incorrect_checksum: u32 = 0xDEADBEEF;
1330
1331            // Write size
1332            let mut offset = 0;
1333            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1334                .await
1335                .expect("Failed to write item size");
1336            offset += 4;
1337
1338            // Write item data
1339            blob.write_at(item_data.to_vec(), offset)
1340                .await
1341                .expect("Failed to write item data");
1342            offset += item_data.len() as u64;
1343
1344            // Write incorrect checksum
1345            blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1346                .await
1347                .expect("Failed to write incorrect checksum");
1348
1349            blob.sync().await.expect("Failed to sync blob");
1350
1351            // Initialize the journal
1352            let journal = Journal::init(context.clone(), cfg.clone())
1353                .await
1354                .expect("Failed to initialize journal");
1355
1356            // Attempt to replay the journal
1357            {
1358                let stream = journal
1359                    .replay(0, 0, NZUsize!(1024))
1360                    .await
1361                    .expect("unable to setup replay");
1362                pin_mut!(stream);
1363                let mut items = Vec::<(u64, u64)>::new();
1364                while let Some(result) = stream.next().await {
1365                    match result {
1366                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1367                        Err(err) => panic!("Failed to read item: {err}"),
1368                    }
1369                }
1370                assert!(items.is_empty());
1371            }
1372            journal.close().await.expect("Failed to close journal");
1373
1374            // Confirm blob is expected length
1375            let (_, blob_size) = context
1376                .open(&cfg.partition, &section.to_be_bytes())
1377                .await
1378                .expect("Failed to open blob");
1379            assert_eq!(blob_size, 0);
1380        });
1381    }
1382
1383    #[test_traced]
1384    fn test_journal_handling_unaligned_truncated_data() {
1385        // Initialize the deterministic context
1386        let executor = deterministic::Runner::default();
1387
1388        // Start the test within the executor
1389        executor.start(|context| async move {
1390            // Create a journal configuration
1391            let cfg = Config {
1392                partition: "test_partition".into(),
1393                compression: None,
1394                codec_config: (),
1395                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1396                write_buffer: NZUsize!(1024),
1397            };
1398
1399            // Initialize the journal
1400            let mut journal = Journal::init(context.clone(), cfg.clone())
1401                .await
1402                .expect("Failed to initialize journal");
1403
1404            // Append 1 item to the first index
1405            journal.append(1, 1).await.expect("Failed to append data");
1406
1407            // Append multiple items to the second index (with unaligned values)
1408            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1409            for (index, data) in &data_items {
1410                journal
1411                    .append(*index, *data)
1412                    .await
1413                    .expect("Failed to append data");
1414                journal.sync(*index).await.expect("Failed to sync blob");
1415            }
1416
1417            // Close the journal
1418            journal.close().await.expect("Failed to close journal");
1419
1420            // Manually corrupt the end of the second blob
1421            let (blob, blob_size) = context
1422                .open(&cfg.partition, &2u64.to_be_bytes())
1423                .await
1424                .expect("Failed to open blob");
1425            blob.resize(blob_size - 4)
1426                .await
1427                .expect("Failed to corrupt blob");
1428            blob.sync().await.expect("Failed to sync blob");
1429
1430            // Re-initialize the journal to simulate a restart
1431            let journal = Journal::init(context.clone(), cfg.clone())
1432                .await
1433                .expect("Failed to re-initialize journal");
1434
1435            // Attempt to replay the journal
1436            let mut items = Vec::<(u64, u32)>::new();
1437            {
1438                let stream = journal
1439                    .replay(0, 0, NZUsize!(1024))
1440                    .await
1441                    .expect("unable to setup replay");
1442                pin_mut!(stream);
1443                while let Some(result) = stream.next().await {
1444                    match result {
1445                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1446                        Err(err) => panic!("Failed to read item: {err}"),
1447                    }
1448                }
1449            }
1450            journal.close().await.expect("Failed to close journal");
1451
1452            // Verify that only non-corrupted items were replayed
1453            assert_eq!(items.len(), 3);
1454            assert_eq!(items[0].0, 1);
1455            assert_eq!(items[0].1, 1);
1456            assert_eq!(items[1].0, data_items[0].0);
1457            assert_eq!(items[1].1, data_items[0].1);
1458            assert_eq!(items[2].0, data_items[1].0);
1459            assert_eq!(items[2].1, data_items[1].1);
1460
1461            // Confirm blob is expected length
1462            let (_, blob_size) = context
1463                .open(&cfg.partition, &2u64.to_be_bytes())
1464                .await
1465                .expect("Failed to open blob");
1466            assert_eq!(blob_size, 28);
1467
1468            // Attempt to replay journal after truncation
1469            let mut journal = Journal::init(context.clone(), cfg.clone())
1470                .await
1471                .expect("Failed to re-initialize journal");
1472
1473            // Attempt to replay the journal
1474            let mut items = Vec::<(u64, u32)>::new();
1475            {
1476                let stream = journal
1477                    .replay(0, 0, NZUsize!(1024))
1478                    .await
1479                    .expect("unable to setup replay");
1480                pin_mut!(stream);
1481                while let Some(result) = stream.next().await {
1482                    match result {
1483                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1484                        Err(err) => panic!("Failed to read item: {err}"),
1485                    }
1486                }
1487            }
1488
1489            // Verify that only non-corrupted items were replayed
1490            assert_eq!(items.len(), 3);
1491            assert_eq!(items[0].0, 1);
1492            assert_eq!(items[0].1, 1);
1493            assert_eq!(items[1].0, data_items[0].0);
1494            assert_eq!(items[1].1, data_items[0].1);
1495            assert_eq!(items[2].0, data_items[1].0);
1496            assert_eq!(items[2].1, data_items[1].1);
1497
1498            // Append a new item to truncated partition
1499            journal.append(2, 5).await.expect("Failed to append data");
1500            journal.sync(2).await.expect("Failed to sync blob");
1501
1502            // Get the new item
1503            let item = journal
1504                .get(2, 2)
1505                .await
1506                .expect("Failed to get item")
1507                .expect("Failed to get item");
1508            assert_eq!(item, 5);
1509
1510            // Close the journal
1511            journal.close().await.expect("Failed to close journal");
1512
1513            // Confirm blob is expected length
1514            let (_, blob_size) = context
1515                .open(&cfg.partition, &2u64.to_be_bytes())
1516                .await
1517                .expect("Failed to open blob");
1518            assert_eq!(blob_size, 44);
1519
1520            // Re-initialize the journal to simulate a restart
1521            let journal = Journal::init(context.clone(), cfg.clone())
1522                .await
1523                .expect("Failed to re-initialize journal");
1524
1525            // Attempt to replay the journal
1526            let mut items = Vec::<(u64, u32)>::new();
1527            {
1528                let stream = journal
1529                    .replay(0, 0, NZUsize!(1024))
1530                    .await
1531                    .expect("unable to setup replay");
1532                pin_mut!(stream);
1533                while let Some(result) = stream.next().await {
1534                    match result {
1535                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1536                        Err(err) => panic!("Failed to read item: {err}"),
1537                    }
1538                }
1539            }
1540
1541            // Verify that only non-corrupted items were replayed
1542            assert_eq!(items.len(), 4);
1543            assert_eq!(items[0].0, 1);
1544            assert_eq!(items[0].1, 1);
1545            assert_eq!(items[1].0, data_items[0].0);
1546            assert_eq!(items[1].1, data_items[0].1);
1547            assert_eq!(items[2].0, data_items[1].0);
1548            assert_eq!(items[2].1, data_items[1].1);
1549            assert_eq!(items[3].0, 2);
1550            assert_eq!(items[3].1, 5);
1551        });
1552    }
1553
1554    #[test_traced]
1555    fn test_journal_handling_aligned_truncated_data() {
1556        // Initialize the deterministic context
1557        let executor = deterministic::Runner::default();
1558
1559        // Start the test within the executor
1560        executor.start(|context| async move {
1561            // Create a journal configuration
1562            let cfg = Config {
1563                partition: "test_partition".into(),
1564                compression: None,
1565                codec_config: (),
1566                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1567                write_buffer: NZUsize!(1024),
1568            };
1569
1570            // Initialize the journal
1571            let mut journal = Journal::init(context.clone(), cfg.clone())
1572                .await
1573                .expect("Failed to initialize journal");
1574
1575            // Append 1 item to the first index
1576            journal.append(1, 1).await.expect("Failed to append data");
1577
1578            // Append multiple items to the second index (with unaligned values)
1579            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1580            for (index, data) in &data_items {
1581                journal
1582                    .append(*index, *data)
1583                    .await
1584                    .expect("Failed to append data");
1585                journal.sync(*index).await.expect("Failed to sync blob");
1586            }
1587
1588            // Close the journal
1589            journal.close().await.expect("Failed to close journal");
1590
1591            // Manually corrupt the end of the second blob
1592            let (blob, blob_size) = context
1593                .open(&cfg.partition, &2u64.to_be_bytes())
1594                .await
1595                .expect("Failed to open blob");
1596            blob.resize(blob_size - 4)
1597                .await
1598                .expect("Failed to corrupt blob");
1599            blob.sync().await.expect("Failed to sync blob");
1600
1601            // Re-initialize the journal to simulate a restart
1602            let mut journal = Journal::init(context.clone(), cfg.clone())
1603                .await
1604                .expect("Failed to re-initialize journal");
1605
1606            // Attempt to replay the journal
1607            let mut items = Vec::<(u64, u64)>::new();
1608            {
1609                let stream = journal
1610                    .replay(0, 0, NZUsize!(1024))
1611                    .await
1612                    .expect("unable to setup replay");
1613                pin_mut!(stream);
1614                while let Some(result) = stream.next().await {
1615                    match result {
1616                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1617                        Err(err) => panic!("Failed to read item: {err}"),
1618                    }
1619                }
1620            }
1621
1622            // Verify that only non-corrupted items were replayed
1623            assert_eq!(items.len(), 3);
1624            assert_eq!(items[0].0, 1);
1625            assert_eq!(items[0].1, 1);
1626            assert_eq!(items[1].0, data_items[0].0);
1627            assert_eq!(items[1].1, data_items[0].1);
1628            assert_eq!(items[2].0, data_items[1].0);
1629            assert_eq!(items[2].1, data_items[1].1);
1630
1631            // Append a new item to the truncated partition
1632            journal.append(2, 5).await.expect("Failed to append data");
1633            journal.sync(2).await.expect("Failed to sync blob");
1634
1635            // Get the new item
1636            let item = journal
1637                .get(2, 2)
1638                .await
1639                .expect("Failed to get item")
1640                .expect("Failed to get item");
1641            assert_eq!(item, 5);
1642
1643            // Close the journal
1644            journal.close().await.expect("Failed to close journal");
1645
1646            // Confirm blob is expected length
1647            let (_, blob_size) = context
1648                .open(&cfg.partition, &2u64.to_be_bytes())
1649                .await
1650                .expect("Failed to open blob");
1651            assert_eq!(blob_size, 48);
1652
1653            // Attempt to replay journal after truncation
1654            let journal = Journal::init(context, cfg)
1655                .await
1656                .expect("Failed to re-initialize journal");
1657
1658            // Attempt to replay the journal
1659            let mut items = Vec::<(u64, u64)>::new();
1660            {
1661                let stream = journal
1662                    .replay(0, 0, NZUsize!(1024))
1663                    .await
1664                    .expect("unable to setup replay");
1665                pin_mut!(stream);
1666                while let Some(result) = stream.next().await {
1667                    match result {
1668                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1669                        Err(err) => panic!("Failed to read item: {err}"),
1670                    }
1671                }
1672            }
1673            journal.close().await.expect("Failed to close journal");
1674
1675            // Verify that only non-corrupted items were replayed
1676            assert_eq!(items.len(), 4);
1677            assert_eq!(items[0].0, 1);
1678            assert_eq!(items[0].1, 1);
1679            assert_eq!(items[1].0, data_items[0].0);
1680            assert_eq!(items[1].1, data_items[0].1);
1681            assert_eq!(items[2].0, data_items[1].0);
1682            assert_eq!(items[2].1, data_items[1].1);
1683            assert_eq!(items[3].0, 2);
1684            assert_eq!(items[3].1, 5);
1685        });
1686    }
1687
1688    #[test_traced]
1689    fn test_journal_handling_extra_data() {
1690        // Initialize the deterministic context
1691        let executor = deterministic::Runner::default();
1692
1693        // Start the test within the executor
1694        executor.start(|context| async move {
1695            // Create a journal configuration
1696            let cfg = Config {
1697                partition: "test_partition".into(),
1698                compression: None,
1699                codec_config: (),
1700                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1701                write_buffer: NZUsize!(1024),
1702            };
1703
1704            // Initialize the journal
1705            let mut journal = Journal::init(context.clone(), cfg.clone())
1706                .await
1707                .expect("Failed to initialize journal");
1708
1709            // Append 1 item to the first index
1710            journal.append(1, 1).await.expect("Failed to append data");
1711
1712            // Append multiple items to the second index
1713            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1714            for (index, data) in &data_items {
1715                journal
1716                    .append(*index, *data)
1717                    .await
1718                    .expect("Failed to append data");
1719                journal.sync(*index).await.expect("Failed to sync blob");
1720            }
1721
1722            // Close the journal
1723            journal.close().await.expect("Failed to close journal");
1724
1725            // Manually add extra data to the end of the second blob
1726            let (blob, blob_size) = context
1727                .open(&cfg.partition, &2u64.to_be_bytes())
1728                .await
1729                .expect("Failed to open blob");
1730            blob.write_at(vec![0u8; 16], blob_size)
1731                .await
1732                .expect("Failed to add extra data");
1733            blob.sync().await.expect("Failed to sync blob");
1734
1735            // Re-initialize the journal to simulate a restart
1736            let journal = Journal::init(context, cfg)
1737                .await
1738                .expect("Failed to re-initialize journal");
1739
1740            // Attempt to replay the journal
1741            let mut items = Vec::<(u64, i32)>::new();
1742            let stream = journal
1743                .replay(0, 0, NZUsize!(1024))
1744                .await
1745                .expect("unable to setup replay");
1746            pin_mut!(stream);
1747            while let Some(result) = stream.next().await {
1748                match result {
1749                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1750                    Err(err) => panic!("Failed to read item: {err}"),
1751                }
1752            }
1753        });
1754    }
1755
1756    // Define `MockBlob` that returns an offset length that should overflow
1757    #[derive(Clone)]
1758    struct MockBlob {}
1759
1760    impl Blob for MockBlob {
1761        async fn read_at(
1762            &self,
1763            buf: impl Into<StableBuf> + Send,
1764            _offset: u64,
1765        ) -> Result<StableBuf, RError> {
1766            Ok(buf.into())
1767        }
1768
1769        async fn write_at(
1770            &self,
1771            _buf: impl Into<StableBuf> + Send,
1772            _offset: u64,
1773        ) -> Result<(), RError> {
1774            Ok(())
1775        }
1776
1777        async fn resize(&self, _len: u64) -> Result<(), RError> {
1778            Ok(())
1779        }
1780
1781        async fn sync(&self) -> Result<(), RError> {
1782            Ok(())
1783        }
1784    }
1785
1786    // Define `MockStorage` that returns `MockBlob`
1787    #[derive(Clone)]
1788    struct MockStorage {
1789        len: u64,
1790    }
1791
1792    impl Storage for MockStorage {
1793        type Blob = MockBlob;
1794
1795        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1796            Ok((MockBlob {}, self.len))
1797        }
1798
1799        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1800            Ok(())
1801        }
1802
1803        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1804            Ok(vec![])
1805        }
1806    }
1807
1808    impl Metrics for MockStorage {
1809        fn with_label(&self, _: &str) -> Self {
1810            self.clone()
1811        }
1812
1813        fn label(&self) -> String {
1814            String::new()
1815        }
1816
1817        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1818
1819        fn encode(&self) -> String {
1820            String::new()
1821        }
1822    }
1823
1824    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1825    // changes to the value
1826    const INDEX_ALIGNMENT: u64 = 16;
1827
1828    #[test_traced]
1829    fn test_journal_large_offset() {
1830        // Initialize the deterministic context
1831        let executor = deterministic::Runner::default();
1832        executor.start(|_| async move {
1833            // Create journal
1834            let cfg = Config {
1835                partition: "partition".to_string(),
1836                compression: None,
1837                codec_config: (),
1838                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1839                write_buffer: NZUsize!(1024),
1840            };
1841            let context = MockStorage {
1842                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1843            };
1844            let mut journal = Journal::init(context, cfg).await.unwrap();
1845
1846            // Append data
1847            let data = 1;
1848            let (result, _) = journal
1849                .append(1, data)
1850                .await
1851                .expect("Failed to append data");
1852            assert_eq!(result, u32::MAX);
1853        });
1854    }
1855
1856    #[test_traced]
1857    fn test_journal_offset_overflow() {
1858        // Initialize the deterministic context
1859        let executor = deterministic::Runner::default();
1860        executor.start(|_| async move {
1861            // Create journal
1862            let cfg = Config {
1863                partition: "partition".to_string(),
1864                compression: None,
1865                codec_config: (),
1866                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1867                write_buffer: NZUsize!(1024),
1868            };
1869            let context = MockStorage {
1870                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1871            };
1872            let mut journal = Journal::init(context, cfg).await.unwrap();
1873
1874            // Append data
1875            let data = 1;
1876            let result = journal.append(1, data).await;
1877            assert!(matches!(result, Err(Error::OffsetOverflow)));
1878        });
1879    }
1880
1881    #[test_traced]
1882    fn test_journal_rewind() {
1883        // Initialize the deterministic context
1884        let executor = deterministic::Runner::default();
1885        executor.start(|context| async move {
1886            // Create journal
1887            let cfg = Config {
1888                partition: "test_partition".to_string(),
1889                compression: None,
1890                codec_config: (),
1891                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1892                write_buffer: NZUsize!(1024),
1893            };
1894            let mut journal = Journal::init(context, cfg).await.unwrap();
1895
1896            // Check size of non-existent section
1897            let size = journal.size(1).await.unwrap();
1898            assert_eq!(size, 0);
1899
1900            // Append data to section 1
1901            journal.append(1, 42i32).await.unwrap();
1902
1903            // Check size of section 1 - should be greater than 0
1904            let size = journal.size(1).await.unwrap();
1905            assert!(size > 0);
1906
1907            // Append more data and verify size increases
1908            journal.append(1, 43i32).await.unwrap();
1909            let new_size = journal.size(1).await.unwrap();
1910            assert!(new_size > size);
1911
1912            // Check size of different section - should still be 0
1913            let size = journal.size(2).await.unwrap();
1914            assert_eq!(size, 0);
1915
1916            // Append data to section 2
1917            journal.append(2, 44i32).await.unwrap();
1918
1919            // Check size of section 2 - should be greater than 0
1920            let size = journal.size(2).await.unwrap();
1921            assert!(size > 0);
1922
1923            // Rollback everything in section 1 and 2
1924            journal.rewind(1, 0).await.unwrap();
1925
1926            // Check size of section 1 - should be 0
1927            let size = journal.size(1).await.unwrap();
1928            assert_eq!(size, 0);
1929
1930            // Check size of section 2 - should be 0
1931            let size = journal.size(2).await.unwrap();
1932            assert_eq!(size, 0);
1933        });
1934    }
1935
1936    #[test_traced]
1937    fn test_journal_rewind_section() {
1938        // Initialize the deterministic context
1939        let executor = deterministic::Runner::default();
1940        executor.start(|context| async move {
1941            // Create journal
1942            let cfg = Config {
1943                partition: "test_partition".to_string(),
1944                compression: None,
1945                codec_config: (),
1946                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1947                write_buffer: NZUsize!(1024),
1948            };
1949            let mut journal = Journal::init(context, cfg).await.unwrap();
1950
1951            // Check size of non-existent section
1952            let size = journal.size(1).await.unwrap();
1953            assert_eq!(size, 0);
1954
1955            // Append data to section 1
1956            journal.append(1, 42i32).await.unwrap();
1957
1958            // Check size of section 1 - should be greater than 0
1959            let size = journal.size(1).await.unwrap();
1960            assert!(size > 0);
1961
1962            // Append more data and verify size increases
1963            journal.append(1, 43i32).await.unwrap();
1964            let new_size = journal.size(1).await.unwrap();
1965            assert!(new_size > size);
1966
1967            // Check size of different section - should still be 0
1968            let size = journal.size(2).await.unwrap();
1969            assert_eq!(size, 0);
1970
1971            // Append data to section 2
1972            journal.append(2, 44i32).await.unwrap();
1973
1974            // Check size of section 2 - should be greater than 0
1975            let size = journal.size(2).await.unwrap();
1976            assert!(size > 0);
1977
1978            // Rollback everything in section 1
1979            journal.rewind_section(1, 0).await.unwrap();
1980
1981            // Check size of section 1 - should be 0
1982            let size = journal.size(1).await.unwrap();
1983            assert_eq!(size, 0);
1984
1985            // Check size of section 2 - should be greater than 0
1986            let size = journal.size(2).await.unwrap();
1987            assert!(size > 0);
1988        });
1989    }
1990
1991    /// Protect against accidental changes to the journal disk format.
1992    #[test_traced]
1993    fn test_journal_conformance() {
1994        // Initialize the deterministic context
1995        let executor = deterministic::Runner::default();
1996
1997        // Start the test within the executor
1998        executor.start(|context| async move {
1999            // Create a journal configuration
2000            let cfg = Config {
2001                partition: "test_partition".into(),
2002                compression: None,
2003                codec_config: (),
2004                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2005                write_buffer: NZUsize!(1024),
2006            };
2007
2008            // Initialize the journal
2009            let mut journal = Journal::init(context.clone(), cfg.clone())
2010                .await
2011                .expect("Failed to initialize journal");
2012
2013            // Append 100 items to the journal
2014            for i in 0..100 {
2015                journal.append(1, i).await.expect("Failed to append data");
2016            }
2017            journal.sync(1).await.expect("Failed to sync blob");
2018
2019            // Close the journal
2020            journal.close().await.expect("Failed to close journal");
2021
2022            // Hash blob contents
2023            let (blob, size) = context
2024                .open(&cfg.partition, &1u64.to_be_bytes())
2025                .await
2026                .expect("Failed to open blob");
2027            assert!(size > 0);
2028            let buf = blob
2029                .read_at(vec![0u8; size as usize], 0)
2030                .await
2031                .expect("Failed to read blob");
2032            let digest = Sha256::hash(buf.as_ref());
2033            assert_eq!(
2034                hex(&digest),
2035                "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2036            );
2037        });
2038    }
2039}