Skip to main content

commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular [Blob] in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    |
16//! +---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |
18//! +---+---+---+---+---+---+---+---+
19//! ```
20//!
21//! # Open Blobs
22//!
23//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
24//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
25//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
26//! `sections`.
27//!
28//! # Sync
29//!
30//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
31//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
32//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
33//!
34//! # Pruning
35//!
36//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
37//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
38//! could be used, for example, by some blockchain application to prune old blocks.
39//!
40//! # Replay
41//!
42//! During application initialization, it is very common to replay data from `Journal` to recover
43//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
44//! method to produce a stream of all items in the `Journal` in order of their `section` and
45//! `offset`.
46//!
47//! # Compression
48//!
49//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
50//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
51//! be changed between initializations of `Journal`, however, it must remain populated if any data
52//! was written with compression enabled.
53//!
54//! # Example
55//!
56//! ```rust
57//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
58//! use commonware_storage::journal::segmented::variable::{Journal, Config};
59//! use commonware_utils::{NZUsize, NZU16};
60//!
61//! let executor = deterministic::Runner::default();
62//! executor.start(|context| async move {
63//!     // Create a page cache
64//!     let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10));
65//!
66//!     // Create a journal
67//!     let mut journal = Journal::init(context, Config {
68//!         partition: "partition".into(),
69//!         compression: None,
70//!         codec_config: (),
71//!         page_cache,
72//!         write_buffer: NZUsize!(1024 * 1024),
73//!     }).await.unwrap();
74//!
75//!     // Append data to the journal
76//!     journal.append(1, &128).await.unwrap();
77//!
78//!     // Sync the journal
79//!     journal.sync_all().await.unwrap();
80//! });
81//! ```
82
83use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86    varint::{UInt, MAX_U32_VARINT_SIZE},
87    Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90    buffer::paged::{Append, CacheRef, Replay},
91    Blob, Buf, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98/// Configuration for `Journal` storage.
99#[derive(Clone)]
100pub struct Config<C> {
101    /// The `commonware-runtime::Storage` partition to use
102    /// for storing journal blobs.
103    pub partition: String,
104
105    /// Optional compression level (using `zstd`) to apply to data before storing.
106    pub compression: Option<u8>,
107
108    /// The codec configuration to use for encoding and decoding items.
109    pub codec_config: C,
110
111    /// The page cache to use for caching data.
112    pub page_cache: CacheRef,
113
114    /// The size of the write buffer to use for each blob.
115    pub write_buffer: NonZeroUsize,
116}
117
118/// Decodes a varint length prefix from a buffer.
119/// Returns (item_size, varint_len).
120#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122    let initial = buf.remaining();
123    let size = UInt::<u32>::read(buf)?.0 as usize;
124    let varint_len = initial - buf.remaining();
125    Ok((size, varint_len))
126}
127
128/// Result of finding an item in a buffer (offsets/lengths, not slices).
129enum ItemInfo {
130    /// All item data is available in the buffer.
131    Complete {
132        /// Length of the varint prefix.
133        varint_len: usize,
134        /// Length of the item data.
135        data_len: usize,
136    },
137    /// Only some item data is available.
138    Incomplete {
139        /// Length of the varint prefix.
140        varint_len: usize,
141        /// Bytes of item data available in buffer.
142        prefix_len: usize,
143        /// Full size of the item.
144        total_len: usize,
145    },
146}
147
148/// Find an item in a buffer by decoding its length prefix.
149///
150/// Returns (next_offset, item_info). The buffer is advanced past the varint.
151fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152    let available = buf.remaining();
153    let (size, varint_len) = decode_length_prefix(buf)?;
154    let next_offset = offset
155        .checked_add(varint_len as u64)
156        .ok_or(Error::OffsetOverflow)?
157        .checked_add(size as u64)
158        .ok_or(Error::OffsetOverflow)?;
159    let buffered = available.saturating_sub(varint_len);
160
161    let item = if buffered >= size {
162        ItemInfo::Complete {
163            varint_len,
164            data_len: size,
165        }
166    } else {
167        ItemInfo::Incomplete {
168            varint_len,
169            prefix_len: buffered,
170            total_len: size,
171        }
172    };
173
174    Ok((next_offset, item))
175}
176
177/// State for replaying a single section's blob.
178struct ReplayState<B: Blob, C> {
179    section: u64,
180    blob: Append<B>,
181    replay: Replay<B>,
182    skip_bytes: u64,
183    offset: u64,
184    valid_offset: u64,
185    codec_config: C,
186    compressed: bool,
187    done: bool,
188}
189
190/// Decode item data with optional decompression.
191fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192    if compressed {
193        let decompressed =
194            decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195        V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196    } else {
197        V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198    }
199}
200
201/// A segmented journal with variable-size entries.
202///
203/// Each section is stored in a separate blob. Items are length-prefixed with a varint.
204///
205/// # Repair
206///
207/// Like
208/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
209/// and
210/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
211/// the first invalid data read will be considered the new end of the journal (and the
212/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
213/// replay (not init) because any blob could have trailing bytes.
214pub struct Journal<E: Storage + Metrics, V: Codec> {
215    manager: Manager<E, AppendFactory>,
216
217    /// Compression level (if enabled).
218    compression: Option<u8>,
219
220    /// Codec configuration.
221    codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225    /// Initialize a new `Journal` instance.
226    ///
227    /// All backing blobs are opened but not read during
228    /// initialization. The `replay` method can be used
229    /// to iterate over all items in the `Journal`.
230    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231        let manager_cfg = ManagerConfig {
232            partition: cfg.partition,
233            factory: AppendFactory {
234                write_buffer: cfg.write_buffer,
235                page_cache_ref: cfg.page_cache,
236            },
237        };
238        let manager = Manager::init(context, manager_cfg).await?;
239
240        Ok(Self {
241            manager,
242            compression: cfg.compression,
243            codec_config: cfg.codec_config,
244        })
245    }
246
247    /// Reads an item from the blob at the given offset.
248    async fn read(
249        compressed: bool,
250        cfg: &V::Cfg,
251        blob: &Append<E::Blob>,
252        offset: u64,
253    ) -> Result<(u64, u32, V), Error> {
254        // Read varint header (max 5 bytes for u32)
255        let (buf, available) = blob
256            .read_up_to(
257                offset,
258                MAX_U32_VARINT_SIZE,
259                IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260            )
261            .await?;
262        let buf = buf.freeze();
263        let mut cursor = Cursor::new(buf.slice(..available));
264        let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266        // Decode item - either directly from buffer or by chaining prefix with remainder
267        let (item_size, decoded) = match item_info {
268            ItemInfo::Complete {
269                varint_len,
270                data_len,
271            } => {
272                // Data follows varint in buffer
273                let data = buf.slice(varint_len..varint_len + data_len);
274                let decoded = decode_item::<V>(data, cfg, compressed)?;
275                (data_len as u32, decoded)
276            }
277            ItemInfo::Incomplete {
278                varint_len,
279                prefix_len,
280                total_len,
281            } => {
282                // Read remainder and chain with prefix to avoid copying
283                let prefix = buf.slice(varint_len..varint_len + prefix_len);
284                let read_offset = offset + varint_len as u64 + prefix_len as u64;
285                let remainder_len = total_len - prefix_len;
286                let mut remainder = vec![0u8; remainder_len];
287                blob.read_into(&mut remainder, read_offset).await?;
288                let chained = prefix.chain(IoBuf::from(remainder));
289                let decoded = decode_item::<V>(chained, cfg, compressed)?;
290                (total_len as u32, decoded)
291            }
292        };
293
294        Ok((next_offset, item_size, decoded))
295    }
296
297    /// Returns an ordered stream of all items in the journal starting with the item at the given
298    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
299    /// (section, offset, size, item).
300    pub async fn replay(
301        &self,
302        start_section: u64,
303        mut start_offset: u64,
304        buffer: NonZeroUsize,
305    ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306        // Collect all blobs to replay (keeping blob reference for potential resize)
307        let codec_config = self.codec_config.clone();
308        let compressed = self.compression.is_some();
309        let mut blobs = Vec::new();
310        for (&section, blob) in self.manager.sections_from(start_section) {
311            blobs.push((
312                section,
313                blob.clone(),
314                blob.replay(buffer).await?,
315                codec_config.clone(),
316                compressed,
317            ));
318        }
319
320        // Stream items as they are read to avoid occupying too much memory
321        Ok(stream::iter(blobs).flat_map(
322            move |(section, blob, replay, codec_config, compressed)| {
323                // Calculate initial skip bytes for first blob
324                let skip_bytes = if section == start_section {
325                    start_offset
326                } else {
327                    start_offset = 0;
328                    0
329                };
330
331                stream::unfold(
332                    ReplayState {
333                        section,
334                        blob,
335                        replay,
336                        skip_bytes,
337                        offset: 0,
338                        valid_offset: skip_bytes,
339                        codec_config,
340                        compressed,
341                        done: false,
342                    },
343                    move |mut state| async move {
344                        if state.done {
345                            return None;
346                        }
347
348                        let blob_size = state.replay.blob_size();
349                        let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350                        loop {
351                            // Ensure we have enough data for varint header.
352                            // ensure() returns Ok(false) if exhausted with fewer bytes,
353                            // but we still try to decode from remaining bytes.
354                            match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355                                Ok(true) => {}
356                                Ok(false) => {
357                                    // Reader exhausted - check if buffer is empty
358                                    if state.replay.remaining() == 0 {
359                                        state.done = true;
360                                        return if batch.is_empty() {
361                                            None
362                                        } else {
363                                            Some((batch, state))
364                                        };
365                                    }
366                                    // Buffer still has data - continue to try decoding
367                                }
368                                Err(err) => {
369                                    batch.push(Err(err.into()));
370                                    state.done = true;
371                                    return Some((batch, state));
372                                }
373                            }
374
375                            // Skip bytes if needed (for start_offset)
376                            if state.skip_bytes > 0 {
377                                let to_skip =
378                                    state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379                                state.replay.advance(to_skip);
380                                state.skip_bytes -= to_skip as u64;
381                                state.offset += to_skip as u64;
382                                continue;
383                            }
384
385                            // Try to decode length prefix
386                            let before_remaining = state.replay.remaining();
387                            let (item_size, varint_len) =
388                                match decode_length_prefix(&mut state.replay) {
389                                    Ok(result) => result,
390                                    Err(err) => {
391                                        // Could be incomplete varint - check if reader exhausted
392                                        if state.replay.is_exhausted()
393                                            || before_remaining < MAX_U32_VARINT_SIZE
394                                        {
395                                            // Treat as trailing bytes
396                                            if state.valid_offset < blob_size
397                                                && state.offset < blob_size
398                                            {
399                                                warn!(
400                                                    blob = state.section,
401                                                    bad_offset = state.offset,
402                                                    new_size = state.valid_offset,
403                                                    "trailing bytes detected: truncating"
404                                                );
405                                                if let Err(err) =
406                                                    state.blob.resize(state.valid_offset).await
407                                                {
408                                                    batch.push(Err(err.into()));
409                                                    state.done = true;
410                                                    return Some((batch, state));
411                                                }
412                                            }
413                                            state.done = true;
414                                            return if batch.is_empty() {
415                                                None
416                                            } else {
417                                                Some((batch, state))
418                                            };
419                                        }
420                                        batch.push(Err(err));
421                                        state.done = true;
422                                        return Some((batch, state));
423                                    }
424                                };
425
426                            // Ensure we have enough data for item body
427                            match state.replay.ensure(item_size).await {
428                                Ok(true) => {}
429                                Ok(false) => {
430                                    // Incomplete item at end - truncate
431                                    warn!(
432                                        blob = state.section,
433                                        bad_offset = state.offset,
434                                        new_size = state.valid_offset,
435                                        "incomplete item at end: truncating"
436                                    );
437                                    if let Err(err) = state.blob.resize(state.valid_offset).await {
438                                        batch.push(Err(err.into()));
439                                        state.done = true;
440                                        return Some((batch, state));
441                                    }
442                                    state.done = true;
443                                    return if batch.is_empty() {
444                                        None
445                                    } else {
446                                        Some((batch, state))
447                                    };
448                                }
449                                Err(err) => {
450                                    batch.push(Err(err.into()));
451                                    state.done = true;
452                                    return Some((batch, state));
453                                }
454                            }
455
456                            // Decode item - use take() to limit bytes read
457                            let item_offset = state.offset;
458                            let next_offset = match state
459                                .offset
460                                .checked_add(varint_len as u64)
461                                .and_then(|o| o.checked_add(item_size as u64))
462                            {
463                                Some(o) => o,
464                                None => {
465                                    batch.push(Err(Error::OffsetOverflow));
466                                    state.done = true;
467                                    return Some((batch, state));
468                                }
469                            };
470                            match decode_item::<V>(
471                                (&mut state.replay).take(item_size),
472                                &state.codec_config,
473                                state.compressed,
474                            ) {
475                                Ok(decoded) => {
476                                    batch.push(Ok((
477                                        state.section,
478                                        item_offset,
479                                        item_size as u32,
480                                        decoded,
481                                    )));
482                                    state.valid_offset = next_offset;
483                                    state.offset = next_offset;
484                                }
485                                Err(err) => {
486                                    batch.push(Err(err));
487                                    state.done = true;
488                                    return Some((batch, state));
489                                }
490                            }
491
492                            // Return batch if we have items and buffer is low
493                            if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
494                                return Some((batch, state));
495                            }
496                        }
497                    },
498                )
499                .flat_map(stream::iter)
500            },
501        ))
502    }
503
504    /// Encode an item.
505    ///
506    /// Returns `(buf, item_len)` where `item_len` is the length of the encoded (and
507    /// possibly compressed) payload, excluding the size prefix.
508    pub(crate) fn encode_item(compression: Option<u8>, item: &V) -> Result<(Vec<u8>, u32), Error> {
509        let mut buf = Vec::new();
510        let item_len = Self::encode_item_into(compression, item, &mut buf)?;
511        Ok((buf, item_len))
512    }
513
514    /// Encode an item with its length prefix, appending the encoded bytes to `buf`.
515    ///
516    /// Existing contents of `buf` are preserved; this allows callers to accumulate
517    /// multiple encoded items into a single buffer.
518    ///
519    /// Returns the payload length, excluding the size prefix.
520    pub(crate) fn encode_item_into(
521        compression: Option<u8>,
522        item: &V,
523        buf: &mut Vec<u8>,
524    ) -> Result<u32, Error> {
525        if let Some(compression) = compression {
526            // Compressed: encode first, then compress
527            let encoded = item.encode();
528            let compressed =
529                compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
530            let item_len = compressed.len();
531            let item_len_u32: u32 = match item_len.try_into() {
532                Ok(len) => len,
533                Err(_) => return Err(Error::ItemTooLarge(item_len)),
534            };
535            let size_len = UInt(item_len_u32).encode_size();
536            let entry_len = size_len
537                .checked_add(item_len)
538                .ok_or(Error::OffsetOverflow)?;
539
540            buf.reserve(entry_len);
541            UInt(item_len_u32).write(buf);
542            buf.extend_from_slice(&compressed);
543
544            Ok(item_len_u32)
545        } else {
546            // Uncompressed: pre-allocate exact size to avoid copying
547            let item_len = item.encode_size();
548            let item_len_u32: u32 = match item_len.try_into() {
549                Ok(len) => len,
550                Err(_) => return Err(Error::ItemTooLarge(item_len)),
551            };
552            let size_len = UInt(item_len_u32).encode_size();
553            let entry_len = size_len
554                .checked_add(item_len)
555                .ok_or(Error::OffsetOverflow)?;
556
557            buf.reserve(entry_len);
558            UInt(item_len_u32).write(buf);
559            item.write(buf);
560
561            Ok(item_len_u32)
562        }
563    }
564
565    /// Appends an item to `Journal` in a given `section`, returning the offset
566    /// where the item was written and the size of the item (which may differ
567    /// from the raw encoded size if compression is enabled).
568    pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
569        let (buf, item_len) = Self::encode_item(self.compression, item)?;
570        self.append_raw(section, &buf)
571            .await
572            .map(|offset| (offset, item_len))
573    }
574
575    /// Append pre-encoded bytes to the given section, returning the byte offset
576    /// where the data was written.
577    ///
578    /// The buffer must be in the on-disk format produced by [Self::encode_item].
579    pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<u64, Error> {
580        let blob = self.manager.get_or_create(section).await?;
581        let offset = blob.size().await;
582        blob.append(buf).await?;
583        trace!(blob = section, offset, "appended item");
584        Ok(offset)
585    }
586
587    /// Retrieves an item from `Journal` at a given `section` and `offset`.
588    ///
589    /// # Errors
590    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
591    ///    current execution.
592    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
593    ///    data appended to it, or has been pruned in a previous execution).
594    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
595    ///    previously appended item) will result in an error, with the specific type being
596    ///    undefined.
597    pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
598        let blob = self
599            .manager
600            .get(section)?
601            .ok_or(Error::SectionOutOfRange(section))?;
602
603        // Perform a multi-op read.
604        let (_, _, item) =
605            Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
606        Ok(item)
607    }
608
609    /// Read multiple items from the same section.
610    ///
611    /// Offsets should be sorted in ascending order.
612    pub async fn get_many(&self, section: u64, offsets: &[u64]) -> Result<Vec<V>, Error> {
613        if offsets.is_empty() {
614            return Ok(Vec::new());
615        }
616        let blob = self
617            .manager
618            .get(section)?
619            .ok_or(Error::SectionOutOfRange(section))?;
620
621        let compressed = self.compression.is_some();
622        let cfg = &self.codec_config;
623        let mut items = Vec::with_capacity(offsets.len());
624        for &offset in offsets {
625            let (_, _, item) = Self::read(compressed, cfg, blob, offset).await?;
626            items.push(item);
627        }
628        Ok(items)
629    }
630
631    /// Read consecutive items from the same section. `offsets` must be sorted in strictly
632    /// ascending order and identify items that are adjacent in the section.
633    ///
634    /// # Errors
635    ///
636    /// Returns [`Error::OffsetDataMismatch`] if the on-disk varint at any offset reports a size
637    /// inconsistent with the gap to the next offset. This indicates either on-disk corruption or a
638    /// caller violation of the byte-adjacency precondition.
639    ///
640    /// # Panics
641    ///
642    /// Panics if `offsets` is not strictly increasing.
643    pub(crate) async fn get_many_consecutive(
644        &self,
645        section: u64,
646        offsets: &[u64],
647    ) -> Result<Vec<V>, Error> {
648        if offsets.len() <= 1 {
649            return self.get_many(section, offsets).await;
650        }
651        let blob = self
652            .manager
653            .get(section)?
654            .ok_or(Error::SectionOutOfRange(section))?;
655
656        let start = offsets[0];
657        let end = offsets[offsets.len() - 1];
658        if end <= start {
659            return self.get_many(section, offsets).await;
660        }
661        let range_len = usize::try_from(end - start).map_err(|_| Error::OffsetOverflow)?;
662        let bytes = blob.read_at(start, range_len).await?.coalesce();
663        let bytes = bytes.as_ref();
664
665        let compressed = self.compression.is_some();
666        let cfg = &self.codec_config;
667        let mut items = Vec::with_capacity(offsets.len());
668        let mut local_offset = 0usize;
669
670        for window in offsets.windows(2) {
671            let offset = window[0];
672            let next_offset = window[1];
673            assert!(offset < next_offset, "offsets must be strictly increasing");
674
675            let item_len =
676                usize::try_from(next_offset - offset).map_err(|_| Error::OffsetOverflow)?;
677
678            let mut cursor = Cursor::new(&bytes[local_offset..]);
679            let (size, varint_len) = decode_length_prefix(&mut cursor)?;
680            let actual_len = size + varint_len;
681            if actual_len != item_len {
682                return Err(Error::OffsetDataMismatch {
683                    section,
684                    offset,
685                    expected_len: item_len,
686                    actual_len,
687                });
688            }
689
690            let data_start = local_offset
691                .checked_add(varint_len)
692                .ok_or(Error::OffsetOverflow)?;
693            let data_end = local_offset
694                .checked_add(item_len)
695                .ok_or(Error::OffsetOverflow)?;
696
697            items.push(decode_item::<V>(
698                &bytes[data_start..data_end],
699                cfg,
700                compressed,
701            )?);
702
703            local_offset = data_end;
704        }
705
706        let (_, _, item) = Self::read(compressed, cfg, blob, end).await?;
707        items.push(item);
708        Ok(items)
709    }
710
711    /// Get an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
712    pub fn try_get_sync(&self, section: u64, offset: u64) -> Option<V> {
713        let mut buf = Vec::new();
714        self.try_get_sync_into(section, offset, &mut buf)
715    }
716
717    /// Get an item synchronously using caller-provided buffer.
718    pub fn try_get_sync_into(&self, section: u64, offset: u64, buf: &mut Vec<u8>) -> Option<V> {
719        let blob = self.manager.get(section).ok()??;
720        let remaining = blob.try_size()?.checked_sub(offset)?;
721        let header_len = usize::try_from(remaining.min(MAX_U32_VARINT_SIZE as u64)).ok()?;
722        if header_len == 0 {
723            return None;
724        }
725
726        // Read the varint header to determine item size.
727        let mut header = [0u8; MAX_U32_VARINT_SIZE];
728        if !blob.try_read_sync(offset, &mut header[..header_len]) {
729            return None;
730        }
731        let mut cursor = Cursor::new(&header[..header_len]);
732        let (_, item_info) = find_item(&mut cursor, offset).ok()?;
733
734        let (varint_len, data_len) = match item_info {
735            ItemInfo::Complete {
736                varint_len,
737                data_len,
738            } => (varint_len, data_len),
739            ItemInfo::Incomplete {
740                varint_len,
741                total_len,
742                ..
743            } => (varint_len, total_len),
744        };
745        let item_len = varint_len.checked_add(data_len)?;
746        if item_len > usize::try_from(remaining).ok()? {
747            return None;
748        }
749
750        // If the full item fits in the header read, decode directly.
751        if item_len <= header_len {
752            return decode_item::<V>(
753                &header[varint_len..varint_len + data_len],
754                &self.codec_config,
755                self.compression.is_some(),
756            )
757            .ok();
758        }
759
760        // Otherwise try reading the full item from cache.
761        buf.resize(item_len, 0);
762        if !blob.try_read_sync(offset, buf) {
763            return None;
764        }
765        decode_item::<V>(
766            &buf[varint_len..varint_len + data_len],
767            &self.codec_config,
768            self.compression.is_some(),
769        )
770        .ok()
771    }
772
773    /// Gets the size of the journal for a specific section.
774    ///
775    /// Returns 0 if the section does not exist.
776    pub async fn size(&self, section: u64) -> Result<u64, Error> {
777        self.manager.size(section).await
778    }
779
780    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
781    ///
782    /// # Warnings
783    ///
784    /// * This operation is not guaranteed to survive restarts until sync is called.
785    /// * This operation is not atomic, but it will always leave the journal in a consistent state
786    ///   in the event of failure since blobs are always removed in reverse order of section.
787    pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
788        self.manager.rewind(section, offset).await
789    }
790
791    /// Rewinds the journal to the given `section` and `size`.
792    ///
793    /// This removes any data beyond the specified `section` and `size`.
794    ///
795    /// # Warnings
796    ///
797    /// * This operation is not guaranteed to survive restarts until sync is called.
798    /// * This operation is not atomic, but it will always leave the journal in a consistent state
799    ///   in the event of failure since blobs are always removed in reverse order of section.
800    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
801        self.manager.rewind(section, size).await
802    }
803
804    /// Rewinds the `section` to the given `size`.
805    ///
806    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
807    ///
808    /// # Warning
809    ///
810    /// This operation is not guaranteed to survive restarts until sync is called.
811    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
812        self.manager.rewind_section(section, size).await
813    }
814
815    /// Ensures that all data in a given `section` is synced to the underlying store.
816    ///
817    /// If the `section` does not exist, no error will be returned.
818    pub async fn sync(&self, section: u64) -> Result<(), Error> {
819        self.manager.sync(section).await
820    }
821
822    /// Syncs all open sections.
823    pub async fn sync_all(&self) -> Result<(), Error> {
824        self.manager.sync_all().await
825    }
826
827    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
828    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
829        self.manager.prune(min).await
830    }
831
832    /// Returns the number of the oldest section in the journal.
833    pub fn oldest_section(&self) -> Option<u64> {
834        self.manager.oldest_section()
835    }
836
837    /// Returns the number of the newest section in the journal.
838    pub fn newest_section(&self) -> Option<u64> {
839        self.manager.newest_section()
840    }
841
842    /// Returns true if no sections exist.
843    pub fn is_empty(&self) -> bool {
844        self.manager.is_empty()
845    }
846
847    /// Returns the number of sections.
848    pub fn num_sections(&self) -> usize {
849        self.manager.num_sections()
850    }
851
852    /// Removes any underlying blobs created by the journal.
853    pub async fn destroy(self) -> Result<(), Error> {
854        self.manager.destroy().await
855    }
856
857    /// Clear all data, resetting the journal to an empty state.
858    ///
859    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
860    pub async fn clear(&mut self) -> Result<(), Error> {
861        self.manager.clear().await
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use commonware_macros::test_traced;
869    use commonware_runtime::{deterministic, Blob, BufMut, Runner, Storage, Supervisor as _};
870    use commonware_utils::{NZUsize, NZU16};
871    use futures::{pin_mut, StreamExt};
872    use std::num::NonZeroU16;
873
874    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
875    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
876
877    #[test_traced]
878    fn test_journal_append_and_read() {
879        // Initialize the deterministic context
880        let executor = deterministic::Runner::default();
881
882        // Start the test within the executor
883        executor.start(|context| async move {
884            // Initialize the journal
885            let cfg = Config {
886                partition: "test-partition".into(),
887                compression: None,
888                codec_config: (),
889                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
890                write_buffer: NZUsize!(1024),
891            };
892            let index = 1u64;
893            let data = 10;
894            let mut journal = Journal::init(context.child("first"), cfg.clone())
895                .await
896                .expect("Failed to initialize journal");
897
898            // Append an item to the journal
899            journal
900                .append(index, &data)
901                .await
902                .expect("Failed to append data");
903
904            // Check metrics
905            let buffer = context.encode();
906            assert!(buffer.contains("first_tracked 1"));
907
908            // Drop and re-open the journal to simulate a restart
909            journal.sync(index).await.expect("Failed to sync journal");
910            drop(journal);
911            let journal = Journal::<_, i32>::init(context.child("second"), cfg)
912                .await
913                .expect("Failed to re-initialize journal");
914
915            // Replay the journal and collect items
916            let mut items = Vec::new();
917            let stream = journal
918                .replay(0, 0, NZUsize!(1024))
919                .await
920                .expect("unable to setup replay");
921            pin_mut!(stream);
922            while let Some(result) = stream.next().await {
923                match result {
924                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
925                    Err(err) => panic!("Failed to read item: {err}"),
926                }
927            }
928
929            // Verify that the item was replayed correctly
930            assert_eq!(items.len(), 1);
931            assert_eq!(items[0].0, index);
932            assert_eq!(items[0].1, data);
933
934            // Check metrics
935            let buffer = context.encode();
936            assert!(buffer.contains("second_tracked 1"));
937        });
938    }
939
940    #[test_traced]
941    fn test_journal_multiple_appends_and_reads() {
942        // Initialize the deterministic context
943        let executor = deterministic::Runner::default();
944
945        // Start the test within the executor
946        executor.start(|context| async move {
947            // Create a journal configuration
948            let cfg = Config {
949                partition: "test-partition".into(),
950                compression: None,
951                codec_config: (),
952                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
953                write_buffer: NZUsize!(1024),
954            };
955
956            // Initialize the journal
957            let mut journal = Journal::init(context.child("first"), cfg.clone())
958                .await
959                .expect("Failed to initialize journal");
960
961            // Append multiple items to different blobs
962            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
963            for (index, data) in &data_items {
964                journal
965                    .append(*index, data)
966                    .await
967                    .expect("Failed to append data");
968                journal.sync(*index).await.expect("Failed to sync blob");
969            }
970
971            // Check metrics
972            let buffer = context.encode();
973            assert!(buffer.contains("first_tracked 3"));
974            assert!(buffer.contains("first_synced_total 4"));
975
976            // Drop and re-open the journal to simulate a restart
977            drop(journal);
978            let journal = Journal::init(context.child("second"), cfg)
979                .await
980                .expect("Failed to re-initialize journal");
981
982            // Replay the journal and collect items
983            let mut items = Vec::<(u64, u32)>::new();
984            {
985                let stream = journal
986                    .replay(0, 0, NZUsize!(1024))
987                    .await
988                    .expect("unable to setup replay");
989                pin_mut!(stream);
990                while let Some(result) = stream.next().await {
991                    match result {
992                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
993                        Err(err) => panic!("Failed to read item: {err}"),
994                    }
995                }
996            }
997
998            // Verify that all items were replayed correctly
999            assert_eq!(items.len(), data_items.len());
1000            for ((expected_index, expected_data), (actual_index, actual_data)) in
1001                data_items.iter().zip(items.iter())
1002            {
1003                assert_eq!(actual_index, expected_index);
1004                assert_eq!(actual_data, expected_data);
1005            }
1006
1007            // Cleanup
1008            journal.destroy().await.expect("Failed to destroy journal");
1009        });
1010    }
1011
1012    #[test_traced]
1013    fn test_journal_prune_blobs() {
1014        // Initialize the deterministic context
1015        let executor = deterministic::Runner::default();
1016
1017        // Start the test within the executor
1018        executor.start(|context| async move {
1019            // Create a journal configuration
1020            let cfg = Config {
1021                partition: "test-partition".into(),
1022                compression: None,
1023                codec_config: (),
1024                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1025                write_buffer: NZUsize!(1024),
1026            };
1027
1028            // Initialize the journal
1029            let mut journal = Journal::init(context.child("first"), cfg.clone())
1030                .await
1031                .expect("Failed to initialize journal");
1032
1033            // Append items to multiple blobs
1034            for index in 1u64..=5u64 {
1035                journal
1036                    .append(index, &index)
1037                    .await
1038                    .expect("Failed to append data");
1039                journal.sync(index).await.expect("Failed to sync blob");
1040            }
1041
1042            // Add one item out-of-order
1043            let data = 99;
1044            journal
1045                .append(2u64, &data)
1046                .await
1047                .expect("Failed to append data");
1048            journal.sync(2u64).await.expect("Failed to sync blob");
1049
1050            // Prune blobs with indices less than 3
1051            journal.prune(3).await.expect("Failed to prune blobs");
1052
1053            // Check metrics
1054            let buffer = context.encode();
1055            assert!(buffer.contains("first_pruned_total 2"));
1056
1057            // Prune again with a section less than the previous one, should be a no-op
1058            journal.prune(2).await.expect("Failed to no-op prune");
1059            let buffer = context.encode();
1060            assert!(buffer.contains("first_pruned_total 2"));
1061
1062            // Drop and re-open the journal to simulate a restart
1063            drop(journal);
1064            let mut journal = Journal::init(context.child("second"), cfg.clone())
1065                .await
1066                .expect("Failed to re-initialize journal");
1067
1068            // Replay the journal and collect items
1069            let mut items = Vec::<(u64, u64)>::new();
1070            {
1071                let stream = journal
1072                    .replay(0, 0, NZUsize!(1024))
1073                    .await
1074                    .expect("unable to setup replay");
1075                pin_mut!(stream);
1076                while let Some(result) = stream.next().await {
1077                    match result {
1078                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1079                        Err(err) => panic!("Failed to read item: {err}"),
1080                    }
1081                }
1082            }
1083
1084            // Verify that items from blobs 1 and 2 are not present
1085            assert_eq!(items.len(), 3);
1086            let expected_indices = [3u64, 4u64, 5u64];
1087            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1088                assert_eq!(item.0, *expected_index);
1089            }
1090
1091            // Prune all blobs
1092            journal.prune(6).await.expect("Failed to prune blobs");
1093
1094            // Drop the journal
1095            drop(journal);
1096
1097            // Ensure no remaining blobs exist
1098            //
1099            // Note: We don't remove the partition, so this does not error
1100            // and instead returns an empty list of blobs.
1101            assert!(context
1102                .scan(&cfg.partition)
1103                .await
1104                .expect("Failed to list blobs")
1105                .is_empty());
1106        });
1107    }
1108
1109    #[test_traced]
1110    fn test_journal_prune_guard() {
1111        let executor = deterministic::Runner::default();
1112
1113        executor.start(|context| async move {
1114            let cfg = Config {
1115                partition: "test-partition".into(),
1116                compression: None,
1117                codec_config: (),
1118                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1119                write_buffer: NZUsize!(1024),
1120            };
1121
1122            let mut journal = Journal::init(context.child("storage"), cfg.clone())
1123                .await
1124                .expect("Failed to initialize journal");
1125
1126            // Append items to sections 1-5
1127            for section in 1u64..=5u64 {
1128                journal
1129                    .append(section, &(section as i32))
1130                    .await
1131                    .expect("Failed to append data");
1132                journal.sync(section).await.expect("Failed to sync");
1133            }
1134
1135            // Prune sections < 3
1136            journal.prune(3).await.expect("Failed to prune");
1137
1138            // Test that accessing pruned sections returns the correct error
1139
1140            // Test append on pruned section
1141            match journal.append(1, &100).await {
1142                Err(Error::AlreadyPrunedToSection(3)) => {}
1143                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1144            }
1145
1146            match journal.append(2, &100).await {
1147                Err(Error::AlreadyPrunedToSection(3)) => {}
1148                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1149            }
1150
1151            // Test get on pruned section
1152            match journal.get(1, 0).await {
1153                Err(Error::AlreadyPrunedToSection(3)) => {}
1154                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1155            }
1156
1157            // Test size on pruned section
1158            match journal.size(1).await {
1159                Err(Error::AlreadyPrunedToSection(3)) => {}
1160                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1161            }
1162
1163            // Test rewind on pruned section
1164            match journal.rewind(2, 0).await {
1165                Err(Error::AlreadyPrunedToSection(3)) => {}
1166                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1167            }
1168
1169            // Test rewind_section on pruned section
1170            match journal.rewind_section(1, 0).await {
1171                Err(Error::AlreadyPrunedToSection(3)) => {}
1172                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1173            }
1174
1175            // Test sync on pruned section
1176            match journal.sync(2).await {
1177                Err(Error::AlreadyPrunedToSection(3)) => {}
1178                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1179            }
1180
1181            // Test that accessing sections at or after the threshold works
1182            assert!(journal.get(3, 0).await.is_ok());
1183            assert!(journal.get(4, 0).await.is_ok());
1184            assert!(journal.get(5, 0).await.is_ok());
1185            assert!(journal.size(3).await.is_ok());
1186            assert!(journal.sync(4).await.is_ok());
1187
1188            // Append to section at threshold should work
1189            journal
1190                .append(3, &999)
1191                .await
1192                .expect("Should be able to append to section 3");
1193
1194            // Prune more sections
1195            journal.prune(5).await.expect("Failed to prune");
1196
1197            // Verify sections 3 and 4 are now pruned
1198            match journal.get(3, 0).await {
1199                Err(Error::AlreadyPrunedToSection(5)) => {}
1200                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1201            }
1202
1203            match journal.get(4, 0).await {
1204                Err(Error::AlreadyPrunedToSection(5)) => {}
1205                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1206            }
1207
1208            // Section 5 should still be accessible
1209            assert!(journal.get(5, 0).await.is_ok());
1210        });
1211    }
1212
1213    #[test_traced]
1214    fn test_journal_prune_guard_across_restart() {
1215        let executor = deterministic::Runner::default();
1216
1217        executor.start(|context| async move {
1218            let cfg = Config {
1219                partition: "test-partition".into(),
1220                compression: None,
1221                codec_config: (),
1222                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1223                write_buffer: NZUsize!(1024),
1224            };
1225
1226            // First session: create and prune
1227            {
1228                let mut journal = Journal::init(context.child("first"), cfg.clone())
1229                    .await
1230                    .expect("Failed to initialize journal");
1231
1232                for section in 1u64..=5u64 {
1233                    journal
1234                        .append(section, &(section as i32))
1235                        .await
1236                        .expect("Failed to append data");
1237                    journal.sync(section).await.expect("Failed to sync");
1238                }
1239
1240                journal.prune(3).await.expect("Failed to prune");
1241            }
1242
1243            // Second session: verify oldest_retained_section is reset
1244            {
1245                let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
1246                    .await
1247                    .expect("Failed to re-initialize journal");
1248
1249                // But the actual sections 1 and 2 should be gone from storage
1250                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1251                match journal.get(1, 0).await {
1252                    Err(Error::SectionOutOfRange(1)) => {}
1253                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1254                }
1255
1256                match journal.get(2, 0).await {
1257                    Err(Error::SectionOutOfRange(2)) => {}
1258                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1259                }
1260
1261                // Sections 3-5 should still be accessible
1262                assert!(journal.get(3, 0).await.is_ok());
1263                assert!(journal.get(4, 0).await.is_ok());
1264                assert!(journal.get(5, 0).await.is_ok());
1265            }
1266        });
1267    }
1268
1269    #[test_traced]
1270    fn test_journal_with_invalid_blob_name() {
1271        // Initialize the deterministic context
1272        let executor = deterministic::Runner::default();
1273
1274        // Start the test within the executor
1275        executor.start(|context| async move {
1276            // Create a journal configuration
1277            let cfg = Config {
1278                partition: "test-partition".into(),
1279                compression: None,
1280                codec_config: (),
1281                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1282                write_buffer: NZUsize!(1024),
1283            };
1284
1285            // Manually create a blob with an invalid name (not 8 bytes)
1286            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1287            let (blob, _) = context
1288                .open(&cfg.partition, invalid_blob_name)
1289                .await
1290                .expect("Failed to create blob with invalid name");
1291            blob.sync().await.expect("Failed to sync blob");
1292
1293            // Attempt to initialize the journal
1294            let result = Journal::<_, u64>::init(context, cfg).await;
1295
1296            // Expect an error
1297            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1298        });
1299    }
1300
1301    #[test_traced]
1302    fn test_journal_read_size_missing() {
1303        // Initialize the deterministic context
1304        let executor = deterministic::Runner::default();
1305
1306        // Start the test within the executor
1307        executor.start(|context| async move {
1308            // Create a journal configuration
1309            let cfg = Config {
1310                partition: "test-partition".into(),
1311                compression: None,
1312                codec_config: (),
1313                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1314                write_buffer: NZUsize!(1024),
1315            };
1316
1317            // Manually create a blob with incomplete size data
1318            let section = 1u64;
1319            let blob_name = section.to_be_bytes();
1320            let (blob, _) = context
1321                .open(&cfg.partition, &blob_name)
1322                .await
1323                .expect("Failed to create blob");
1324
1325            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1326            let mut incomplete_data = Vec::new();
1327            UInt(u32::MAX).write(&mut incomplete_data);
1328            incomplete_data.truncate(1);
1329            blob.write_at_sync(0, incomplete_data)
1330                .await
1331                .expect("Failed to write incomplete data");
1332
1333            // Initialize the journal
1334            let journal = Journal::init(context, cfg)
1335                .await
1336                .expect("Failed to initialize journal");
1337
1338            // Attempt to replay the journal
1339            let stream = journal
1340                .replay(0, 0, NZUsize!(1024))
1341                .await
1342                .expect("unable to setup replay");
1343            pin_mut!(stream);
1344            let mut items = Vec::<(u64, u64)>::new();
1345            while let Some(result) = stream.next().await {
1346                match result {
1347                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1348                    Err(err) => panic!("Failed to read item: {err}"),
1349                }
1350            }
1351            assert!(items.is_empty());
1352        });
1353    }
1354
1355    #[test_traced]
1356    fn test_journal_replay_reports_resize_error_on_trailing_bytes() {
1357        let executor = deterministic::Runner::default();
1358        executor.start(|context| async move {
1359            let cfg = Config {
1360                partition: "test-partition".into(),
1361                compression: None,
1362                codec_config: (),
1363                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1364                write_buffer: NZUsize!(1024),
1365            };
1366
1367            // Leave one byte in the first page so the trailing bytes below cross the page
1368            // boundary and repair must issue a physical resize.
1369            let section = 1u64;
1370            let item = [10u8; 1021];
1371            let item_record_size =
1372                UInt(item.encode_size() as u32).encode_size() + item.encode_size();
1373            assert_eq!(item_record_size, PAGE_SIZE.get() as usize - 1);
1374
1375            let mut journal = Journal::init(context.child("first"), cfg.clone())
1376                .await
1377                .expect("Failed to initialize journal");
1378            journal
1379                .append(section, &item)
1380                .await
1381                .expect("Failed to append item");
1382            journal
1383                .append_raw(section, &[0xFF, 0xFF])
1384                .await
1385                .expect("Failed to append trailing bytes");
1386            journal.sync(section).await.expect("Failed to sync journal");
1387            drop(journal);
1388
1389            let journal = Journal::init(context.child("second"), cfg)
1390                .await
1391                .expect("Failed to re-initialize journal");
1392            *context.storage_fault_config().write() = deterministic::FaultConfig {
1393                resize_rate: Some(1.0),
1394                ..Default::default()
1395            };
1396
1397            let stream = journal
1398                .replay(0, 0, NZUsize!(1024))
1399                .await
1400                .expect("unable to setup replay");
1401            pin_mut!(stream);
1402
1403            let first = stream
1404                .next()
1405                .await
1406                .expect("expected item before trailing bytes")
1407                .expect("failed to replay valid item");
1408            assert_eq!(first, (section, 0, item.encode_size() as u32, item));
1409
1410            // The trailing bytes cross the page boundary, so repair must issue a physical resize.
1411            match stream.next().await {
1412                Some(Err(_)) => {}
1413                other => {
1414                    panic!("expected resize error while repairing trailing bytes, got {other:?}")
1415                }
1416            }
1417            assert!(stream.next().await.is_none());
1418        });
1419    }
1420
1421    #[test_traced]
1422    fn test_journal_read_item_missing() {
1423        // Initialize the deterministic context
1424        let executor = deterministic::Runner::default();
1425
1426        // Start the test within the executor
1427        executor.start(|context| async move {
1428            // Create a journal configuration
1429            let cfg = Config {
1430                partition: "test-partition".into(),
1431                compression: None,
1432                codec_config: (),
1433                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1434                write_buffer: NZUsize!(1024),
1435            };
1436
1437            // Manually create a blob with missing item data
1438            let section = 1u64;
1439            let blob_name = section.to_be_bytes();
1440            let (blob, _) = context
1441                .open(&cfg.partition, &blob_name)
1442                .await
1443                .expect("Failed to create blob");
1444
1445            // Write size but incomplete item data
1446            let item_size: u32 = 10; // Size indicates 10 bytes of data
1447            let mut buf = Vec::new();
1448            UInt(item_size).write(&mut buf); // Varint encoding
1449            let data = [2u8; 5];
1450            BufMut::put_slice(&mut buf, &data);
1451            blob.write_at_sync(0, buf)
1452                .await
1453                .expect("Failed to write incomplete item");
1454
1455            // Initialize the journal
1456            let journal = Journal::init(context, cfg)
1457                .await
1458                .expect("Failed to initialize journal");
1459
1460            // Attempt to replay the journal
1461            let stream = journal
1462                .replay(0, 0, NZUsize!(1024))
1463                .await
1464                .expect("unable to setup replay");
1465            pin_mut!(stream);
1466            let mut items = Vec::<(u64, u64)>::new();
1467            while let Some(result) = stream.next().await {
1468                match result {
1469                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1470                    Err(err) => panic!("Failed to read item: {err}"),
1471                }
1472            }
1473            assert!(items.is_empty());
1474        });
1475    }
1476
1477    #[test_traced]
1478    fn test_journal_read_checksum_missing() {
1479        // Initialize the deterministic context
1480        let executor = deterministic::Runner::default();
1481
1482        // Start the test within the executor
1483        executor.start(|context| async move {
1484            // Create a journal configuration
1485            let cfg = Config {
1486                partition: "test-partition".into(),
1487                compression: None,
1488                codec_config: (),
1489                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1490                write_buffer: NZUsize!(1024),
1491            };
1492
1493            // Manually create a blob with missing checksum
1494            let section = 1u64;
1495            let blob_name = section.to_be_bytes();
1496            let (blob, _) = context
1497                .open(&cfg.partition, &blob_name)
1498                .await
1499                .expect("Failed to create blob");
1500
1501            // Prepare item data
1502            let item_data = b"Test data";
1503            let item_size = item_data.len() as u32;
1504
1505            // Write size (varint) and data, but no checksum
1506            let mut buf = Vec::new();
1507            UInt(item_size).write(&mut buf);
1508            BufMut::put_slice(&mut buf, item_data);
1509            blob.write_at_sync(0, buf)
1510                .await
1511                .expect("Failed to write item without checksum");
1512
1513            // Initialize the journal
1514            let journal = Journal::init(context, cfg)
1515                .await
1516                .expect("Failed to initialize journal");
1517
1518            // Attempt to replay the journal
1519            //
1520            // This will truncate the leftover bytes from our manual write.
1521            let stream = journal
1522                .replay(0, 0, NZUsize!(1024))
1523                .await
1524                .expect("unable to setup replay");
1525            pin_mut!(stream);
1526            let mut items = Vec::<(u64, u64)>::new();
1527            while let Some(result) = stream.next().await {
1528                match result {
1529                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1530                    Err(err) => panic!("Failed to read item: {err}"),
1531                }
1532            }
1533            assert!(items.is_empty());
1534        });
1535    }
1536
1537    #[test_traced]
1538    fn test_journal_read_checksum_mismatch() {
1539        // Initialize the deterministic context
1540        let executor = deterministic::Runner::default();
1541
1542        // Start the test within the executor
1543        executor.start(|context| async move {
1544            // Create a journal configuration
1545            let cfg = Config {
1546                partition: "test-partition".into(),
1547                compression: None,
1548                codec_config: (),
1549                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1550                write_buffer: NZUsize!(1024),
1551            };
1552
1553            // Manually create a blob with incorrect checksum
1554            let section = 1u64;
1555            let blob_name = section.to_be_bytes();
1556            let (blob, _) = context
1557                .open(&cfg.partition, &blob_name)
1558                .await
1559                .expect("Failed to create blob");
1560
1561            // Prepare item data
1562            let item_data = b"Test data";
1563            let item_size = item_data.len() as u32;
1564            let incorrect_checksum: u32 = 0xDEADBEEF;
1565
1566            // Write size (varint), data, and incorrect checksum
1567            let mut buf = Vec::new();
1568            UInt(item_size).write(&mut buf);
1569            BufMut::put_slice(&mut buf, item_data);
1570            buf.put_u32(incorrect_checksum);
1571            blob.write_at_sync(0, buf)
1572                .await
1573                .expect("Failed to write item with bad checksum");
1574
1575            // Initialize the journal
1576            let journal = Journal::init(context.child("storage"), cfg.clone())
1577                .await
1578                .expect("Failed to initialize journal");
1579
1580            // Attempt to replay the journal
1581            {
1582                let stream = journal
1583                    .replay(0, 0, NZUsize!(1024))
1584                    .await
1585                    .expect("unable to setup replay");
1586                pin_mut!(stream);
1587                let mut items = Vec::<(u64, u64)>::new();
1588                while let Some(result) = stream.next().await {
1589                    match result {
1590                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1591                        Err(err) => panic!("Failed to read item: {err}"),
1592                    }
1593                }
1594                assert!(items.is_empty());
1595            }
1596            drop(journal);
1597
1598            // Confirm blob is expected length
1599            let (_, blob_size) = context
1600                .open(&cfg.partition, &section.to_be_bytes())
1601                .await
1602                .expect("Failed to open blob");
1603            assert_eq!(blob_size, 0);
1604        });
1605    }
1606
1607    #[test_traced]
1608    fn test_journal_truncation_recovery() {
1609        // Initialize the deterministic context
1610        let executor = deterministic::Runner::default();
1611
1612        // Start the test within the executor
1613        executor.start(|context| async move {
1614            // Create a journal configuration
1615            let cfg = Config {
1616                partition: "test-partition".into(),
1617                compression: None,
1618                codec_config: (),
1619                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1620                write_buffer: NZUsize!(1024),
1621            };
1622
1623            // Initialize the journal
1624            let mut journal = Journal::init(context.child("first"), cfg.clone())
1625                .await
1626                .expect("Failed to initialize journal");
1627
1628            // Append 1 item to the first index
1629            journal.append(1, &1).await.expect("Failed to append data");
1630
1631            // Append multiple items to the second section
1632            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1633            for (index, data) in &data_items {
1634                journal
1635                    .append(*index, data)
1636                    .await
1637                    .expect("Failed to append data");
1638                journal.sync(*index).await.expect("Failed to sync blob");
1639            }
1640
1641            // Sync all sections and drop the journal
1642            journal.sync_all().await.expect("Failed to sync");
1643            drop(journal);
1644
1645            // Manually corrupt the end of the second blob
1646            let (blob, blob_size) = context
1647                .open(&cfg.partition, &2u64.to_be_bytes())
1648                .await
1649                .expect("Failed to open blob");
1650            blob.resize(blob_size - 4)
1651                .await
1652                .expect("Failed to corrupt blob");
1653            blob.sync().await.expect("Failed to sync blob");
1654
1655            // Re-initialize the journal to simulate a restart
1656            let journal = Journal::init(context.child("second"), cfg.clone())
1657                .await
1658                .expect("Failed to re-initialize journal");
1659
1660            // Attempt to replay the journal
1661            let mut items = Vec::<(u64, u32)>::new();
1662            {
1663                let stream = journal
1664                    .replay(0, 0, NZUsize!(1024))
1665                    .await
1666                    .expect("unable to setup replay");
1667                pin_mut!(stream);
1668                while let Some(result) = stream.next().await {
1669                    match result {
1670                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1671                        Err(err) => panic!("Failed to read item: {err}"),
1672                    }
1673                }
1674            }
1675            drop(journal);
1676
1677            // Verify that replay stopped after corruption detected (the second blob).
1678            assert_eq!(items.len(), 1);
1679            assert_eq!(items[0].0, 1);
1680            assert_eq!(items[0].1, 1);
1681
1682            // Confirm second blob was truncated.
1683            let (_, blob_size) = context
1684                .open(&cfg.partition, &2u64.to_be_bytes())
1685                .await
1686                .expect("Failed to open blob");
1687            assert_eq!(blob_size, 0);
1688
1689            // Attempt to replay journal after truncation
1690            let mut journal = Journal::init(context.child("third"), cfg.clone())
1691                .await
1692                .expect("Failed to re-initialize journal");
1693
1694            // Attempt to replay the journal
1695            let mut items = Vec::<(u64, u32)>::new();
1696            {
1697                let stream = journal
1698                    .replay(0, 0, NZUsize!(1024))
1699                    .await
1700                    .expect("unable to setup replay");
1701                pin_mut!(stream);
1702                while let Some(result) = stream.next().await {
1703                    match result {
1704                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1705                        Err(err) => panic!("Failed to read item: {err}"),
1706                    }
1707                }
1708            }
1709
1710            // Verify that only non-corrupted items were replayed
1711            assert_eq!(items.len(), 1);
1712            assert_eq!(items[0].0, 1);
1713            assert_eq!(items[0].1, 1);
1714
1715            // Append a new item to truncated partition
1716            let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1717            journal.sync(2).await.expect("Failed to sync blob");
1718
1719            // Get the new item (offset is 0 since blob was truncated)
1720            let item = journal.get(2, 0).await.expect("Failed to get item");
1721            assert_eq!(item, 5);
1722
1723            // Drop the journal (data already synced)
1724            drop(journal);
1725
1726            // Re-initialize the journal to simulate a restart
1727            let journal = Journal::init(context.child("storage"), cfg.clone())
1728                .await
1729                .expect("Failed to re-initialize journal");
1730
1731            // Attempt to replay the journal
1732            let mut items = Vec::<(u64, u32)>::new();
1733            {
1734                let stream = journal
1735                    .replay(0, 0, NZUsize!(1024))
1736                    .await
1737                    .expect("unable to setup replay");
1738                pin_mut!(stream);
1739                while let Some(result) = stream.next().await {
1740                    match result {
1741                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1742                        Err(err) => panic!("Failed to read item: {err}"),
1743                    }
1744                }
1745            }
1746
1747            // Verify that only non-corrupted items were replayed
1748            assert_eq!(items.len(), 2);
1749            assert_eq!(items[0].0, 1);
1750            assert_eq!(items[0].1, 1);
1751            assert_eq!(items[1].0, 2);
1752            assert_eq!(items[1].1, 5);
1753        });
1754    }
1755
1756    #[test_traced]
1757    fn test_journal_handling_extra_data() {
1758        // Initialize the deterministic context
1759        let executor = deterministic::Runner::default();
1760
1761        // Start the test within the executor
1762        executor.start(|context| async move {
1763            // Create a journal configuration
1764            let cfg = Config {
1765                partition: "test-partition".into(),
1766                compression: None,
1767                codec_config: (),
1768                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1769                write_buffer: NZUsize!(1024),
1770            };
1771
1772            // Initialize the journal
1773            let mut journal = Journal::init(context.child("first"), cfg.clone())
1774                .await
1775                .expect("Failed to initialize journal");
1776
1777            // Append 1 item to the first index
1778            journal.append(1, &1).await.expect("Failed to append data");
1779
1780            // Append multiple items to the second index
1781            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1782            for (index, data) in &data_items {
1783                journal
1784                    .append(*index, data)
1785                    .await
1786                    .expect("Failed to append data");
1787                journal.sync(*index).await.expect("Failed to sync blob");
1788            }
1789
1790            // Sync all sections and drop the journal
1791            journal.sync_all().await.expect("Failed to sync");
1792            drop(journal);
1793
1794            // Manually add extra data to the end of the second blob
1795            let (blob, blob_size) = context
1796                .open(&cfg.partition, &2u64.to_be_bytes())
1797                .await
1798                .expect("Failed to open blob");
1799            blob.write_at_sync(blob_size, vec![0u8; 16])
1800                .await
1801                .expect("Failed to add extra data");
1802
1803            // Re-initialize the journal to simulate a restart
1804            let journal = Journal::init(context.child("second"), cfg)
1805                .await
1806                .expect("Failed to re-initialize journal");
1807
1808            // Attempt to replay the journal
1809            let mut items = Vec::<(u64, i32)>::new();
1810            let stream = journal
1811                .replay(0, 0, NZUsize!(1024))
1812                .await
1813                .expect("unable to setup replay");
1814            pin_mut!(stream);
1815            while let Some(result) = stream.next().await {
1816                match result {
1817                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1818                    Err(err) => panic!("Failed to read item: {err}"),
1819                }
1820            }
1821        });
1822    }
1823
1824    #[test_traced]
1825    fn test_journal_rewind() {
1826        // Initialize the deterministic context
1827        let executor = deterministic::Runner::default();
1828        executor.start(|context| async move {
1829            // Create journal
1830            let cfg = Config {
1831                partition: "test-partition".into(),
1832                compression: None,
1833                codec_config: (),
1834                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1835                write_buffer: NZUsize!(1024),
1836            };
1837            let mut journal = Journal::init(context, cfg).await.unwrap();
1838
1839            // Check size of non-existent section
1840            let size = journal.size(1).await.unwrap();
1841            assert_eq!(size, 0);
1842
1843            // Append data to section 1
1844            journal.append(1, &42i32).await.unwrap();
1845
1846            // Check size of section 1 - should be greater than 0
1847            let size = journal.size(1).await.unwrap();
1848            assert!(size > 0);
1849
1850            // Append more data and verify size increases
1851            journal.append(1, &43i32).await.unwrap();
1852            let new_size = journal.size(1).await.unwrap();
1853            assert!(new_size > size);
1854
1855            // Check size of different section - should still be 0
1856            let size = journal.size(2).await.unwrap();
1857            assert_eq!(size, 0);
1858
1859            // Append data to section 2
1860            journal.append(2, &44i32).await.unwrap();
1861
1862            // Check size of section 2 - should be greater than 0
1863            let size = journal.size(2).await.unwrap();
1864            assert!(size > 0);
1865
1866            // Rollback everything in section 1 and 2
1867            journal.rewind(1, 0).await.unwrap();
1868
1869            // Check size of section 1 - should be 0
1870            let size = journal.size(1).await.unwrap();
1871            assert_eq!(size, 0);
1872
1873            // Check size of section 2 - should be 0
1874            let size = journal.size(2).await.unwrap();
1875            assert_eq!(size, 0);
1876        });
1877    }
1878
1879    #[test_traced]
1880    fn test_journal_rewind_section() {
1881        // Initialize the deterministic context
1882        let executor = deterministic::Runner::default();
1883        executor.start(|context| async move {
1884            // Create journal
1885            let cfg = Config {
1886                partition: "test-partition".into(),
1887                compression: None,
1888                codec_config: (),
1889                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1890                write_buffer: NZUsize!(1024),
1891            };
1892            let mut journal = Journal::init(context, cfg).await.unwrap();
1893
1894            // Check size of non-existent section
1895            let size = journal.size(1).await.unwrap();
1896            assert_eq!(size, 0);
1897
1898            // Append data to section 1
1899            journal.append(1, &42i32).await.unwrap();
1900
1901            // Check size of section 1 - should be greater than 0
1902            let size = journal.size(1).await.unwrap();
1903            assert!(size > 0);
1904
1905            // Append more data and verify size increases
1906            journal.append(1, &43i32).await.unwrap();
1907            let new_size = journal.size(1).await.unwrap();
1908            assert!(new_size > size);
1909
1910            // Check size of different section - should still be 0
1911            let size = journal.size(2).await.unwrap();
1912            assert_eq!(size, 0);
1913
1914            // Append data to section 2
1915            journal.append(2, &44i32).await.unwrap();
1916
1917            // Check size of section 2 - should be greater than 0
1918            let size = journal.size(2).await.unwrap();
1919            assert!(size > 0);
1920
1921            // Rollback everything in section 1
1922            journal.rewind_section(1, 0).await.unwrap();
1923
1924            // Check size of section 1 - should be 0
1925            let size = journal.size(1).await.unwrap();
1926            assert_eq!(size, 0);
1927
1928            // Check size of section 2 - should be greater than 0
1929            let size = journal.size(2).await.unwrap();
1930            assert!(size > 0);
1931        });
1932    }
1933
1934    #[test_traced]
1935    fn test_journal_small_items() {
1936        let executor = deterministic::Runner::default();
1937        executor.start(|context| async move {
1938            let cfg = Config {
1939                partition: "test-partition".into(),
1940                compression: None,
1941                codec_config: (),
1942                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1943                write_buffer: NZUsize!(1024),
1944            };
1945
1946            let mut journal = Journal::init(context.child("first"), cfg.clone())
1947                .await
1948                .expect("Failed to initialize journal");
1949
1950            // Append many small (1-byte) items to the same section
1951            let num_items = 100;
1952            let mut offsets = Vec::new();
1953            for i in 0..num_items {
1954                let (offset, size) = journal
1955                    .append(1, &(i as u8))
1956                    .await
1957                    .expect("Failed to append data");
1958                assert_eq!(size, 1, "u8 should encode to 1 byte");
1959                offsets.push(offset);
1960            }
1961            journal.sync(1).await.expect("Failed to sync");
1962
1963            // Read each item back via random access
1964            for (i, &offset) in offsets.iter().enumerate() {
1965                let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1966                assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1967            }
1968
1969            // Drop and reopen to test replay
1970            drop(journal);
1971            let journal = Journal::<_, u8>::init(context.child("second"), cfg)
1972                .await
1973                .expect("Failed to re-initialize journal");
1974
1975            // Replay and verify all items
1976            let stream = journal
1977                .replay(0, 0, NZUsize!(1024))
1978                .await
1979                .expect("Failed to setup replay");
1980            pin_mut!(stream);
1981
1982            let mut count = 0;
1983            while let Some(result) = stream.next().await {
1984                let (section, offset, size, item) = result.expect("Failed to replay item");
1985                assert_eq!(section, 1);
1986                assert_eq!(offset, offsets[count]);
1987                assert_eq!(size, 1);
1988                assert_eq!(item, count as u8);
1989                count += 1;
1990            }
1991            assert_eq!(count, num_items, "Should replay all items");
1992        });
1993    }
1994
1995    #[test_traced]
1996    fn test_journal_rewind_many_sections() {
1997        let executor = deterministic::Runner::default();
1998        executor.start(|context| async move {
1999            let cfg = Config {
2000                partition: "test-partition".into(),
2001                compression: None,
2002                codec_config: (),
2003                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2004                write_buffer: NZUsize!(1024),
2005            };
2006            let mut journal = Journal::init(context.child("storage"), cfg.clone())
2007                .await
2008                .unwrap();
2009
2010            // Create sections 1-10 with data
2011            for section in 1u64..=10 {
2012                journal.append(section, &(section as i32)).await.unwrap();
2013            }
2014            journal.sync_all().await.unwrap();
2015
2016            // Verify all sections exist
2017            for section in 1u64..=10 {
2018                let size = journal.size(section).await.unwrap();
2019                assert!(size > 0, "section {section} should have data");
2020            }
2021
2022            // Rewind to section 5 (should remove sections 6-10)
2023            journal
2024                .rewind(5, journal.size(5).await.unwrap())
2025                .await
2026                .unwrap();
2027
2028            // Verify sections 1-5 still exist with correct data
2029            for section in 1u64..=5 {
2030                let size = journal.size(section).await.unwrap();
2031                assert!(size > 0, "section {section} should still have data");
2032            }
2033
2034            // Verify sections 6-10 are removed (size should be 0)
2035            for section in 6u64..=10 {
2036                let size = journal.size(section).await.unwrap();
2037                assert_eq!(size, 0, "section {section} should be removed");
2038            }
2039
2040            // Verify data integrity via replay
2041            {
2042                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2043                pin_mut!(stream);
2044                let mut items = Vec::new();
2045                while let Some(result) = stream.next().await {
2046                    let (section, _, _, item) = result.unwrap();
2047                    items.push((section, item));
2048                }
2049                assert_eq!(items.len(), 5);
2050                for (i, (section, item)) in items.iter().enumerate() {
2051                    assert_eq!(*section, (i + 1) as u64);
2052                    assert_eq!(*item, (i + 1) as i32);
2053                }
2054            }
2055
2056            journal.destroy().await.unwrap();
2057        });
2058    }
2059
2060    #[test_traced]
2061    fn test_journal_rewind_partial_truncation() {
2062        let executor = deterministic::Runner::default();
2063        executor.start(|context| async move {
2064            let cfg = Config {
2065                partition: "test-partition".into(),
2066                compression: None,
2067                codec_config: (),
2068                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2069                write_buffer: NZUsize!(1024),
2070            };
2071            let mut journal = Journal::init(context.child("storage"), cfg.clone())
2072                .await
2073                .unwrap();
2074
2075            // Append 5 items and record sizes after each
2076            let mut sizes = Vec::new();
2077            for i in 0..5 {
2078                journal.append(1, &i).await.unwrap();
2079                journal.sync(1).await.unwrap();
2080                sizes.push(journal.size(1).await.unwrap());
2081            }
2082
2083            // Rewind to keep only first 3 items
2084            let target_size = sizes[2];
2085            journal.rewind(1, target_size).await.unwrap();
2086
2087            // Verify size is correct
2088            let new_size = journal.size(1).await.unwrap();
2089            assert_eq!(new_size, target_size);
2090
2091            // Verify first 3 items via replay
2092            {
2093                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2094                pin_mut!(stream);
2095                let mut items = Vec::new();
2096                while let Some(result) = stream.next().await {
2097                    let (_, _, _, item) = result.unwrap();
2098                    items.push(item);
2099                }
2100                assert_eq!(items.len(), 3);
2101                for (i, item) in items.iter().enumerate() {
2102                    assert_eq!(*item, i as i32);
2103                }
2104            }
2105
2106            journal.destroy().await.unwrap();
2107        });
2108    }
2109
2110    #[test_traced]
2111    fn test_journal_rewind_nonexistent_target() {
2112        let executor = deterministic::Runner::default();
2113        executor.start(|context| async move {
2114            let cfg = Config {
2115                partition: "test-partition".into(),
2116                compression: None,
2117                codec_config: (),
2118                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2119                write_buffer: NZUsize!(1024),
2120            };
2121            let mut journal = Journal::init(context.child("storage"), cfg.clone())
2122                .await
2123                .unwrap();
2124
2125            // Create sections 5, 6, 7 (skip 1-4)
2126            for section in 5u64..=7 {
2127                journal.append(section, &(section as i32)).await.unwrap();
2128            }
2129            journal.sync_all().await.unwrap();
2130
2131            // Rewind to section 3 (doesn't exist)
2132            journal.rewind(3, 0).await.unwrap();
2133
2134            // Verify sections 5, 6, 7 are removed
2135            for section in 5u64..=7 {
2136                let size = journal.size(section).await.unwrap();
2137                assert_eq!(size, 0, "section {section} should be removed");
2138            }
2139
2140            // Verify replay returns nothing
2141            {
2142                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2143                pin_mut!(stream);
2144                let items: Vec<_> = stream.collect().await;
2145                assert!(items.is_empty());
2146            }
2147
2148            journal.destroy().await.unwrap();
2149        });
2150    }
2151
2152    #[test_traced]
2153    fn test_journal_rewind_persistence() {
2154        let executor = deterministic::Runner::default();
2155        executor.start(|context| async move {
2156            let cfg = Config {
2157                partition: "test-partition".into(),
2158                compression: None,
2159                codec_config: (),
2160                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2161                write_buffer: NZUsize!(1024),
2162            };
2163
2164            // Create sections 1-5 with data
2165            let mut journal = Journal::init(context.child("first"), cfg.clone())
2166                .await
2167                .unwrap();
2168            for section in 1u64..=5 {
2169                journal.append(section, &(section as i32)).await.unwrap();
2170            }
2171            journal.sync_all().await.unwrap();
2172
2173            // Rewind to section 2
2174            let size = journal.size(2).await.unwrap();
2175            journal.rewind(2, size).await.unwrap();
2176            journal.sync_all().await.unwrap();
2177            drop(journal);
2178
2179            // Re-init and verify only sections 1-2 exist
2180            let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2181                .await
2182                .unwrap();
2183
2184            // Verify sections 1-2 have data
2185            for section in 1u64..=2 {
2186                let size = journal.size(section).await.unwrap();
2187                assert!(size > 0, "section {section} should have data after restart");
2188            }
2189
2190            // Verify sections 3-5 are gone
2191            for section in 3u64..=5 {
2192                let size = journal.size(section).await.unwrap();
2193                assert_eq!(size, 0, "section {section} should be gone after restart");
2194            }
2195
2196            // Verify data integrity via replay
2197            {
2198                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2199                pin_mut!(stream);
2200                let mut items = Vec::new();
2201                while let Some(result) = stream.next().await {
2202                    let (section, _, _, item) = result.unwrap();
2203                    items.push((section, item));
2204                }
2205                assert_eq!(items.len(), 2);
2206                assert_eq!(items[0], (1, 1));
2207                assert_eq!(items[1], (2, 2));
2208            }
2209
2210            journal.destroy().await.unwrap();
2211        });
2212    }
2213
2214    #[test_traced]
2215    fn test_journal_rewind_to_zero_removes_all_newer() {
2216        let executor = deterministic::Runner::default();
2217        executor.start(|context| async move {
2218            let cfg = Config {
2219                partition: "test-partition".into(),
2220                compression: None,
2221                codec_config: (),
2222                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2223                write_buffer: NZUsize!(1024),
2224            };
2225            let mut journal = Journal::init(context.child("storage"), cfg.clone())
2226                .await
2227                .unwrap();
2228
2229            // Create sections 1, 2, 3
2230            for section in 1u64..=3 {
2231                journal.append(section, &(section as i32)).await.unwrap();
2232            }
2233            journal.sync_all().await.unwrap();
2234
2235            // Rewind section 1 to size 0
2236            journal.rewind(1, 0).await.unwrap();
2237
2238            // Verify section 1 exists but is empty
2239            let size = journal.size(1).await.unwrap();
2240            assert_eq!(size, 0, "section 1 should be empty");
2241
2242            // Verify sections 2, 3 are completely removed
2243            for section in 2u64..=3 {
2244                let size = journal.size(section).await.unwrap();
2245                assert_eq!(size, 0, "section {section} should be removed");
2246            }
2247
2248            // Verify replay returns nothing
2249            {
2250                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2251                pin_mut!(stream);
2252                let items: Vec<_> = stream.collect().await;
2253                assert!(items.is_empty());
2254            }
2255
2256            journal.destroy().await.unwrap();
2257        });
2258    }
2259
2260    #[test_traced]
2261    fn test_journal_replay_start_offset_with_trailing_bytes() {
2262        // Regression: valid_offset must be initialized to start_offset, not 0.
2263        let executor = deterministic::Runner::default();
2264        executor.start(|context| async move {
2265            let cfg = Config {
2266                partition: "test-partition".into(),
2267                compression: None,
2268                codec_config: (),
2269                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2270                write_buffer: NZUsize!(1024),
2271            };
2272            let mut journal = Journal::init(context.child("first"), cfg.clone())
2273                .await
2274                .expect("Failed to initialize journal");
2275
2276            // Append several items to build up valid data
2277            for i in 0..5i32 {
2278                journal.append(1, &i).await.unwrap();
2279            }
2280            journal.sync(1).await.unwrap();
2281            let valid_logical_size = journal.size(1).await.unwrap();
2282            drop(journal);
2283
2284            // Get the physical blob size before corruption
2285            let (blob, physical_size_before) = context
2286                .open(&cfg.partition, &1u64.to_be_bytes())
2287                .await
2288                .unwrap();
2289
2290            // Write incomplete varint: 0xFF has continuation bit set, needs more bytes
2291            // This creates 2 trailing bytes that cannot form a valid item
2292            blob.write_at_sync(physical_size_before, vec![0xFF, 0xFF])
2293                .await
2294                .unwrap();
2295
2296            // Reopen journal and replay starting PAST all valid items
2297            // (start_offset = valid_logical_size means we skip all valid data)
2298            // The first thing encountered will be the trailing corrupt bytes
2299            let start_offset = valid_logical_size;
2300            {
2301                let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2302                    .await
2303                    .unwrap();
2304
2305                let stream = journal
2306                    .replay(1, start_offset, NZUsize!(1024))
2307                    .await
2308                    .unwrap();
2309                pin_mut!(stream);
2310
2311                // Consume the stream - should detect trailing bytes and truncate
2312                while let Some(_result) = stream.next().await {}
2313            }
2314
2315            // Verify that valid data before start_offset was NOT lost
2316            let (_, physical_size_after) = context
2317                .open(&cfg.partition, &1u64.to_be_bytes())
2318                .await
2319                .unwrap();
2320
2321            // The blob should have been truncated back to the valid physical size
2322            // (removing the trailing corrupt bytes) but NOT to 0
2323            assert!(
2324                physical_size_after >= physical_size_before,
2325                "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2326                 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2327                 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2328            );
2329        });
2330    }
2331
2332    #[test_traced]
2333    fn test_journal_large_item_spanning_pages() {
2334        // 2048 bytes spans 2 full pages (PAGE_SIZE = 1024).
2335        const LARGE_SIZE: usize = 2048;
2336        type LargeItem = [u8; LARGE_SIZE];
2337
2338        let executor = deterministic::Runner::default();
2339        executor.start(|context| async move {
2340            let cfg = Config {
2341                partition: "test-partition".into(),
2342                compression: None,
2343                codec_config: (),
2344                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2345                write_buffer: NZUsize!(4096),
2346            };
2347            let mut journal = Journal::init(context.child("first"), cfg.clone())
2348                .await
2349                .expect("Failed to initialize journal");
2350
2351            // Create a large item that spans multiple pages.
2352            let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2353            for (i, byte) in large_data.iter_mut().enumerate() {
2354                *byte = (i % 256) as u8;
2355            }
2356            assert!(
2357                LARGE_SIZE > PAGE_SIZE.get() as usize,
2358                "Item must be larger than page size"
2359            );
2360
2361            // Append the large item
2362            let (offset, size) = journal
2363                .append(1, &large_data)
2364                .await
2365                .expect("Failed to append large item");
2366            assert_eq!(size as usize, LARGE_SIZE);
2367            journal.sync(1).await.expect("Failed to sync");
2368
2369            // Read the item back via random access
2370            let retrieved: LargeItem = journal
2371                .get(1, offset)
2372                .await
2373                .expect("Failed to get large item");
2374            assert_eq!(retrieved, large_data, "Random access read mismatch");
2375
2376            // Drop and reopen to test replay
2377            drop(journal);
2378            let journal = Journal::<_, LargeItem>::init(context.child("second"), cfg.clone())
2379                .await
2380                .expect("Failed to re-initialize journal");
2381
2382            // Replay and verify the large item
2383            {
2384                let stream = journal
2385                    .replay(0, 0, NZUsize!(1024))
2386                    .await
2387                    .expect("Failed to setup replay");
2388                pin_mut!(stream);
2389
2390                let mut items = Vec::new();
2391                while let Some(result) = stream.next().await {
2392                    let (section, off, sz, item) = result.expect("Failed to replay item");
2393                    items.push((section, off, sz, item));
2394                }
2395
2396                assert_eq!(items.len(), 1, "Should have exactly one item");
2397                let (section, off, sz, item) = &items[0];
2398                assert_eq!(*section, 1);
2399                assert_eq!(*off, offset);
2400                assert_eq!(*sz as usize, LARGE_SIZE);
2401                assert_eq!(*item, large_data, "Replay read mismatch");
2402            }
2403
2404            journal.destroy().await.unwrap();
2405        });
2406    }
2407
2408    #[test_traced]
2409    fn test_journal_non_contiguous_sections() {
2410        // Test that sections with gaps in numbering work correctly.
2411        // Sections 1, 5, 10 should all be independent and accessible.
2412        let executor = deterministic::Runner::default();
2413        executor.start(|context| async move {
2414            let cfg = Config {
2415                partition: "test-partition".into(),
2416                compression: None,
2417                codec_config: (),
2418                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2419                write_buffer: NZUsize!(1024),
2420            };
2421            let mut journal = Journal::init(context.child("first"), cfg.clone())
2422                .await
2423                .expect("Failed to initialize journal");
2424
2425            // Create sections with gaps: 1, 5, 10
2426            let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2427            let mut offsets = Vec::new();
2428
2429            for (section, data) in &sections_and_data {
2430                let (offset, _) = journal
2431                    .append(*section, data)
2432                    .await
2433                    .expect("Failed to append");
2434                offsets.push(offset);
2435            }
2436            journal.sync_all().await.expect("Failed to sync");
2437
2438            // Verify random access to each section
2439            for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2440                let retrieved: i32 = journal
2441                    .get(*section, offsets[i])
2442                    .await
2443                    .expect("Failed to get item");
2444                assert_eq!(retrieved, *expected_data);
2445            }
2446
2447            // Verify non-existent sections return appropriate errors
2448            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2449                let result = journal.get(missing_section, 0).await;
2450                assert!(
2451                    matches!(result, Err(Error::SectionOutOfRange(_))),
2452                    "Expected SectionOutOfRange for section {}, got {:?}",
2453                    missing_section,
2454                    result
2455                );
2456            }
2457
2458            // Drop and reopen to test replay
2459            drop(journal);
2460            let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2461                .await
2462                .expect("Failed to re-initialize journal");
2463
2464            // Replay and verify all items in order
2465            {
2466                let stream = journal
2467                    .replay(0, 0, NZUsize!(1024))
2468                    .await
2469                    .expect("Failed to setup replay");
2470                pin_mut!(stream);
2471
2472                let mut items = Vec::new();
2473                while let Some(result) = stream.next().await {
2474                    let (section, _, _, item) = result.expect("Failed to replay item");
2475                    items.push((section, item));
2476                }
2477
2478                assert_eq!(items.len(), 3, "Should have 3 items");
2479                assert_eq!(items[0], (1, 100));
2480                assert_eq!(items[1], (5, 500));
2481                assert_eq!(items[2], (10, 1000));
2482            }
2483
2484            // Test replay starting from middle section (5)
2485            {
2486                let stream = journal
2487                    .replay(5, 0, NZUsize!(1024))
2488                    .await
2489                    .expect("Failed to setup replay from section 5");
2490                pin_mut!(stream);
2491
2492                let mut items = Vec::new();
2493                while let Some(result) = stream.next().await {
2494                    let (section, _, _, item) = result.expect("Failed to replay item");
2495                    items.push((section, item));
2496                }
2497
2498                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2499                assert_eq!(items[0], (5, 500));
2500                assert_eq!(items[1], (10, 1000));
2501            }
2502
2503            // Test replay starting from non-existent section (should skip to next)
2504            {
2505                let stream = journal
2506                    .replay(3, 0, NZUsize!(1024))
2507                    .await
2508                    .expect("Failed to setup replay from section 3");
2509                pin_mut!(stream);
2510
2511                let mut items = Vec::new();
2512                while let Some(result) = stream.next().await {
2513                    let (section, _, _, item) = result.expect("Failed to replay item");
2514                    items.push((section, item));
2515                }
2516
2517                // Should get sections 5 and 10 (skipping non-existent 3, 4)
2518                assert_eq!(items.len(), 2);
2519                assert_eq!(items[0], (5, 500));
2520                assert_eq!(items[1], (10, 1000));
2521            }
2522
2523            journal.destroy().await.unwrap();
2524        });
2525    }
2526
2527    #[test_traced]
2528    fn test_journal_empty_section_in_middle() {
2529        // Test that replay correctly handles an empty section between sections with data.
2530        // Section 1 has data, section 2 is empty, section 3 has data.
2531        let executor = deterministic::Runner::default();
2532        executor.start(|context| async move {
2533            let cfg = Config {
2534                partition: "test-partition".into(),
2535                compression: None,
2536                codec_config: (),
2537                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2538                write_buffer: NZUsize!(1024),
2539            };
2540            let mut journal = Journal::init(context.child("first"), cfg.clone())
2541                .await
2542                .expect("Failed to initialize journal");
2543
2544            // Append to section 1
2545            journal.append(1, &100i32).await.expect("Failed to append");
2546
2547            // Create section 2 but don't append anything - just sync to create the blob
2548            // Actually, we need to append something and then rewind to make it empty
2549            journal.append(2, &200i32).await.expect("Failed to append");
2550            journal.sync(2).await.expect("Failed to sync");
2551            journal
2552                .rewind_section(2, 0)
2553                .await
2554                .expect("Failed to rewind");
2555
2556            // Append to section 3
2557            journal.append(3, &300i32).await.expect("Failed to append");
2558
2559            journal.sync_all().await.expect("Failed to sync");
2560
2561            // Verify section sizes
2562            assert!(journal.size(1).await.unwrap() > 0);
2563            assert_eq!(journal.size(2).await.unwrap(), 0);
2564            assert!(journal.size(3).await.unwrap() > 0);
2565
2566            // Drop and reopen to test replay
2567            drop(journal);
2568            let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2569                .await
2570                .expect("Failed to re-initialize journal");
2571
2572            // Replay all - should get items from sections 1 and 3, skipping empty section 2
2573            {
2574                let stream = journal
2575                    .replay(0, 0, NZUsize!(1024))
2576                    .await
2577                    .expect("Failed to setup replay");
2578                pin_mut!(stream);
2579
2580                let mut items = Vec::new();
2581                while let Some(result) = stream.next().await {
2582                    let (section, _, _, item) = result.expect("Failed to replay item");
2583                    items.push((section, item));
2584                }
2585
2586                assert_eq!(
2587                    items.len(),
2588                    2,
2589                    "Should have 2 items (skipping empty section)"
2590                );
2591                assert_eq!(items[0], (1, 100));
2592                assert_eq!(items[1], (3, 300));
2593            }
2594
2595            // Replay starting from empty section 2 - should get only section 3
2596            {
2597                let stream = journal
2598                    .replay(2, 0, NZUsize!(1024))
2599                    .await
2600                    .expect("Failed to setup replay from section 2");
2601                pin_mut!(stream);
2602
2603                let mut items = Vec::new();
2604                while let Some(result) = stream.next().await {
2605                    let (section, _, _, item) = result.expect("Failed to replay item");
2606                    items.push((section, item));
2607                }
2608
2609                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2610                assert_eq!(items[0], (3, 300));
2611            }
2612
2613            journal.destroy().await.unwrap();
2614        });
2615    }
2616
2617    #[test_traced]
2618    fn test_journal_item_exactly_page_size() {
2619        // Test that items exactly equal to PAGE_SIZE work correctly.
2620        // This is a boundary condition where item fills exactly one page.
2621        const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2622        type ExactItem = [u8; ITEM_SIZE];
2623
2624        let executor = deterministic::Runner::default();
2625        executor.start(|context| async move {
2626            let cfg = Config {
2627                partition: "test-partition".into(),
2628                compression: None,
2629                codec_config: (),
2630                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2631                write_buffer: NZUsize!(4096),
2632            };
2633            let mut journal = Journal::init(context.child("first"), cfg.clone())
2634                .await
2635                .expect("Failed to initialize journal");
2636
2637            // Create an item exactly PAGE_SIZE bytes
2638            let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2639            for (i, byte) in exact_data.iter_mut().enumerate() {
2640                *byte = (i % 256) as u8;
2641            }
2642
2643            // Append the exact-size item
2644            let (offset, size) = journal
2645                .append(1, &exact_data)
2646                .await
2647                .expect("Failed to append exact item");
2648            assert_eq!(size as usize, ITEM_SIZE);
2649            journal.sync(1).await.expect("Failed to sync");
2650
2651            // Read the item back via random access
2652            let retrieved: ExactItem = journal
2653                .get(1, offset)
2654                .await
2655                .expect("Failed to get exact item");
2656            assert_eq!(retrieved, exact_data, "Random access read mismatch");
2657
2658            // Drop and reopen to test replay
2659            drop(journal);
2660            let journal = Journal::<_, ExactItem>::init(context.child("second"), cfg.clone())
2661                .await
2662                .expect("Failed to re-initialize journal");
2663
2664            // Replay and verify
2665            {
2666                let stream = journal
2667                    .replay(0, 0, NZUsize!(1024))
2668                    .await
2669                    .expect("Failed to setup replay");
2670                pin_mut!(stream);
2671
2672                let mut items = Vec::new();
2673                while let Some(result) = stream.next().await {
2674                    let (section, off, sz, item) = result.expect("Failed to replay item");
2675                    items.push((section, off, sz, item));
2676                }
2677
2678                assert_eq!(items.len(), 1, "Should have exactly one item");
2679                let (section, off, sz, item) = &items[0];
2680                assert_eq!(*section, 1);
2681                assert_eq!(*off, offset);
2682                assert_eq!(*sz as usize, ITEM_SIZE);
2683                assert_eq!(*item, exact_data, "Replay read mismatch");
2684            }
2685
2686            journal.destroy().await.unwrap();
2687        });
2688    }
2689
2690    #[test_traced]
2691    fn test_journal_varint_spanning_page_boundary() {
2692        // Test that items with data spanning page boundaries work correctly
2693        // when using a small page size.
2694        //
2695        // With PAGE_SIZE=16:
2696        // - Physical page = 16 + 12 = 28 bytes
2697        // - Each [u8; 128] item = 2-byte varint + 128 bytes data = 130 bytes
2698        // - This spans multiple 16-byte pages, testing cross-page reading
2699        const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2700
2701        let executor = deterministic::Runner::default();
2702        executor.start(|context| async move {
2703            let cfg = Config {
2704                partition: "test-partition".into(),
2705                compression: None,
2706                codec_config: (),
2707                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2708                write_buffer: NZUsize!(1024),
2709            };
2710            let mut journal: Journal<_, [u8; 128]> =
2711                Journal::init(context.child("first"), cfg.clone())
2712                    .await
2713                    .expect("Failed to initialize journal");
2714
2715            // Create items that will span many 16-byte pages
2716            let item1: [u8; 128] = [1u8; 128];
2717            let item2: [u8; 128] = [2u8; 128];
2718            let item3: [u8; 128] = [3u8; 128];
2719
2720            // Append items - each is 130 bytes (2-byte varint + 128 data)
2721            // spanning ceil(130/16) = 9 pages worth of logical data
2722            let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2723            let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2724            let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2725
2726            journal.sync(1).await.expect("Failed to sync");
2727
2728            // Read items back via random access
2729            let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2730            let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2731            let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2732            assert_eq!(retrieved1, item1);
2733            assert_eq!(retrieved2, item2);
2734            assert_eq!(retrieved3, item3);
2735
2736            // Drop and reopen to test replay
2737            drop(journal);
2738            let journal: Journal<_, [u8; 128]> =
2739                Journal::init(context.child("second"), cfg.clone())
2740                    .await
2741                    .expect("Failed to re-initialize journal");
2742
2743            // Replay and verify all items
2744            {
2745                let stream = journal
2746                    .replay(0, 0, NZUsize!(64))
2747                    .await
2748                    .expect("Failed to setup replay");
2749                pin_mut!(stream);
2750
2751                let mut items = Vec::new();
2752                while let Some(result) = stream.next().await {
2753                    let (section, off, _, item) = result.expect("Failed to replay item");
2754                    items.push((section, off, item));
2755                }
2756
2757                assert_eq!(items.len(), 3, "Should have 3 items");
2758                assert_eq!(items[0], (1, offset1, item1));
2759                assert_eq!(items[1], (1, offset2, item2));
2760                assert_eq!(items[2], (1, offset3, item3));
2761            }
2762
2763            journal.destroy().await.unwrap();
2764        });
2765    }
2766
2767    #[test_traced]
2768    fn test_journal_clear() {
2769        let executor = deterministic::Runner::default();
2770        executor.start(|context| async move {
2771            let cfg = Config {
2772                partition: "clear-test".into(),
2773                compression: None,
2774                codec_config: (),
2775                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2776                write_buffer: NZUsize!(1024),
2777            };
2778
2779            let mut journal: Journal<_, u64> = Journal::init(context.child("journal"), cfg.clone())
2780                .await
2781                .expect("Failed to initialize journal");
2782
2783            // Append items across multiple sections
2784            for section in 0..5u64 {
2785                for i in 0..10u64 {
2786                    journal
2787                        .append(section, &(section * 1000 + i))
2788                        .await
2789                        .expect("Failed to append");
2790                }
2791                journal.sync(section).await.expect("Failed to sync");
2792            }
2793
2794            // Verify we have data
2795            assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2796            assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2797
2798            // Clear the journal
2799            journal.clear().await.expect("Failed to clear");
2800
2801            // After clear, all reads should fail
2802            for section in 0..5u64 {
2803                assert!(matches!(
2804                    journal.get(section, 0).await,
2805                    Err(Error::SectionOutOfRange(s)) if s == section
2806                ));
2807            }
2808
2809            // Append new data after clear
2810            for i in 0..5u64 {
2811                journal
2812                    .append(10, &(i * 100))
2813                    .await
2814                    .expect("Failed to append after clear");
2815            }
2816            journal.sync(10).await.expect("Failed to sync after clear");
2817
2818            // New data should be readable
2819            assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2820
2821            // Old sections should still be missing
2822            assert!(matches!(
2823                journal.get(0, 0).await,
2824                Err(Error::SectionOutOfRange(0))
2825            ));
2826
2827            journal.destroy().await.unwrap();
2828        });
2829    }
2830}