commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Size | Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method to produce a stream of all items in the `Journal` in order of their `section` and
61//! `offset`.
62//!
63//! # Exact Reads
64//!
65//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
66//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
67//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
68//! behavior.
69//!
70//! # Compression
71//!
72//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
73//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
74//! be changed between initializations of `Journal`, however, it must remain populated if any data
75//! was written with compression enabled.
76//!
77//! # Example
78//!
79//! ```rust
80//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
81//! use commonware_storage::journal::segmented::variable::{Journal, Config};
82//! use commonware_utils::NZUsize;
83//!
84//! let executor = deterministic::Runner::default();
85//! executor.start(|context| async move {
86//!     // Create a journal
87//!     let mut journal = Journal::init(context, Config{
88//!         partition: "partition".to_string(),
89//!         compression: None,
90//!         codec_config: (),
91//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
92//!         write_buffer: NZUsize!(1024 * 1024),
93//!     }).await.unwrap();
94//!
95//!     // Append data to the journal
96//!     journal.append(1, 128).await.unwrap();
97//!
98//!     // Close the journal
99//!     journal.close().await.unwrap();
100//! });
101//! ```
102
103use crate::journal::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107    buffer::{Append, PoolRef, Read},
108    Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114    collections::{btree_map::Entry, BTreeMap},
115    io::Cursor,
116    marker::PhantomData,
117    num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122/// Configuration for `Journal` storage.
123#[derive(Clone)]
124pub struct Config<C> {
125    /// The `commonware-runtime::Storage` partition to use
126    /// for storing journal blobs.
127    pub partition: String,
128
129    /// Optional compression level (using `zstd`) to apply to data before storing.
130    pub compression: Option<u8>,
131
132    /// The codec configuration to use for encoding and decoding items.
133    pub codec_config: C,
134
135    /// The buffer pool to use for caching data.
136    pub buffer_pool: PoolRef,
137
138    /// The size of the write buffer to use for each blob.
139    pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144/// Computes the next offset for an item using the underlying `u64`
145/// offset of `Blob`.
146#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148    let overage = offset % ITEM_ALIGNMENT;
149    if overage != 0 {
150        offset += ITEM_ALIGNMENT - overage;
151    }
152    let offset = offset / ITEM_ALIGNMENT;
153    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154    Ok(aligned_offset)
155}
156
157/// Implementation of `Journal` storage.
158pub struct Journal<E: Storage + Metrics, V: Codec> {
159    pub(crate) context: E,
160    pub(crate) cfg: Config<V::Cfg>,
161
162    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164    /// A section number before which all sections have been pruned. This value is not persisted,
165    /// and is initialized to 0 at startup. It's updated only during calls to `prune` during the
166    /// current execution, and therefore provides only a best effort lower-bound on the true value.
167    pub(crate) oldest_retained_section: u64,
168
169    pub(crate) tracked: Gauge,
170    pub(crate) synced: Counter,
171    pub(crate) pruned: Counter,
172
173    pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177    /// Initialize a new `Journal` instance.
178    ///
179    /// All backing blobs are opened but not read during
180    /// initialization. The `replay` method can be used
181    /// to iterate over all items in the `Journal`.
182    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183        // Iterate over blobs in partition
184        let mut blobs = BTreeMap::new();
185        let stored_blobs = match context.scan(&cfg.partition).await {
186            Ok(blobs) => blobs,
187            Err(RError::PartitionMissing(_)) => Vec::new(),
188            Err(err) => return Err(Error::Runtime(err)),
189        };
190        for name in stored_blobs {
191            let (blob, size) = context.open(&cfg.partition, &name).await?;
192            let hex_name = hex(&name);
193            let section = match name.try_into() {
194                Ok(section) => u64::from_be_bytes(section),
195                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196            };
197            debug!(section, blob = hex_name, size, "loaded section");
198            let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199            blobs.insert(section, blob);
200        }
201
202        // Initialize metrics
203        let tracked = Gauge::default();
204        let synced = Counter::default();
205        let pruned = Counter::default();
206        context.register("tracked", "Number of blobs", tracked.clone());
207        context.register("synced", "Number of syncs", synced.clone());
208        context.register("pruned", "Number of blobs pruned", pruned.clone());
209        tracked.set(blobs.len() as i64);
210
211        // Create journal instance
212        Ok(Self {
213            context,
214            cfg,
215            blobs,
216            oldest_retained_section: 0,
217            tracked,
218            synced,
219            pruned,
220
221            _phantom: PhantomData,
222        })
223    }
224
225    /// Ensures that a section pruned during the current execution is not accessed.
226    fn prune_guard(&self, section: u64) -> Result<(), Error> {
227        if section < self.oldest_retained_section {
228            Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229        } else {
230            Ok(())
231        }
232    }
233
234    /// Reads an item from the blob at the given offset.
235    pub(crate) async fn read(
236        compressed: bool,
237        cfg: &V::Cfg,
238        blob: &Append<E::Blob>,
239        offset: u32,
240    ) -> Result<(u32, u32, V), Error> {
241        // Read item size
242        let mut hasher = crc32fast::Hasher::new();
243        let offset = offset as u64 * ITEM_ALIGNMENT;
244        let size = blob.read_at(vec![0; 4], offset).await?;
245        hasher.update(size.as_ref());
246        let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249        // Read remaining
250        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252        let buf = buf.as_ref();
253        let offset = offset
254            .checked_add(buf_size as u64)
255            .ok_or(Error::OffsetOverflow)?;
256
257        // Read item
258        let item = &buf[..size];
259        hasher.update(item);
260
261        // Verify integrity
262        let checksum = hasher.finalize();
263        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264        if checksum != stored_checksum {
265            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266        }
267
268        // Compute next offset
269        let aligned_offset = compute_next_offset(offset)?;
270
271        // If compression is enabled, decompress the item
272        let item = if compressed {
273            let decompressed =
274                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276        } else {
277            V::decode_cfg(item, cfg).map_err(Error::Codec)?
278        };
279
280        // Return item
281        Ok((aligned_offset, size as u32, item))
282    }
283
284    /// Helper function to read an item from a [Read].
285    async fn read_buffered(
286        reader: &mut Read<Append<E::Blob>>,
287        offset: u32,
288        cfg: &V::Cfg,
289        compressed: bool,
290    ) -> Result<(u32, u64, u32, V), Error> {
291        // Calculate absolute file offset from the item offset
292        let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294        // If we're not at the right position, seek to it
295        if reader.position() != file_offset {
296            reader.seek_to(file_offset).map_err(Error::Runtime)?;
297        }
298
299        // Read item size (4 bytes)
300        let mut hasher = crc32fast::Hasher::new();
301        let mut size_buf = [0u8; 4];
302        reader
303            .read_exact(&mut size_buf, 4)
304            .await
305            .map_err(Error::Runtime)?;
306        hasher.update(&size_buf);
307
308        // Read remaining
309        let size = u32::from_be_bytes(size_buf) as usize;
310        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311        let mut buf = vec![0u8; buf_size];
312        reader
313            .read_exact(&mut buf, buf_size)
314            .await
315            .map_err(Error::Runtime)?;
316
317        // Read item
318        let item = &buf[..size];
319        hasher.update(item);
320
321        // Verify integrity
322        let checksum = hasher.finalize();
323        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324        if checksum != stored_checksum {
325            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326        }
327
328        // If compression is enabled, decompress the item
329        let item = if compressed {
330            let decompressed =
331                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333        } else {
334            V::decode_cfg(item, cfg).map_err(Error::Codec)?
335        };
336
337        // Calculate next offset
338        let current_pos = reader.position();
339        let aligned_offset = compute_next_offset(current_pos)?;
340        Ok((aligned_offset, current_pos, size as u32, item))
341    }
342
343    /// Reads an item from the blob at the given offset and of a given size.
344    async fn read_exact(
345        compressed: bool,
346        cfg: &V::Cfg,
347        blob: &Append<E::Blob>,
348        offset: u32,
349        len: u32,
350    ) -> Result<V, Error> {
351        // Read buffer
352        let offset = offset as u64 * ITEM_ALIGNMENT;
353        let entry_size = 4 + len as usize + 4;
354        let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356        // Check size
357        let mut hasher = crc32fast::Hasher::new();
358        let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359        hasher.update(&buf.as_ref()[..4]);
360        if disk_size != len {
361            return Err(Error::UnexpectedSize(disk_size, len));
362        }
363
364        // Verify integrity
365        let item = &buf.as_ref()[4..4 + len as usize];
366        hasher.update(item);
367        let checksum = hasher.finalize();
368        let stored_checksum =
369            u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370        if checksum != stored_checksum {
371            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372        }
373
374        // Decompress item
375        let item = if compressed {
376            decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377        } else {
378            item.to_vec()
379        };
380
381        // Return item
382        let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383        Ok(item)
384    }
385
386    /// Returns an ordered stream of all items in the journal starting with the item at the given
387    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
388    /// (section, offset, size, item).
389    ///
390    /// # Repair
391    ///
392    /// Like
393    /// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
394    /// and
395    /// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
396    /// the first invalid data read will be considered the new end of the journal (and the
397    /// underlying [Blob] will be truncated to the last valid item).
398    pub async fn replay(
399        &self,
400        start_section: u64,
401        mut offset: u32,
402        buffer: NonZeroUsize,
403    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404        // Collect all blobs to replay
405        let codec_config = self.cfg.codec_config.clone();
406        let compressed = self.cfg.compression.is_some();
407        let mut blobs = Vec::with_capacity(self.blobs.len());
408        for (section, blob) in self.blobs.range(start_section..) {
409            let blob_size = blob.size().await;
410            let max_offset = compute_next_offset(blob_size)?;
411            blobs.push((
412                *section,
413                blob.clone(),
414                max_offset,
415                blob_size,
416                codec_config.clone(),
417                compressed,
418            ));
419        }
420
421        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
422        // memory with buffered data)
423        Ok(stream::iter(blobs).flat_map(
424            move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425                // Created buffered reader
426                let mut reader = Read::new(blob, blob_size, buffer);
427                if section == start_section && offset != 0 {
428                    if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429                        warn!(section, offset, ?err, "failed to seek to offset");
430                        // Return early with the error to terminate the entire stream
431                        return stream::once(async move { Err(err.into()) }).left_stream();
432                    }
433                } else {
434                    offset = 0;
435                }
436
437                // Read over the blob
438                stream::unfold(
439                        (section, reader, offset, 0u64, codec_config, compressed),
440                        move |(
441                            section,
442                            mut reader,
443                            offset,
444                            valid_size,
445                            codec_config,
446                            compressed,
447                        )| async move {
448                            // Check if we are at the end of the blob
449                            if offset >= max_offset {
450                                return None;
451                            }
452
453                            // Read an item from the buffer
454                            match Self::read_buffered(
455                                &mut reader,
456                                offset,
457                                &codec_config,
458                                compressed,
459                            )
460                            .await
461                            {
462                                Ok((next_offset, next_valid_size, size, item)) => {
463                                    trace!(blob = section, cursor = offset, "replayed item");
464                                    Some((
465                                        Ok((section, offset, size, item)),
466                                        (
467                                            section,
468                                            reader,
469                                            next_offset,
470                                            next_valid_size,
471                                            codec_config,
472                                            compressed,
473                                        ),
474                                    ))
475                                }
476                                Err(Error::ChecksumMismatch(expected, found)) => {
477                                    // If we encounter corruption, we prune to the last valid item. This
478                                    // can happen during an unclean file close (where pending data is not
479                                    // fully synced to disk).
480                                    warn!(
481                                        blob = section,
482                                        bad_offset = offset,
483                                        new_size = valid_size,
484                                        expected,
485                                        found,
486                                        "corruption detected: truncating"
487                                    );
488                                    reader.resize(valid_size).await.ok()?;
489                                    None
490                                }
491                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492                                    // If we encounter trailing bytes, we prune to the last
493                                    // valid item. This can happen during an unclean file close (where
494                                    // pending data is not fully synced to disk).
495                                    warn!(
496                                        blob = section,
497                                        bad_offset = offset,
498                                        new_size = valid_size,
499                                        "trailing bytes detected: truncating"
500                                    );
501                                    reader.resize(valid_size).await.ok()?;
502                                    None
503                                }
504                                Err(err) => {
505                                    // If we encounter an unexpected error, return it without attempting
506                                    // to fix anything.
507                                    warn!(
508                                        blob = section,
509                                        cursor = offset,
510                                        ?err,
511                                        "unexpected error"
512                                    );
513                                    Some((
514                                        Err(err),
515                                        (
516                                            section,
517                                            reader,
518                                            offset,
519                                            valid_size,
520                                            codec_config,
521                                            compressed,
522                                        ),
523                                    ))
524                                }
525                            }
526                        },
527                    ).right_stream()
528            },
529        ))
530    }
531
532    /// Appends an item to `Journal` in a given `section`, returning the offset
533    /// where the item was written and the size of the item (which may now be smaller
534    /// than the encoded size from the codec, if compression is enabled).
535    ///
536    /// # Warning
537    ///
538    /// If there exist trailing bytes in the `Blob` of a particular `section` and
539    /// `replay` is not called before this, it is likely that subsequent data added
540    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
541    /// the checksum verification). It is recommended to call `replay` before calling
542    /// `append` to prevent this.
543    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544        // Check last pruned
545        self.prune_guard(section)?;
546
547        // Create item
548        let encoded = item.encode();
549        let encoded = if let Some(compression) = self.cfg.compression {
550            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551        } else {
552            encoded.into()
553        };
554
555        // Ensure item is not too large
556        let item_len = encoded.len();
557        let entry_len = 4 + item_len + 4;
558        let item_len = match item_len.try_into() {
559            Ok(len) => len,
560            Err(_) => return Err(Error::ItemTooLarge(item_len)),
561        };
562
563        // Get existing blob or create new one
564        let blob = match self.blobs.entry(section) {
565            Entry::Occupied(entry) => entry.into_mut(),
566            Entry::Vacant(entry) => {
567                let name = section.to_be_bytes();
568                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569                let blob = Append::new(
570                    blob,
571                    size,
572                    self.cfg.write_buffer,
573                    self.cfg.buffer_pool.clone(),
574                )
575                .await?;
576                self.tracked.inc();
577                entry.insert(blob)
578            }
579        };
580
581        // Calculate alignment
582        let cursor = blob.size().await;
583        let offset = compute_next_offset(cursor)?;
584        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585        let padding = (aligned_cursor - cursor) as usize;
586
587        // Populate buffer
588        let mut buf = Vec::with_capacity(padding + entry_len);
589
590        // Add padding bytes if necessary
591        if padding > 0 {
592            buf.resize(padding, 0);
593        }
594
595        // Add entry data
596        let entry_start = buf.len();
597        buf.put_u32(item_len);
598        buf.put_slice(&encoded);
599
600        // Calculate checksum only for the entry data (without padding)
601        let checksum = crc32fast::hash(&buf[entry_start..]);
602        buf.put_u32(checksum);
603        assert_eq!(buf[entry_start..].len(), entry_len);
604
605        // Append item to blob
606        blob.append(buf).await?;
607        trace!(blob = section, offset, "appended item");
608        Ok((offset, item_len))
609    }
610
611    /// Retrieves an item from `Journal` at a given `section` and `offset`.
612    ///
613    /// # Errors
614    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
615    ///    current execution.
616    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
617    ///    data appended to it, or has been pruned in a previous execution).
618    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
619    ///    previously appended item) will result in an error, with the specific type being
620    ///    undefined.
621    pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
622        self.prune_guard(section)?;
623        let blob = match self.blobs.get(&section) {
624            Some(blob) => blob,
625            None => return Err(Error::SectionOutOfRange(section)),
626        };
627
628        // Perform a multi-op read.
629        let (_, _, item) = Self::read(
630            self.cfg.compression.is_some(),
631            &self.cfg.codec_config,
632            blob,
633            offset,
634        )
635        .await?;
636        Ok(item)
637    }
638
639    /// Retrieves an item from `Journal` at a given `section` and `offset` with a given size.
640    pub async fn get_exact(&self, section: u64, offset: u32, size: u32) -> Result<V, Error> {
641        self.prune_guard(section)?;
642        let blob = match self.blobs.get(&section) {
643            Some(blob) => blob,
644            None => return Err(Error::SectionOutOfRange(section)),
645        };
646
647        // Perform a multi-op read.
648        let item = Self::read_exact(
649            self.cfg.compression.is_some(),
650            &self.cfg.codec_config,
651            blob,
652            offset,
653            size,
654        )
655        .await?;
656        Ok(item)
657    }
658
659    /// Gets the size of the journal for a specific section.
660    ///
661    /// Returns 0 if the section does not exist.
662    pub async fn size(&self, section: u64) -> Result<u64, Error> {
663        self.prune_guard(section)?;
664        match self.blobs.get(&section) {
665            Some(blob) => Ok(blob.size().await),
666            None => Ok(0),
667        }
668    }
669
670    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
671    ///
672    /// # Warnings
673    ///
674    /// * This operation is not guaranteed to survive restarts until sync is called.
675    /// * This operation is not atomic, but it will always leave the journal in a consistent state
676    ///   in the event of failure since blobs are always removed in reverse order of section.
677    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
678        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
679    }
680
681    /// Rewinds the journal to the given `section` and `size`.
682    ///
683    /// This removes any data beyond the specified `section` and `size`.
684    ///
685    /// # Warnings
686    ///
687    /// * This operation is not guaranteed to survive restarts until sync is called.
688    /// * This operation is not atomic, but it will always leave the journal in a consistent state
689    ///   in the event of failure since blobs are always removed in reverse order of section.
690    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
691        self.prune_guard(section)?;
692
693        // Remove any sections beyond the given section
694        let trailing: Vec<u64> = self
695            .blobs
696            .range((
697                std::ops::Bound::Excluded(section),
698                std::ops::Bound::Unbounded,
699            ))
700            .map(|(&section, _)| section)
701            .collect();
702        for index in trailing.iter().rev() {
703            // Remove the underlying blob from storage.
704            let blob = self.blobs.remove(index).unwrap();
705
706            // Destroy the blob
707            drop(blob);
708            self.context
709                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
710                .await?;
711            debug!(section = index, "removed section");
712            self.tracked.dec();
713        }
714
715        // If the section exists, truncate it to the given offset
716        let blob = match self.blobs.get_mut(&section) {
717            Some(blob) => blob,
718            None => return Ok(()),
719        };
720        let current = blob.size().await;
721        if size >= current {
722            return Ok(()); // Already smaller than or equal to target size
723        }
724        blob.resize(size).await?;
725        debug!(
726            section,
727            from = current,
728            to = size,
729            ?trailing,
730            "rewound journal"
731        );
732        Ok(())
733    }
734
735    /// Rewinds the `section` to the given `size`.
736    ///
737    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
738    ///
739    /// # Warning
740    ///
741    /// This operation is not guaranteed to survive restarts until sync is called.
742    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
743        self.prune_guard(section)?;
744
745        // Get the blob at the given section
746        let blob = match self.blobs.get_mut(&section) {
747            Some(blob) => blob,
748            None => return Ok(()),
749        };
750
751        // Truncate the blob to the given size
752        let current = blob.size().await;
753        if size >= current {
754            return Ok(()); // Already smaller than or equal to target size
755        }
756        blob.resize(size).await?;
757        debug!(section, from = current, to = size, "rewound section");
758        Ok(())
759    }
760
761    /// Ensures that all data in a given `section` is synced to the underlying store.
762    ///
763    /// If the `section` does not exist, no error will be returned.
764    pub async fn sync(&self, section: u64) -> Result<(), Error> {
765        self.prune_guard(section)?;
766        let blob = match self.blobs.get(&section) {
767            Some(blob) => blob,
768            None => return Ok(()),
769        };
770        self.synced.inc();
771        blob.sync().await.map_err(Error::Runtime)
772    }
773
774    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
775    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
776        // Prune any blobs that are smaller than the minimum
777        let mut pruned = false;
778        while let Some((&section, _)) = self.blobs.first_key_value() {
779            // Stop pruning if we reach the minimum
780            if section >= min {
781                break;
782            }
783
784            // Remove blob from journal
785            let blob = self.blobs.remove(&section).unwrap();
786            let size = blob.size().await;
787            drop(blob);
788
789            // Remove blob from storage
790            self.context
791                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
792                .await?;
793            pruned = true;
794
795            debug!(blob = section, size, "pruned blob");
796            self.tracked.dec();
797            self.pruned.inc();
798        }
799
800        if pruned {
801            self.oldest_retained_section = min;
802        }
803
804        Ok(pruned)
805    }
806
807    /// Syncs and closes all open sections.
808    pub async fn close(self) -> Result<(), Error> {
809        for (section, blob) in self.blobs.into_iter() {
810            let size = blob.size().await;
811            blob.sync().await?;
812            debug!(blob = section, size, "synced blob");
813        }
814        Ok(())
815    }
816
817    /// Returns the number of the oldest section in the journal.
818    pub fn oldest_section(&self) -> Option<u64> {
819        self.blobs.first_key_value().map(|(section, _)| *section)
820    }
821
822    /// Removes any underlying blobs created by the journal.
823    pub async fn destroy(self) -> Result<(), Error> {
824        for (i, blob) in self.blobs.into_iter() {
825            let size = blob.size().await;
826            drop(blob);
827            debug!(blob = i, size, "destroyed blob");
828            self.context
829                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
830                .await?;
831        }
832        match self.context.remove(&self.cfg.partition, None).await {
833            Ok(()) => {}
834            Err(RError::PartitionMissing(_)) => {
835                // Partition already removed or never existed.
836            }
837            Err(err) => return Err(Error::Runtime(err)),
838        }
839        Ok(())
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use bytes::BufMut;
847    use commonware_cryptography::{Hasher, Sha256};
848    use commonware_macros::test_traced;
849    use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
850    use commonware_utils::{NZUsize, StableBuf};
851    use futures::{pin_mut, StreamExt};
852    use prometheus_client::registry::Metric;
853
854    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
855    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
856
857    #[test_traced]
858    fn test_journal_append_and_read() {
859        // Initialize the deterministic context
860        let executor = deterministic::Runner::default();
861
862        // Start the test within the executor
863        executor.start(|context| async move {
864            // Initialize the journal
865            let cfg = Config {
866                partition: "test_partition".into(),
867                compression: None,
868                codec_config: (),
869                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870                write_buffer: NZUsize!(1024),
871            };
872            let index = 1u64;
873            let data = 10;
874            let mut journal = Journal::init(context.clone(), cfg.clone())
875                .await
876                .expect("Failed to initialize journal");
877
878            // Append an item to the journal
879            journal
880                .append(index, data)
881                .await
882                .expect("Failed to append data");
883
884            // Check metrics
885            let buffer = context.encode();
886            assert!(buffer.contains("tracked 1"));
887
888            // Close the journal
889            journal.close().await.expect("Failed to close journal");
890
891            // Re-initialize the journal to simulate a restart
892            let cfg = Config {
893                partition: "test_partition".into(),
894                compression: None,
895                codec_config: (),
896                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
897                write_buffer: NZUsize!(1024),
898            };
899            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
900                .await
901                .expect("Failed to re-initialize journal");
902
903            // Replay the journal and collect items
904            let mut items = Vec::new();
905            let stream = journal
906                .replay(0, 0, NZUsize!(1024))
907                .await
908                .expect("unable to setup replay");
909            pin_mut!(stream);
910            while let Some(result) = stream.next().await {
911                match result {
912                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
913                    Err(err) => panic!("Failed to read item: {err}"),
914                }
915            }
916
917            // Verify that the item was replayed correctly
918            assert_eq!(items.len(), 1);
919            assert_eq!(items[0].0, index);
920            assert_eq!(items[0].1, data);
921
922            // Check metrics
923            let buffer = context.encode();
924            assert!(buffer.contains("tracked 1"));
925        });
926    }
927
928    #[test_traced]
929    fn test_journal_multiple_appends_and_reads() {
930        // Initialize the deterministic context
931        let executor = deterministic::Runner::default();
932
933        // Start the test within the executor
934        executor.start(|context| async move {
935            // Create a journal configuration
936            let cfg = Config {
937                partition: "test_partition".into(),
938                compression: None,
939                codec_config: (),
940                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
941                write_buffer: NZUsize!(1024),
942            };
943
944            // Initialize the journal
945            let mut journal = Journal::init(context.clone(), cfg.clone())
946                .await
947                .expect("Failed to initialize journal");
948
949            // Append multiple items to different blobs
950            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
951            for (index, data) in &data_items {
952                journal
953                    .append(*index, *data)
954                    .await
955                    .expect("Failed to append data");
956                journal.sync(*index).await.expect("Failed to sync blob");
957            }
958
959            // Check metrics
960            let buffer = context.encode();
961            assert!(buffer.contains("tracked 3"));
962            assert!(buffer.contains("synced_total 4"));
963
964            // Close the journal
965            journal.close().await.expect("Failed to close journal");
966
967            // Re-initialize the journal to simulate a restart
968            let journal = Journal::init(context, cfg)
969                .await
970                .expect("Failed to re-initialize journal");
971
972            // Replay the journal and collect items
973            let mut items = Vec::<(u64, u32)>::new();
974            {
975                let stream = journal
976                    .replay(0, 0, NZUsize!(1024))
977                    .await
978                    .expect("unable to setup replay");
979                pin_mut!(stream);
980                while let Some(result) = stream.next().await {
981                    match result {
982                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
983                        Err(err) => panic!("Failed to read item: {err}"),
984                    }
985                }
986            }
987
988            // Verify that all items were replayed correctly
989            assert_eq!(items.len(), data_items.len());
990            for ((expected_index, expected_data), (actual_index, actual_data)) in
991                data_items.iter().zip(items.iter())
992            {
993                assert_eq!(actual_index, expected_index);
994                assert_eq!(actual_data, expected_data);
995            }
996
997            // Cleanup
998            journal.destroy().await.expect("Failed to destroy journal");
999        });
1000    }
1001
1002    #[test_traced]
1003    fn test_journal_prune_blobs() {
1004        // Initialize the deterministic context
1005        let executor = deterministic::Runner::default();
1006
1007        // Start the test within the executor
1008        executor.start(|context| async move {
1009            // Create a journal configuration
1010            let cfg = Config {
1011                partition: "test_partition".into(),
1012                compression: None,
1013                codec_config: (),
1014                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1015                write_buffer: NZUsize!(1024),
1016            };
1017
1018            // Initialize the journal
1019            let mut journal = Journal::init(context.clone(), cfg.clone())
1020                .await
1021                .expect("Failed to initialize journal");
1022
1023            // Append items to multiple blobs
1024            for index in 1u64..=5u64 {
1025                journal
1026                    .append(index, index)
1027                    .await
1028                    .expect("Failed to append data");
1029                journal.sync(index).await.expect("Failed to sync blob");
1030            }
1031
1032            // Add one item out-of-order
1033            let data = 99;
1034            journal
1035                .append(2u64, data)
1036                .await
1037                .expect("Failed to append data");
1038            journal.sync(2u64).await.expect("Failed to sync blob");
1039
1040            // Prune blobs with indices less than 3
1041            journal.prune(3).await.expect("Failed to prune blobs");
1042
1043            // Check metrics
1044            let buffer = context.encode();
1045            assert!(buffer.contains("pruned_total 2"));
1046
1047            // Prune again with a section less than the previous one, should be a no-op
1048            journal.prune(2).await.expect("Failed to no-op prune");
1049            let buffer = context.encode();
1050            assert!(buffer.contains("pruned_total 2"));
1051
1052            // Close the journal
1053            journal.close().await.expect("Failed to close journal");
1054
1055            // Re-initialize the journal to simulate a restart
1056            let mut journal = Journal::init(context.clone(), cfg.clone())
1057                .await
1058                .expect("Failed to re-initialize journal");
1059
1060            // Replay the journal and collect items
1061            let mut items = Vec::<(u64, u64)>::new();
1062            {
1063                let stream = journal
1064                    .replay(0, 0, NZUsize!(1024))
1065                    .await
1066                    .expect("unable to setup replay");
1067                pin_mut!(stream);
1068                while let Some(result) = stream.next().await {
1069                    match result {
1070                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1071                        Err(err) => panic!("Failed to read item: {err}"),
1072                    }
1073                }
1074            }
1075
1076            // Verify that items from blobs 1 and 2 are not present
1077            assert_eq!(items.len(), 3);
1078            let expected_indices = [3u64, 4u64, 5u64];
1079            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1080                assert_eq!(item.0, *expected_index);
1081            }
1082
1083            // Prune all blobs
1084            journal.prune(6).await.expect("Failed to prune blobs");
1085
1086            // Close the journal
1087            journal.close().await.expect("Failed to close journal");
1088
1089            // Ensure no remaining blobs exist
1090            //
1091            // Note: We don't remove the partition, so this does not error
1092            // and instead returns an empty list of blobs.
1093            assert!(context
1094                .scan(&cfg.partition)
1095                .await
1096                .expect("Failed to list blobs")
1097                .is_empty());
1098        });
1099    }
1100
1101    #[test_traced]
1102    fn test_journal_prune_guard() {
1103        let executor = deterministic::Runner::default();
1104
1105        executor.start(|context| async move {
1106            let cfg = Config {
1107                partition: "test_partition".into(),
1108                compression: None,
1109                codec_config: (),
1110                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1111                write_buffer: NZUsize!(1024),
1112            };
1113
1114            let mut journal = Journal::init(context.clone(), cfg.clone())
1115                .await
1116                .expect("Failed to initialize journal");
1117
1118            // Append items to sections 1-5
1119            for section in 1u64..=5u64 {
1120                journal
1121                    .append(section, section as i32)
1122                    .await
1123                    .expect("Failed to append data");
1124                journal.sync(section).await.expect("Failed to sync");
1125            }
1126
1127            // Verify initial oldest_retained_section is 0
1128            assert_eq!(journal.oldest_retained_section, 0);
1129
1130            // Prune sections < 3
1131            journal.prune(3).await.expect("Failed to prune");
1132
1133            // Verify oldest_retained_section is updated
1134            assert_eq!(journal.oldest_retained_section, 3);
1135
1136            // Test that accessing pruned sections returns the correct error
1137
1138            // Test append on pruned section
1139            match journal.append(1, 100).await {
1140                Err(Error::AlreadyPrunedToSection(3)) => {}
1141                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1142            }
1143
1144            match journal.append(2, 100).await {
1145                Err(Error::AlreadyPrunedToSection(3)) => {}
1146                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1147            }
1148
1149            // Test get on pruned section
1150            match journal.get(1, 0).await {
1151                Err(Error::AlreadyPrunedToSection(3)) => {}
1152                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1153            }
1154
1155            // Test get_exact on pruned section
1156            match journal.get_exact(2, 0, 12).await {
1157                Err(Error::AlreadyPrunedToSection(3)) => {}
1158                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1159            }
1160
1161            // Test size on pruned section
1162            match journal.size(1).await {
1163                Err(Error::AlreadyPrunedToSection(3)) => {}
1164                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1165            }
1166
1167            // Test rewind on pruned section
1168            match journal.rewind(2, 0).await {
1169                Err(Error::AlreadyPrunedToSection(3)) => {}
1170                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1171            }
1172
1173            // Test rewind_section on pruned section
1174            match journal.rewind_section(1, 0).await {
1175                Err(Error::AlreadyPrunedToSection(3)) => {}
1176                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1177            }
1178
1179            // Test sync on pruned section
1180            match journal.sync(2).await {
1181                Err(Error::AlreadyPrunedToSection(3)) => {}
1182                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1183            }
1184
1185            // Test that accessing sections at or after the threshold works
1186            assert!(journal.get(3, 0).await.is_ok());
1187            assert!(journal.get(4, 0).await.is_ok());
1188            assert!(journal.get(5, 0).await.is_ok());
1189            assert!(journal.size(3).await.is_ok());
1190            assert!(journal.sync(4).await.is_ok());
1191
1192            // Append to section at threshold should work
1193            journal
1194                .append(3, 999)
1195                .await
1196                .expect("Should be able to append to section 3");
1197
1198            // Prune more sections
1199            journal.prune(5).await.expect("Failed to prune");
1200            assert_eq!(journal.oldest_retained_section, 5);
1201
1202            // Verify sections 3 and 4 are now pruned
1203            match journal.get(3, 0).await {
1204                Err(Error::AlreadyPrunedToSection(5)) => {}
1205                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1206            }
1207
1208            match journal.get(4, 0).await {
1209                Err(Error::AlreadyPrunedToSection(5)) => {}
1210                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1211            }
1212
1213            // Section 5 should still be accessible
1214            assert!(journal.get(5, 0).await.is_ok());
1215
1216            journal.close().await.expect("Failed to close journal");
1217        });
1218    }
1219
1220    #[test_traced]
1221    fn test_journal_prune_guard_across_restart() {
1222        let executor = deterministic::Runner::default();
1223
1224        executor.start(|context| async move {
1225            let cfg = Config {
1226                partition: "test_partition".into(),
1227                compression: None,
1228                codec_config: (),
1229                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1230                write_buffer: NZUsize!(1024),
1231            };
1232
1233            // First session: create and prune
1234            {
1235                let mut journal = Journal::init(context.clone(), cfg.clone())
1236                    .await
1237                    .expect("Failed to initialize journal");
1238
1239                for section in 1u64..=5u64 {
1240                    journal
1241                        .append(section, section as i32)
1242                        .await
1243                        .expect("Failed to append data");
1244                    journal.sync(section).await.expect("Failed to sync");
1245                }
1246
1247                journal.prune(3).await.expect("Failed to prune");
1248                assert_eq!(journal.oldest_retained_section, 3);
1249
1250                journal.close().await.expect("Failed to close journal");
1251            }
1252
1253            // Second session: verify oldest_retained_section is reset
1254            {
1255                let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1256                    .await
1257                    .expect("Failed to re-initialize journal");
1258
1259                // After restart, oldest_retained_section should be back to 0
1260                // since it's not persisted
1261                assert_eq!(journal.oldest_retained_section, 0);
1262
1263                // But the actual sections 1 and 2 should be gone from storage
1264                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1265                match journal.get(1, 0).await {
1266                    Err(Error::SectionOutOfRange(1)) => {}
1267                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1268                }
1269
1270                match journal.get(2, 0).await {
1271                    Err(Error::SectionOutOfRange(2)) => {}
1272                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1273                }
1274
1275                // Sections 3-5 should still be accessible
1276                assert!(journal.get(3, 0).await.is_ok());
1277                assert!(journal.get(4, 0).await.is_ok());
1278                assert!(journal.get(5, 0).await.is_ok());
1279
1280                journal.close().await.expect("Failed to close journal");
1281            }
1282        });
1283    }
1284
1285    #[test_traced]
1286    fn test_journal_with_invalid_blob_name() {
1287        // Initialize the deterministic context
1288        let executor = deterministic::Runner::default();
1289
1290        // Start the test within the executor
1291        executor.start(|context| async move {
1292            // Create a journal configuration
1293            let cfg = Config {
1294                partition: "test_partition".into(),
1295                compression: None,
1296                codec_config: (),
1297                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1298                write_buffer: NZUsize!(1024),
1299            };
1300
1301            // Manually create a blob with an invalid name (not 8 bytes)
1302            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1303            let (blob, _) = context
1304                .open(&cfg.partition, invalid_blob_name)
1305                .await
1306                .expect("Failed to create blob with invalid name");
1307            blob.sync().await.expect("Failed to sync blob");
1308
1309            // Attempt to initialize the journal
1310            let result = Journal::<_, u64>::init(context, cfg).await;
1311
1312            // Expect an error
1313            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1314        });
1315    }
1316
1317    #[test_traced]
1318    fn test_journal_read_size_missing() {
1319        // Initialize the deterministic context
1320        let executor = deterministic::Runner::default();
1321
1322        // Start the test within the executor
1323        executor.start(|context| async move {
1324            // Create a journal configuration
1325            let cfg = Config {
1326                partition: "test_partition".into(),
1327                compression: None,
1328                codec_config: (),
1329                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1330                write_buffer: NZUsize!(1024),
1331            };
1332
1333            // Manually create a blob with incomplete size data
1334            let section = 1u64;
1335            let blob_name = section.to_be_bytes();
1336            let (blob, _) = context
1337                .open(&cfg.partition, &blob_name)
1338                .await
1339                .expect("Failed to create blob");
1340
1341            // Write incomplete size data (less than 4 bytes)
1342            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
1343            blob.write_at(incomplete_data, 0)
1344                .await
1345                .expect("Failed to write incomplete data");
1346            blob.sync().await.expect("Failed to sync blob");
1347
1348            // Initialize the journal
1349            let journal = Journal::init(context, cfg)
1350                .await
1351                .expect("Failed to initialize journal");
1352
1353            // Attempt to replay the journal
1354            let stream = journal
1355                .replay(0, 0, NZUsize!(1024))
1356                .await
1357                .expect("unable to setup replay");
1358            pin_mut!(stream);
1359            let mut items = Vec::<(u64, u64)>::new();
1360            while let Some(result) = stream.next().await {
1361                match result {
1362                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1363                    Err(err) => panic!("Failed to read item: {err}"),
1364                }
1365            }
1366            assert!(items.is_empty());
1367        });
1368    }
1369
1370    #[test_traced]
1371    fn test_journal_read_item_missing() {
1372        // Initialize the deterministic context
1373        let executor = deterministic::Runner::default();
1374
1375        // Start the test within the executor
1376        executor.start(|context| async move {
1377            // Create a journal configuration
1378            let cfg = Config {
1379                partition: "test_partition".into(),
1380                compression: None,
1381                codec_config: (),
1382                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1383                write_buffer: NZUsize!(1024),
1384            };
1385
1386            // Manually create a blob with missing item data
1387            let section = 1u64;
1388            let blob_name = section.to_be_bytes();
1389            let (blob, _) = context
1390                .open(&cfg.partition, &blob_name)
1391                .await
1392                .expect("Failed to create blob");
1393
1394            // Write size but no item data
1395            let item_size: u32 = 10; // Size of the item
1396            let mut buf = Vec::new();
1397            buf.put_u32(item_size);
1398            let data = [2u8; 5];
1399            BufMut::put_slice(&mut buf, &data);
1400            blob.write_at(buf, 0)
1401                .await
1402                .expect("Failed to write item size");
1403            blob.sync().await.expect("Failed to sync blob");
1404
1405            // Initialize the journal
1406            let journal = Journal::init(context, cfg)
1407                .await
1408                .expect("Failed to initialize journal");
1409
1410            // Attempt to replay the journal
1411            let stream = journal
1412                .replay(0, 0, NZUsize!(1024))
1413                .await
1414                .expect("unable to setup replay");
1415            pin_mut!(stream);
1416            let mut items = Vec::<(u64, u64)>::new();
1417            while let Some(result) = stream.next().await {
1418                match result {
1419                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1420                    Err(err) => panic!("Failed to read item: {err}"),
1421                }
1422            }
1423            assert!(items.is_empty());
1424        });
1425    }
1426
1427    #[test_traced]
1428    fn test_journal_read_checksum_missing() {
1429        // Initialize the deterministic context
1430        let executor = deterministic::Runner::default();
1431
1432        // Start the test within the executor
1433        executor.start(|context| async move {
1434            // Create a journal configuration
1435            let cfg = Config {
1436                partition: "test_partition".into(),
1437                compression: None,
1438                codec_config: (),
1439                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1440                write_buffer: NZUsize!(1024),
1441            };
1442
1443            // Manually create a blob with missing checksum
1444            let section = 1u64;
1445            let blob_name = section.to_be_bytes();
1446            let (blob, _) = context
1447                .open(&cfg.partition, &blob_name)
1448                .await
1449                .expect("Failed to create blob");
1450
1451            // Prepare item data
1452            let item_data = b"Test data";
1453            let item_size = item_data.len() as u32;
1454
1455            // Write size
1456            let mut offset = 0;
1457            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1458                .await
1459                .expect("Failed to write item size");
1460            offset += 4;
1461
1462            // Write item data
1463            blob.write_at(item_data.to_vec(), offset)
1464                .await
1465                .expect("Failed to write item data");
1466            // Do not write checksum (omit it)
1467
1468            blob.sync().await.expect("Failed to sync blob");
1469
1470            // Initialize the journal
1471            let journal = Journal::init(context, cfg)
1472                .await
1473                .expect("Failed to initialize journal");
1474
1475            // Attempt to replay the journal
1476            //
1477            // This will truncate the leftover bytes from our manual write.
1478            let stream = journal
1479                .replay(0, 0, NZUsize!(1024))
1480                .await
1481                .expect("unable to setup replay");
1482            pin_mut!(stream);
1483            let mut items = Vec::<(u64, u64)>::new();
1484            while let Some(result) = stream.next().await {
1485                match result {
1486                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1487                    Err(err) => panic!("Failed to read item: {err}"),
1488                }
1489            }
1490            assert!(items.is_empty());
1491        });
1492    }
1493
1494    #[test_traced]
1495    fn test_journal_read_checksum_mismatch() {
1496        // Initialize the deterministic context
1497        let executor = deterministic::Runner::default();
1498
1499        // Start the test within the executor
1500        executor.start(|context| async move {
1501            // Create a journal configuration
1502            let cfg = Config {
1503                partition: "test_partition".into(),
1504                compression: None,
1505                codec_config: (),
1506                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1507                write_buffer: NZUsize!(1024),
1508            };
1509
1510            // Manually create a blob with incorrect checksum
1511            let section = 1u64;
1512            let blob_name = section.to_be_bytes();
1513            let (blob, _) = context
1514                .open(&cfg.partition, &blob_name)
1515                .await
1516                .expect("Failed to create blob");
1517
1518            // Prepare item data
1519            let item_data = b"Test data";
1520            let item_size = item_data.len() as u32;
1521            let incorrect_checksum: u32 = 0xDEADBEEF;
1522
1523            // Write size
1524            let mut offset = 0;
1525            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1526                .await
1527                .expect("Failed to write item size");
1528            offset += 4;
1529
1530            // Write item data
1531            blob.write_at(item_data.to_vec(), offset)
1532                .await
1533                .expect("Failed to write item data");
1534            offset += item_data.len() as u64;
1535
1536            // Write incorrect checksum
1537            blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1538                .await
1539                .expect("Failed to write incorrect checksum");
1540
1541            blob.sync().await.expect("Failed to sync blob");
1542
1543            // Initialize the journal
1544            let journal = Journal::init(context.clone(), cfg.clone())
1545                .await
1546                .expect("Failed to initialize journal");
1547
1548            // Attempt to replay the journal
1549            {
1550                let stream = journal
1551                    .replay(0, 0, NZUsize!(1024))
1552                    .await
1553                    .expect("unable to setup replay");
1554                pin_mut!(stream);
1555                let mut items = Vec::<(u64, u64)>::new();
1556                while let Some(result) = stream.next().await {
1557                    match result {
1558                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1559                        Err(err) => panic!("Failed to read item: {err}"),
1560                    }
1561                }
1562                assert!(items.is_empty());
1563            }
1564            journal.close().await.expect("Failed to close journal");
1565
1566            // Confirm blob is expected length
1567            let (_, blob_size) = context
1568                .open(&cfg.partition, &section.to_be_bytes())
1569                .await
1570                .expect("Failed to open blob");
1571            assert_eq!(blob_size, 0);
1572        });
1573    }
1574
1575    #[test_traced]
1576    fn test_journal_handling_unaligned_truncated_data() {
1577        // Initialize the deterministic context
1578        let executor = deterministic::Runner::default();
1579
1580        // Start the test within the executor
1581        executor.start(|context| async move {
1582            // Create a journal configuration
1583            let cfg = Config {
1584                partition: "test_partition".into(),
1585                compression: None,
1586                codec_config: (),
1587                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1588                write_buffer: NZUsize!(1024),
1589            };
1590
1591            // Initialize the journal
1592            let mut journal = Journal::init(context.clone(), cfg.clone())
1593                .await
1594                .expect("Failed to initialize journal");
1595
1596            // Append 1 item to the first index
1597            journal.append(1, 1).await.expect("Failed to append data");
1598
1599            // Append multiple items to the second index (with unaligned values)
1600            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1601            for (index, data) in &data_items {
1602                journal
1603                    .append(*index, *data)
1604                    .await
1605                    .expect("Failed to append data");
1606                journal.sync(*index).await.expect("Failed to sync blob");
1607            }
1608
1609            // Close the journal
1610            journal.close().await.expect("Failed to close journal");
1611
1612            // Manually corrupt the end of the second blob
1613            let (blob, blob_size) = context
1614                .open(&cfg.partition, &2u64.to_be_bytes())
1615                .await
1616                .expect("Failed to open blob");
1617            blob.resize(blob_size - 4)
1618                .await
1619                .expect("Failed to corrupt blob");
1620            blob.sync().await.expect("Failed to sync blob");
1621
1622            // Re-initialize the journal to simulate a restart
1623            let journal = Journal::init(context.clone(), cfg.clone())
1624                .await
1625                .expect("Failed to re-initialize journal");
1626
1627            // Attempt to replay the journal
1628            let mut items = Vec::<(u64, u32)>::new();
1629            {
1630                let stream = journal
1631                    .replay(0, 0, NZUsize!(1024))
1632                    .await
1633                    .expect("unable to setup replay");
1634                pin_mut!(stream);
1635                while let Some(result) = stream.next().await {
1636                    match result {
1637                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1638                        Err(err) => panic!("Failed to read item: {err}"),
1639                    }
1640                }
1641            }
1642            journal.close().await.expect("Failed to close journal");
1643
1644            // Verify that only non-corrupted items were replayed
1645            assert_eq!(items.len(), 3);
1646            assert_eq!(items[0].0, 1);
1647            assert_eq!(items[0].1, 1);
1648            assert_eq!(items[1].0, data_items[0].0);
1649            assert_eq!(items[1].1, data_items[0].1);
1650            assert_eq!(items[2].0, data_items[1].0);
1651            assert_eq!(items[2].1, data_items[1].1);
1652
1653            // Confirm blob is expected length
1654            let (_, blob_size) = context
1655                .open(&cfg.partition, &2u64.to_be_bytes())
1656                .await
1657                .expect("Failed to open blob");
1658            assert_eq!(blob_size, 28);
1659
1660            // Attempt to replay journal after truncation
1661            let mut journal = Journal::init(context.clone(), cfg.clone())
1662                .await
1663                .expect("Failed to re-initialize journal");
1664
1665            // Attempt to replay the journal
1666            let mut items = Vec::<(u64, u32)>::new();
1667            {
1668                let stream = journal
1669                    .replay(0, 0, NZUsize!(1024))
1670                    .await
1671                    .expect("unable to setup replay");
1672                pin_mut!(stream);
1673                while let Some(result) = stream.next().await {
1674                    match result {
1675                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1676                        Err(err) => panic!("Failed to read item: {err}"),
1677                    }
1678                }
1679            }
1680
1681            // Verify that only non-corrupted items were replayed
1682            assert_eq!(items.len(), 3);
1683            assert_eq!(items[0].0, 1);
1684            assert_eq!(items[0].1, 1);
1685            assert_eq!(items[1].0, data_items[0].0);
1686            assert_eq!(items[1].1, data_items[0].1);
1687            assert_eq!(items[2].0, data_items[1].0);
1688            assert_eq!(items[2].1, data_items[1].1);
1689
1690            // Append a new item to truncated partition
1691            journal.append(2, 5).await.expect("Failed to append data");
1692            journal.sync(2).await.expect("Failed to sync blob");
1693
1694            // Get the new item
1695            let item = journal.get(2, 2).await.expect("Failed to get item");
1696            assert_eq!(item, 5);
1697
1698            // Close the journal
1699            journal.close().await.expect("Failed to close journal");
1700
1701            // Confirm blob is expected length
1702            let (_, blob_size) = context
1703                .open(&cfg.partition, &2u64.to_be_bytes())
1704                .await
1705                .expect("Failed to open blob");
1706            assert_eq!(blob_size, 44);
1707
1708            // Re-initialize the journal to simulate a restart
1709            let journal = Journal::init(context.clone(), cfg.clone())
1710                .await
1711                .expect("Failed to re-initialize journal");
1712
1713            // Attempt to replay the journal
1714            let mut items = Vec::<(u64, u32)>::new();
1715            {
1716                let stream = journal
1717                    .replay(0, 0, NZUsize!(1024))
1718                    .await
1719                    .expect("unable to setup replay");
1720                pin_mut!(stream);
1721                while let Some(result) = stream.next().await {
1722                    match result {
1723                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1724                        Err(err) => panic!("Failed to read item: {err}"),
1725                    }
1726                }
1727            }
1728
1729            // Verify that only non-corrupted items were replayed
1730            assert_eq!(items.len(), 4);
1731            assert_eq!(items[0].0, 1);
1732            assert_eq!(items[0].1, 1);
1733            assert_eq!(items[1].0, data_items[0].0);
1734            assert_eq!(items[1].1, data_items[0].1);
1735            assert_eq!(items[2].0, data_items[1].0);
1736            assert_eq!(items[2].1, data_items[1].1);
1737            assert_eq!(items[3].0, 2);
1738            assert_eq!(items[3].1, 5);
1739        });
1740    }
1741
1742    #[test_traced]
1743    fn test_journal_handling_aligned_truncated_data() {
1744        // Initialize the deterministic context
1745        let executor = deterministic::Runner::default();
1746
1747        // Start the test within the executor
1748        executor.start(|context| async move {
1749            // Create a journal configuration
1750            let cfg = Config {
1751                partition: "test_partition".into(),
1752                compression: None,
1753                codec_config: (),
1754                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1755                write_buffer: NZUsize!(1024),
1756            };
1757
1758            // Initialize the journal
1759            let mut journal = Journal::init(context.clone(), cfg.clone())
1760                .await
1761                .expect("Failed to initialize journal");
1762
1763            // Append 1 item to the first index
1764            journal.append(1, 1).await.expect("Failed to append data");
1765
1766            // Append multiple items to the second index (with unaligned values)
1767            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1768            for (index, data) in &data_items {
1769                journal
1770                    .append(*index, *data)
1771                    .await
1772                    .expect("Failed to append data");
1773                journal.sync(*index).await.expect("Failed to sync blob");
1774            }
1775
1776            // Close the journal
1777            journal.close().await.expect("Failed to close journal");
1778
1779            // Manually corrupt the end of the second blob
1780            let (blob, blob_size) = context
1781                .open(&cfg.partition, &2u64.to_be_bytes())
1782                .await
1783                .expect("Failed to open blob");
1784            blob.resize(blob_size - 4)
1785                .await
1786                .expect("Failed to corrupt blob");
1787            blob.sync().await.expect("Failed to sync blob");
1788
1789            // Re-initialize the journal to simulate a restart
1790            let mut journal = Journal::init(context.clone(), cfg.clone())
1791                .await
1792                .expect("Failed to re-initialize journal");
1793
1794            // Attempt to replay the journal
1795            let mut items = Vec::<(u64, u64)>::new();
1796            {
1797                let stream = journal
1798                    .replay(0, 0, NZUsize!(1024))
1799                    .await
1800                    .expect("unable to setup replay");
1801                pin_mut!(stream);
1802                while let Some(result) = stream.next().await {
1803                    match result {
1804                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1805                        Err(err) => panic!("Failed to read item: {err}"),
1806                    }
1807                }
1808            }
1809
1810            // Verify that only non-corrupted items were replayed
1811            assert_eq!(items.len(), 3);
1812            assert_eq!(items[0].0, 1);
1813            assert_eq!(items[0].1, 1);
1814            assert_eq!(items[1].0, data_items[0].0);
1815            assert_eq!(items[1].1, data_items[0].1);
1816            assert_eq!(items[2].0, data_items[1].0);
1817            assert_eq!(items[2].1, data_items[1].1);
1818
1819            // Append a new item to the truncated partition
1820            journal.append(2, 5).await.expect("Failed to append data");
1821            journal.sync(2).await.expect("Failed to sync blob");
1822
1823            // Get the new item
1824            let item = journal.get(2, 2).await.expect("Failed to get item");
1825            assert_eq!(item, 5);
1826
1827            // Close the journal
1828            journal.close().await.expect("Failed to close journal");
1829
1830            // Confirm blob is expected length
1831            let (_, blob_size) = context
1832                .open(&cfg.partition, &2u64.to_be_bytes())
1833                .await
1834                .expect("Failed to open blob");
1835            assert_eq!(blob_size, 48);
1836
1837            // Attempt to replay journal after truncation
1838            let journal = Journal::init(context, cfg)
1839                .await
1840                .expect("Failed to re-initialize journal");
1841
1842            // Attempt to replay the journal
1843            let mut items = Vec::<(u64, u64)>::new();
1844            {
1845                let stream = journal
1846                    .replay(0, 0, NZUsize!(1024))
1847                    .await
1848                    .expect("unable to setup replay");
1849                pin_mut!(stream);
1850                while let Some(result) = stream.next().await {
1851                    match result {
1852                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1853                        Err(err) => panic!("Failed to read item: {err}"),
1854                    }
1855                }
1856            }
1857            journal.close().await.expect("Failed to close journal");
1858
1859            // Verify that only non-corrupted items were replayed
1860            assert_eq!(items.len(), 4);
1861            assert_eq!(items[0].0, 1);
1862            assert_eq!(items[0].1, 1);
1863            assert_eq!(items[1].0, data_items[0].0);
1864            assert_eq!(items[1].1, data_items[0].1);
1865            assert_eq!(items[2].0, data_items[1].0);
1866            assert_eq!(items[2].1, data_items[1].1);
1867            assert_eq!(items[3].0, 2);
1868            assert_eq!(items[3].1, 5);
1869        });
1870    }
1871
1872    #[test_traced]
1873    fn test_journal_handling_extra_data() {
1874        // Initialize the deterministic context
1875        let executor = deterministic::Runner::default();
1876
1877        // Start the test within the executor
1878        executor.start(|context| async move {
1879            // Create a journal configuration
1880            let cfg = Config {
1881                partition: "test_partition".into(),
1882                compression: None,
1883                codec_config: (),
1884                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1885                write_buffer: NZUsize!(1024),
1886            };
1887
1888            // Initialize the journal
1889            let mut journal = Journal::init(context.clone(), cfg.clone())
1890                .await
1891                .expect("Failed to initialize journal");
1892
1893            // Append 1 item to the first index
1894            journal.append(1, 1).await.expect("Failed to append data");
1895
1896            // Append multiple items to the second index
1897            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1898            for (index, data) in &data_items {
1899                journal
1900                    .append(*index, *data)
1901                    .await
1902                    .expect("Failed to append data");
1903                journal.sync(*index).await.expect("Failed to sync blob");
1904            }
1905
1906            // Close the journal
1907            journal.close().await.expect("Failed to close journal");
1908
1909            // Manually add extra data to the end of the second blob
1910            let (blob, blob_size) = context
1911                .open(&cfg.partition, &2u64.to_be_bytes())
1912                .await
1913                .expect("Failed to open blob");
1914            blob.write_at(vec![0u8; 16], blob_size)
1915                .await
1916                .expect("Failed to add extra data");
1917            blob.sync().await.expect("Failed to sync blob");
1918
1919            // Re-initialize the journal to simulate a restart
1920            let journal = Journal::init(context, cfg)
1921                .await
1922                .expect("Failed to re-initialize journal");
1923
1924            // Attempt to replay the journal
1925            let mut items = Vec::<(u64, i32)>::new();
1926            let stream = journal
1927                .replay(0, 0, NZUsize!(1024))
1928                .await
1929                .expect("unable to setup replay");
1930            pin_mut!(stream);
1931            while let Some(result) = stream.next().await {
1932                match result {
1933                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1934                    Err(err) => panic!("Failed to read item: {err}"),
1935                }
1936            }
1937        });
1938    }
1939
1940    // Define `MockBlob` that returns an offset length that should overflow
1941    #[derive(Clone)]
1942    struct MockBlob {}
1943
1944    impl Blob for MockBlob {
1945        async fn read_at(
1946            &self,
1947            buf: impl Into<StableBuf> + Send,
1948            _offset: u64,
1949        ) -> Result<StableBuf, RError> {
1950            Ok(buf.into())
1951        }
1952
1953        async fn write_at(
1954            &self,
1955            _buf: impl Into<StableBuf> + Send,
1956            _offset: u64,
1957        ) -> Result<(), RError> {
1958            Ok(())
1959        }
1960
1961        async fn resize(&self, _len: u64) -> Result<(), RError> {
1962            Ok(())
1963        }
1964
1965        async fn sync(&self) -> Result<(), RError> {
1966            Ok(())
1967        }
1968    }
1969
1970    // Define `MockStorage` that returns `MockBlob`
1971    #[derive(Clone)]
1972    struct MockStorage {
1973        len: u64,
1974    }
1975
1976    impl Storage for MockStorage {
1977        type Blob = MockBlob;
1978
1979        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1980            Ok((MockBlob {}, self.len))
1981        }
1982
1983        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1984            Ok(())
1985        }
1986
1987        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1988            Ok(vec![])
1989        }
1990    }
1991
1992    impl Metrics for MockStorage {
1993        fn with_label(&self, _: &str) -> Self {
1994            self.clone()
1995        }
1996
1997        fn label(&self) -> String {
1998            String::new()
1999        }
2000
2001        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
2002
2003        fn encode(&self) -> String {
2004            String::new()
2005        }
2006    }
2007
2008    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
2009    // changes to the value
2010    const INDEX_ALIGNMENT: u64 = 16;
2011
2012    #[test_traced]
2013    fn test_journal_large_offset() {
2014        // Initialize the deterministic context
2015        let executor = deterministic::Runner::default();
2016        executor.start(|_| async move {
2017            // Create journal
2018            let cfg = Config {
2019                partition: "partition".to_string(),
2020                compression: None,
2021                codec_config: (),
2022                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2023                write_buffer: NZUsize!(1024),
2024            };
2025            let context = MockStorage {
2026                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
2027            };
2028            let mut journal = Journal::init(context, cfg).await.unwrap();
2029
2030            // Append data
2031            let data = 1;
2032            let (result, _) = journal
2033                .append(1, data)
2034                .await
2035                .expect("Failed to append data");
2036            assert_eq!(result, u32::MAX);
2037        });
2038    }
2039
2040    #[test_traced]
2041    fn test_journal_offset_overflow() {
2042        // Initialize the deterministic context
2043        let executor = deterministic::Runner::default();
2044        executor.start(|_| async move {
2045            // Create journal
2046            let cfg = Config {
2047                partition: "partition".to_string(),
2048                compression: None,
2049                codec_config: (),
2050                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2051                write_buffer: NZUsize!(1024),
2052            };
2053            let context = MockStorage {
2054                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
2055            };
2056            let mut journal = Journal::init(context, cfg).await.unwrap();
2057
2058            // Append data
2059            let data = 1;
2060            let result = journal.append(1, data).await;
2061            assert!(matches!(result, Err(Error::OffsetOverflow)));
2062        });
2063    }
2064
2065    #[test_traced]
2066    fn test_journal_rewind() {
2067        // Initialize the deterministic context
2068        let executor = deterministic::Runner::default();
2069        executor.start(|context| async move {
2070            // Create journal
2071            let cfg = Config {
2072                partition: "test_partition".to_string(),
2073                compression: None,
2074                codec_config: (),
2075                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2076                write_buffer: NZUsize!(1024),
2077            };
2078            let mut journal = Journal::init(context, cfg).await.unwrap();
2079
2080            // Check size of non-existent section
2081            let size = journal.size(1).await.unwrap();
2082            assert_eq!(size, 0);
2083
2084            // Append data to section 1
2085            journal.append(1, 42i32).await.unwrap();
2086
2087            // Check size of section 1 - should be greater than 0
2088            let size = journal.size(1).await.unwrap();
2089            assert!(size > 0);
2090
2091            // Append more data and verify size increases
2092            journal.append(1, 43i32).await.unwrap();
2093            let new_size = journal.size(1).await.unwrap();
2094            assert!(new_size > size);
2095
2096            // Check size of different section - should still be 0
2097            let size = journal.size(2).await.unwrap();
2098            assert_eq!(size, 0);
2099
2100            // Append data to section 2
2101            journal.append(2, 44i32).await.unwrap();
2102
2103            // Check size of section 2 - should be greater than 0
2104            let size = journal.size(2).await.unwrap();
2105            assert!(size > 0);
2106
2107            // Rollback everything in section 1 and 2
2108            journal.rewind(1, 0).await.unwrap();
2109
2110            // Check size of section 1 - should be 0
2111            let size = journal.size(1).await.unwrap();
2112            assert_eq!(size, 0);
2113
2114            // Check size of section 2 - should be 0
2115            let size = journal.size(2).await.unwrap();
2116            assert_eq!(size, 0);
2117        });
2118    }
2119
2120    #[test_traced]
2121    fn test_journal_rewind_section() {
2122        // Initialize the deterministic context
2123        let executor = deterministic::Runner::default();
2124        executor.start(|context| async move {
2125            // Create journal
2126            let cfg = Config {
2127                partition: "test_partition".to_string(),
2128                compression: None,
2129                codec_config: (),
2130                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2131                write_buffer: NZUsize!(1024),
2132            };
2133            let mut journal = Journal::init(context, cfg).await.unwrap();
2134
2135            // Check size of non-existent section
2136            let size = journal.size(1).await.unwrap();
2137            assert_eq!(size, 0);
2138
2139            // Append data to section 1
2140            journal.append(1, 42i32).await.unwrap();
2141
2142            // Check size of section 1 - should be greater than 0
2143            let size = journal.size(1).await.unwrap();
2144            assert!(size > 0);
2145
2146            // Append more data and verify size increases
2147            journal.append(1, 43i32).await.unwrap();
2148            let new_size = journal.size(1).await.unwrap();
2149            assert!(new_size > size);
2150
2151            // Check size of different section - should still be 0
2152            let size = journal.size(2).await.unwrap();
2153            assert_eq!(size, 0);
2154
2155            // Append data to section 2
2156            journal.append(2, 44i32).await.unwrap();
2157
2158            // Check size of section 2 - should be greater than 0
2159            let size = journal.size(2).await.unwrap();
2160            assert!(size > 0);
2161
2162            // Rollback everything in section 1
2163            journal.rewind_section(1, 0).await.unwrap();
2164
2165            // Check size of section 1 - should be 0
2166            let size = journal.size(1).await.unwrap();
2167            assert_eq!(size, 0);
2168
2169            // Check size of section 2 - should be greater than 0
2170            let size = journal.size(2).await.unwrap();
2171            assert!(size > 0);
2172        });
2173    }
2174
2175    /// Protect against accidental changes to the journal disk format.
2176    #[test_traced]
2177    fn test_journal_conformance() {
2178        // Initialize the deterministic context
2179        let executor = deterministic::Runner::default();
2180
2181        // Start the test within the executor
2182        executor.start(|context| async move {
2183            // Create a journal configuration
2184            let cfg = Config {
2185                partition: "test_partition".into(),
2186                compression: None,
2187                codec_config: (),
2188                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2189                write_buffer: NZUsize!(1024),
2190            };
2191
2192            // Initialize the journal
2193            let mut journal = Journal::init(context.clone(), cfg.clone())
2194                .await
2195                .expect("Failed to initialize journal");
2196
2197            // Append 100 items to the journal
2198            for i in 0..100 {
2199                journal.append(1, i).await.expect("Failed to append data");
2200            }
2201            journal.sync(1).await.expect("Failed to sync blob");
2202
2203            // Close the journal
2204            journal.close().await.expect("Failed to close journal");
2205
2206            // Hash blob contents
2207            let (blob, size) = context
2208                .open(&cfg.partition, &1u64.to_be_bytes())
2209                .await
2210                .expect("Failed to open blob");
2211            assert!(size > 0);
2212            let buf = blob
2213                .read_at(vec![0u8; size as usize], 0)
2214                .await
2215                .expect("Failed to read blob");
2216            let digest = Sha256::hash(buf.as_ref());
2217            assert_eq!(
2218                hex(&digest),
2219                "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2220            );
2221        });
2222    }
2223}