commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! Size = u32 as varint (1 to 5 bytes)
21//! C = CRC32(Size | Data)
22//! ```
23//!
24//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
25//! each item. If the checksum of the read data does not match the stored checksum, an error is
26//! returned. This checksum is only verified when data is accessed and not at startup (which would
27//! require reading all data in `Journal`)._
28//!
29//! # Open Blobs
30//!
31//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
32//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
33//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
34//! `sections`.
35//!
36//! # Offset Alignment
37//!
38//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
39//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
40//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
41//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
42//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
43//! returned.
44//!
45//! # Sync
46//!
47//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
48//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
49//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
50//!
51//! # Pruning
52//!
53//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
54//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
55//! could be used, for example, by some blockchain application to prune old blocks.
56//!
57//! # Replay
58//!
59//! During application initialization, it is very common to replay data from `Journal` to recover
60//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
61//! method to produce a stream of all items in the `Journal` in order of their `section` and
62//! `offset`.
63//!
64//! # Compression
65//!
66//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
67//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
68//! be changed between initializations of `Journal`, however, it must remain populated if any data
69//! was written with compression enabled.
70//!
71//! # Example
72//!
73//! ```rust
74//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
75//! use commonware_storage::journal::segmented::variable::{Journal, Config};
76//! use commonware_utils::NZUsize;
77//!
78//! let executor = deterministic::Runner::default();
79//! executor.start(|context| async move {
80//!     // Create a journal
81//!     let mut journal = Journal::init(context, Config{
82//!         partition: "partition".to_string(),
83//!         compression: None,
84//!         codec_config: (),
85//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
86//!         write_buffer: NZUsize!(1024 * 1024),
87//!     }).await.unwrap();
88//!
89//!     // Append data to the journal
90//!     journal.append(1, 128).await.unwrap();
91//!
92//!     // Close the journal
93//!     journal.close().await.unwrap();
94//! });
95//! ```
96
97use crate::journal::Error;
98use bytes::{Buf, BufMut};
99use commonware_codec::{varint::UInt, Codec, EncodeSize, ReadExt, Write as CodecWrite};
100use commonware_runtime::{
101    buffer::{Append, PoolRef, Read},
102    telemetry::metrics::status::GaugeExt,
103    Blob, Error as RError, Metrics, Storage,
104};
105use commonware_utils::hex;
106use futures::stream::{self, Stream, StreamExt};
107use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
108use std::{
109    collections::{btree_map::Entry, BTreeMap},
110    io::Cursor,
111    marker::PhantomData,
112    num::NonZeroUsize,
113};
114use tracing::{debug, trace, warn};
115use zstd::{bulk::compress, decode_all};
116
117/// Configuration for `Journal` storage.
118#[derive(Clone)]
119pub struct Config<C> {
120    /// The `commonware-runtime::Storage` partition to use
121    /// for storing journal blobs.
122    pub partition: String,
123
124    /// Optional compression level (using `zstd`) to apply to data before storing.
125    pub compression: Option<u8>,
126
127    /// The codec configuration to use for encoding and decoding items.
128    pub codec_config: C,
129
130    /// The buffer pool to use for caching data.
131    pub buffer_pool: PoolRef,
132
133    /// The size of the write buffer to use for each blob.
134    pub write_buffer: NonZeroUsize,
135}
136
137pub(crate) const ITEM_ALIGNMENT: u64 = 16;
138
139/// Minimum size of any item: 1 byte varint (size=0) + 0 bytes data + 4 bytes checksum.
140/// This is also the max varint size for u32, so we can always read this many bytes
141/// at the start of an item to get the complete varint.
142const MIN_ITEM_SIZE: usize = 5;
143
144/// Computes the next offset for an item using the underlying `u64`
145/// offset of `Blob`.
146#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148    let overage = offset % ITEM_ALIGNMENT;
149    if overage != 0 {
150        offset += ITEM_ALIGNMENT - overage;
151    }
152    let offset = offset / ITEM_ALIGNMENT;
153    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154    Ok(aligned_offset)
155}
156
157/// Implementation of `Journal` storage.
158pub struct Journal<E: Storage + Metrics, V: Codec> {
159    pub(crate) context: E,
160    pub(crate) cfg: Config<V::Cfg>,
161
162    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164    /// A section number before which all sections have been pruned. This value is not persisted,
165    /// and is initialized to 0 at startup. It's updated only during calls to `prune` during the
166    /// current execution, and therefore provides only a best effort lower-bound on the true value.
167    pub(crate) oldest_retained_section: u64,
168
169    pub(crate) tracked: Gauge,
170    pub(crate) synced: Counter,
171    pub(crate) pruned: Counter,
172
173    pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177    /// Initialize a new `Journal` instance.
178    ///
179    /// All backing blobs are opened but not read during
180    /// initialization. The `replay` method can be used
181    /// to iterate over all items in the `Journal`.
182    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183        // Iterate over blobs in partition
184        let mut blobs = BTreeMap::new();
185        let stored_blobs = match context.scan(&cfg.partition).await {
186            Ok(blobs) => blobs,
187            Err(RError::PartitionMissing(_)) => Vec::new(),
188            Err(err) => return Err(Error::Runtime(err)),
189        };
190        for name in stored_blobs {
191            let (blob, size) = context.open(&cfg.partition, &name).await?;
192            let hex_name = hex(&name);
193            let section = match name.try_into() {
194                Ok(section) => u64::from_be_bytes(section),
195                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196            };
197            debug!(section, blob = hex_name, size, "loaded section");
198            let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199            blobs.insert(section, blob);
200        }
201
202        // Initialize metrics
203        let tracked = Gauge::default();
204        let synced = Counter::default();
205        let pruned = Counter::default();
206        context.register("tracked", "Number of blobs", tracked.clone());
207        context.register("synced", "Number of syncs", synced.clone());
208        context.register("pruned", "Number of blobs pruned", pruned.clone());
209        let _ = tracked.try_set(blobs.len());
210
211        // Create journal instance
212        Ok(Self {
213            context,
214            cfg,
215            blobs,
216            oldest_retained_section: 0,
217            tracked,
218            synced,
219            pruned,
220
221            _phantom: PhantomData,
222        })
223    }
224
225    /// Ensures that a section pruned during the current execution is not accessed.
226    const fn prune_guard(&self, section: u64) -> Result<(), Error> {
227        if section < self.oldest_retained_section {
228            Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229        } else {
230            Ok(())
231        }
232    }
233
234    /// Reads an item from the blob at the given offset.
235    pub(crate) async fn read(
236        compressed: bool,
237        cfg: &V::Cfg,
238        blob: &Append<E::Blob>,
239        offset: u32,
240    ) -> Result<(u32, u32, V), Error> {
241        // Read varint size (max 5 bytes for u32)
242        let mut hasher = crc32fast::Hasher::new();
243        let offset = offset as u64 * ITEM_ALIGNMENT;
244        let varint_buf = blob.read_at(vec![0; MIN_ITEM_SIZE], offset).await?;
245        let mut varint = varint_buf.as_ref();
246        let size = UInt::<u32>::read(&mut varint).map_err(Error::Codec)?.0 as usize;
247        let varint_len = MIN_ITEM_SIZE - varint.remaining();
248        hasher.update(&varint_buf.as_ref()[..varint_len]);
249        let offset = offset
250            .checked_add(varint_len as u64)
251            .ok_or(Error::OffsetOverflow)?;
252
253        // Read remaining
254        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
255        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
256        let buf = buf.as_ref();
257        let offset = offset
258            .checked_add(buf_size as u64)
259            .ok_or(Error::OffsetOverflow)?;
260
261        // Read item
262        let item = &buf[..size];
263        hasher.update(item);
264
265        // Verify integrity
266        let checksum = hasher.finalize();
267        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
268        if checksum != stored_checksum {
269            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
270        }
271
272        // Compute next offset
273        let aligned_offset = compute_next_offset(offset)?;
274
275        // If compression is enabled, decompress the item
276        let item = if compressed {
277            let decompressed =
278                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
279            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
280        } else {
281            V::decode_cfg(item, cfg).map_err(Error::Codec)?
282        };
283
284        // Return item
285        Ok((aligned_offset, size as u32, item))
286    }
287
288    /// Helper function to read an item from a [Read].
289    async fn read_buffered(
290        reader: &mut Read<Append<E::Blob>>,
291        offset: u32,
292        cfg: &V::Cfg,
293        compressed: bool,
294    ) -> Result<(u32, u64, u32, V), Error> {
295        // Calculate absolute file offset from the item offset
296        let file_offset = offset as u64 * ITEM_ALIGNMENT;
297
298        // If we're not at the right position, seek to it
299        if reader.position() != file_offset {
300            reader.seek_to(file_offset).map_err(Error::Runtime)?;
301        }
302
303        // Read varint size (max 5 bytes for u32, and min item size is 5 bytes)
304        let mut hasher = crc32fast::Hasher::new();
305        let mut varint_buf = [0u8; MIN_ITEM_SIZE];
306        reader
307            .read_exact(&mut varint_buf, MIN_ITEM_SIZE)
308            .await
309            .map_err(Error::Runtime)?;
310        let mut varint = varint_buf.as_ref();
311        let size = UInt::<u32>::read(&mut varint).map_err(Error::Codec)?.0 as usize;
312        let varint_len = MIN_ITEM_SIZE - varint.remaining();
313        hasher.update(&varint_buf[..varint_len]);
314
315        // Read remaining data+checksum (we already have some bytes from the varint read)
316        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
317        let already_read = MIN_ITEM_SIZE - varint_len;
318        let mut buf = vec![0u8; buf_size];
319        buf[..already_read].copy_from_slice(&varint_buf[varint_len..]);
320        if buf_size > already_read {
321            reader
322                .read_exact(&mut buf[already_read..], buf_size - already_read)
323                .await
324                .map_err(Error::Runtime)?;
325        }
326
327        // Read item
328        let item = &buf[..size];
329        hasher.update(item);
330
331        // Verify integrity
332        let checksum = hasher.finalize();
333        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
334        if checksum != stored_checksum {
335            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
336        }
337
338        // If compression is enabled, decompress the item
339        let item = if compressed {
340            let decompressed =
341                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
342            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
343        } else {
344            V::decode_cfg(item, cfg).map_err(Error::Codec)?
345        };
346
347        // Calculate next offset
348        let current_pos = reader.position();
349        let aligned_offset = compute_next_offset(current_pos)?;
350        Ok((aligned_offset, current_pos, size as u32, item))
351    }
352
353    /// Returns an ordered stream of all items in the journal starting with the item at the given
354    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
355    /// (section, offset, size, item).
356    ///
357    /// # Repair
358    ///
359    /// Like
360    /// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
361    /// and
362    /// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
363    /// the first invalid data read will be considered the new end of the journal (and the
364    /// underlying [Blob] will be truncated to the last valid item).
365    pub async fn replay(
366        &self,
367        start_section: u64,
368        mut offset: u32,
369        buffer: NonZeroUsize,
370    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
371        // Collect all blobs to replay
372        let codec_config = self.cfg.codec_config.clone();
373        let compressed = self.cfg.compression.is_some();
374        let mut blobs = Vec::with_capacity(self.blobs.len());
375        for (section, blob) in self.blobs.range(start_section..) {
376            let blob_size = blob.size().await;
377            let max_offset = compute_next_offset(blob_size)?;
378            blobs.push((
379                *section,
380                blob.clone(),
381                max_offset,
382                blob_size,
383                codec_config.clone(),
384                compressed,
385            ));
386        }
387
388        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
389        // memory with buffered data)
390        Ok(stream::iter(blobs).flat_map(
391            move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
392                // Created buffered reader
393                let mut reader = Read::new(blob, blob_size, buffer);
394                if section == start_section && offset != 0 {
395                    if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
396                        warn!(section, offset, ?err, "failed to seek to offset");
397                        // Return early with the error to terminate the entire stream
398                        return stream::once(async move { Err(err.into()) }).left_stream();
399                    }
400                } else {
401                    offset = 0;
402                }
403
404                // Read over the blob
405                stream::unfold(
406                        (section, reader, offset, 0u64, codec_config, compressed),
407                        move |(
408                            section,
409                            mut reader,
410                            offset,
411                            valid_size,
412                            codec_config,
413                            compressed,
414                        )| async move {
415                            // Check if we are at the end of the blob
416                            if offset >= max_offset {
417                                return None;
418                            }
419
420                            // Read an item from the buffer
421                            match Self::read_buffered(
422                                &mut reader,
423                                offset,
424                                &codec_config,
425                                compressed,
426                            )
427                            .await
428                            {
429                                Ok((next_offset, next_valid_size, size, item)) => {
430                                    trace!(blob = section, cursor = offset, "replayed item");
431                                    Some((
432                                        Ok((section, offset, size, item)),
433                                        (
434                                            section,
435                                            reader,
436                                            next_offset,
437                                            next_valid_size,
438                                            codec_config,
439                                            compressed,
440                                        ),
441                                    ))
442                                }
443                                Err(Error::ChecksumMismatch(expected, found)) => {
444                                    // If we encounter corruption, we prune to the last valid item. This
445                                    // can happen during an unclean file close (where pending data is not
446                                    // fully synced to disk).
447                                    warn!(
448                                        blob = section,
449                                        bad_offset = offset,
450                                        new_size = valid_size,
451                                        expected,
452                                        found,
453                                        "corruption detected: truncating"
454                                    );
455                                    reader.resize(valid_size).await.ok()?;
456                                    None
457                                }
458                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
459                                    // If we encounter trailing bytes, we prune to the last
460                                    // valid item. This can happen during an unclean file close (where
461                                    // pending data is not fully synced to disk).
462                                    warn!(
463                                        blob = section,
464                                        bad_offset = offset,
465                                        new_size = valid_size,
466                                        "trailing bytes detected: truncating"
467                                    );
468                                    reader.resize(valid_size).await.ok()?;
469                                    None
470                                }
471                                Err(err) => {
472                                    // If we encounter an unexpected error, return it without attempting
473                                    // to fix anything.
474                                    warn!(
475                                        blob = section,
476                                        cursor = offset,
477                                        ?err,
478                                        "unexpected error"
479                                    );
480                                    Some((
481                                        Err(err),
482                                        (
483                                            section,
484                                            reader,
485                                            offset,
486                                            valid_size,
487                                            codec_config,
488                                            compressed,
489                                        ),
490                                    ))
491                                }
492                            }
493                        },
494                    ).right_stream()
495            },
496        ))
497    }
498
499    /// Appends an item to `Journal` in a given `section`, returning the offset
500    /// where the item was written and the size of the item (which may now be smaller
501    /// than the encoded size from the codec, if compression is enabled).
502    ///
503    /// # Warning
504    ///
505    /// If there exist trailing bytes in the `Blob` of a particular `section` and
506    /// `replay` is not called before this, it is likely that subsequent data added
507    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
508    /// the checksum verification). It is recommended to call `replay` before calling
509    /// `append` to prevent this.
510    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
511        // Check last pruned
512        self.prune_guard(section)?;
513
514        // Create item
515        let encoded = item.encode();
516        let encoded = if let Some(compression) = self.cfg.compression {
517            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
518        } else {
519            encoded.into()
520        };
521
522        // Ensure item is not too large
523        let item_len = encoded.len();
524        let item_len = match item_len.try_into() {
525            Ok(len) => len,
526            Err(_) => return Err(Error::ItemTooLarge(item_len)),
527        };
528        let size_len = UInt(item_len).encode_size();
529        let entry_len = size_len + item_len as usize + 4;
530
531        // Get existing blob or create new one
532        let blob = match self.blobs.entry(section) {
533            Entry::Occupied(entry) => entry.into_mut(),
534            Entry::Vacant(entry) => {
535                let name = section.to_be_bytes();
536                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
537                let blob = Append::new(
538                    blob,
539                    size,
540                    self.cfg.write_buffer,
541                    self.cfg.buffer_pool.clone(),
542                )
543                .await?;
544                self.tracked.inc();
545                entry.insert(blob)
546            }
547        };
548
549        // Calculate alignment
550        let cursor = blob.size().await;
551        let offset = compute_next_offset(cursor)?;
552        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
553        let padding = (aligned_cursor - cursor) as usize;
554
555        // Populate buffer
556        let mut buf = Vec::with_capacity(padding + entry_len);
557
558        // Add padding bytes if necessary
559        if padding > 0 {
560            buf.resize(padding, 0);
561        }
562
563        // Add entry data
564        let entry_start = buf.len();
565        UInt(item_len).write(&mut buf);
566        buf.put_slice(&encoded);
567
568        // Calculate checksum only for the entry data (without padding)
569        let checksum = crc32fast::hash(&buf[entry_start..]);
570        buf.put_u32(checksum);
571        assert_eq!(buf[entry_start..].len(), entry_len);
572
573        // Append item to blob
574        blob.append(buf).await?;
575        trace!(blob = section, offset, "appended item");
576        Ok((offset, item_len))
577    }
578
579    /// Retrieves an item from `Journal` at a given `section` and `offset`.
580    ///
581    /// # Errors
582    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
583    ///    current execution.
584    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
585    ///    data appended to it, or has been pruned in a previous execution).
586    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
587    ///    previously appended item) will result in an error, with the specific type being
588    ///    undefined.
589    pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
590        self.prune_guard(section)?;
591        let blob = match self.blobs.get(&section) {
592            Some(blob) => blob,
593            None => return Err(Error::SectionOutOfRange(section)),
594        };
595
596        // Perform a multi-op read.
597        let (_, _, item) = Self::read(
598            self.cfg.compression.is_some(),
599            &self.cfg.codec_config,
600            blob,
601            offset,
602        )
603        .await?;
604        Ok(item)
605    }
606
607    /// Gets the size of the journal for a specific section.
608    ///
609    /// Returns 0 if the section does not exist.
610    pub async fn size(&self, section: u64) -> Result<u64, Error> {
611        self.prune_guard(section)?;
612        match self.blobs.get(&section) {
613            Some(blob) => Ok(blob.size().await),
614            None => Ok(0),
615        }
616    }
617
618    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
619    ///
620    /// # Warnings
621    ///
622    /// * This operation is not guaranteed to survive restarts until sync is called.
623    /// * This operation is not atomic, but it will always leave the journal in a consistent state
624    ///   in the event of failure since blobs are always removed in reverse order of section.
625    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
626        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
627    }
628
629    /// Rewinds the journal to the given `section` and `size`.
630    ///
631    /// This removes any data beyond the specified `section` and `size`.
632    ///
633    /// # Warnings
634    ///
635    /// * This operation is not guaranteed to survive restarts until sync is called.
636    /// * This operation is not atomic, but it will always leave the journal in a consistent state
637    ///   in the event of failure since blobs are always removed in reverse order of section.
638    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
639        self.prune_guard(section)?;
640
641        // Remove any sections beyond the given section
642        let trailing: Vec<u64> = self
643            .blobs
644            .range((
645                std::ops::Bound::Excluded(section),
646                std::ops::Bound::Unbounded,
647            ))
648            .map(|(&section, _)| section)
649            .collect();
650        for index in trailing.iter().rev() {
651            // Remove the underlying blob from storage.
652            let blob = self.blobs.remove(index).unwrap();
653
654            // Destroy the blob
655            drop(blob);
656            self.context
657                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
658                .await?;
659            debug!(section = index, "removed section");
660            self.tracked.dec();
661        }
662
663        // If the section exists, truncate it to the given offset
664        let blob = match self.blobs.get_mut(&section) {
665            Some(blob) => blob,
666            None => return Ok(()),
667        };
668        let current = blob.size().await;
669        if size >= current {
670            return Ok(()); // Already smaller than or equal to target size
671        }
672        blob.resize(size).await?;
673        debug!(
674            section,
675            from = current,
676            to = size,
677            ?trailing,
678            "rewound journal"
679        );
680        Ok(())
681    }
682
683    /// Rewinds the `section` to the given `size`.
684    ///
685    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
686    ///
687    /// # Warning
688    ///
689    /// This operation is not guaranteed to survive restarts until sync is called.
690    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
691        self.prune_guard(section)?;
692
693        // Get the blob at the given section
694        let blob = match self.blobs.get_mut(&section) {
695            Some(blob) => blob,
696            None => return Ok(()),
697        };
698
699        // Truncate the blob to the given size
700        let current = blob.size().await;
701        if size >= current {
702            return Ok(()); // Already smaller than or equal to target size
703        }
704        blob.resize(size).await?;
705        debug!(section, from = current, to = size, "rewound section");
706        Ok(())
707    }
708
709    /// Ensures that all data in a given `section` is synced to the underlying store.
710    ///
711    /// If the `section` does not exist, no error will be returned.
712    pub async fn sync(&self, section: u64) -> Result<(), Error> {
713        self.prune_guard(section)?;
714        let blob = match self.blobs.get(&section) {
715            Some(blob) => blob,
716            None => return Ok(()),
717        };
718        self.synced.inc();
719        blob.sync().await.map_err(Error::Runtime)
720    }
721
722    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
723    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
724        // Prune any blobs that are smaller than the minimum
725        let mut pruned = false;
726        while let Some((&section, _)) = self.blobs.first_key_value() {
727            // Stop pruning if we reach the minimum
728            if section >= min {
729                break;
730            }
731
732            // Remove blob from journal
733            let blob = self.blobs.remove(&section).unwrap();
734            let size = blob.size().await;
735            drop(blob);
736
737            // Remove blob from storage
738            self.context
739                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
740                .await?;
741            pruned = true;
742
743            debug!(blob = section, size, "pruned blob");
744            self.tracked.dec();
745            self.pruned.inc();
746        }
747
748        if pruned {
749            self.oldest_retained_section = min;
750        }
751
752        Ok(pruned)
753    }
754
755    /// Syncs and closes all open sections.
756    pub async fn close(self) -> Result<(), Error> {
757        for (section, blob) in self.blobs.into_iter() {
758            let size = blob.size().await;
759            blob.sync().await?;
760            debug!(blob = section, size, "synced blob");
761        }
762        Ok(())
763    }
764
765    /// Returns the number of the oldest section in the journal.
766    pub fn oldest_section(&self) -> Option<u64> {
767        self.blobs.first_key_value().map(|(section, _)| *section)
768    }
769
770    /// Removes any underlying blobs created by the journal.
771    pub async fn destroy(self) -> Result<(), Error> {
772        for (i, blob) in self.blobs.into_iter() {
773            let size = blob.size().await;
774            drop(blob);
775            debug!(blob = i, size, "destroyed blob");
776            self.context
777                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
778                .await?;
779        }
780        match self.context.remove(&self.cfg.partition, None).await {
781            Ok(()) => {}
782            Err(RError::PartitionMissing(_)) => {
783                // Partition already removed or never existed.
784            }
785            Err(err) => return Err(Error::Runtime(err)),
786        }
787        Ok(())
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794    use bytes::BufMut;
795    use commonware_cryptography::{Hasher, Sha256};
796    use commonware_macros::test_traced;
797    use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
798    use commonware_utils::{NZUsize, StableBuf};
799    use futures::{pin_mut, StreamExt};
800    use prometheus_client::registry::Metric;
801
802    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
803    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
804
805    #[test_traced]
806    fn test_journal_append_and_read() {
807        // Initialize the deterministic context
808        let executor = deterministic::Runner::default();
809
810        // Start the test within the executor
811        executor.start(|context| async move {
812            // Initialize the journal
813            let cfg = Config {
814                partition: "test_partition".into(),
815                compression: None,
816                codec_config: (),
817                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
818                write_buffer: NZUsize!(1024),
819            };
820            let index = 1u64;
821            let data = 10;
822            let mut journal = Journal::init(context.clone(), cfg.clone())
823                .await
824                .expect("Failed to initialize journal");
825
826            // Append an item to the journal
827            journal
828                .append(index, data)
829                .await
830                .expect("Failed to append data");
831
832            // Check metrics
833            let buffer = context.encode();
834            assert!(buffer.contains("tracked 1"));
835
836            // Close the journal
837            journal.close().await.expect("Failed to close journal");
838
839            // Re-initialize the journal to simulate a restart
840            let cfg = Config {
841                partition: "test_partition".into(),
842                compression: None,
843                codec_config: (),
844                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
845                write_buffer: NZUsize!(1024),
846            };
847            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
848                .await
849                .expect("Failed to re-initialize journal");
850
851            // Replay the journal and collect items
852            let mut items = Vec::new();
853            let stream = journal
854                .replay(0, 0, NZUsize!(1024))
855                .await
856                .expect("unable to setup replay");
857            pin_mut!(stream);
858            while let Some(result) = stream.next().await {
859                match result {
860                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
861                    Err(err) => panic!("Failed to read item: {err}"),
862                }
863            }
864
865            // Verify that the item was replayed correctly
866            assert_eq!(items.len(), 1);
867            assert_eq!(items[0].0, index);
868            assert_eq!(items[0].1, data);
869
870            // Check metrics
871            let buffer = context.encode();
872            assert!(buffer.contains("tracked 1"));
873        });
874    }
875
876    #[test_traced]
877    fn test_journal_multiple_appends_and_reads() {
878        // Initialize the deterministic context
879        let executor = deterministic::Runner::default();
880
881        // Start the test within the executor
882        executor.start(|context| async move {
883            // Create a journal configuration
884            let cfg = Config {
885                partition: "test_partition".into(),
886                compression: None,
887                codec_config: (),
888                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
889                write_buffer: NZUsize!(1024),
890            };
891
892            // Initialize the journal
893            let mut journal = Journal::init(context.clone(), cfg.clone())
894                .await
895                .expect("Failed to initialize journal");
896
897            // Append multiple items to different blobs
898            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
899            for (index, data) in &data_items {
900                journal
901                    .append(*index, *data)
902                    .await
903                    .expect("Failed to append data");
904                journal.sync(*index).await.expect("Failed to sync blob");
905            }
906
907            // Check metrics
908            let buffer = context.encode();
909            assert!(buffer.contains("tracked 3"));
910            assert!(buffer.contains("synced_total 4"));
911
912            // Close the journal
913            journal.close().await.expect("Failed to close journal");
914
915            // Re-initialize the journal to simulate a restart
916            let journal = Journal::init(context, cfg)
917                .await
918                .expect("Failed to re-initialize journal");
919
920            // Replay the journal and collect items
921            let mut items = Vec::<(u64, u32)>::new();
922            {
923                let stream = journal
924                    .replay(0, 0, NZUsize!(1024))
925                    .await
926                    .expect("unable to setup replay");
927                pin_mut!(stream);
928                while let Some(result) = stream.next().await {
929                    match result {
930                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
931                        Err(err) => panic!("Failed to read item: {err}"),
932                    }
933                }
934            }
935
936            // Verify that all items were replayed correctly
937            assert_eq!(items.len(), data_items.len());
938            for ((expected_index, expected_data), (actual_index, actual_data)) in
939                data_items.iter().zip(items.iter())
940            {
941                assert_eq!(actual_index, expected_index);
942                assert_eq!(actual_data, expected_data);
943            }
944
945            // Cleanup
946            journal.destroy().await.expect("Failed to destroy journal");
947        });
948    }
949
950    #[test_traced]
951    fn test_journal_prune_blobs() {
952        // Initialize the deterministic context
953        let executor = deterministic::Runner::default();
954
955        // Start the test within the executor
956        executor.start(|context| async move {
957            // Create a journal configuration
958            let cfg = Config {
959                partition: "test_partition".into(),
960                compression: None,
961                codec_config: (),
962                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
963                write_buffer: NZUsize!(1024),
964            };
965
966            // Initialize the journal
967            let mut journal = Journal::init(context.clone(), cfg.clone())
968                .await
969                .expect("Failed to initialize journal");
970
971            // Append items to multiple blobs
972            for index in 1u64..=5u64 {
973                journal
974                    .append(index, index)
975                    .await
976                    .expect("Failed to append data");
977                journal.sync(index).await.expect("Failed to sync blob");
978            }
979
980            // Add one item out-of-order
981            let data = 99;
982            journal
983                .append(2u64, data)
984                .await
985                .expect("Failed to append data");
986            journal.sync(2u64).await.expect("Failed to sync blob");
987
988            // Prune blobs with indices less than 3
989            journal.prune(3).await.expect("Failed to prune blobs");
990
991            // Check metrics
992            let buffer = context.encode();
993            assert!(buffer.contains("pruned_total 2"));
994
995            // Prune again with a section less than the previous one, should be a no-op
996            journal.prune(2).await.expect("Failed to no-op prune");
997            let buffer = context.encode();
998            assert!(buffer.contains("pruned_total 2"));
999
1000            // Close the journal
1001            journal.close().await.expect("Failed to close journal");
1002
1003            // Re-initialize the journal to simulate a restart
1004            let mut journal = Journal::init(context.clone(), cfg.clone())
1005                .await
1006                .expect("Failed to re-initialize journal");
1007
1008            // Replay the journal and collect items
1009            let mut items = Vec::<(u64, u64)>::new();
1010            {
1011                let stream = journal
1012                    .replay(0, 0, NZUsize!(1024))
1013                    .await
1014                    .expect("unable to setup replay");
1015                pin_mut!(stream);
1016                while let Some(result) = stream.next().await {
1017                    match result {
1018                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1019                        Err(err) => panic!("Failed to read item: {err}"),
1020                    }
1021                }
1022            }
1023
1024            // Verify that items from blobs 1 and 2 are not present
1025            assert_eq!(items.len(), 3);
1026            let expected_indices = [3u64, 4u64, 5u64];
1027            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1028                assert_eq!(item.0, *expected_index);
1029            }
1030
1031            // Prune all blobs
1032            journal.prune(6).await.expect("Failed to prune blobs");
1033
1034            // Close the journal
1035            journal.close().await.expect("Failed to close journal");
1036
1037            // Ensure no remaining blobs exist
1038            //
1039            // Note: We don't remove the partition, so this does not error
1040            // and instead returns an empty list of blobs.
1041            assert!(context
1042                .scan(&cfg.partition)
1043                .await
1044                .expect("Failed to list blobs")
1045                .is_empty());
1046        });
1047    }
1048
1049    #[test_traced]
1050    fn test_journal_prune_guard() {
1051        let executor = deterministic::Runner::default();
1052
1053        executor.start(|context| async move {
1054            let cfg = Config {
1055                partition: "test_partition".into(),
1056                compression: None,
1057                codec_config: (),
1058                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1059                write_buffer: NZUsize!(1024),
1060            };
1061
1062            let mut journal = Journal::init(context.clone(), cfg.clone())
1063                .await
1064                .expect("Failed to initialize journal");
1065
1066            // Append items to sections 1-5
1067            for section in 1u64..=5u64 {
1068                journal
1069                    .append(section, section as i32)
1070                    .await
1071                    .expect("Failed to append data");
1072                journal.sync(section).await.expect("Failed to sync");
1073            }
1074
1075            // Verify initial oldest_retained_section is 0
1076            assert_eq!(journal.oldest_retained_section, 0);
1077
1078            // Prune sections < 3
1079            journal.prune(3).await.expect("Failed to prune");
1080
1081            // Verify oldest_retained_section is updated
1082            assert_eq!(journal.oldest_retained_section, 3);
1083
1084            // Test that accessing pruned sections returns the correct error
1085
1086            // Test append on pruned section
1087            match journal.append(1, 100).await {
1088                Err(Error::AlreadyPrunedToSection(3)) => {}
1089                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1090            }
1091
1092            match journal.append(2, 100).await {
1093                Err(Error::AlreadyPrunedToSection(3)) => {}
1094                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1095            }
1096
1097            // Test get on pruned section
1098            match journal.get(1, 0).await {
1099                Err(Error::AlreadyPrunedToSection(3)) => {}
1100                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1101            }
1102
1103            // Test size on pruned section
1104            match journal.size(1).await {
1105                Err(Error::AlreadyPrunedToSection(3)) => {}
1106                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1107            }
1108
1109            // Test rewind on pruned section
1110            match journal.rewind(2, 0).await {
1111                Err(Error::AlreadyPrunedToSection(3)) => {}
1112                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1113            }
1114
1115            // Test rewind_section on pruned section
1116            match journal.rewind_section(1, 0).await {
1117                Err(Error::AlreadyPrunedToSection(3)) => {}
1118                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1119            }
1120
1121            // Test sync on pruned section
1122            match journal.sync(2).await {
1123                Err(Error::AlreadyPrunedToSection(3)) => {}
1124                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1125            }
1126
1127            // Test that accessing sections at or after the threshold works
1128            assert!(journal.get(3, 0).await.is_ok());
1129            assert!(journal.get(4, 0).await.is_ok());
1130            assert!(journal.get(5, 0).await.is_ok());
1131            assert!(journal.size(3).await.is_ok());
1132            assert!(journal.sync(4).await.is_ok());
1133
1134            // Append to section at threshold should work
1135            journal
1136                .append(3, 999)
1137                .await
1138                .expect("Should be able to append to section 3");
1139
1140            // Prune more sections
1141            journal.prune(5).await.expect("Failed to prune");
1142            assert_eq!(journal.oldest_retained_section, 5);
1143
1144            // Verify sections 3 and 4 are now pruned
1145            match journal.get(3, 0).await {
1146                Err(Error::AlreadyPrunedToSection(5)) => {}
1147                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1148            }
1149
1150            match journal.get(4, 0).await {
1151                Err(Error::AlreadyPrunedToSection(5)) => {}
1152                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1153            }
1154
1155            // Section 5 should still be accessible
1156            assert!(journal.get(5, 0).await.is_ok());
1157
1158            journal.close().await.expect("Failed to close journal");
1159        });
1160    }
1161
1162    #[test_traced]
1163    fn test_journal_prune_guard_across_restart() {
1164        let executor = deterministic::Runner::default();
1165
1166        executor.start(|context| async move {
1167            let cfg = Config {
1168                partition: "test_partition".into(),
1169                compression: None,
1170                codec_config: (),
1171                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1172                write_buffer: NZUsize!(1024),
1173            };
1174
1175            // First session: create and prune
1176            {
1177                let mut journal = Journal::init(context.clone(), cfg.clone())
1178                    .await
1179                    .expect("Failed to initialize journal");
1180
1181                for section in 1u64..=5u64 {
1182                    journal
1183                        .append(section, section as i32)
1184                        .await
1185                        .expect("Failed to append data");
1186                    journal.sync(section).await.expect("Failed to sync");
1187                }
1188
1189                journal.prune(3).await.expect("Failed to prune");
1190                assert_eq!(journal.oldest_retained_section, 3);
1191
1192                journal.close().await.expect("Failed to close journal");
1193            }
1194
1195            // Second session: verify oldest_retained_section is reset
1196            {
1197                let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1198                    .await
1199                    .expect("Failed to re-initialize journal");
1200
1201                // After restart, oldest_retained_section should be back to 0
1202                // since it's not persisted
1203                assert_eq!(journal.oldest_retained_section, 0);
1204
1205                // But the actual sections 1 and 2 should be gone from storage
1206                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1207                match journal.get(1, 0).await {
1208                    Err(Error::SectionOutOfRange(1)) => {}
1209                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1210                }
1211
1212                match journal.get(2, 0).await {
1213                    Err(Error::SectionOutOfRange(2)) => {}
1214                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1215                }
1216
1217                // Sections 3-5 should still be accessible
1218                assert!(journal.get(3, 0).await.is_ok());
1219                assert!(journal.get(4, 0).await.is_ok());
1220                assert!(journal.get(5, 0).await.is_ok());
1221
1222                journal.close().await.expect("Failed to close journal");
1223            }
1224        });
1225    }
1226
1227    #[test_traced]
1228    fn test_journal_with_invalid_blob_name() {
1229        // Initialize the deterministic context
1230        let executor = deterministic::Runner::default();
1231
1232        // Start the test within the executor
1233        executor.start(|context| async move {
1234            // Create a journal configuration
1235            let cfg = Config {
1236                partition: "test_partition".into(),
1237                compression: None,
1238                codec_config: (),
1239                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1240                write_buffer: NZUsize!(1024),
1241            };
1242
1243            // Manually create a blob with an invalid name (not 8 bytes)
1244            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1245            let (blob, _) = context
1246                .open(&cfg.partition, invalid_blob_name)
1247                .await
1248                .expect("Failed to create blob with invalid name");
1249            blob.sync().await.expect("Failed to sync blob");
1250
1251            // Attempt to initialize the journal
1252            let result = Journal::<_, u64>::init(context, cfg).await;
1253
1254            // Expect an error
1255            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1256        });
1257    }
1258
1259    #[test_traced]
1260    fn test_journal_read_size_missing() {
1261        // Initialize the deterministic context
1262        let executor = deterministic::Runner::default();
1263
1264        // Start the test within the executor
1265        executor.start(|context| async move {
1266            // Create a journal configuration
1267            let cfg = Config {
1268                partition: "test_partition".into(),
1269                compression: None,
1270                codec_config: (),
1271                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1272                write_buffer: NZUsize!(1024),
1273            };
1274
1275            // Manually create a blob with incomplete size data
1276            let section = 1u64;
1277            let blob_name = section.to_be_bytes();
1278            let (blob, _) = context
1279                .open(&cfg.partition, &blob_name)
1280                .await
1281                .expect("Failed to create blob");
1282
1283            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1284            let mut incomplete_data = Vec::new();
1285            UInt(u32::MAX).write(&mut incomplete_data);
1286            incomplete_data.truncate(1);
1287            blob.write_at(incomplete_data, 0)
1288                .await
1289                .expect("Failed to write incomplete data");
1290            blob.sync().await.expect("Failed to sync blob");
1291
1292            // Initialize the journal
1293            let journal = Journal::init(context, cfg)
1294                .await
1295                .expect("Failed to initialize journal");
1296
1297            // Attempt to replay the journal
1298            let stream = journal
1299                .replay(0, 0, NZUsize!(1024))
1300                .await
1301                .expect("unable to setup replay");
1302            pin_mut!(stream);
1303            let mut items = Vec::<(u64, u64)>::new();
1304            while let Some(result) = stream.next().await {
1305                match result {
1306                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1307                    Err(err) => panic!("Failed to read item: {err}"),
1308                }
1309            }
1310            assert!(items.is_empty());
1311        });
1312    }
1313
1314    #[test_traced]
1315    fn test_journal_read_item_missing() {
1316        // Initialize the deterministic context
1317        let executor = deterministic::Runner::default();
1318
1319        // Start the test within the executor
1320        executor.start(|context| async move {
1321            // Create a journal configuration
1322            let cfg = Config {
1323                partition: "test_partition".into(),
1324                compression: None,
1325                codec_config: (),
1326                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1327                write_buffer: NZUsize!(1024),
1328            };
1329
1330            // Manually create a blob with missing item data
1331            let section = 1u64;
1332            let blob_name = section.to_be_bytes();
1333            let (blob, _) = context
1334                .open(&cfg.partition, &blob_name)
1335                .await
1336                .expect("Failed to create blob");
1337
1338            // Write size but incomplete item data
1339            let item_size: u32 = 10; // Size indicates 10 bytes of data
1340            let mut buf = Vec::new();
1341            UInt(item_size).write(&mut buf); // Varint encoding
1342            let data = [2u8; 5]; // Only 5 bytes, not 10 + 4 (checksum)
1343            BufMut::put_slice(&mut buf, &data);
1344            blob.write_at(buf, 0)
1345                .await
1346                .expect("Failed to write incomplete item");
1347            blob.sync().await.expect("Failed to sync blob");
1348
1349            // Initialize the journal
1350            let journal = Journal::init(context, cfg)
1351                .await
1352                .expect("Failed to initialize journal");
1353
1354            // Attempt to replay the journal
1355            let stream = journal
1356                .replay(0, 0, NZUsize!(1024))
1357                .await
1358                .expect("unable to setup replay");
1359            pin_mut!(stream);
1360            let mut items = Vec::<(u64, u64)>::new();
1361            while let Some(result) = stream.next().await {
1362                match result {
1363                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1364                    Err(err) => panic!("Failed to read item: {err}"),
1365                }
1366            }
1367            assert!(items.is_empty());
1368        });
1369    }
1370
1371    #[test_traced]
1372    fn test_journal_read_checksum_missing() {
1373        // Initialize the deterministic context
1374        let executor = deterministic::Runner::default();
1375
1376        // Start the test within the executor
1377        executor.start(|context| async move {
1378            // Create a journal configuration
1379            let cfg = Config {
1380                partition: "test_partition".into(),
1381                compression: None,
1382                codec_config: (),
1383                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1384                write_buffer: NZUsize!(1024),
1385            };
1386
1387            // Manually create a blob with missing checksum
1388            let section = 1u64;
1389            let blob_name = section.to_be_bytes();
1390            let (blob, _) = context
1391                .open(&cfg.partition, &blob_name)
1392                .await
1393                .expect("Failed to create blob");
1394
1395            // Prepare item data
1396            let item_data = b"Test data";
1397            let item_size = item_data.len() as u32;
1398
1399            // Write size (varint) and data, but no checksum
1400            let mut buf = Vec::new();
1401            UInt(item_size).write(&mut buf);
1402            BufMut::put_slice(&mut buf, item_data);
1403            blob.write_at(buf, 0)
1404                .await
1405                .expect("Failed to write item without checksum");
1406
1407            blob.sync().await.expect("Failed to sync blob");
1408
1409            // Initialize the journal
1410            let journal = Journal::init(context, cfg)
1411                .await
1412                .expect("Failed to initialize journal");
1413
1414            // Attempt to replay the journal
1415            //
1416            // This will truncate the leftover bytes from our manual write.
1417            let stream = journal
1418                .replay(0, 0, NZUsize!(1024))
1419                .await
1420                .expect("unable to setup replay");
1421            pin_mut!(stream);
1422            let mut items = Vec::<(u64, u64)>::new();
1423            while let Some(result) = stream.next().await {
1424                match result {
1425                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1426                    Err(err) => panic!("Failed to read item: {err}"),
1427                }
1428            }
1429            assert!(items.is_empty());
1430        });
1431    }
1432
1433    #[test_traced]
1434    fn test_journal_read_checksum_mismatch() {
1435        // Initialize the deterministic context
1436        let executor = deterministic::Runner::default();
1437
1438        // Start the test within the executor
1439        executor.start(|context| async move {
1440            // Create a journal configuration
1441            let cfg = Config {
1442                partition: "test_partition".into(),
1443                compression: None,
1444                codec_config: (),
1445                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1446                write_buffer: NZUsize!(1024),
1447            };
1448
1449            // Manually create a blob with incorrect checksum
1450            let section = 1u64;
1451            let blob_name = section.to_be_bytes();
1452            let (blob, _) = context
1453                .open(&cfg.partition, &blob_name)
1454                .await
1455                .expect("Failed to create blob");
1456
1457            // Prepare item data
1458            let item_data = b"Test data";
1459            let item_size = item_data.len() as u32;
1460            let incorrect_checksum: u32 = 0xDEADBEEF;
1461
1462            // Write size (varint), data, and incorrect checksum
1463            let mut buf = Vec::new();
1464            UInt(item_size).write(&mut buf);
1465            BufMut::put_slice(&mut buf, item_data);
1466            buf.put_u32(incorrect_checksum);
1467            blob.write_at(buf, 0)
1468                .await
1469                .expect("Failed to write item with bad checksum");
1470
1471            blob.sync().await.expect("Failed to sync blob");
1472
1473            // Initialize the journal
1474            let journal = Journal::init(context.clone(), cfg.clone())
1475                .await
1476                .expect("Failed to initialize journal");
1477
1478            // Attempt to replay the journal
1479            {
1480                let stream = journal
1481                    .replay(0, 0, NZUsize!(1024))
1482                    .await
1483                    .expect("unable to setup replay");
1484                pin_mut!(stream);
1485                let mut items = Vec::<(u64, u64)>::new();
1486                while let Some(result) = stream.next().await {
1487                    match result {
1488                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1489                        Err(err) => panic!("Failed to read item: {err}"),
1490                    }
1491                }
1492                assert!(items.is_empty());
1493            }
1494            journal.close().await.expect("Failed to close journal");
1495
1496            // Confirm blob is expected length
1497            let (_, blob_size) = context
1498                .open(&cfg.partition, &section.to_be_bytes())
1499                .await
1500                .expect("Failed to open blob");
1501            assert_eq!(blob_size, 0);
1502        });
1503    }
1504
1505    #[test_traced]
1506    fn test_journal_handling_unaligned_truncated_data() {
1507        // Initialize the deterministic context
1508        let executor = deterministic::Runner::default();
1509
1510        // Start the test within the executor
1511        executor.start(|context| async move {
1512            // Create a journal configuration
1513            let cfg = Config {
1514                partition: "test_partition".into(),
1515                compression: None,
1516                codec_config: (),
1517                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1518                write_buffer: NZUsize!(1024),
1519            };
1520
1521            // Initialize the journal
1522            let mut journal = Journal::init(context.clone(), cfg.clone())
1523                .await
1524                .expect("Failed to initialize journal");
1525
1526            // Append 1 item to the first index
1527            journal.append(1, 1).await.expect("Failed to append data");
1528
1529            // Append multiple items to the second index (with unaligned values)
1530            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1531            for (index, data) in &data_items {
1532                journal
1533                    .append(*index, *data)
1534                    .await
1535                    .expect("Failed to append data");
1536                journal.sync(*index).await.expect("Failed to sync blob");
1537            }
1538
1539            // Close the journal
1540            journal.close().await.expect("Failed to close journal");
1541
1542            // Manually corrupt the end of the second blob
1543            let (blob, blob_size) = context
1544                .open(&cfg.partition, &2u64.to_be_bytes())
1545                .await
1546                .expect("Failed to open blob");
1547            blob.resize(blob_size - 4)
1548                .await
1549                .expect("Failed to corrupt blob");
1550            blob.sync().await.expect("Failed to sync blob");
1551
1552            // Re-initialize the journal to simulate a restart
1553            let journal = Journal::init(context.clone(), cfg.clone())
1554                .await
1555                .expect("Failed to re-initialize journal");
1556
1557            // Attempt to replay the journal
1558            let mut items = Vec::<(u64, u32)>::new();
1559            {
1560                let stream = journal
1561                    .replay(0, 0, NZUsize!(1024))
1562                    .await
1563                    .expect("unable to setup replay");
1564                pin_mut!(stream);
1565                while let Some(result) = stream.next().await {
1566                    match result {
1567                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1568                        Err(err) => panic!("Failed to read item: {err}"),
1569                    }
1570                }
1571            }
1572            journal.close().await.expect("Failed to close journal");
1573
1574            // Verify that only non-corrupted items were replayed
1575            assert_eq!(items.len(), 3);
1576            assert_eq!(items[0].0, 1);
1577            assert_eq!(items[0].1, 1);
1578            assert_eq!(items[1].0, data_items[0].0);
1579            assert_eq!(items[1].1, data_items[0].1);
1580            assert_eq!(items[2].0, data_items[1].0);
1581            assert_eq!(items[2].1, data_items[1].1);
1582
1583            // Confirm blob is expected length
1584            // entry = 1 (varint for 4) + 4 (data) + 4 (checksum) = 9 bytes
1585            // Item 2 ends at position 16 + 9 = 25
1586            let (_, blob_size) = context
1587                .open(&cfg.partition, &2u64.to_be_bytes())
1588                .await
1589                .expect("Failed to open blob");
1590            assert_eq!(blob_size, 25);
1591
1592            // Attempt to replay journal after truncation
1593            let mut journal = Journal::init(context.clone(), cfg.clone())
1594                .await
1595                .expect("Failed to re-initialize journal");
1596
1597            // Attempt to replay the journal
1598            let mut items = Vec::<(u64, u32)>::new();
1599            {
1600                let stream = journal
1601                    .replay(0, 0, NZUsize!(1024))
1602                    .await
1603                    .expect("unable to setup replay");
1604                pin_mut!(stream);
1605                while let Some(result) = stream.next().await {
1606                    match result {
1607                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1608                        Err(err) => panic!("Failed to read item: {err}"),
1609                    }
1610                }
1611            }
1612
1613            // Verify that only non-corrupted items were replayed
1614            assert_eq!(items.len(), 3);
1615            assert_eq!(items[0].0, 1);
1616            assert_eq!(items[0].1, 1);
1617            assert_eq!(items[1].0, data_items[0].0);
1618            assert_eq!(items[1].1, data_items[0].1);
1619            assert_eq!(items[2].0, data_items[1].0);
1620            assert_eq!(items[2].1, data_items[1].1);
1621
1622            // Append a new item to truncated partition
1623            journal.append(2, 5).await.expect("Failed to append data");
1624            journal.sync(2).await.expect("Failed to sync blob");
1625
1626            // Get the new item
1627            let item = journal.get(2, 2).await.expect("Failed to get item");
1628            assert_eq!(item, 5);
1629
1630            // Close the journal
1631            journal.close().await.expect("Failed to close journal");
1632
1633            // Confirm blob is expected length
1634            // Items 1 and 2 at positions 0 and 16, item 3 (value 5) at position 32
1635            // Item 3 = 1 (varint) + 4 (data) + 4 (checksum) = 9 bytes, ends at 41
1636            let (_, blob_size) = context
1637                .open(&cfg.partition, &2u64.to_be_bytes())
1638                .await
1639                .expect("Failed to open blob");
1640            assert_eq!(blob_size, 41);
1641
1642            // Re-initialize the journal to simulate a restart
1643            let journal = Journal::init(context.clone(), cfg.clone())
1644                .await
1645                .expect("Failed to re-initialize journal");
1646
1647            // Attempt to replay the journal
1648            let mut items = Vec::<(u64, u32)>::new();
1649            {
1650                let stream = journal
1651                    .replay(0, 0, NZUsize!(1024))
1652                    .await
1653                    .expect("unable to setup replay");
1654                pin_mut!(stream);
1655                while let Some(result) = stream.next().await {
1656                    match result {
1657                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1658                        Err(err) => panic!("Failed to read item: {err}"),
1659                    }
1660                }
1661            }
1662
1663            // Verify that only non-corrupted items were replayed
1664            assert_eq!(items.len(), 4);
1665            assert_eq!(items[0].0, 1);
1666            assert_eq!(items[0].1, 1);
1667            assert_eq!(items[1].0, data_items[0].0);
1668            assert_eq!(items[1].1, data_items[0].1);
1669            assert_eq!(items[2].0, data_items[1].0);
1670            assert_eq!(items[2].1, data_items[1].1);
1671            assert_eq!(items[3].0, 2);
1672            assert_eq!(items[3].1, 5);
1673        });
1674    }
1675
1676    #[test_traced]
1677    fn test_journal_handling_aligned_truncated_data() {
1678        // Initialize the deterministic context
1679        let executor = deterministic::Runner::default();
1680
1681        // Start the test within the executor
1682        executor.start(|context| async move {
1683            // Create a journal configuration
1684            let cfg = Config {
1685                partition: "test_partition".into(),
1686                compression: None,
1687                codec_config: (),
1688                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1689                write_buffer: NZUsize!(1024),
1690            };
1691
1692            // Initialize the journal
1693            let mut journal = Journal::init(context.clone(), cfg.clone())
1694                .await
1695                .expect("Failed to initialize journal");
1696
1697            // Append 1 item to the first index
1698            journal.append(1, 1).await.expect("Failed to append data");
1699
1700            // Append multiple items to the second index (with unaligned values)
1701            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1702            for (index, data) in &data_items {
1703                journal
1704                    .append(*index, *data)
1705                    .await
1706                    .expect("Failed to append data");
1707                journal.sync(*index).await.expect("Failed to sync blob");
1708            }
1709
1710            // Close the journal
1711            journal.close().await.expect("Failed to close journal");
1712
1713            // Manually corrupt the end of the second blob
1714            let (blob, blob_size) = context
1715                .open(&cfg.partition, &2u64.to_be_bytes())
1716                .await
1717                .expect("Failed to open blob");
1718            blob.resize(blob_size - 4)
1719                .await
1720                .expect("Failed to corrupt blob");
1721            blob.sync().await.expect("Failed to sync blob");
1722
1723            // Re-initialize the journal to simulate a restart
1724            let mut journal = Journal::init(context.clone(), cfg.clone())
1725                .await
1726                .expect("Failed to re-initialize journal");
1727
1728            // Attempt to replay the journal
1729            let mut items = Vec::<(u64, u64)>::new();
1730            {
1731                let stream = journal
1732                    .replay(0, 0, NZUsize!(1024))
1733                    .await
1734                    .expect("unable to setup replay");
1735                pin_mut!(stream);
1736                while let Some(result) = stream.next().await {
1737                    match result {
1738                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1739                        Err(err) => panic!("Failed to read item: {err}"),
1740                    }
1741                }
1742            }
1743
1744            // Verify that only non-corrupted items were replayed
1745            assert_eq!(items.len(), 3);
1746            assert_eq!(items[0].0, 1);
1747            assert_eq!(items[0].1, 1);
1748            assert_eq!(items[1].0, data_items[0].0);
1749            assert_eq!(items[1].1, data_items[0].1);
1750            assert_eq!(items[2].0, data_items[1].0);
1751            assert_eq!(items[2].1, data_items[1].1);
1752
1753            // Append a new item to the truncated partition
1754            journal.append(2, 5).await.expect("Failed to append data");
1755            journal.sync(2).await.expect("Failed to sync blob");
1756
1757            // Get the new item
1758            let item = journal.get(2, 2).await.expect("Failed to get item");
1759            assert_eq!(item, 5);
1760
1761            // Close the journal
1762            journal.close().await.expect("Failed to close journal");
1763
1764            // Confirm blob is expected length
1765            // entry = 1 (varint for 8) + 8 (u64 data) + 4 (checksum) = 13 bytes
1766            // Items at positions 0, 16, 32; item 3 ends at 32 + 13 = 45
1767            let (_, blob_size) = context
1768                .open(&cfg.partition, &2u64.to_be_bytes())
1769                .await
1770                .expect("Failed to open blob");
1771            assert_eq!(blob_size, 45);
1772
1773            // Attempt to replay journal after truncation
1774            let journal = Journal::init(context, cfg)
1775                .await
1776                .expect("Failed to re-initialize journal");
1777
1778            // Attempt to replay the journal
1779            let mut items = Vec::<(u64, u64)>::new();
1780            {
1781                let stream = journal
1782                    .replay(0, 0, NZUsize!(1024))
1783                    .await
1784                    .expect("unable to setup replay");
1785                pin_mut!(stream);
1786                while let Some(result) = stream.next().await {
1787                    match result {
1788                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1789                        Err(err) => panic!("Failed to read item: {err}"),
1790                    }
1791                }
1792            }
1793            journal.close().await.expect("Failed to close journal");
1794
1795            // Verify that only non-corrupted items were replayed
1796            assert_eq!(items.len(), 4);
1797            assert_eq!(items[0].0, 1);
1798            assert_eq!(items[0].1, 1);
1799            assert_eq!(items[1].0, data_items[0].0);
1800            assert_eq!(items[1].1, data_items[0].1);
1801            assert_eq!(items[2].0, data_items[1].0);
1802            assert_eq!(items[2].1, data_items[1].1);
1803            assert_eq!(items[3].0, 2);
1804            assert_eq!(items[3].1, 5);
1805        });
1806    }
1807
1808    #[test_traced]
1809    fn test_journal_handling_extra_data() {
1810        // Initialize the deterministic context
1811        let executor = deterministic::Runner::default();
1812
1813        // Start the test within the executor
1814        executor.start(|context| async move {
1815            // Create a journal configuration
1816            let cfg = Config {
1817                partition: "test_partition".into(),
1818                compression: None,
1819                codec_config: (),
1820                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1821                write_buffer: NZUsize!(1024),
1822            };
1823
1824            // Initialize the journal
1825            let mut journal = Journal::init(context.clone(), cfg.clone())
1826                .await
1827                .expect("Failed to initialize journal");
1828
1829            // Append 1 item to the first index
1830            journal.append(1, 1).await.expect("Failed to append data");
1831
1832            // Append multiple items to the second index
1833            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1834            for (index, data) in &data_items {
1835                journal
1836                    .append(*index, *data)
1837                    .await
1838                    .expect("Failed to append data");
1839                journal.sync(*index).await.expect("Failed to sync blob");
1840            }
1841
1842            // Close the journal
1843            journal.close().await.expect("Failed to close journal");
1844
1845            // Manually add extra data to the end of the second blob
1846            let (blob, blob_size) = context
1847                .open(&cfg.partition, &2u64.to_be_bytes())
1848                .await
1849                .expect("Failed to open blob");
1850            blob.write_at(vec![0u8; 16], blob_size)
1851                .await
1852                .expect("Failed to add extra data");
1853            blob.sync().await.expect("Failed to sync blob");
1854
1855            // Re-initialize the journal to simulate a restart
1856            let journal = Journal::init(context, cfg)
1857                .await
1858                .expect("Failed to re-initialize journal");
1859
1860            // Attempt to replay the journal
1861            let mut items = Vec::<(u64, i32)>::new();
1862            let stream = journal
1863                .replay(0, 0, NZUsize!(1024))
1864                .await
1865                .expect("unable to setup replay");
1866            pin_mut!(stream);
1867            while let Some(result) = stream.next().await {
1868                match result {
1869                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1870                    Err(err) => panic!("Failed to read item: {err}"),
1871                }
1872            }
1873        });
1874    }
1875
1876    // Define `MockBlob` that returns an offset length that should overflow
1877    #[derive(Clone)]
1878    struct MockBlob {}
1879
1880    impl Blob for MockBlob {
1881        async fn read_at(
1882            &self,
1883            buf: impl Into<StableBuf> + Send,
1884            _offset: u64,
1885        ) -> Result<StableBuf, RError> {
1886            Ok(buf.into())
1887        }
1888
1889        async fn write_at(
1890            &self,
1891            _buf: impl Into<StableBuf> + Send,
1892            _offset: u64,
1893        ) -> Result<(), RError> {
1894            Ok(())
1895        }
1896
1897        async fn resize(&self, _len: u64) -> Result<(), RError> {
1898            Ok(())
1899        }
1900
1901        async fn sync(&self) -> Result<(), RError> {
1902            Ok(())
1903        }
1904    }
1905
1906    // Define `MockStorage` that returns `MockBlob`
1907    #[derive(Clone)]
1908    struct MockStorage {
1909        len: u64,
1910    }
1911
1912    impl Storage for MockStorage {
1913        type Blob = MockBlob;
1914
1915        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1916            Ok((MockBlob {}, self.len))
1917        }
1918
1919        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1920            Ok(())
1921        }
1922
1923        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1924            Ok(vec![])
1925        }
1926    }
1927
1928    impl Metrics for MockStorage {
1929        fn with_label(&self, _: &str) -> Self {
1930            self.clone()
1931        }
1932
1933        fn label(&self) -> String {
1934            String::new()
1935        }
1936
1937        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1938
1939        fn encode(&self) -> String {
1940            String::new()
1941        }
1942    }
1943
1944    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1945    // changes to the value
1946    const INDEX_ALIGNMENT: u64 = 16;
1947
1948    #[test_traced]
1949    fn test_journal_large_offset() {
1950        // Initialize the deterministic context
1951        let executor = deterministic::Runner::default();
1952        executor.start(|_| async move {
1953            // Create journal
1954            let cfg = Config {
1955                partition: "partition".to_string(),
1956                compression: None,
1957                codec_config: (),
1958                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1959                write_buffer: NZUsize!(1024),
1960            };
1961            let context = MockStorage {
1962                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1963            };
1964            let mut journal = Journal::init(context, cfg).await.unwrap();
1965
1966            // Append data
1967            let data = 1;
1968            let (result, _) = journal
1969                .append(1, data)
1970                .await
1971                .expect("Failed to append data");
1972            assert_eq!(result, u32::MAX);
1973        });
1974    }
1975
1976    #[test_traced]
1977    fn test_journal_offset_overflow() {
1978        // Initialize the deterministic context
1979        let executor = deterministic::Runner::default();
1980        executor.start(|_| async move {
1981            // Create journal
1982            let cfg = Config {
1983                partition: "partition".to_string(),
1984                compression: None,
1985                codec_config: (),
1986                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1987                write_buffer: NZUsize!(1024),
1988            };
1989            let context = MockStorage {
1990                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1991            };
1992            let mut journal = Journal::init(context, cfg).await.unwrap();
1993
1994            // Append data
1995            let data = 1;
1996            let result = journal.append(1, data).await;
1997            assert!(matches!(result, Err(Error::OffsetOverflow)));
1998        });
1999    }
2000
2001    #[test_traced]
2002    fn test_journal_rewind() {
2003        // Initialize the deterministic context
2004        let executor = deterministic::Runner::default();
2005        executor.start(|context| async move {
2006            // Create journal
2007            let cfg = Config {
2008                partition: "test_partition".to_string(),
2009                compression: None,
2010                codec_config: (),
2011                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2012                write_buffer: NZUsize!(1024),
2013            };
2014            let mut journal = Journal::init(context, cfg).await.unwrap();
2015
2016            // Check size of non-existent section
2017            let size = journal.size(1).await.unwrap();
2018            assert_eq!(size, 0);
2019
2020            // Append data to section 1
2021            journal.append(1, 42i32).await.unwrap();
2022
2023            // Check size of section 1 - should be greater than 0
2024            let size = journal.size(1).await.unwrap();
2025            assert!(size > 0);
2026
2027            // Append more data and verify size increases
2028            journal.append(1, 43i32).await.unwrap();
2029            let new_size = journal.size(1).await.unwrap();
2030            assert!(new_size > size);
2031
2032            // Check size of different section - should still be 0
2033            let size = journal.size(2).await.unwrap();
2034            assert_eq!(size, 0);
2035
2036            // Append data to section 2
2037            journal.append(2, 44i32).await.unwrap();
2038
2039            // Check size of section 2 - should be greater than 0
2040            let size = journal.size(2).await.unwrap();
2041            assert!(size > 0);
2042
2043            // Rollback everything in section 1 and 2
2044            journal.rewind(1, 0).await.unwrap();
2045
2046            // Check size of section 1 - should be 0
2047            let size = journal.size(1).await.unwrap();
2048            assert_eq!(size, 0);
2049
2050            // Check size of section 2 - should be 0
2051            let size = journal.size(2).await.unwrap();
2052            assert_eq!(size, 0);
2053        });
2054    }
2055
2056    #[test_traced]
2057    fn test_journal_rewind_section() {
2058        // Initialize the deterministic context
2059        let executor = deterministic::Runner::default();
2060        executor.start(|context| async move {
2061            // Create journal
2062            let cfg = Config {
2063                partition: "test_partition".to_string(),
2064                compression: None,
2065                codec_config: (),
2066                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2067                write_buffer: NZUsize!(1024),
2068            };
2069            let mut journal = Journal::init(context, cfg).await.unwrap();
2070
2071            // Check size of non-existent section
2072            let size = journal.size(1).await.unwrap();
2073            assert_eq!(size, 0);
2074
2075            // Append data to section 1
2076            journal.append(1, 42i32).await.unwrap();
2077
2078            // Check size of section 1 - should be greater than 0
2079            let size = journal.size(1).await.unwrap();
2080            assert!(size > 0);
2081
2082            // Append more data and verify size increases
2083            journal.append(1, 43i32).await.unwrap();
2084            let new_size = journal.size(1).await.unwrap();
2085            assert!(new_size > size);
2086
2087            // Check size of different section - should still be 0
2088            let size = journal.size(2).await.unwrap();
2089            assert_eq!(size, 0);
2090
2091            // Append data to section 2
2092            journal.append(2, 44i32).await.unwrap();
2093
2094            // Check size of section 2 - should be greater than 0
2095            let size = journal.size(2).await.unwrap();
2096            assert!(size > 0);
2097
2098            // Rollback everything in section 1
2099            journal.rewind_section(1, 0).await.unwrap();
2100
2101            // Check size of section 1 - should be 0
2102            let size = journal.size(1).await.unwrap();
2103            assert_eq!(size, 0);
2104
2105            // Check size of section 2 - should be greater than 0
2106            let size = journal.size(2).await.unwrap();
2107            assert!(size > 0);
2108        });
2109    }
2110
2111    /// Protect against accidental changes to the journal disk format.
2112    #[test_traced]
2113    fn test_journal_conformance() {
2114        // Initialize the deterministic context
2115        let executor = deterministic::Runner::default();
2116
2117        // Start the test within the executor
2118        executor.start(|context| async move {
2119            // Create a journal configuration
2120            let cfg = Config {
2121                partition: "test_partition".into(),
2122                compression: None,
2123                codec_config: (),
2124                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2125                write_buffer: NZUsize!(1024),
2126            };
2127
2128            // Initialize the journal
2129            let mut journal = Journal::init(context.clone(), cfg.clone())
2130                .await
2131                .expect("Failed to initialize journal");
2132
2133            // Append 100 items to the journal
2134            for i in 0..100 {
2135                journal.append(1, i).await.expect("Failed to append data");
2136            }
2137            journal.sync(1).await.expect("Failed to sync blob");
2138
2139            // Close the journal
2140            journal.close().await.expect("Failed to close journal");
2141
2142            // Hash blob contents
2143            let (blob, size) = context
2144                .open(&cfg.partition, &1u64.to_be_bytes())
2145                .await
2146                .expect("Failed to open blob");
2147            assert!(size > 0);
2148            let buf = blob
2149                .read_at(vec![0u8; size as usize], 0)
2150                .await
2151                .expect("Failed to read blob");
2152            let digest = Sha256::hash(buf.as_ref());
2153            assert_eq!(
2154                hex(&digest),
2155                "f55bf27a59118603466fcf6a507ab012eea4cb2d6bdd06ce8f515513729af847",
2156            );
2157        });
2158    }
2159}