Skip to main content

commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular [Blob] in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    |
16//! +---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |
18//! +---+---+---+---+---+---+---+---+
19//! ```
20//!
21//! # Open Blobs
22//!
23//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
24//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
25//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
26//! `sections`.
27//!
28//! # Sync
29//!
30//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
31//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
32//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
33//!
34//! # Pruning
35//!
36//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
37//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
38//! could be used, for example, by some blockchain application to prune old blocks.
39//!
40//! # Replay
41//!
42//! During application initialization, it is very common to replay data from `Journal` to recover
43//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
44//! method to produce a stream of all items in the `Journal` in order of their `section` and
45//! `offset`.
46//!
47//! # Compression
48//!
49//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
50//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
51//! be changed between initializations of `Journal`, however, it must remain populated if any data
52//! was written with compression enabled.
53//!
54//! # Example
55//!
56//! ```rust
57//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
58//! use commonware_storage::journal::segmented::variable::{Journal, Config};
59//! use commonware_utils::{NZUsize, NZU16};
60//!
61//! let executor = deterministic::Runner::default();
62//! executor.start(|context| async move {
63//!     // Create a page cache
64//!     let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10));
65//!
66//!     // Create a journal
67//!     let mut journal = Journal::init(context, Config {
68//!         partition: "partition".into(),
69//!         compression: None,
70//!         codec_config: (),
71//!         page_cache,
72//!         write_buffer: NZUsize!(1024 * 1024),
73//!     }).await.unwrap();
74//!
75//!     // Append data to the journal
76//!     journal.append(1, &128).await.unwrap();
77//!
78//!     // Sync the journal
79//!     journal.sync_all().await.unwrap();
80//! });
81//! ```
82
83use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86    varint::{UInt, MAX_U32_VARINT_SIZE},
87    Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90    buffer::paged::{Append, CacheRef, Replay},
91    Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98/// Configuration for `Journal` storage.
99#[derive(Clone)]
100pub struct Config<C> {
101    /// The `commonware-runtime::Storage` partition to use
102    /// for storing journal blobs.
103    pub partition: String,
104
105    /// Optional compression level (using `zstd`) to apply to data before storing.
106    pub compression: Option<u8>,
107
108    /// The codec configuration to use for encoding and decoding items.
109    pub codec_config: C,
110
111    /// The page cache to use for caching data.
112    pub page_cache: CacheRef,
113
114    /// The size of the write buffer to use for each blob.
115    pub write_buffer: NonZeroUsize,
116}
117
118/// Decodes a varint length prefix from a buffer.
119/// Returns (item_size, varint_len).
120#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122    let initial = buf.remaining();
123    let size = UInt::<u32>::read(buf)?.0 as usize;
124    let varint_len = initial - buf.remaining();
125    Ok((size, varint_len))
126}
127
128/// Result of finding an item in a buffer (offsets/lengths, not slices).
129enum ItemInfo {
130    /// All item data is available in the buffer.
131    Complete {
132        /// Length of the varint prefix.
133        varint_len: usize,
134        /// Length of the item data.
135        data_len: usize,
136    },
137    /// Only some item data is available.
138    Incomplete {
139        /// Length of the varint prefix.
140        varint_len: usize,
141        /// Bytes of item data available in buffer.
142        prefix_len: usize,
143        /// Full size of the item.
144        total_len: usize,
145    },
146}
147
148/// Find an item in a buffer by decoding its length prefix.
149///
150/// Returns (next_offset, item_info). The buffer is advanced past the varint.
151fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152    let available = buf.remaining();
153    let (size, varint_len) = decode_length_prefix(buf)?;
154    let next_offset = offset
155        .checked_add(varint_len as u64)
156        .ok_or(Error::OffsetOverflow)?
157        .checked_add(size as u64)
158        .ok_or(Error::OffsetOverflow)?;
159    let buffered = available.saturating_sub(varint_len);
160
161    let item = if buffered >= size {
162        ItemInfo::Complete {
163            varint_len,
164            data_len: size,
165        }
166    } else {
167        ItemInfo::Incomplete {
168            varint_len,
169            prefix_len: buffered,
170            total_len: size,
171        }
172    };
173
174    Ok((next_offset, item))
175}
176
177/// State for replaying a single section's blob.
178struct ReplayState<B: Blob, C> {
179    section: u64,
180    blob: Append<B>,
181    replay: Replay<B>,
182    skip_bytes: u64,
183    offset: u64,
184    valid_offset: u64,
185    codec_config: C,
186    compressed: bool,
187    done: bool,
188}
189
190/// Decode item data with optional decompression.
191fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192    if compressed {
193        let decompressed =
194            decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195        V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196    } else {
197        V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198    }
199}
200
201/// A segmented journal with variable-size entries.
202///
203/// Each section is stored in a separate blob. Items are length-prefixed with a varint.
204///
205/// # Repair
206///
207/// Like
208/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
209/// and
210/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
211/// the first invalid data read will be considered the new end of the journal (and the
212/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
213/// replay (not init) because any blob could have trailing bytes.
214pub struct Journal<E: Storage + Metrics, V: Codec> {
215    manager: Manager<E, AppendFactory>,
216
217    /// Compression level (if enabled).
218    compression: Option<u8>,
219
220    /// Codec configuration.
221    codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225    /// Initialize a new `Journal` instance.
226    ///
227    /// All backing blobs are opened but not read during
228    /// initialization. The `replay` method can be used
229    /// to iterate over all items in the `Journal`.
230    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231        let manager_cfg = ManagerConfig {
232            partition: cfg.partition,
233            factory: AppendFactory {
234                write_buffer: cfg.write_buffer,
235                page_cache_ref: cfg.page_cache,
236            },
237        };
238        let manager = Manager::init(context, manager_cfg).await?;
239
240        Ok(Self {
241            manager,
242            compression: cfg.compression,
243            codec_config: cfg.codec_config,
244        })
245    }
246
247    /// Reads an item from the blob at the given offset.
248    async fn read(
249        compressed: bool,
250        cfg: &V::Cfg,
251        blob: &Append<E::Blob>,
252        offset: u64,
253    ) -> Result<(u64, u32, V), Error> {
254        // Read varint header (max 5 bytes for u32)
255        let (buf, available) = blob
256            .read_up_to(
257                offset,
258                MAX_U32_VARINT_SIZE,
259                IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260            )
261            .await?;
262        let buf = buf.freeze();
263        let mut cursor = Cursor::new(buf.slice(..available));
264        let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266        // Decode item - either directly from buffer or by chaining prefix with remainder
267        let (item_size, decoded) = match item_info {
268            ItemInfo::Complete {
269                varint_len,
270                data_len,
271            } => {
272                // Data follows varint in buffer
273                let data = buf.slice(varint_len..varint_len + data_len);
274                let decoded = decode_item::<V>(data, cfg, compressed)?;
275                (data_len as u32, decoded)
276            }
277            ItemInfo::Incomplete {
278                varint_len,
279                prefix_len,
280                total_len,
281            } => {
282                // Read remainder and chain with prefix to avoid copying
283                let prefix = buf.slice(varint_len..varint_len + prefix_len);
284                let read_offset = offset + varint_len as u64 + prefix_len as u64;
285                let remainder_len = total_len - prefix_len;
286                let mut remainder = vec![0u8; remainder_len];
287                blob.read_into(&mut remainder, read_offset).await?;
288                let chained = prefix.chain(IoBuf::from(remainder));
289                let decoded = decode_item::<V>(chained, cfg, compressed)?;
290                (total_len as u32, decoded)
291            }
292        };
293
294        Ok((next_offset, item_size, decoded))
295    }
296
297    /// Returns an ordered stream of all items in the journal starting with the item at the given
298    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
299    /// (section, offset, size, item).
300    pub async fn replay(
301        &self,
302        start_section: u64,
303        mut start_offset: u64,
304        buffer: NonZeroUsize,
305    ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306        // Collect all blobs to replay (keeping blob reference for potential resize)
307        let codec_config = self.codec_config.clone();
308        let compressed = self.compression.is_some();
309        let mut blobs = Vec::new();
310        for (&section, blob) in self.manager.sections_from(start_section) {
311            blobs.push((
312                section,
313                blob.clone(),
314                blob.replay(buffer).await?,
315                codec_config.clone(),
316                compressed,
317            ));
318        }
319
320        // Stream items as they are read to avoid occupying too much memory
321        Ok(stream::iter(blobs).flat_map(
322            move |(section, blob, replay, codec_config, compressed)| {
323                // Calculate initial skip bytes for first blob
324                let skip_bytes = if section == start_section {
325                    start_offset
326                } else {
327                    start_offset = 0;
328                    0
329                };
330
331                stream::unfold(
332                    ReplayState {
333                        section,
334                        blob,
335                        replay,
336                        skip_bytes,
337                        offset: 0,
338                        valid_offset: skip_bytes,
339                        codec_config,
340                        compressed,
341                        done: false,
342                    },
343                    move |mut state| async move {
344                        if state.done {
345                            return None;
346                        }
347
348                        let blob_size = state.replay.blob_size();
349                        let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350                        loop {
351                            // Ensure we have enough data for varint header.
352                            // ensure() returns Ok(false) if exhausted with fewer bytes,
353                            // but we still try to decode from remaining bytes.
354                            match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355                                Ok(true) => {}
356                                Ok(false) => {
357                                    // Reader exhausted - check if buffer is empty
358                                    if state.replay.remaining() == 0 {
359                                        state.done = true;
360                                        return if batch.is_empty() {
361                                            None
362                                        } else {
363                                            Some((batch, state))
364                                        };
365                                    }
366                                    // Buffer still has data - continue to try decoding
367                                }
368                                Err(err) => {
369                                    batch.push(Err(err.into()));
370                                    state.done = true;
371                                    return Some((batch, state));
372                                }
373                            }
374
375                            // Skip bytes if needed (for start_offset)
376                            if state.skip_bytes > 0 {
377                                let to_skip =
378                                    state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379                                state.replay.advance(to_skip);
380                                state.skip_bytes -= to_skip as u64;
381                                state.offset += to_skip as u64;
382                                continue;
383                            }
384
385                            // Try to decode length prefix
386                            let before_remaining = state.replay.remaining();
387                            let (item_size, varint_len) =
388                                match decode_length_prefix(&mut state.replay) {
389                                    Ok(result) => result,
390                                    Err(err) => {
391                                        // Could be incomplete varint - check if reader exhausted
392                                        if state.replay.is_exhausted()
393                                            || before_remaining < MAX_U32_VARINT_SIZE
394                                        {
395                                            // Treat as trailing bytes
396                                            if state.valid_offset < blob_size
397                                                && state.offset < blob_size
398                                            {
399                                                warn!(
400                                                    blob = state.section,
401                                                    bad_offset = state.offset,
402                                                    new_size = state.valid_offset,
403                                                    "trailing bytes detected: truncating"
404                                                );
405                                                state.blob.resize(state.valid_offset).await.ok()?;
406                                            }
407                                            state.done = true;
408                                            return if batch.is_empty() {
409                                                None
410                                            } else {
411                                                Some((batch, state))
412                                            };
413                                        }
414                                        batch.push(Err(err));
415                                        state.done = true;
416                                        return Some((batch, state));
417                                    }
418                                };
419
420                            // Ensure we have enough data for item body
421                            match state.replay.ensure(item_size).await {
422                                Ok(true) => {}
423                                Ok(false) => {
424                                    // Incomplete item at end - truncate
425                                    warn!(
426                                        blob = state.section,
427                                        bad_offset = state.offset,
428                                        new_size = state.valid_offset,
429                                        "incomplete item at end: truncating"
430                                    );
431                                    state.blob.resize(state.valid_offset).await.ok()?;
432                                    state.done = true;
433                                    return if batch.is_empty() {
434                                        None
435                                    } else {
436                                        Some((batch, state))
437                                    };
438                                }
439                                Err(err) => {
440                                    batch.push(Err(err.into()));
441                                    state.done = true;
442                                    return Some((batch, state));
443                                }
444                            }
445
446                            // Decode item - use take() to limit bytes read
447                            let item_offset = state.offset;
448                            let next_offset = match state
449                                .offset
450                                .checked_add(varint_len as u64)
451                                .and_then(|o| o.checked_add(item_size as u64))
452                            {
453                                Some(o) => o,
454                                None => {
455                                    batch.push(Err(Error::OffsetOverflow));
456                                    state.done = true;
457                                    return Some((batch, state));
458                                }
459                            };
460                            match decode_item::<V>(
461                                (&mut state.replay).take(item_size),
462                                &state.codec_config,
463                                state.compressed,
464                            ) {
465                                Ok(decoded) => {
466                                    batch.push(Ok((
467                                        state.section,
468                                        item_offset,
469                                        item_size as u32,
470                                        decoded,
471                                    )));
472                                    state.valid_offset = next_offset;
473                                    state.offset = next_offset;
474                                }
475                                Err(err) => {
476                                    batch.push(Err(err));
477                                    state.done = true;
478                                    return Some((batch, state));
479                                }
480                            }
481
482                            // Return batch if we have items and buffer is low
483                            if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
484                                return Some((batch, state));
485                            }
486                        }
487                    },
488                )
489                .flat_map(stream::iter)
490            },
491        ))
492    }
493
494    /// Encode an item.
495    ///
496    /// Returns `(buf, item_len)` where `item_len` is the length of the encoded (and
497    /// possibly compressed) payload, excluding the size prefix.
498    pub(crate) fn encode_item(compression: Option<u8>, item: &V) -> Result<(Vec<u8>, u32), Error> {
499        if let Some(compression) = compression {
500            // Compressed: encode first, then compress
501            let encoded = item.encode();
502            let compressed =
503                compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
504            let item_len = compressed.len();
505            let item_len_u32: u32 = match item_len.try_into() {
506                Ok(len) => len,
507                Err(_) => return Err(Error::ItemTooLarge(item_len)),
508            };
509            let size_len = UInt(item_len_u32).encode_size();
510            let entry_len = size_len
511                .checked_add(item_len)
512                .ok_or(Error::OffsetOverflow)?;
513
514            let mut buf = Vec::with_capacity(entry_len);
515            UInt(item_len_u32).write(&mut buf);
516            buf.put_slice(&compressed);
517
518            Ok((buf, item_len_u32))
519        } else {
520            // Uncompressed: pre-allocate exact size to avoid copying
521            let item_len = item.encode_size();
522            let item_len_u32: u32 = match item_len.try_into() {
523                Ok(len) => len,
524                Err(_) => return Err(Error::ItemTooLarge(item_len)),
525            };
526            let size_len = UInt(item_len_u32).encode_size();
527            let entry_len = size_len
528                .checked_add(item_len)
529                .ok_or(Error::OffsetOverflow)?;
530
531            let mut buf = Vec::with_capacity(entry_len);
532            UInt(item_len_u32).write(&mut buf);
533            item.write(&mut buf);
534
535            Ok((buf, item_len_u32))
536        }
537    }
538
539    /// Appends an item to `Journal` in a given `section`, returning the offset
540    /// where the item was written and the size of the item (which may differ
541    /// from the raw encoded size if compression is enabled).
542    pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
543        let (buf, item_len) = Self::encode_item(self.compression, item)?;
544        self.append_raw(section, &buf)
545            .await
546            .map(|offset| (offset, item_len))
547    }
548
549    /// Append pre-encoded bytes to the given section, returning the byte offset
550    /// where the data was written.
551    ///
552    /// The buffer must be in the on-disk format produced by [Self::encode_item].
553    pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<u64, Error> {
554        let blob = self.manager.get_or_create(section).await?;
555        let offset = blob.size().await;
556        blob.append(buf).await?;
557        trace!(blob = section, offset, "appended item");
558        Ok(offset)
559    }
560
561    /// Retrieves an item from `Journal` at a given `section` and `offset`.
562    ///
563    /// # Errors
564    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
565    ///    current execution.
566    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
567    ///    data appended to it, or has been pruned in a previous execution).
568    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
569    ///    previously appended item) will result in an error, with the specific type being
570    ///    undefined.
571    pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
572        let blob = self
573            .manager
574            .get(section)?
575            .ok_or(Error::SectionOutOfRange(section))?;
576
577        // Perform a multi-op read.
578        let (_, _, item) =
579            Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
580        Ok(item)
581    }
582
583    /// Get an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
584    pub fn try_get_sync(&self, section: u64, offset: u64) -> Option<V> {
585        let blob = self.manager.get(section).ok()??;
586        let remaining = blob.try_size()?.checked_sub(offset)?;
587        let header_len = usize::try_from(remaining.min(MAX_U32_VARINT_SIZE as u64)).ok()?;
588        if header_len == 0 {
589            return None;
590        }
591
592        // Read the varint header to determine item size.
593        let mut header = [0u8; MAX_U32_VARINT_SIZE];
594        if !blob.try_read_sync(offset, &mut header[..header_len]) {
595            return None;
596        }
597        let mut cursor = Cursor::new(&header[..header_len]);
598        let (_, item_info) = find_item(&mut cursor, offset).ok()?;
599
600        let (varint_len, data_len) = match item_info {
601            ItemInfo::Complete {
602                varint_len,
603                data_len,
604            } => (varint_len, data_len),
605            ItemInfo::Incomplete {
606                varint_len,
607                total_len,
608                ..
609            } => (varint_len, total_len),
610        };
611        let item_len = varint_len.checked_add(data_len)?;
612        if item_len > usize::try_from(remaining).ok()? {
613            return None;
614        }
615
616        // If the full item fits in the header read, decode directly.
617        if item_len <= header_len {
618            return decode_item::<V>(
619                &header[varint_len..varint_len + data_len],
620                &self.codec_config,
621                self.compression.is_some(),
622            )
623            .ok();
624        }
625
626        // Otherwise try reading the full item from cache.
627        let mut full = vec![0u8; item_len];
628        if !blob.try_read_sync(offset, &mut full) {
629            return None;
630        }
631        decode_item::<V>(
632            &full[varint_len..varint_len + data_len],
633            &self.codec_config,
634            self.compression.is_some(),
635        )
636        .ok()
637    }
638
639    /// Gets the size of the journal for a specific section.
640    ///
641    /// Returns 0 if the section does not exist.
642    pub async fn size(&self, section: u64) -> Result<u64, Error> {
643        self.manager.size(section).await
644    }
645
646    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
647    ///
648    /// # Warnings
649    ///
650    /// * This operation is not guaranteed to survive restarts until sync is called.
651    /// * This operation is not atomic, but it will always leave the journal in a consistent state
652    ///   in the event of failure since blobs are always removed in reverse order of section.
653    pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
654        self.manager.rewind(section, offset).await
655    }
656
657    /// Rewinds the journal to the given `section` and `size`.
658    ///
659    /// This removes any data beyond the specified `section` and `size`.
660    ///
661    /// # Warnings
662    ///
663    /// * This operation is not guaranteed to survive restarts until sync is called.
664    /// * This operation is not atomic, but it will always leave the journal in a consistent state
665    ///   in the event of failure since blobs are always removed in reverse order of section.
666    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
667        self.manager.rewind(section, size).await
668    }
669
670    /// Rewinds the `section` to the given `size`.
671    ///
672    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
673    ///
674    /// # Warning
675    ///
676    /// This operation is not guaranteed to survive restarts until sync is called.
677    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
678        self.manager.rewind_section(section, size).await
679    }
680
681    /// Ensures that all data in a given `section` is synced to the underlying store.
682    ///
683    /// If the `section` does not exist, no error will be returned.
684    pub async fn sync(&self, section: u64) -> Result<(), Error> {
685        self.manager.sync(section).await
686    }
687
688    /// Syncs all open sections.
689    pub async fn sync_all(&self) -> Result<(), Error> {
690        self.manager.sync_all().await
691    }
692
693    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
694    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
695        self.manager.prune(min).await
696    }
697
698    /// Returns the number of the oldest section in the journal.
699    pub fn oldest_section(&self) -> Option<u64> {
700        self.manager.oldest_section()
701    }
702
703    /// Returns the number of the newest section in the journal.
704    pub fn newest_section(&self) -> Option<u64> {
705        self.manager.newest_section()
706    }
707
708    /// Returns true if no sections exist.
709    pub fn is_empty(&self) -> bool {
710        self.manager.is_empty()
711    }
712
713    /// Returns the number of sections.
714    pub fn num_sections(&self) -> usize {
715        self.manager.num_sections()
716    }
717
718    /// Removes any underlying blobs created by the journal.
719    pub async fn destroy(self) -> Result<(), Error> {
720        self.manager.destroy().await
721    }
722
723    /// Clear all data, resetting the journal to an empty state.
724    ///
725    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
726    pub async fn clear(&mut self) -> Result<(), Error> {
727        self.manager.clear().await
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734    use commonware_macros::test_traced;
735    use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
736    use commonware_utils::{NZUsize, NZU16};
737    use futures::{pin_mut, StreamExt};
738    use std::num::NonZeroU16;
739
740    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
741    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
742
743    #[test_traced]
744    fn test_journal_append_and_read() {
745        // Initialize the deterministic context
746        let executor = deterministic::Runner::default();
747
748        // Start the test within the executor
749        executor.start(|context| async move {
750            // Initialize the journal
751            let cfg = Config {
752                partition: "test-partition".into(),
753                compression: None,
754                codec_config: (),
755                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
756                write_buffer: NZUsize!(1024),
757            };
758            let index = 1u64;
759            let data = 10;
760            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
761                .await
762                .expect("Failed to initialize journal");
763
764            // Append an item to the journal
765            journal
766                .append(index, &data)
767                .await
768                .expect("Failed to append data");
769
770            // Check metrics
771            let buffer = context.encode();
772            assert!(buffer.contains("first_tracked 1"));
773
774            // Drop and re-open the journal to simulate a restart
775            journal.sync(index).await.expect("Failed to sync journal");
776            drop(journal);
777            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
778                .await
779                .expect("Failed to re-initialize journal");
780
781            // Replay the journal and collect items
782            let mut items = Vec::new();
783            let stream = journal
784                .replay(0, 0, NZUsize!(1024))
785                .await
786                .expect("unable to setup replay");
787            pin_mut!(stream);
788            while let Some(result) = stream.next().await {
789                match result {
790                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
791                    Err(err) => panic!("Failed to read item: {err}"),
792                }
793            }
794
795            // Verify that the item was replayed correctly
796            assert_eq!(items.len(), 1);
797            assert_eq!(items[0].0, index);
798            assert_eq!(items[0].1, data);
799
800            // Check metrics
801            let buffer = context.encode();
802            assert!(buffer.contains("second_tracked 1"));
803        });
804    }
805
806    #[test_traced]
807    fn test_journal_multiple_appends_and_reads() {
808        // Initialize the deterministic context
809        let executor = deterministic::Runner::default();
810
811        // Start the test within the executor
812        executor.start(|context| async move {
813            // Create a journal configuration
814            let cfg = Config {
815                partition: "test-partition".into(),
816                compression: None,
817                codec_config: (),
818                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
819                write_buffer: NZUsize!(1024),
820            };
821
822            // Initialize the journal
823            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
824                .await
825                .expect("Failed to initialize journal");
826
827            // Append multiple items to different blobs
828            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
829            for (index, data) in &data_items {
830                journal
831                    .append(*index, data)
832                    .await
833                    .expect("Failed to append data");
834                journal.sync(*index).await.expect("Failed to sync blob");
835            }
836
837            // Check metrics
838            let buffer = context.encode();
839            assert!(buffer.contains("first_tracked 3"));
840            assert!(buffer.contains("first_synced_total 4"));
841
842            // Drop and re-open the journal to simulate a restart
843            drop(journal);
844            let journal = Journal::init(context.with_label("second"), cfg)
845                .await
846                .expect("Failed to re-initialize journal");
847
848            // Replay the journal and collect items
849            let mut items = Vec::<(u64, u32)>::new();
850            {
851                let stream = journal
852                    .replay(0, 0, NZUsize!(1024))
853                    .await
854                    .expect("unable to setup replay");
855                pin_mut!(stream);
856                while let Some(result) = stream.next().await {
857                    match result {
858                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
859                        Err(err) => panic!("Failed to read item: {err}"),
860                    }
861                }
862            }
863
864            // Verify that all items were replayed correctly
865            assert_eq!(items.len(), data_items.len());
866            for ((expected_index, expected_data), (actual_index, actual_data)) in
867                data_items.iter().zip(items.iter())
868            {
869                assert_eq!(actual_index, expected_index);
870                assert_eq!(actual_data, expected_data);
871            }
872
873            // Cleanup
874            journal.destroy().await.expect("Failed to destroy journal");
875        });
876    }
877
878    #[test_traced]
879    fn test_journal_prune_blobs() {
880        // Initialize the deterministic context
881        let executor = deterministic::Runner::default();
882
883        // Start the test within the executor
884        executor.start(|context| async move {
885            // Create a journal configuration
886            let cfg = Config {
887                partition: "test-partition".into(),
888                compression: None,
889                codec_config: (),
890                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
891                write_buffer: NZUsize!(1024),
892            };
893
894            // Initialize the journal
895            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
896                .await
897                .expect("Failed to initialize journal");
898
899            // Append items to multiple blobs
900            for index in 1u64..=5u64 {
901                journal
902                    .append(index, &index)
903                    .await
904                    .expect("Failed to append data");
905                journal.sync(index).await.expect("Failed to sync blob");
906            }
907
908            // Add one item out-of-order
909            let data = 99;
910            journal
911                .append(2u64, &data)
912                .await
913                .expect("Failed to append data");
914            journal.sync(2u64).await.expect("Failed to sync blob");
915
916            // Prune blobs with indices less than 3
917            journal.prune(3).await.expect("Failed to prune blobs");
918
919            // Check metrics
920            let buffer = context.encode();
921            assert!(buffer.contains("first_pruned_total 2"));
922
923            // Prune again with a section less than the previous one, should be a no-op
924            journal.prune(2).await.expect("Failed to no-op prune");
925            let buffer = context.encode();
926            assert!(buffer.contains("first_pruned_total 2"));
927
928            // Drop and re-open the journal to simulate a restart
929            drop(journal);
930            let mut journal = Journal::init(context.with_label("second"), cfg.clone())
931                .await
932                .expect("Failed to re-initialize journal");
933
934            // Replay the journal and collect items
935            let mut items = Vec::<(u64, u64)>::new();
936            {
937                let stream = journal
938                    .replay(0, 0, NZUsize!(1024))
939                    .await
940                    .expect("unable to setup replay");
941                pin_mut!(stream);
942                while let Some(result) = stream.next().await {
943                    match result {
944                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
945                        Err(err) => panic!("Failed to read item: {err}"),
946                    }
947                }
948            }
949
950            // Verify that items from blobs 1 and 2 are not present
951            assert_eq!(items.len(), 3);
952            let expected_indices = [3u64, 4u64, 5u64];
953            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
954                assert_eq!(item.0, *expected_index);
955            }
956
957            // Prune all blobs
958            journal.prune(6).await.expect("Failed to prune blobs");
959
960            // Drop the journal
961            drop(journal);
962
963            // Ensure no remaining blobs exist
964            //
965            // Note: We don't remove the partition, so this does not error
966            // and instead returns an empty list of blobs.
967            assert!(context
968                .scan(&cfg.partition)
969                .await
970                .expect("Failed to list blobs")
971                .is_empty());
972        });
973    }
974
975    #[test_traced]
976    fn test_journal_prune_guard() {
977        let executor = deterministic::Runner::default();
978
979        executor.start(|context| async move {
980            let cfg = Config {
981                partition: "test-partition".into(),
982                compression: None,
983                codec_config: (),
984                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
985                write_buffer: NZUsize!(1024),
986            };
987
988            let mut journal = Journal::init(context.clone(), cfg.clone())
989                .await
990                .expect("Failed to initialize journal");
991
992            // Append items to sections 1-5
993            for section in 1u64..=5u64 {
994                journal
995                    .append(section, &(section as i32))
996                    .await
997                    .expect("Failed to append data");
998                journal.sync(section).await.expect("Failed to sync");
999            }
1000
1001            // Prune sections < 3
1002            journal.prune(3).await.expect("Failed to prune");
1003
1004            // Test that accessing pruned sections returns the correct error
1005
1006            // Test append on pruned section
1007            match journal.append(1, &100).await {
1008                Err(Error::AlreadyPrunedToSection(3)) => {}
1009                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1010            }
1011
1012            match journal.append(2, &100).await {
1013                Err(Error::AlreadyPrunedToSection(3)) => {}
1014                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1015            }
1016
1017            // Test get on pruned section
1018            match journal.get(1, 0).await {
1019                Err(Error::AlreadyPrunedToSection(3)) => {}
1020                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1021            }
1022
1023            // Test size on pruned section
1024            match journal.size(1).await {
1025                Err(Error::AlreadyPrunedToSection(3)) => {}
1026                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1027            }
1028
1029            // Test rewind on pruned section
1030            match journal.rewind(2, 0).await {
1031                Err(Error::AlreadyPrunedToSection(3)) => {}
1032                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1033            }
1034
1035            // Test rewind_section on pruned section
1036            match journal.rewind_section(1, 0).await {
1037                Err(Error::AlreadyPrunedToSection(3)) => {}
1038                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1039            }
1040
1041            // Test sync on pruned section
1042            match journal.sync(2).await {
1043                Err(Error::AlreadyPrunedToSection(3)) => {}
1044                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1045            }
1046
1047            // Test that accessing sections at or after the threshold works
1048            assert!(journal.get(3, 0).await.is_ok());
1049            assert!(journal.get(4, 0).await.is_ok());
1050            assert!(journal.get(5, 0).await.is_ok());
1051            assert!(journal.size(3).await.is_ok());
1052            assert!(journal.sync(4).await.is_ok());
1053
1054            // Append to section at threshold should work
1055            journal
1056                .append(3, &999)
1057                .await
1058                .expect("Should be able to append to section 3");
1059
1060            // Prune more sections
1061            journal.prune(5).await.expect("Failed to prune");
1062
1063            // Verify sections 3 and 4 are now pruned
1064            match journal.get(3, 0).await {
1065                Err(Error::AlreadyPrunedToSection(5)) => {}
1066                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1067            }
1068
1069            match journal.get(4, 0).await {
1070                Err(Error::AlreadyPrunedToSection(5)) => {}
1071                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1072            }
1073
1074            // Section 5 should still be accessible
1075            assert!(journal.get(5, 0).await.is_ok());
1076        });
1077    }
1078
1079    #[test_traced]
1080    fn test_journal_prune_guard_across_restart() {
1081        let executor = deterministic::Runner::default();
1082
1083        executor.start(|context| async move {
1084            let cfg = Config {
1085                partition: "test-partition".into(),
1086                compression: None,
1087                codec_config: (),
1088                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1089                write_buffer: NZUsize!(1024),
1090            };
1091
1092            // First session: create and prune
1093            {
1094                let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1095                    .await
1096                    .expect("Failed to initialize journal");
1097
1098                for section in 1u64..=5u64 {
1099                    journal
1100                        .append(section, &(section as i32))
1101                        .await
1102                        .expect("Failed to append data");
1103                    journal.sync(section).await.expect("Failed to sync");
1104                }
1105
1106                journal.prune(3).await.expect("Failed to prune");
1107            }
1108
1109            // Second session: verify oldest_retained_section is reset
1110            {
1111                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1112                    .await
1113                    .expect("Failed to re-initialize journal");
1114
1115                // But the actual sections 1 and 2 should be gone from storage
1116                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1117                match journal.get(1, 0).await {
1118                    Err(Error::SectionOutOfRange(1)) => {}
1119                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1120                }
1121
1122                match journal.get(2, 0).await {
1123                    Err(Error::SectionOutOfRange(2)) => {}
1124                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1125                }
1126
1127                // Sections 3-5 should still be accessible
1128                assert!(journal.get(3, 0).await.is_ok());
1129                assert!(journal.get(4, 0).await.is_ok());
1130                assert!(journal.get(5, 0).await.is_ok());
1131            }
1132        });
1133    }
1134
1135    #[test_traced]
1136    fn test_journal_with_invalid_blob_name() {
1137        // Initialize the deterministic context
1138        let executor = deterministic::Runner::default();
1139
1140        // Start the test within the executor
1141        executor.start(|context| async move {
1142            // Create a journal configuration
1143            let cfg = Config {
1144                partition: "test-partition".into(),
1145                compression: None,
1146                codec_config: (),
1147                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1148                write_buffer: NZUsize!(1024),
1149            };
1150
1151            // Manually create a blob with an invalid name (not 8 bytes)
1152            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1153            let (blob, _) = context
1154                .open(&cfg.partition, invalid_blob_name)
1155                .await
1156                .expect("Failed to create blob with invalid name");
1157            blob.sync().await.expect("Failed to sync blob");
1158
1159            // Attempt to initialize the journal
1160            let result = Journal::<_, u64>::init(context, cfg).await;
1161
1162            // Expect an error
1163            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1164        });
1165    }
1166
1167    #[test_traced]
1168    fn test_journal_read_size_missing() {
1169        // Initialize the deterministic context
1170        let executor = deterministic::Runner::default();
1171
1172        // Start the test within the executor
1173        executor.start(|context| async move {
1174            // Create a journal configuration
1175            let cfg = Config {
1176                partition: "test-partition".into(),
1177                compression: None,
1178                codec_config: (),
1179                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1180                write_buffer: NZUsize!(1024),
1181            };
1182
1183            // Manually create a blob with incomplete size data
1184            let section = 1u64;
1185            let blob_name = section.to_be_bytes();
1186            let (blob, _) = context
1187                .open(&cfg.partition, &blob_name)
1188                .await
1189                .expect("Failed to create blob");
1190
1191            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1192            let mut incomplete_data = Vec::new();
1193            UInt(u32::MAX).write(&mut incomplete_data);
1194            incomplete_data.truncate(1);
1195            blob.write_at(0, incomplete_data)
1196                .await
1197                .expect("Failed to write incomplete data");
1198            blob.sync().await.expect("Failed to sync blob");
1199
1200            // Initialize the journal
1201            let journal = Journal::init(context, cfg)
1202                .await
1203                .expect("Failed to initialize journal");
1204
1205            // Attempt to replay the journal
1206            let stream = journal
1207                .replay(0, 0, NZUsize!(1024))
1208                .await
1209                .expect("unable to setup replay");
1210            pin_mut!(stream);
1211            let mut items = Vec::<(u64, u64)>::new();
1212            while let Some(result) = stream.next().await {
1213                match result {
1214                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1215                    Err(err) => panic!("Failed to read item: {err}"),
1216                }
1217            }
1218            assert!(items.is_empty());
1219        });
1220    }
1221
1222    #[test_traced]
1223    fn test_journal_read_item_missing() {
1224        // Initialize the deterministic context
1225        let executor = deterministic::Runner::default();
1226
1227        // Start the test within the executor
1228        executor.start(|context| async move {
1229            // Create a journal configuration
1230            let cfg = Config {
1231                partition: "test-partition".into(),
1232                compression: None,
1233                codec_config: (),
1234                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1235                write_buffer: NZUsize!(1024),
1236            };
1237
1238            // Manually create a blob with missing item data
1239            let section = 1u64;
1240            let blob_name = section.to_be_bytes();
1241            let (blob, _) = context
1242                .open(&cfg.partition, &blob_name)
1243                .await
1244                .expect("Failed to create blob");
1245
1246            // Write size but incomplete item data
1247            let item_size: u32 = 10; // Size indicates 10 bytes of data
1248            let mut buf = Vec::new();
1249            UInt(item_size).write(&mut buf); // Varint encoding
1250            let data = [2u8; 5];
1251            BufMut::put_slice(&mut buf, &data);
1252            blob.write_at(0, buf)
1253                .await
1254                .expect("Failed to write incomplete item");
1255            blob.sync().await.expect("Failed to sync blob");
1256
1257            // Initialize the journal
1258            let journal = Journal::init(context, cfg)
1259                .await
1260                .expect("Failed to initialize journal");
1261
1262            // Attempt to replay the journal
1263            let stream = journal
1264                .replay(0, 0, NZUsize!(1024))
1265                .await
1266                .expect("unable to setup replay");
1267            pin_mut!(stream);
1268            let mut items = Vec::<(u64, u64)>::new();
1269            while let Some(result) = stream.next().await {
1270                match result {
1271                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1272                    Err(err) => panic!("Failed to read item: {err}"),
1273                }
1274            }
1275            assert!(items.is_empty());
1276        });
1277    }
1278
1279    #[test_traced]
1280    fn test_journal_read_checksum_missing() {
1281        // Initialize the deterministic context
1282        let executor = deterministic::Runner::default();
1283
1284        // Start the test within the executor
1285        executor.start(|context| async move {
1286            // Create a journal configuration
1287            let cfg = Config {
1288                partition: "test-partition".into(),
1289                compression: None,
1290                codec_config: (),
1291                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1292                write_buffer: NZUsize!(1024),
1293            };
1294
1295            // Manually create a blob with missing checksum
1296            let section = 1u64;
1297            let blob_name = section.to_be_bytes();
1298            let (blob, _) = context
1299                .open(&cfg.partition, &blob_name)
1300                .await
1301                .expect("Failed to create blob");
1302
1303            // Prepare item data
1304            let item_data = b"Test data";
1305            let item_size = item_data.len() as u32;
1306
1307            // Write size (varint) and data, but no checksum
1308            let mut buf = Vec::new();
1309            UInt(item_size).write(&mut buf);
1310            BufMut::put_slice(&mut buf, item_data);
1311            blob.write_at(0, buf)
1312                .await
1313                .expect("Failed to write item without checksum");
1314
1315            blob.sync().await.expect("Failed to sync blob");
1316
1317            // Initialize the journal
1318            let journal = Journal::init(context, cfg)
1319                .await
1320                .expect("Failed to initialize journal");
1321
1322            // Attempt to replay the journal
1323            //
1324            // This will truncate the leftover bytes from our manual write.
1325            let stream = journal
1326                .replay(0, 0, NZUsize!(1024))
1327                .await
1328                .expect("unable to setup replay");
1329            pin_mut!(stream);
1330            let mut items = Vec::<(u64, u64)>::new();
1331            while let Some(result) = stream.next().await {
1332                match result {
1333                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1334                    Err(err) => panic!("Failed to read item: {err}"),
1335                }
1336            }
1337            assert!(items.is_empty());
1338        });
1339    }
1340
1341    #[test_traced]
1342    fn test_journal_read_checksum_mismatch() {
1343        // Initialize the deterministic context
1344        let executor = deterministic::Runner::default();
1345
1346        // Start the test within the executor
1347        executor.start(|context| async move {
1348            // Create a journal configuration
1349            let cfg = Config {
1350                partition: "test-partition".into(),
1351                compression: None,
1352                codec_config: (),
1353                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1354                write_buffer: NZUsize!(1024),
1355            };
1356
1357            // Manually create a blob with incorrect checksum
1358            let section = 1u64;
1359            let blob_name = section.to_be_bytes();
1360            let (blob, _) = context
1361                .open(&cfg.partition, &blob_name)
1362                .await
1363                .expect("Failed to create blob");
1364
1365            // Prepare item data
1366            let item_data = b"Test data";
1367            let item_size = item_data.len() as u32;
1368            let incorrect_checksum: u32 = 0xDEADBEEF;
1369
1370            // Write size (varint), data, and incorrect checksum
1371            let mut buf = Vec::new();
1372            UInt(item_size).write(&mut buf);
1373            BufMut::put_slice(&mut buf, item_data);
1374            buf.put_u32(incorrect_checksum);
1375            blob.write_at(0, buf)
1376                .await
1377                .expect("Failed to write item with bad checksum");
1378
1379            blob.sync().await.expect("Failed to sync blob");
1380
1381            // Initialize the journal
1382            let journal = Journal::init(context.clone(), cfg.clone())
1383                .await
1384                .expect("Failed to initialize journal");
1385
1386            // Attempt to replay the journal
1387            {
1388                let stream = journal
1389                    .replay(0, 0, NZUsize!(1024))
1390                    .await
1391                    .expect("unable to setup replay");
1392                pin_mut!(stream);
1393                let mut items = Vec::<(u64, u64)>::new();
1394                while let Some(result) = stream.next().await {
1395                    match result {
1396                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1397                        Err(err) => panic!("Failed to read item: {err}"),
1398                    }
1399                }
1400                assert!(items.is_empty());
1401            }
1402            drop(journal);
1403
1404            // Confirm blob is expected length
1405            let (_, blob_size) = context
1406                .open(&cfg.partition, &section.to_be_bytes())
1407                .await
1408                .expect("Failed to open blob");
1409            assert_eq!(blob_size, 0);
1410        });
1411    }
1412
1413    #[test_traced]
1414    fn test_journal_truncation_recovery() {
1415        // Initialize the deterministic context
1416        let executor = deterministic::Runner::default();
1417
1418        // Start the test within the executor
1419        executor.start(|context| async move {
1420            // Create a journal configuration
1421            let cfg = Config {
1422                partition: "test-partition".into(),
1423                compression: None,
1424                codec_config: (),
1425                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1426                write_buffer: NZUsize!(1024),
1427            };
1428
1429            // Initialize the journal
1430            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1431                .await
1432                .expect("Failed to initialize journal");
1433
1434            // Append 1 item to the first index
1435            journal.append(1, &1).await.expect("Failed to append data");
1436
1437            // Append multiple items to the second section
1438            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1439            for (index, data) in &data_items {
1440                journal
1441                    .append(*index, data)
1442                    .await
1443                    .expect("Failed to append data");
1444                journal.sync(*index).await.expect("Failed to sync blob");
1445            }
1446
1447            // Sync all sections and drop the journal
1448            journal.sync_all().await.expect("Failed to sync");
1449            drop(journal);
1450
1451            // Manually corrupt the end of the second blob
1452            let (blob, blob_size) = context
1453                .open(&cfg.partition, &2u64.to_be_bytes())
1454                .await
1455                .expect("Failed to open blob");
1456            blob.resize(blob_size - 4)
1457                .await
1458                .expect("Failed to corrupt blob");
1459            blob.sync().await.expect("Failed to sync blob");
1460
1461            // Re-initialize the journal to simulate a restart
1462            let journal = Journal::init(context.with_label("second"), cfg.clone())
1463                .await
1464                .expect("Failed to re-initialize journal");
1465
1466            // Attempt to replay the journal
1467            let mut items = Vec::<(u64, u32)>::new();
1468            {
1469                let stream = journal
1470                    .replay(0, 0, NZUsize!(1024))
1471                    .await
1472                    .expect("unable to setup replay");
1473                pin_mut!(stream);
1474                while let Some(result) = stream.next().await {
1475                    match result {
1476                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1477                        Err(err) => panic!("Failed to read item: {err}"),
1478                    }
1479                }
1480            }
1481            drop(journal);
1482
1483            // Verify that replay stopped after corruption detected (the second blob).
1484            assert_eq!(items.len(), 1);
1485            assert_eq!(items[0].0, 1);
1486            assert_eq!(items[0].1, 1);
1487
1488            // Confirm second blob was truncated.
1489            let (_, blob_size) = context
1490                .open(&cfg.partition, &2u64.to_be_bytes())
1491                .await
1492                .expect("Failed to open blob");
1493            assert_eq!(blob_size, 0);
1494
1495            // Attempt to replay journal after truncation
1496            let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1497                .await
1498                .expect("Failed to re-initialize journal");
1499
1500            // Attempt to replay the journal
1501            let mut items = Vec::<(u64, u32)>::new();
1502            {
1503                let stream = journal
1504                    .replay(0, 0, NZUsize!(1024))
1505                    .await
1506                    .expect("unable to setup replay");
1507                pin_mut!(stream);
1508                while let Some(result) = stream.next().await {
1509                    match result {
1510                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1511                        Err(err) => panic!("Failed to read item: {err}"),
1512                    }
1513                }
1514            }
1515
1516            // Verify that only non-corrupted items were replayed
1517            assert_eq!(items.len(), 1);
1518            assert_eq!(items[0].0, 1);
1519            assert_eq!(items[0].1, 1);
1520
1521            // Append a new item to truncated partition
1522            let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1523            journal.sync(2).await.expect("Failed to sync blob");
1524
1525            // Get the new item (offset is 0 since blob was truncated)
1526            let item = journal.get(2, 0).await.expect("Failed to get item");
1527            assert_eq!(item, 5);
1528
1529            // Drop the journal (data already synced)
1530            drop(journal);
1531
1532            // Re-initialize the journal to simulate a restart
1533            let journal = Journal::init(context.clone(), cfg.clone())
1534                .await
1535                .expect("Failed to re-initialize journal");
1536
1537            // Attempt to replay the journal
1538            let mut items = Vec::<(u64, u32)>::new();
1539            {
1540                let stream = journal
1541                    .replay(0, 0, NZUsize!(1024))
1542                    .await
1543                    .expect("unable to setup replay");
1544                pin_mut!(stream);
1545                while let Some(result) = stream.next().await {
1546                    match result {
1547                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1548                        Err(err) => panic!("Failed to read item: {err}"),
1549                    }
1550                }
1551            }
1552
1553            // Verify that only non-corrupted items were replayed
1554            assert_eq!(items.len(), 2);
1555            assert_eq!(items[0].0, 1);
1556            assert_eq!(items[0].1, 1);
1557            assert_eq!(items[1].0, 2);
1558            assert_eq!(items[1].1, 5);
1559        });
1560    }
1561
1562    #[test_traced]
1563    fn test_journal_handling_extra_data() {
1564        // Initialize the deterministic context
1565        let executor = deterministic::Runner::default();
1566
1567        // Start the test within the executor
1568        executor.start(|context| async move {
1569            // Create a journal configuration
1570            let cfg = Config {
1571                partition: "test-partition".into(),
1572                compression: None,
1573                codec_config: (),
1574                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1575                write_buffer: NZUsize!(1024),
1576            };
1577
1578            // Initialize the journal
1579            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1580                .await
1581                .expect("Failed to initialize journal");
1582
1583            // Append 1 item to the first index
1584            journal.append(1, &1).await.expect("Failed to append data");
1585
1586            // Append multiple items to the second index
1587            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1588            for (index, data) in &data_items {
1589                journal
1590                    .append(*index, data)
1591                    .await
1592                    .expect("Failed to append data");
1593                journal.sync(*index).await.expect("Failed to sync blob");
1594            }
1595
1596            // Sync all sections and drop the journal
1597            journal.sync_all().await.expect("Failed to sync");
1598            drop(journal);
1599
1600            // Manually add extra data to the end of the second blob
1601            let (blob, blob_size) = context
1602                .open(&cfg.partition, &2u64.to_be_bytes())
1603                .await
1604                .expect("Failed to open blob");
1605            blob.write_at(blob_size, vec![0u8; 16])
1606                .await
1607                .expect("Failed to add extra data");
1608            blob.sync().await.expect("Failed to sync blob");
1609
1610            // Re-initialize the journal to simulate a restart
1611            let journal = Journal::init(context.with_label("second"), cfg)
1612                .await
1613                .expect("Failed to re-initialize journal");
1614
1615            // Attempt to replay the journal
1616            let mut items = Vec::<(u64, i32)>::new();
1617            let stream = journal
1618                .replay(0, 0, NZUsize!(1024))
1619                .await
1620                .expect("unable to setup replay");
1621            pin_mut!(stream);
1622            while let Some(result) = stream.next().await {
1623                match result {
1624                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1625                    Err(err) => panic!("Failed to read item: {err}"),
1626                }
1627            }
1628        });
1629    }
1630
1631    #[test_traced]
1632    fn test_journal_rewind() {
1633        // Initialize the deterministic context
1634        let executor = deterministic::Runner::default();
1635        executor.start(|context| async move {
1636            // Create journal
1637            let cfg = Config {
1638                partition: "test-partition".into(),
1639                compression: None,
1640                codec_config: (),
1641                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1642                write_buffer: NZUsize!(1024),
1643            };
1644            let mut journal = Journal::init(context, cfg).await.unwrap();
1645
1646            // Check size of non-existent section
1647            let size = journal.size(1).await.unwrap();
1648            assert_eq!(size, 0);
1649
1650            // Append data to section 1
1651            journal.append(1, &42i32).await.unwrap();
1652
1653            // Check size of section 1 - should be greater than 0
1654            let size = journal.size(1).await.unwrap();
1655            assert!(size > 0);
1656
1657            // Append more data and verify size increases
1658            journal.append(1, &43i32).await.unwrap();
1659            let new_size = journal.size(1).await.unwrap();
1660            assert!(new_size > size);
1661
1662            // Check size of different section - should still be 0
1663            let size = journal.size(2).await.unwrap();
1664            assert_eq!(size, 0);
1665
1666            // Append data to section 2
1667            journal.append(2, &44i32).await.unwrap();
1668
1669            // Check size of section 2 - should be greater than 0
1670            let size = journal.size(2).await.unwrap();
1671            assert!(size > 0);
1672
1673            // Rollback everything in section 1 and 2
1674            journal.rewind(1, 0).await.unwrap();
1675
1676            // Check size of section 1 - should be 0
1677            let size = journal.size(1).await.unwrap();
1678            assert_eq!(size, 0);
1679
1680            // Check size of section 2 - should be 0
1681            let size = journal.size(2).await.unwrap();
1682            assert_eq!(size, 0);
1683        });
1684    }
1685
1686    #[test_traced]
1687    fn test_journal_rewind_section() {
1688        // Initialize the deterministic context
1689        let executor = deterministic::Runner::default();
1690        executor.start(|context| async move {
1691            // Create journal
1692            let cfg = Config {
1693                partition: "test-partition".into(),
1694                compression: None,
1695                codec_config: (),
1696                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1697                write_buffer: NZUsize!(1024),
1698            };
1699            let mut journal = Journal::init(context, cfg).await.unwrap();
1700
1701            // Check size of non-existent section
1702            let size = journal.size(1).await.unwrap();
1703            assert_eq!(size, 0);
1704
1705            // Append data to section 1
1706            journal.append(1, &42i32).await.unwrap();
1707
1708            // Check size of section 1 - should be greater than 0
1709            let size = journal.size(1).await.unwrap();
1710            assert!(size > 0);
1711
1712            // Append more data and verify size increases
1713            journal.append(1, &43i32).await.unwrap();
1714            let new_size = journal.size(1).await.unwrap();
1715            assert!(new_size > size);
1716
1717            // Check size of different section - should still be 0
1718            let size = journal.size(2).await.unwrap();
1719            assert_eq!(size, 0);
1720
1721            // Append data to section 2
1722            journal.append(2, &44i32).await.unwrap();
1723
1724            // Check size of section 2 - should be greater than 0
1725            let size = journal.size(2).await.unwrap();
1726            assert!(size > 0);
1727
1728            // Rollback everything in section 1
1729            journal.rewind_section(1, 0).await.unwrap();
1730
1731            // Check size of section 1 - should be 0
1732            let size = journal.size(1).await.unwrap();
1733            assert_eq!(size, 0);
1734
1735            // Check size of section 2 - should be greater than 0
1736            let size = journal.size(2).await.unwrap();
1737            assert!(size > 0);
1738        });
1739    }
1740
1741    #[test_traced]
1742    fn test_journal_small_items() {
1743        let executor = deterministic::Runner::default();
1744        executor.start(|context| async move {
1745            let cfg = Config {
1746                partition: "test-partition".into(),
1747                compression: None,
1748                codec_config: (),
1749                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1750                write_buffer: NZUsize!(1024),
1751            };
1752
1753            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1754                .await
1755                .expect("Failed to initialize journal");
1756
1757            // Append many small (1-byte) items to the same section
1758            let num_items = 100;
1759            let mut offsets = Vec::new();
1760            for i in 0..num_items {
1761                let (offset, size) = journal
1762                    .append(1, &(i as u8))
1763                    .await
1764                    .expect("Failed to append data");
1765                assert_eq!(size, 1, "u8 should encode to 1 byte");
1766                offsets.push(offset);
1767            }
1768            journal.sync(1).await.expect("Failed to sync");
1769
1770            // Read each item back via random access
1771            for (i, &offset) in offsets.iter().enumerate() {
1772                let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1773                assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1774            }
1775
1776            // Drop and reopen to test replay
1777            drop(journal);
1778            let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1779                .await
1780                .expect("Failed to re-initialize journal");
1781
1782            // Replay and verify all items
1783            let stream = journal
1784                .replay(0, 0, NZUsize!(1024))
1785                .await
1786                .expect("Failed to setup replay");
1787            pin_mut!(stream);
1788
1789            let mut count = 0;
1790            while let Some(result) = stream.next().await {
1791                let (section, offset, size, item) = result.expect("Failed to replay item");
1792                assert_eq!(section, 1);
1793                assert_eq!(offset, offsets[count]);
1794                assert_eq!(size, 1);
1795                assert_eq!(item, count as u8);
1796                count += 1;
1797            }
1798            assert_eq!(count, num_items, "Should replay all items");
1799        });
1800    }
1801
1802    #[test_traced]
1803    fn test_journal_rewind_many_sections() {
1804        let executor = deterministic::Runner::default();
1805        executor.start(|context| async move {
1806            let cfg = Config {
1807                partition: "test-partition".into(),
1808                compression: None,
1809                codec_config: (),
1810                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1811                write_buffer: NZUsize!(1024),
1812            };
1813            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1814
1815            // Create sections 1-10 with data
1816            for section in 1u64..=10 {
1817                journal.append(section, &(section as i32)).await.unwrap();
1818            }
1819            journal.sync_all().await.unwrap();
1820
1821            // Verify all sections exist
1822            for section in 1u64..=10 {
1823                let size = journal.size(section).await.unwrap();
1824                assert!(size > 0, "section {section} should have data");
1825            }
1826
1827            // Rewind to section 5 (should remove sections 6-10)
1828            journal
1829                .rewind(5, journal.size(5).await.unwrap())
1830                .await
1831                .unwrap();
1832
1833            // Verify sections 1-5 still exist with correct data
1834            for section in 1u64..=5 {
1835                let size = journal.size(section).await.unwrap();
1836                assert!(size > 0, "section {section} should still have data");
1837            }
1838
1839            // Verify sections 6-10 are removed (size should be 0)
1840            for section in 6u64..=10 {
1841                let size = journal.size(section).await.unwrap();
1842                assert_eq!(size, 0, "section {section} should be removed");
1843            }
1844
1845            // Verify data integrity via replay
1846            {
1847                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1848                pin_mut!(stream);
1849                let mut items = Vec::new();
1850                while let Some(result) = stream.next().await {
1851                    let (section, _, _, item) = result.unwrap();
1852                    items.push((section, item));
1853                }
1854                assert_eq!(items.len(), 5);
1855                for (i, (section, item)) in items.iter().enumerate() {
1856                    assert_eq!(*section, (i + 1) as u64);
1857                    assert_eq!(*item, (i + 1) as i32);
1858                }
1859            }
1860
1861            journal.destroy().await.unwrap();
1862        });
1863    }
1864
1865    #[test_traced]
1866    fn test_journal_rewind_partial_truncation() {
1867        let executor = deterministic::Runner::default();
1868        executor.start(|context| async move {
1869            let cfg = Config {
1870                partition: "test-partition".into(),
1871                compression: None,
1872                codec_config: (),
1873                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1874                write_buffer: NZUsize!(1024),
1875            };
1876            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1877
1878            // Append 5 items and record sizes after each
1879            let mut sizes = Vec::new();
1880            for i in 0..5 {
1881                journal.append(1, &i).await.unwrap();
1882                journal.sync(1).await.unwrap();
1883                sizes.push(journal.size(1).await.unwrap());
1884            }
1885
1886            // Rewind to keep only first 3 items
1887            let target_size = sizes[2];
1888            journal.rewind(1, target_size).await.unwrap();
1889
1890            // Verify size is correct
1891            let new_size = journal.size(1).await.unwrap();
1892            assert_eq!(new_size, target_size);
1893
1894            // Verify first 3 items via replay
1895            {
1896                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1897                pin_mut!(stream);
1898                let mut items = Vec::new();
1899                while let Some(result) = stream.next().await {
1900                    let (_, _, _, item) = result.unwrap();
1901                    items.push(item);
1902                }
1903                assert_eq!(items.len(), 3);
1904                for (i, item) in items.iter().enumerate() {
1905                    assert_eq!(*item, i as i32);
1906                }
1907            }
1908
1909            journal.destroy().await.unwrap();
1910        });
1911    }
1912
1913    #[test_traced]
1914    fn test_journal_rewind_nonexistent_target() {
1915        let executor = deterministic::Runner::default();
1916        executor.start(|context| async move {
1917            let cfg = Config {
1918                partition: "test-partition".into(),
1919                compression: None,
1920                codec_config: (),
1921                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1922                write_buffer: NZUsize!(1024),
1923            };
1924            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1925
1926            // Create sections 5, 6, 7 (skip 1-4)
1927            for section in 5u64..=7 {
1928                journal.append(section, &(section as i32)).await.unwrap();
1929            }
1930            journal.sync_all().await.unwrap();
1931
1932            // Rewind to section 3 (doesn't exist)
1933            journal.rewind(3, 0).await.unwrap();
1934
1935            // Verify sections 5, 6, 7 are removed
1936            for section in 5u64..=7 {
1937                let size = journal.size(section).await.unwrap();
1938                assert_eq!(size, 0, "section {section} should be removed");
1939            }
1940
1941            // Verify replay returns nothing
1942            {
1943                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1944                pin_mut!(stream);
1945                let items: Vec<_> = stream.collect().await;
1946                assert!(items.is_empty());
1947            }
1948
1949            journal.destroy().await.unwrap();
1950        });
1951    }
1952
1953    #[test_traced]
1954    fn test_journal_rewind_persistence() {
1955        let executor = deterministic::Runner::default();
1956        executor.start(|context| async move {
1957            let cfg = Config {
1958                partition: "test-partition".into(),
1959                compression: None,
1960                codec_config: (),
1961                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1962                write_buffer: NZUsize!(1024),
1963            };
1964
1965            // Create sections 1-5 with data
1966            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1967                .await
1968                .unwrap();
1969            for section in 1u64..=5 {
1970                journal.append(section, &(section as i32)).await.unwrap();
1971            }
1972            journal.sync_all().await.unwrap();
1973
1974            // Rewind to section 2
1975            let size = journal.size(2).await.unwrap();
1976            journal.rewind(2, size).await.unwrap();
1977            journal.sync_all().await.unwrap();
1978            drop(journal);
1979
1980            // Re-init and verify only sections 1-2 exist
1981            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1982                .await
1983                .unwrap();
1984
1985            // Verify sections 1-2 have data
1986            for section in 1u64..=2 {
1987                let size = journal.size(section).await.unwrap();
1988                assert!(size > 0, "section {section} should have data after restart");
1989            }
1990
1991            // Verify sections 3-5 are gone
1992            for section in 3u64..=5 {
1993                let size = journal.size(section).await.unwrap();
1994                assert_eq!(size, 0, "section {section} should be gone after restart");
1995            }
1996
1997            // Verify data integrity via replay
1998            {
1999                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2000                pin_mut!(stream);
2001                let mut items = Vec::new();
2002                while let Some(result) = stream.next().await {
2003                    let (section, _, _, item) = result.unwrap();
2004                    items.push((section, item));
2005                }
2006                assert_eq!(items.len(), 2);
2007                assert_eq!(items[0], (1, 1));
2008                assert_eq!(items[1], (2, 2));
2009            }
2010
2011            journal.destroy().await.unwrap();
2012        });
2013    }
2014
2015    #[test_traced]
2016    fn test_journal_rewind_to_zero_removes_all_newer() {
2017        let executor = deterministic::Runner::default();
2018        executor.start(|context| async move {
2019            let cfg = Config {
2020                partition: "test-partition".into(),
2021                compression: None,
2022                codec_config: (),
2023                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2024                write_buffer: NZUsize!(1024),
2025            };
2026            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
2027
2028            // Create sections 1, 2, 3
2029            for section in 1u64..=3 {
2030                journal.append(section, &(section as i32)).await.unwrap();
2031            }
2032            journal.sync_all().await.unwrap();
2033
2034            // Rewind section 1 to size 0
2035            journal.rewind(1, 0).await.unwrap();
2036
2037            // Verify section 1 exists but is empty
2038            let size = journal.size(1).await.unwrap();
2039            assert_eq!(size, 0, "section 1 should be empty");
2040
2041            // Verify sections 2, 3 are completely removed
2042            for section in 2u64..=3 {
2043                let size = journal.size(section).await.unwrap();
2044                assert_eq!(size, 0, "section {section} should be removed");
2045            }
2046
2047            // Verify replay returns nothing
2048            {
2049                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2050                pin_mut!(stream);
2051                let items: Vec<_> = stream.collect().await;
2052                assert!(items.is_empty());
2053            }
2054
2055            journal.destroy().await.unwrap();
2056        });
2057    }
2058
2059    #[test_traced]
2060    fn test_journal_replay_start_offset_with_trailing_bytes() {
2061        // Regression: valid_offset must be initialized to start_offset, not 0.
2062        let executor = deterministic::Runner::default();
2063        executor.start(|context| async move {
2064            let cfg = Config {
2065                partition: "test-partition".into(),
2066                compression: None,
2067                codec_config: (),
2068                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2069                write_buffer: NZUsize!(1024),
2070            };
2071            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2072                .await
2073                .expect("Failed to initialize journal");
2074
2075            // Append several items to build up valid data
2076            for i in 0..5i32 {
2077                journal.append(1, &i).await.unwrap();
2078            }
2079            journal.sync(1).await.unwrap();
2080            let valid_logical_size = journal.size(1).await.unwrap();
2081            drop(journal);
2082
2083            // Get the physical blob size before corruption
2084            let (blob, physical_size_before) = context
2085                .open(&cfg.partition, &1u64.to_be_bytes())
2086                .await
2087                .unwrap();
2088
2089            // Write incomplete varint: 0xFF has continuation bit set, needs more bytes
2090            // This creates 2 trailing bytes that cannot form a valid item
2091            blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2092                .await
2093                .unwrap();
2094            blob.sync().await.unwrap();
2095
2096            // Reopen journal and replay starting PAST all valid items
2097            // (start_offset = valid_logical_size means we skip all valid data)
2098            // The first thing encountered will be the trailing corrupt bytes
2099            let start_offset = valid_logical_size;
2100            {
2101                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2102                    .await
2103                    .unwrap();
2104
2105                let stream = journal
2106                    .replay(1, start_offset, NZUsize!(1024))
2107                    .await
2108                    .unwrap();
2109                pin_mut!(stream);
2110
2111                // Consume the stream - should detect trailing bytes and truncate
2112                while let Some(_result) = stream.next().await {}
2113            }
2114
2115            // Verify that valid data before start_offset was NOT lost
2116            let (_, physical_size_after) = context
2117                .open(&cfg.partition, &1u64.to_be_bytes())
2118                .await
2119                .unwrap();
2120
2121            // The blob should have been truncated back to the valid physical size
2122            // (removing the trailing corrupt bytes) but NOT to 0
2123            assert!(
2124                physical_size_after >= physical_size_before,
2125                "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2126                 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2127                 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2128            );
2129        });
2130    }
2131
2132    #[test_traced]
2133    fn test_journal_large_item_spanning_pages() {
2134        // 2048 bytes spans 2 full pages (PAGE_SIZE = 1024).
2135        const LARGE_SIZE: usize = 2048;
2136        type LargeItem = [u8; LARGE_SIZE];
2137
2138        let executor = deterministic::Runner::default();
2139        executor.start(|context| async move {
2140            let cfg = Config {
2141                partition: "test-partition".into(),
2142                compression: None,
2143                codec_config: (),
2144                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2145                write_buffer: NZUsize!(4096),
2146            };
2147            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2148                .await
2149                .expect("Failed to initialize journal");
2150
2151            // Create a large item that spans multiple pages.
2152            let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2153            for (i, byte) in large_data.iter_mut().enumerate() {
2154                *byte = (i % 256) as u8;
2155            }
2156            assert!(
2157                LARGE_SIZE > PAGE_SIZE.get() as usize,
2158                "Item must be larger than page size"
2159            );
2160
2161            // Append the large item
2162            let (offset, size) = journal
2163                .append(1, &large_data)
2164                .await
2165                .expect("Failed to append large item");
2166            assert_eq!(size as usize, LARGE_SIZE);
2167            journal.sync(1).await.expect("Failed to sync");
2168
2169            // Read the item back via random access
2170            let retrieved: LargeItem = journal
2171                .get(1, offset)
2172                .await
2173                .expect("Failed to get large item");
2174            assert_eq!(retrieved, large_data, "Random access read mismatch");
2175
2176            // Drop and reopen to test replay
2177            drop(journal);
2178            let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2179                .await
2180                .expect("Failed to re-initialize journal");
2181
2182            // Replay and verify the large item
2183            {
2184                let stream = journal
2185                    .replay(0, 0, NZUsize!(1024))
2186                    .await
2187                    .expect("Failed to setup replay");
2188                pin_mut!(stream);
2189
2190                let mut items = Vec::new();
2191                while let Some(result) = stream.next().await {
2192                    let (section, off, sz, item) = result.expect("Failed to replay item");
2193                    items.push((section, off, sz, item));
2194                }
2195
2196                assert_eq!(items.len(), 1, "Should have exactly one item");
2197                let (section, off, sz, item) = &items[0];
2198                assert_eq!(*section, 1);
2199                assert_eq!(*off, offset);
2200                assert_eq!(*sz as usize, LARGE_SIZE);
2201                assert_eq!(*item, large_data, "Replay read mismatch");
2202            }
2203
2204            journal.destroy().await.unwrap();
2205        });
2206    }
2207
2208    #[test_traced]
2209    fn test_journal_non_contiguous_sections() {
2210        // Test that sections with gaps in numbering work correctly.
2211        // Sections 1, 5, 10 should all be independent and accessible.
2212        let executor = deterministic::Runner::default();
2213        executor.start(|context| async move {
2214            let cfg = Config {
2215                partition: "test-partition".into(),
2216                compression: None,
2217                codec_config: (),
2218                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2219                write_buffer: NZUsize!(1024),
2220            };
2221            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2222                .await
2223                .expect("Failed to initialize journal");
2224
2225            // Create sections with gaps: 1, 5, 10
2226            let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2227            let mut offsets = Vec::new();
2228
2229            for (section, data) in &sections_and_data {
2230                let (offset, _) = journal
2231                    .append(*section, data)
2232                    .await
2233                    .expect("Failed to append");
2234                offsets.push(offset);
2235            }
2236            journal.sync_all().await.expect("Failed to sync");
2237
2238            // Verify random access to each section
2239            for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2240                let retrieved: i32 = journal
2241                    .get(*section, offsets[i])
2242                    .await
2243                    .expect("Failed to get item");
2244                assert_eq!(retrieved, *expected_data);
2245            }
2246
2247            // Verify non-existent sections return appropriate errors
2248            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2249                let result = journal.get(missing_section, 0).await;
2250                assert!(
2251                    matches!(result, Err(Error::SectionOutOfRange(_))),
2252                    "Expected SectionOutOfRange for section {}, got {:?}",
2253                    missing_section,
2254                    result
2255                );
2256            }
2257
2258            // Drop and reopen to test replay
2259            drop(journal);
2260            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2261                .await
2262                .expect("Failed to re-initialize journal");
2263
2264            // Replay and verify all items in order
2265            {
2266                let stream = journal
2267                    .replay(0, 0, NZUsize!(1024))
2268                    .await
2269                    .expect("Failed to setup replay");
2270                pin_mut!(stream);
2271
2272                let mut items = Vec::new();
2273                while let Some(result) = stream.next().await {
2274                    let (section, _, _, item) = result.expect("Failed to replay item");
2275                    items.push((section, item));
2276                }
2277
2278                assert_eq!(items.len(), 3, "Should have 3 items");
2279                assert_eq!(items[0], (1, 100));
2280                assert_eq!(items[1], (5, 500));
2281                assert_eq!(items[2], (10, 1000));
2282            }
2283
2284            // Test replay starting from middle section (5)
2285            {
2286                let stream = journal
2287                    .replay(5, 0, NZUsize!(1024))
2288                    .await
2289                    .expect("Failed to setup replay from section 5");
2290                pin_mut!(stream);
2291
2292                let mut items = Vec::new();
2293                while let Some(result) = stream.next().await {
2294                    let (section, _, _, item) = result.expect("Failed to replay item");
2295                    items.push((section, item));
2296                }
2297
2298                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2299                assert_eq!(items[0], (5, 500));
2300                assert_eq!(items[1], (10, 1000));
2301            }
2302
2303            // Test replay starting from non-existent section (should skip to next)
2304            {
2305                let stream = journal
2306                    .replay(3, 0, NZUsize!(1024))
2307                    .await
2308                    .expect("Failed to setup replay from section 3");
2309                pin_mut!(stream);
2310
2311                let mut items = Vec::new();
2312                while let Some(result) = stream.next().await {
2313                    let (section, _, _, item) = result.expect("Failed to replay item");
2314                    items.push((section, item));
2315                }
2316
2317                // Should get sections 5 and 10 (skipping non-existent 3, 4)
2318                assert_eq!(items.len(), 2);
2319                assert_eq!(items[0], (5, 500));
2320                assert_eq!(items[1], (10, 1000));
2321            }
2322
2323            journal.destroy().await.unwrap();
2324        });
2325    }
2326
2327    #[test_traced]
2328    fn test_journal_empty_section_in_middle() {
2329        // Test that replay correctly handles an empty section between sections with data.
2330        // Section 1 has data, section 2 is empty, section 3 has data.
2331        let executor = deterministic::Runner::default();
2332        executor.start(|context| async move {
2333            let cfg = Config {
2334                partition: "test-partition".into(),
2335                compression: None,
2336                codec_config: (),
2337                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2338                write_buffer: NZUsize!(1024),
2339            };
2340            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2341                .await
2342                .expect("Failed to initialize journal");
2343
2344            // Append to section 1
2345            journal.append(1, &100i32).await.expect("Failed to append");
2346
2347            // Create section 2 but don't append anything - just sync to create the blob
2348            // Actually, we need to append something and then rewind to make it empty
2349            journal.append(2, &200i32).await.expect("Failed to append");
2350            journal.sync(2).await.expect("Failed to sync");
2351            journal
2352                .rewind_section(2, 0)
2353                .await
2354                .expect("Failed to rewind");
2355
2356            // Append to section 3
2357            journal.append(3, &300i32).await.expect("Failed to append");
2358
2359            journal.sync_all().await.expect("Failed to sync");
2360
2361            // Verify section sizes
2362            assert!(journal.size(1).await.unwrap() > 0);
2363            assert_eq!(journal.size(2).await.unwrap(), 0);
2364            assert!(journal.size(3).await.unwrap() > 0);
2365
2366            // Drop and reopen to test replay
2367            drop(journal);
2368            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2369                .await
2370                .expect("Failed to re-initialize journal");
2371
2372            // Replay all - should get items from sections 1 and 3, skipping empty section 2
2373            {
2374                let stream = journal
2375                    .replay(0, 0, NZUsize!(1024))
2376                    .await
2377                    .expect("Failed to setup replay");
2378                pin_mut!(stream);
2379
2380                let mut items = Vec::new();
2381                while let Some(result) = stream.next().await {
2382                    let (section, _, _, item) = result.expect("Failed to replay item");
2383                    items.push((section, item));
2384                }
2385
2386                assert_eq!(
2387                    items.len(),
2388                    2,
2389                    "Should have 2 items (skipping empty section)"
2390                );
2391                assert_eq!(items[0], (1, 100));
2392                assert_eq!(items[1], (3, 300));
2393            }
2394
2395            // Replay starting from empty section 2 - should get only section 3
2396            {
2397                let stream = journal
2398                    .replay(2, 0, NZUsize!(1024))
2399                    .await
2400                    .expect("Failed to setup replay from section 2");
2401                pin_mut!(stream);
2402
2403                let mut items = Vec::new();
2404                while let Some(result) = stream.next().await {
2405                    let (section, _, _, item) = result.expect("Failed to replay item");
2406                    items.push((section, item));
2407                }
2408
2409                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2410                assert_eq!(items[0], (3, 300));
2411            }
2412
2413            journal.destroy().await.unwrap();
2414        });
2415    }
2416
2417    #[test_traced]
2418    fn test_journal_item_exactly_page_size() {
2419        // Test that items exactly equal to PAGE_SIZE work correctly.
2420        // This is a boundary condition where item fills exactly one page.
2421        const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2422        type ExactItem = [u8; ITEM_SIZE];
2423
2424        let executor = deterministic::Runner::default();
2425        executor.start(|context| async move {
2426            let cfg = Config {
2427                partition: "test-partition".into(),
2428                compression: None,
2429                codec_config: (),
2430                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2431                write_buffer: NZUsize!(4096),
2432            };
2433            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2434                .await
2435                .expect("Failed to initialize journal");
2436
2437            // Create an item exactly PAGE_SIZE bytes
2438            let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2439            for (i, byte) in exact_data.iter_mut().enumerate() {
2440                *byte = (i % 256) as u8;
2441            }
2442
2443            // Append the exact-size item
2444            let (offset, size) = journal
2445                .append(1, &exact_data)
2446                .await
2447                .expect("Failed to append exact item");
2448            assert_eq!(size as usize, ITEM_SIZE);
2449            journal.sync(1).await.expect("Failed to sync");
2450
2451            // Read the item back via random access
2452            let retrieved: ExactItem = journal
2453                .get(1, offset)
2454                .await
2455                .expect("Failed to get exact item");
2456            assert_eq!(retrieved, exact_data, "Random access read mismatch");
2457
2458            // Drop and reopen to test replay
2459            drop(journal);
2460            let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2461                .await
2462                .expect("Failed to re-initialize journal");
2463
2464            // Replay and verify
2465            {
2466                let stream = journal
2467                    .replay(0, 0, NZUsize!(1024))
2468                    .await
2469                    .expect("Failed to setup replay");
2470                pin_mut!(stream);
2471
2472                let mut items = Vec::new();
2473                while let Some(result) = stream.next().await {
2474                    let (section, off, sz, item) = result.expect("Failed to replay item");
2475                    items.push((section, off, sz, item));
2476                }
2477
2478                assert_eq!(items.len(), 1, "Should have exactly one item");
2479                let (section, off, sz, item) = &items[0];
2480                assert_eq!(*section, 1);
2481                assert_eq!(*off, offset);
2482                assert_eq!(*sz as usize, ITEM_SIZE);
2483                assert_eq!(*item, exact_data, "Replay read mismatch");
2484            }
2485
2486            journal.destroy().await.unwrap();
2487        });
2488    }
2489
2490    #[test_traced]
2491    fn test_journal_varint_spanning_page_boundary() {
2492        // Test that items with data spanning page boundaries work correctly
2493        // when using a small page size.
2494        //
2495        // With PAGE_SIZE=16:
2496        // - Physical page = 16 + 12 = 28 bytes
2497        // - Each [u8; 128] item = 2-byte varint + 128 bytes data = 130 bytes
2498        // - This spans multiple 16-byte pages, testing cross-page reading
2499        const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2500
2501        let executor = deterministic::Runner::default();
2502        executor.start(|context| async move {
2503            let cfg = Config {
2504                partition: "test-partition".into(),
2505                compression: None,
2506                codec_config: (),
2507                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2508                write_buffer: NZUsize!(1024),
2509            };
2510            let mut journal: Journal<_, [u8; 128]> =
2511                Journal::init(context.with_label("first"), cfg.clone())
2512                    .await
2513                    .expect("Failed to initialize journal");
2514
2515            // Create items that will span many 16-byte pages
2516            let item1: [u8; 128] = [1u8; 128];
2517            let item2: [u8; 128] = [2u8; 128];
2518            let item3: [u8; 128] = [3u8; 128];
2519
2520            // Append items - each is 130 bytes (2-byte varint + 128 data)
2521            // spanning ceil(130/16) = 9 pages worth of logical data
2522            let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2523            let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2524            let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2525
2526            journal.sync(1).await.expect("Failed to sync");
2527
2528            // Read items back via random access
2529            let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2530            let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2531            let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2532            assert_eq!(retrieved1, item1);
2533            assert_eq!(retrieved2, item2);
2534            assert_eq!(retrieved3, item3);
2535
2536            // Drop and reopen to test replay
2537            drop(journal);
2538            let journal: Journal<_, [u8; 128]> =
2539                Journal::init(context.with_label("second"), cfg.clone())
2540                    .await
2541                    .expect("Failed to re-initialize journal");
2542
2543            // Replay and verify all items
2544            {
2545                let stream = journal
2546                    .replay(0, 0, NZUsize!(64))
2547                    .await
2548                    .expect("Failed to setup replay");
2549                pin_mut!(stream);
2550
2551                let mut items = Vec::new();
2552                while let Some(result) = stream.next().await {
2553                    let (section, off, _, item) = result.expect("Failed to replay item");
2554                    items.push((section, off, item));
2555                }
2556
2557                assert_eq!(items.len(), 3, "Should have 3 items");
2558                assert_eq!(items[0], (1, offset1, item1));
2559                assert_eq!(items[1], (1, offset2, item2));
2560                assert_eq!(items[2], (1, offset3, item3));
2561            }
2562
2563            journal.destroy().await.unwrap();
2564        });
2565    }
2566
2567    #[test_traced]
2568    fn test_journal_clear() {
2569        let executor = deterministic::Runner::default();
2570        executor.start(|context| async move {
2571            let cfg = Config {
2572                partition: "clear-test".into(),
2573                compression: None,
2574                codec_config: (),
2575                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2576                write_buffer: NZUsize!(1024),
2577            };
2578
2579            let mut journal: Journal<_, u64> =
2580                Journal::init(context.with_label("journal"), cfg.clone())
2581                    .await
2582                    .expect("Failed to initialize journal");
2583
2584            // Append items across multiple sections
2585            for section in 0..5u64 {
2586                for i in 0..10u64 {
2587                    journal
2588                        .append(section, &(section * 1000 + i))
2589                        .await
2590                        .expect("Failed to append");
2591                }
2592                journal.sync(section).await.expect("Failed to sync");
2593            }
2594
2595            // Verify we have data
2596            assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2597            assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2598
2599            // Clear the journal
2600            journal.clear().await.expect("Failed to clear");
2601
2602            // After clear, all reads should fail
2603            for section in 0..5u64 {
2604                assert!(matches!(
2605                    journal.get(section, 0).await,
2606                    Err(Error::SectionOutOfRange(s)) if s == section
2607                ));
2608            }
2609
2610            // Append new data after clear
2611            for i in 0..5u64 {
2612                journal
2613                    .append(10, &(i * 100))
2614                    .await
2615                    .expect("Failed to append after clear");
2616            }
2617            journal.sync(10).await.expect("Failed to sync after clear");
2618
2619            // New data should be readable
2620            assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2621
2622            // Old sections should still be missing
2623            assert!(matches!(
2624                journal.get(0, 0).await,
2625                Err(Error::SectionOutOfRange(0))
2626            ));
2627
2628            journal.destroy().await.unwrap();
2629        });
2630    }
2631}