Skip to main content

commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular [Blob] in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    |
16//! +---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |
18//! +---+---+---+---+---+---+---+---+
19//! ```
20//!
21//! # Open Blobs
22//!
23//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
24//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
25//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
26//! `sections`.
27//!
28//! # Sync
29//!
30//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
31//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
32//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
33//!
34//! # Pruning
35//!
36//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
37//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
38//! could be used, for example, by some blockchain application to prune old blocks.
39//!
40//! # Replay
41//!
42//! During application initialization, it is very common to replay data from `Journal` to recover
43//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
44//! method to produce a stream of all items in the `Journal` in order of their `section` and
45//! `offset`.
46//!
47//! # Compression
48//!
49//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
50//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
51//! be changed between initializations of `Journal`, however, it must remain populated if any data
52//! was written with compression enabled.
53//!
54//! # Example
55//!
56//! ```rust
57//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
58//! use commonware_storage::journal::segmented::variable::{Journal, Config};
59//! use commonware_utils::{NZUsize, NZU16};
60//!
61//! let executor = deterministic::Runner::default();
62//! executor.start(|context| async move {
63//!     // Create a page cache
64//!     let page_cache = CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10));
65//!
66//!     // Create a journal
67//!     let mut journal = Journal::init(context, Config {
68//!         partition: "partition".into(),
69//!         compression: None,
70//!         codec_config: (),
71//!         page_cache,
72//!         write_buffer: NZUsize!(1024 * 1024),
73//!     }).await.unwrap();
74//!
75//!     // Append data to the journal
76//!     journal.append(1, &128).await.unwrap();
77//!
78//!     // Sync the journal
79//!     journal.sync_all().await.unwrap();
80//! });
81//! ```
82
83use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86    varint::{UInt, MAX_U32_VARINT_SIZE},
87    Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90    buffer::paged::{Append, CacheRef, Replay},
91    Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98/// Configuration for `Journal` storage.
99#[derive(Clone)]
100pub struct Config<C> {
101    /// The `commonware-runtime::Storage` partition to use
102    /// for storing journal blobs.
103    pub partition: String,
104
105    /// Optional compression level (using `zstd`) to apply to data before storing.
106    pub compression: Option<u8>,
107
108    /// The codec configuration to use for encoding and decoding items.
109    pub codec_config: C,
110
111    /// The page cache to use for caching data.
112    pub page_cache: CacheRef,
113
114    /// The size of the write buffer to use for each blob.
115    pub write_buffer: NonZeroUsize,
116}
117
118/// Decodes a varint length prefix from a buffer.
119/// Returns (item_size, varint_len).
120#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122    let initial = buf.remaining();
123    let size = UInt::<u32>::read(buf)?.0 as usize;
124    let varint_len = initial - buf.remaining();
125    Ok((size, varint_len))
126}
127
128/// Result of finding an item in a buffer (offsets/lengths, not slices).
129enum ItemInfo {
130    /// All item data is available in the buffer.
131    Complete {
132        /// Length of the varint prefix.
133        varint_len: usize,
134        /// Length of the item data.
135        data_len: usize,
136    },
137    /// Only some item data is available.
138    Incomplete {
139        /// Length of the varint prefix.
140        varint_len: usize,
141        /// Bytes of item data available in buffer.
142        prefix_len: usize,
143        /// Full size of the item.
144        total_len: usize,
145    },
146}
147
148/// Find an item in a buffer by decoding its length prefix.
149///
150/// Returns (next_offset, item_info). The buffer is advanced past the varint.
151fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152    let available = buf.remaining();
153    let (size, varint_len) = decode_length_prefix(buf)?;
154    let next_offset = offset
155        .checked_add(varint_len as u64)
156        .ok_or(Error::OffsetOverflow)?
157        .checked_add(size as u64)
158        .ok_or(Error::OffsetOverflow)?;
159    let buffered = available.saturating_sub(varint_len);
160
161    let item = if buffered >= size {
162        ItemInfo::Complete {
163            varint_len,
164            data_len: size,
165        }
166    } else {
167        ItemInfo::Incomplete {
168            varint_len,
169            prefix_len: buffered,
170            total_len: size,
171        }
172    };
173
174    Ok((next_offset, item))
175}
176
177/// State for replaying a single section's blob.
178struct ReplayState<B: Blob, C> {
179    section: u64,
180    blob: Append<B>,
181    replay: Replay<B>,
182    skip_bytes: u64,
183    offset: u64,
184    valid_offset: u64,
185    codec_config: C,
186    compressed: bool,
187    done: bool,
188}
189
190/// Decode item data with optional decompression.
191fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192    if compressed {
193        let decompressed =
194            decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195        V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196    } else {
197        V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198    }
199}
200
201/// A segmented journal with variable-size entries.
202///
203/// Each section is stored in a separate blob. Items are length-prefixed with a varint.
204///
205/// # Repair
206///
207/// Like
208/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
209/// and
210/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
211/// the first invalid data read will be considered the new end of the journal (and the
212/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
213/// replay (not init) because any blob could have trailing bytes.
214pub struct Journal<E: Storage + Metrics, V: Codec> {
215    manager: Manager<E, AppendFactory>,
216
217    /// Compression level (if enabled).
218    compression: Option<u8>,
219
220    /// Codec configuration.
221    codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225    /// Initialize a new `Journal` instance.
226    ///
227    /// All backing blobs are opened but not read during
228    /// initialization. The `replay` method can be used
229    /// to iterate over all items in the `Journal`.
230    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231        let manager_cfg = ManagerConfig {
232            partition: cfg.partition,
233            factory: AppendFactory {
234                write_buffer: cfg.write_buffer,
235                page_cache_ref: cfg.page_cache,
236            },
237        };
238        let manager = Manager::init(context, manager_cfg).await?;
239
240        Ok(Self {
241            manager,
242            compression: cfg.compression,
243            codec_config: cfg.codec_config,
244        })
245    }
246
247    /// Reads an item from the blob at the given offset.
248    async fn read(
249        compressed: bool,
250        cfg: &V::Cfg,
251        blob: &Append<E::Blob>,
252        offset: u64,
253    ) -> Result<(u64, u32, V), Error> {
254        // Read varint header (max 5 bytes for u32)
255        let (buf, available) = blob
256            .read_up_to(
257                offset,
258                MAX_U32_VARINT_SIZE,
259                IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260            )
261            .await?;
262        let buf = buf.freeze();
263        let mut cursor = Cursor::new(buf.slice(..available));
264        let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266        // Decode item - either directly from buffer or by chaining prefix with remainder
267        let (item_size, decoded) = match item_info {
268            ItemInfo::Complete {
269                varint_len,
270                data_len,
271            } => {
272                // Data follows varint in buffer
273                let data = buf.slice(varint_len..varint_len + data_len);
274                let decoded = decode_item::<V>(data, cfg, compressed)?;
275                (data_len as u32, decoded)
276            }
277            ItemInfo::Incomplete {
278                varint_len,
279                prefix_len,
280                total_len,
281            } => {
282                // Read remainder and chain with prefix to avoid copying
283                let prefix = buf.slice(varint_len..varint_len + prefix_len);
284                let read_offset = offset + varint_len as u64 + prefix_len as u64;
285                let remainder_len = total_len - prefix_len;
286                let mut remainder = vec![0u8; remainder_len];
287                blob.read_into(&mut remainder, read_offset).await?;
288                let chained = prefix.chain(IoBuf::from(remainder));
289                let decoded = decode_item::<V>(chained, cfg, compressed)?;
290                (total_len as u32, decoded)
291            }
292        };
293
294        Ok((next_offset, item_size, decoded))
295    }
296
297    /// Returns an ordered stream of all items in the journal starting with the item at the given
298    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
299    /// (section, offset, size, item).
300    pub async fn replay(
301        &self,
302        start_section: u64,
303        mut start_offset: u64,
304        buffer: NonZeroUsize,
305    ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306        // Collect all blobs to replay (keeping blob reference for potential resize)
307        let codec_config = self.codec_config.clone();
308        let compressed = self.compression.is_some();
309        let mut blobs = Vec::new();
310        for (&section, blob) in self.manager.sections_from(start_section) {
311            blobs.push((
312                section,
313                blob.clone(),
314                blob.replay(buffer).await?,
315                codec_config.clone(),
316                compressed,
317            ));
318        }
319
320        // Stream items as they are read to avoid occupying too much memory
321        Ok(stream::iter(blobs).flat_map(
322            move |(section, blob, replay, codec_config, compressed)| {
323                // Calculate initial skip bytes for first blob
324                let skip_bytes = if section == start_section {
325                    start_offset
326                } else {
327                    start_offset = 0;
328                    0
329                };
330
331                stream::unfold(
332                    ReplayState {
333                        section,
334                        blob,
335                        replay,
336                        skip_bytes,
337                        offset: 0,
338                        valid_offset: skip_bytes,
339                        codec_config,
340                        compressed,
341                        done: false,
342                    },
343                    move |mut state| async move {
344                        if state.done {
345                            return None;
346                        }
347
348                        let blob_size = state.replay.blob_size();
349                        let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350                        loop {
351                            // Ensure we have enough data for varint header.
352                            // ensure() returns Ok(false) if exhausted with fewer bytes,
353                            // but we still try to decode from remaining bytes.
354                            match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355                                Ok(true) => {}
356                                Ok(false) => {
357                                    // Reader exhausted - check if buffer is empty
358                                    if state.replay.remaining() == 0 {
359                                        state.done = true;
360                                        return if batch.is_empty() {
361                                            None
362                                        } else {
363                                            Some((batch, state))
364                                        };
365                                    }
366                                    // Buffer still has data - continue to try decoding
367                                }
368                                Err(err) => {
369                                    batch.push(Err(err.into()));
370                                    state.done = true;
371                                    return Some((batch, state));
372                                }
373                            }
374
375                            // Skip bytes if needed (for start_offset)
376                            if state.skip_bytes > 0 {
377                                let to_skip =
378                                    state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379                                state.replay.advance(to_skip);
380                                state.skip_bytes -= to_skip as u64;
381                                state.offset += to_skip as u64;
382                                continue;
383                            }
384
385                            // Try to decode length prefix
386                            let before_remaining = state.replay.remaining();
387                            let (item_size, varint_len) =
388                                match decode_length_prefix(&mut state.replay) {
389                                    Ok(result) => result,
390                                    Err(err) => {
391                                        // Could be incomplete varint - check if reader exhausted
392                                        if state.replay.is_exhausted()
393                                            || before_remaining < MAX_U32_VARINT_SIZE
394                                        {
395                                            // Treat as trailing bytes
396                                            if state.valid_offset < blob_size
397                                                && state.offset < blob_size
398                                            {
399                                                warn!(
400                                                    blob = state.section,
401                                                    bad_offset = state.offset,
402                                                    new_size = state.valid_offset,
403                                                    "trailing bytes detected: truncating"
404                                                );
405                                                state.blob.resize(state.valid_offset).await.ok()?;
406                                            }
407                                            state.done = true;
408                                            return if batch.is_empty() {
409                                                None
410                                            } else {
411                                                Some((batch, state))
412                                            };
413                                        }
414                                        batch.push(Err(err));
415                                        state.done = true;
416                                        return Some((batch, state));
417                                    }
418                                };
419
420                            // Ensure we have enough data for item body
421                            match state.replay.ensure(item_size).await {
422                                Ok(true) => {}
423                                Ok(false) => {
424                                    // Incomplete item at end - truncate
425                                    warn!(
426                                        blob = state.section,
427                                        bad_offset = state.offset,
428                                        new_size = state.valid_offset,
429                                        "incomplete item at end: truncating"
430                                    );
431                                    state.blob.resize(state.valid_offset).await.ok()?;
432                                    state.done = true;
433                                    return if batch.is_empty() {
434                                        None
435                                    } else {
436                                        Some((batch, state))
437                                    };
438                                }
439                                Err(err) => {
440                                    batch.push(Err(err.into()));
441                                    state.done = true;
442                                    return Some((batch, state));
443                                }
444                            }
445
446                            // Decode item - use take() to limit bytes read
447                            let item_offset = state.offset;
448                            let next_offset = match state
449                                .offset
450                                .checked_add(varint_len as u64)
451                                .and_then(|o| o.checked_add(item_size as u64))
452                            {
453                                Some(o) => o,
454                                None => {
455                                    batch.push(Err(Error::OffsetOverflow));
456                                    state.done = true;
457                                    return Some((batch, state));
458                                }
459                            };
460                            match decode_item::<V>(
461                                (&mut state.replay).take(item_size),
462                                &state.codec_config,
463                                state.compressed,
464                            ) {
465                                Ok(decoded) => {
466                                    batch.push(Ok((
467                                        state.section,
468                                        item_offset,
469                                        item_size as u32,
470                                        decoded,
471                                    )));
472                                    state.valid_offset = next_offset;
473                                    state.offset = next_offset;
474                                }
475                                Err(err) => {
476                                    batch.push(Err(err));
477                                    state.done = true;
478                                    return Some((batch, state));
479                                }
480                            }
481
482                            // Return batch if we have items and buffer is low
483                            if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
484                                return Some((batch, state));
485                            }
486                        }
487                    },
488                )
489                .flat_map(stream::iter)
490            },
491        ))
492    }
493
494    /// Appends an item to `Journal` in a given `section`, returning the offset
495    /// where the item was written and the size of the item (which may now be smaller
496    /// than the encoded size from the codec, if compression is enabled).
497    pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
498        // Create buffer with item data (no checksum, no alignment)
499        let (buf, item_len) = if let Some(compression) = self.compression {
500            // Compressed: encode first, then compress
501            let encoded = item.encode();
502            let compressed =
503                compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
504            let item_len = compressed.len();
505            let item_len_u32: u32 = match item_len.try_into() {
506                Ok(len) => len,
507                Err(_) => return Err(Error::ItemTooLarge(item_len)),
508            };
509            let size_len = UInt(item_len_u32).encode_size();
510            let entry_len = size_len
511                .checked_add(item_len)
512                .ok_or(Error::OffsetOverflow)?;
513
514            let mut buf = Vec::with_capacity(entry_len);
515            UInt(item_len_u32).write(&mut buf);
516            buf.put_slice(&compressed);
517
518            (buf, item_len)
519        } else {
520            // Uncompressed: pre-allocate exact size to avoid copying
521            let item_len = item.encode_size();
522            let item_len_u32: u32 = match item_len.try_into() {
523                Ok(len) => len,
524                Err(_) => return Err(Error::ItemTooLarge(item_len)),
525            };
526            let size_len = UInt(item_len_u32).encode_size();
527            let entry_len = size_len
528                .checked_add(item_len)
529                .ok_or(Error::OffsetOverflow)?;
530
531            let mut buf = Vec::with_capacity(entry_len);
532            UInt(item_len_u32).write(&mut buf);
533            item.write(&mut buf);
534
535            (buf, item_len)
536        };
537
538        // Get or create blob
539        let blob = self.manager.get_or_create(section).await?;
540
541        // Get current position - this is where we'll write (no alignment)
542        let offset = blob.size().await;
543
544        // Append item to blob
545        blob.append(&buf).await?;
546        trace!(blob = section, offset, "appended item");
547        Ok((offset, item_len as u32))
548    }
549
550    /// Retrieves an item from `Journal` at a given `section` and `offset`.
551    ///
552    /// # Errors
553    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
554    ///    current execution.
555    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
556    ///    data appended to it, or has been pruned in a previous execution).
557    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
558    ///    previously appended item) will result in an error, with the specific type being
559    ///    undefined.
560    pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
561        let blob = self
562            .manager
563            .get(section)?
564            .ok_or(Error::SectionOutOfRange(section))?;
565
566        // Perform a multi-op read.
567        let (_, _, item) =
568            Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
569        Ok(item)
570    }
571
572    /// Gets the size of the journal for a specific section.
573    ///
574    /// Returns 0 if the section does not exist.
575    pub async fn size(&self, section: u64) -> Result<u64, Error> {
576        self.manager.size(section).await
577    }
578
579    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
580    ///
581    /// # Warnings
582    ///
583    /// * This operation is not guaranteed to survive restarts until sync is called.
584    /// * This operation is not atomic, but it will always leave the journal in a consistent state
585    ///   in the event of failure since blobs are always removed in reverse order of section.
586    pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
587        self.manager.rewind(section, offset).await
588    }
589
590    /// Rewinds the journal to the given `section` and `size`.
591    ///
592    /// This removes any data beyond the specified `section` and `size`.
593    ///
594    /// # Warnings
595    ///
596    /// * This operation is not guaranteed to survive restarts until sync is called.
597    /// * This operation is not atomic, but it will always leave the journal in a consistent state
598    ///   in the event of failure since blobs are always removed in reverse order of section.
599    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
600        self.manager.rewind(section, size).await
601    }
602
603    /// Rewinds the `section` to the given `size`.
604    ///
605    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
606    ///
607    /// # Warning
608    ///
609    /// This operation is not guaranteed to survive restarts until sync is called.
610    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
611        self.manager.rewind_section(section, size).await
612    }
613
614    /// Ensures that all data in a given `section` is synced to the underlying store.
615    ///
616    /// If the `section` does not exist, no error will be returned.
617    pub async fn sync(&self, section: u64) -> Result<(), Error> {
618        self.manager.sync(section).await
619    }
620
621    /// Syncs all open sections.
622    pub async fn sync_all(&self) -> Result<(), Error> {
623        self.manager.sync_all().await
624    }
625
626    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
627    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
628        self.manager.prune(min).await
629    }
630
631    /// Returns the number of the oldest section in the journal.
632    pub fn oldest_section(&self) -> Option<u64> {
633        self.manager.oldest_section()
634    }
635
636    /// Returns the number of the newest section in the journal.
637    pub fn newest_section(&self) -> Option<u64> {
638        self.manager.newest_section()
639    }
640
641    /// Returns true if no sections exist.
642    pub fn is_empty(&self) -> bool {
643        self.manager.is_empty()
644    }
645
646    /// Returns the number of sections.
647    pub fn num_sections(&self) -> usize {
648        self.manager.num_sections()
649    }
650
651    /// Removes any underlying blobs created by the journal.
652    pub async fn destroy(self) -> Result<(), Error> {
653        self.manager.destroy().await
654    }
655
656    /// Clear all data, resetting the journal to an empty state.
657    ///
658    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
659    pub async fn clear(&mut self) -> Result<(), Error> {
660        self.manager.clear().await
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use commonware_macros::test_traced;
668    use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
669    use commonware_utils::{NZUsize, NZU16};
670    use futures::{pin_mut, StreamExt};
671    use std::num::NonZeroU16;
672
673    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
674    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
675
676    #[test_traced]
677    fn test_journal_append_and_read() {
678        // Initialize the deterministic context
679        let executor = deterministic::Runner::default();
680
681        // Start the test within the executor
682        executor.start(|context| async move {
683            // Initialize the journal
684            let cfg = Config {
685                partition: "test-partition".into(),
686                compression: None,
687                codec_config: (),
688                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
689                write_buffer: NZUsize!(1024),
690            };
691            let index = 1u64;
692            let data = 10;
693            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
694                .await
695                .expect("Failed to initialize journal");
696
697            // Append an item to the journal
698            journal
699                .append(index, &data)
700                .await
701                .expect("Failed to append data");
702
703            // Check metrics
704            let buffer = context.encode();
705            assert!(buffer.contains("first_tracked 1"));
706
707            // Drop and re-open the journal to simulate a restart
708            journal.sync(index).await.expect("Failed to sync journal");
709            drop(journal);
710            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
711                .await
712                .expect("Failed to re-initialize journal");
713
714            // Replay the journal and collect items
715            let mut items = Vec::new();
716            let stream = journal
717                .replay(0, 0, NZUsize!(1024))
718                .await
719                .expect("unable to setup replay");
720            pin_mut!(stream);
721            while let Some(result) = stream.next().await {
722                match result {
723                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
724                    Err(err) => panic!("Failed to read item: {err}"),
725                }
726            }
727
728            // Verify that the item was replayed correctly
729            assert_eq!(items.len(), 1);
730            assert_eq!(items[0].0, index);
731            assert_eq!(items[0].1, data);
732
733            // Check metrics
734            let buffer = context.encode();
735            assert!(buffer.contains("second_tracked 1"));
736        });
737    }
738
739    #[test_traced]
740    fn test_journal_multiple_appends_and_reads() {
741        // Initialize the deterministic context
742        let executor = deterministic::Runner::default();
743
744        // Start the test within the executor
745        executor.start(|context| async move {
746            // Create a journal configuration
747            let cfg = Config {
748                partition: "test-partition".into(),
749                compression: None,
750                codec_config: (),
751                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
752                write_buffer: NZUsize!(1024),
753            };
754
755            // Initialize the journal
756            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
757                .await
758                .expect("Failed to initialize journal");
759
760            // Append multiple items to different blobs
761            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
762            for (index, data) in &data_items {
763                journal
764                    .append(*index, data)
765                    .await
766                    .expect("Failed to append data");
767                journal.sync(*index).await.expect("Failed to sync blob");
768            }
769
770            // Check metrics
771            let buffer = context.encode();
772            assert!(buffer.contains("first_tracked 3"));
773            assert!(buffer.contains("first_synced_total 4"));
774
775            // Drop and re-open the journal to simulate a restart
776            drop(journal);
777            let journal = Journal::init(context.with_label("second"), cfg)
778                .await
779                .expect("Failed to re-initialize journal");
780
781            // Replay the journal and collect items
782            let mut items = Vec::<(u64, u32)>::new();
783            {
784                let stream = journal
785                    .replay(0, 0, NZUsize!(1024))
786                    .await
787                    .expect("unable to setup replay");
788                pin_mut!(stream);
789                while let Some(result) = stream.next().await {
790                    match result {
791                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
792                        Err(err) => panic!("Failed to read item: {err}"),
793                    }
794                }
795            }
796
797            // Verify that all items were replayed correctly
798            assert_eq!(items.len(), data_items.len());
799            for ((expected_index, expected_data), (actual_index, actual_data)) in
800                data_items.iter().zip(items.iter())
801            {
802                assert_eq!(actual_index, expected_index);
803                assert_eq!(actual_data, expected_data);
804            }
805
806            // Cleanup
807            journal.destroy().await.expect("Failed to destroy journal");
808        });
809    }
810
811    #[test_traced]
812    fn test_journal_prune_blobs() {
813        // Initialize the deterministic context
814        let executor = deterministic::Runner::default();
815
816        // Start the test within the executor
817        executor.start(|context| async move {
818            // Create a journal configuration
819            let cfg = Config {
820                partition: "test-partition".into(),
821                compression: None,
822                codec_config: (),
823                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
824                write_buffer: NZUsize!(1024),
825            };
826
827            // Initialize the journal
828            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
829                .await
830                .expect("Failed to initialize journal");
831
832            // Append items to multiple blobs
833            for index in 1u64..=5u64 {
834                journal
835                    .append(index, &index)
836                    .await
837                    .expect("Failed to append data");
838                journal.sync(index).await.expect("Failed to sync blob");
839            }
840
841            // Add one item out-of-order
842            let data = 99;
843            journal
844                .append(2u64, &data)
845                .await
846                .expect("Failed to append data");
847            journal.sync(2u64).await.expect("Failed to sync blob");
848
849            // Prune blobs with indices less than 3
850            journal.prune(3).await.expect("Failed to prune blobs");
851
852            // Check metrics
853            let buffer = context.encode();
854            assert!(buffer.contains("first_pruned_total 2"));
855
856            // Prune again with a section less than the previous one, should be a no-op
857            journal.prune(2).await.expect("Failed to no-op prune");
858            let buffer = context.encode();
859            assert!(buffer.contains("first_pruned_total 2"));
860
861            // Drop and re-open the journal to simulate a restart
862            drop(journal);
863            let mut journal = Journal::init(context.with_label("second"), cfg.clone())
864                .await
865                .expect("Failed to re-initialize journal");
866
867            // Replay the journal and collect items
868            let mut items = Vec::<(u64, u64)>::new();
869            {
870                let stream = journal
871                    .replay(0, 0, NZUsize!(1024))
872                    .await
873                    .expect("unable to setup replay");
874                pin_mut!(stream);
875                while let Some(result) = stream.next().await {
876                    match result {
877                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
878                        Err(err) => panic!("Failed to read item: {err}"),
879                    }
880                }
881            }
882
883            // Verify that items from blobs 1 and 2 are not present
884            assert_eq!(items.len(), 3);
885            let expected_indices = [3u64, 4u64, 5u64];
886            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
887                assert_eq!(item.0, *expected_index);
888            }
889
890            // Prune all blobs
891            journal.prune(6).await.expect("Failed to prune blobs");
892
893            // Drop the journal
894            drop(journal);
895
896            // Ensure no remaining blobs exist
897            //
898            // Note: We don't remove the partition, so this does not error
899            // and instead returns an empty list of blobs.
900            assert!(context
901                .scan(&cfg.partition)
902                .await
903                .expect("Failed to list blobs")
904                .is_empty());
905        });
906    }
907
908    #[test_traced]
909    fn test_journal_prune_guard() {
910        let executor = deterministic::Runner::default();
911
912        executor.start(|context| async move {
913            let cfg = Config {
914                partition: "test-partition".into(),
915                compression: None,
916                codec_config: (),
917                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
918                write_buffer: NZUsize!(1024),
919            };
920
921            let mut journal = Journal::init(context.clone(), cfg.clone())
922                .await
923                .expect("Failed to initialize journal");
924
925            // Append items to sections 1-5
926            for section in 1u64..=5u64 {
927                journal
928                    .append(section, &(section as i32))
929                    .await
930                    .expect("Failed to append data");
931                journal.sync(section).await.expect("Failed to sync");
932            }
933
934            // Prune sections < 3
935            journal.prune(3).await.expect("Failed to prune");
936
937            // Test that accessing pruned sections returns the correct error
938
939            // Test append on pruned section
940            match journal.append(1, &100).await {
941                Err(Error::AlreadyPrunedToSection(3)) => {}
942                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
943            }
944
945            match journal.append(2, &100).await {
946                Err(Error::AlreadyPrunedToSection(3)) => {}
947                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
948            }
949
950            // Test get on pruned section
951            match journal.get(1, 0).await {
952                Err(Error::AlreadyPrunedToSection(3)) => {}
953                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
954            }
955
956            // Test size on pruned section
957            match journal.size(1).await {
958                Err(Error::AlreadyPrunedToSection(3)) => {}
959                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
960            }
961
962            // Test rewind on pruned section
963            match journal.rewind(2, 0).await {
964                Err(Error::AlreadyPrunedToSection(3)) => {}
965                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
966            }
967
968            // Test rewind_section on pruned section
969            match journal.rewind_section(1, 0).await {
970                Err(Error::AlreadyPrunedToSection(3)) => {}
971                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
972            }
973
974            // Test sync on pruned section
975            match journal.sync(2).await {
976                Err(Error::AlreadyPrunedToSection(3)) => {}
977                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
978            }
979
980            // Test that accessing sections at or after the threshold works
981            assert!(journal.get(3, 0).await.is_ok());
982            assert!(journal.get(4, 0).await.is_ok());
983            assert!(journal.get(5, 0).await.is_ok());
984            assert!(journal.size(3).await.is_ok());
985            assert!(journal.sync(4).await.is_ok());
986
987            // Append to section at threshold should work
988            journal
989                .append(3, &999)
990                .await
991                .expect("Should be able to append to section 3");
992
993            // Prune more sections
994            journal.prune(5).await.expect("Failed to prune");
995
996            // Verify sections 3 and 4 are now pruned
997            match journal.get(3, 0).await {
998                Err(Error::AlreadyPrunedToSection(5)) => {}
999                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1000            }
1001
1002            match journal.get(4, 0).await {
1003                Err(Error::AlreadyPrunedToSection(5)) => {}
1004                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1005            }
1006
1007            // Section 5 should still be accessible
1008            assert!(journal.get(5, 0).await.is_ok());
1009        });
1010    }
1011
1012    #[test_traced]
1013    fn test_journal_prune_guard_across_restart() {
1014        let executor = deterministic::Runner::default();
1015
1016        executor.start(|context| async move {
1017            let cfg = Config {
1018                partition: "test-partition".into(),
1019                compression: None,
1020                codec_config: (),
1021                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1022                write_buffer: NZUsize!(1024),
1023            };
1024
1025            // First session: create and prune
1026            {
1027                let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1028                    .await
1029                    .expect("Failed to initialize journal");
1030
1031                for section in 1u64..=5u64 {
1032                    journal
1033                        .append(section, &(section as i32))
1034                        .await
1035                        .expect("Failed to append data");
1036                    journal.sync(section).await.expect("Failed to sync");
1037                }
1038
1039                journal.prune(3).await.expect("Failed to prune");
1040            }
1041
1042            // Second session: verify oldest_retained_section is reset
1043            {
1044                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1045                    .await
1046                    .expect("Failed to re-initialize journal");
1047
1048                // But the actual sections 1 and 2 should be gone from storage
1049                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1050                match journal.get(1, 0).await {
1051                    Err(Error::SectionOutOfRange(1)) => {}
1052                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1053                }
1054
1055                match journal.get(2, 0).await {
1056                    Err(Error::SectionOutOfRange(2)) => {}
1057                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1058                }
1059
1060                // Sections 3-5 should still be accessible
1061                assert!(journal.get(3, 0).await.is_ok());
1062                assert!(journal.get(4, 0).await.is_ok());
1063                assert!(journal.get(5, 0).await.is_ok());
1064            }
1065        });
1066    }
1067
1068    #[test_traced]
1069    fn test_journal_with_invalid_blob_name() {
1070        // Initialize the deterministic context
1071        let executor = deterministic::Runner::default();
1072
1073        // Start the test within the executor
1074        executor.start(|context| async move {
1075            // Create a journal configuration
1076            let cfg = Config {
1077                partition: "test-partition".into(),
1078                compression: None,
1079                codec_config: (),
1080                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1081                write_buffer: NZUsize!(1024),
1082            };
1083
1084            // Manually create a blob with an invalid name (not 8 bytes)
1085            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1086            let (blob, _) = context
1087                .open(&cfg.partition, invalid_blob_name)
1088                .await
1089                .expect("Failed to create blob with invalid name");
1090            blob.sync().await.expect("Failed to sync blob");
1091
1092            // Attempt to initialize the journal
1093            let result = Journal::<_, u64>::init(context, cfg).await;
1094
1095            // Expect an error
1096            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1097        });
1098    }
1099
1100    #[test_traced]
1101    fn test_journal_read_size_missing() {
1102        // Initialize the deterministic context
1103        let executor = deterministic::Runner::default();
1104
1105        // Start the test within the executor
1106        executor.start(|context| async move {
1107            // Create a journal configuration
1108            let cfg = Config {
1109                partition: "test-partition".into(),
1110                compression: None,
1111                codec_config: (),
1112                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1113                write_buffer: NZUsize!(1024),
1114            };
1115
1116            // Manually create a blob with incomplete size data
1117            let section = 1u64;
1118            let blob_name = section.to_be_bytes();
1119            let (blob, _) = context
1120                .open(&cfg.partition, &blob_name)
1121                .await
1122                .expect("Failed to create blob");
1123
1124            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1125            let mut incomplete_data = Vec::new();
1126            UInt(u32::MAX).write(&mut incomplete_data);
1127            incomplete_data.truncate(1);
1128            blob.write_at(0, incomplete_data)
1129                .await
1130                .expect("Failed to write incomplete data");
1131            blob.sync().await.expect("Failed to sync blob");
1132
1133            // Initialize the journal
1134            let journal = Journal::init(context, cfg)
1135                .await
1136                .expect("Failed to initialize journal");
1137
1138            // Attempt to replay the journal
1139            let stream = journal
1140                .replay(0, 0, NZUsize!(1024))
1141                .await
1142                .expect("unable to setup replay");
1143            pin_mut!(stream);
1144            let mut items = Vec::<(u64, u64)>::new();
1145            while let Some(result) = stream.next().await {
1146                match result {
1147                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1148                    Err(err) => panic!("Failed to read item: {err}"),
1149                }
1150            }
1151            assert!(items.is_empty());
1152        });
1153    }
1154
1155    #[test_traced]
1156    fn test_journal_read_item_missing() {
1157        // Initialize the deterministic context
1158        let executor = deterministic::Runner::default();
1159
1160        // Start the test within the executor
1161        executor.start(|context| async move {
1162            // Create a journal configuration
1163            let cfg = Config {
1164                partition: "test-partition".into(),
1165                compression: None,
1166                codec_config: (),
1167                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1168                write_buffer: NZUsize!(1024),
1169            };
1170
1171            // Manually create a blob with missing item data
1172            let section = 1u64;
1173            let blob_name = section.to_be_bytes();
1174            let (blob, _) = context
1175                .open(&cfg.partition, &blob_name)
1176                .await
1177                .expect("Failed to create blob");
1178
1179            // Write size but incomplete item data
1180            let item_size: u32 = 10; // Size indicates 10 bytes of data
1181            let mut buf = Vec::new();
1182            UInt(item_size).write(&mut buf); // Varint encoding
1183            let data = [2u8; 5];
1184            BufMut::put_slice(&mut buf, &data);
1185            blob.write_at(0, buf)
1186                .await
1187                .expect("Failed to write incomplete item");
1188            blob.sync().await.expect("Failed to sync blob");
1189
1190            // Initialize the journal
1191            let journal = Journal::init(context, cfg)
1192                .await
1193                .expect("Failed to initialize journal");
1194
1195            // Attempt to replay the journal
1196            let stream = journal
1197                .replay(0, 0, NZUsize!(1024))
1198                .await
1199                .expect("unable to setup replay");
1200            pin_mut!(stream);
1201            let mut items = Vec::<(u64, u64)>::new();
1202            while let Some(result) = stream.next().await {
1203                match result {
1204                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1205                    Err(err) => panic!("Failed to read item: {err}"),
1206                }
1207            }
1208            assert!(items.is_empty());
1209        });
1210    }
1211
1212    #[test_traced]
1213    fn test_journal_read_checksum_missing() {
1214        // Initialize the deterministic context
1215        let executor = deterministic::Runner::default();
1216
1217        // Start the test within the executor
1218        executor.start(|context| async move {
1219            // Create a journal configuration
1220            let cfg = Config {
1221                partition: "test-partition".into(),
1222                compression: None,
1223                codec_config: (),
1224                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1225                write_buffer: NZUsize!(1024),
1226            };
1227
1228            // Manually create a blob with missing checksum
1229            let section = 1u64;
1230            let blob_name = section.to_be_bytes();
1231            let (blob, _) = context
1232                .open(&cfg.partition, &blob_name)
1233                .await
1234                .expect("Failed to create blob");
1235
1236            // Prepare item data
1237            let item_data = b"Test data";
1238            let item_size = item_data.len() as u32;
1239
1240            // Write size (varint) and data, but no checksum
1241            let mut buf = Vec::new();
1242            UInt(item_size).write(&mut buf);
1243            BufMut::put_slice(&mut buf, item_data);
1244            blob.write_at(0, buf)
1245                .await
1246                .expect("Failed to write item without checksum");
1247
1248            blob.sync().await.expect("Failed to sync blob");
1249
1250            // Initialize the journal
1251            let journal = Journal::init(context, cfg)
1252                .await
1253                .expect("Failed to initialize journal");
1254
1255            // Attempt to replay the journal
1256            //
1257            // This will truncate the leftover bytes from our manual write.
1258            let stream = journal
1259                .replay(0, 0, NZUsize!(1024))
1260                .await
1261                .expect("unable to setup replay");
1262            pin_mut!(stream);
1263            let mut items = Vec::<(u64, u64)>::new();
1264            while let Some(result) = stream.next().await {
1265                match result {
1266                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1267                    Err(err) => panic!("Failed to read item: {err}"),
1268                }
1269            }
1270            assert!(items.is_empty());
1271        });
1272    }
1273
1274    #[test_traced]
1275    fn test_journal_read_checksum_mismatch() {
1276        // Initialize the deterministic context
1277        let executor = deterministic::Runner::default();
1278
1279        // Start the test within the executor
1280        executor.start(|context| async move {
1281            // Create a journal configuration
1282            let cfg = Config {
1283                partition: "test-partition".into(),
1284                compression: None,
1285                codec_config: (),
1286                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1287                write_buffer: NZUsize!(1024),
1288            };
1289
1290            // Manually create a blob with incorrect checksum
1291            let section = 1u64;
1292            let blob_name = section.to_be_bytes();
1293            let (blob, _) = context
1294                .open(&cfg.partition, &blob_name)
1295                .await
1296                .expect("Failed to create blob");
1297
1298            // Prepare item data
1299            let item_data = b"Test data";
1300            let item_size = item_data.len() as u32;
1301            let incorrect_checksum: u32 = 0xDEADBEEF;
1302
1303            // Write size (varint), data, and incorrect checksum
1304            let mut buf = Vec::new();
1305            UInt(item_size).write(&mut buf);
1306            BufMut::put_slice(&mut buf, item_data);
1307            buf.put_u32(incorrect_checksum);
1308            blob.write_at(0, buf)
1309                .await
1310                .expect("Failed to write item with bad checksum");
1311
1312            blob.sync().await.expect("Failed to sync blob");
1313
1314            // Initialize the journal
1315            let journal = Journal::init(context.clone(), cfg.clone())
1316                .await
1317                .expect("Failed to initialize journal");
1318
1319            // Attempt to replay the journal
1320            {
1321                let stream = journal
1322                    .replay(0, 0, NZUsize!(1024))
1323                    .await
1324                    .expect("unable to setup replay");
1325                pin_mut!(stream);
1326                let mut items = Vec::<(u64, u64)>::new();
1327                while let Some(result) = stream.next().await {
1328                    match result {
1329                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1330                        Err(err) => panic!("Failed to read item: {err}"),
1331                    }
1332                }
1333                assert!(items.is_empty());
1334            }
1335            drop(journal);
1336
1337            // Confirm blob is expected length
1338            let (_, blob_size) = context
1339                .open(&cfg.partition, &section.to_be_bytes())
1340                .await
1341                .expect("Failed to open blob");
1342            assert_eq!(blob_size, 0);
1343        });
1344    }
1345
1346    #[test_traced]
1347    fn test_journal_truncation_recovery() {
1348        // Initialize the deterministic context
1349        let executor = deterministic::Runner::default();
1350
1351        // Start the test within the executor
1352        executor.start(|context| async move {
1353            // Create a journal configuration
1354            let cfg = Config {
1355                partition: "test-partition".into(),
1356                compression: None,
1357                codec_config: (),
1358                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1359                write_buffer: NZUsize!(1024),
1360            };
1361
1362            // Initialize the journal
1363            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1364                .await
1365                .expect("Failed to initialize journal");
1366
1367            // Append 1 item to the first index
1368            journal.append(1, &1).await.expect("Failed to append data");
1369
1370            // Append multiple items to the second section
1371            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1372            for (index, data) in &data_items {
1373                journal
1374                    .append(*index, data)
1375                    .await
1376                    .expect("Failed to append data");
1377                journal.sync(*index).await.expect("Failed to sync blob");
1378            }
1379
1380            // Sync all sections and drop the journal
1381            journal.sync_all().await.expect("Failed to sync");
1382            drop(journal);
1383
1384            // Manually corrupt the end of the second blob
1385            let (blob, blob_size) = context
1386                .open(&cfg.partition, &2u64.to_be_bytes())
1387                .await
1388                .expect("Failed to open blob");
1389            blob.resize(blob_size - 4)
1390                .await
1391                .expect("Failed to corrupt blob");
1392            blob.sync().await.expect("Failed to sync blob");
1393
1394            // Re-initialize the journal to simulate a restart
1395            let journal = Journal::init(context.with_label("second"), cfg.clone())
1396                .await
1397                .expect("Failed to re-initialize journal");
1398
1399            // Attempt to replay the journal
1400            let mut items = Vec::<(u64, u32)>::new();
1401            {
1402                let stream = journal
1403                    .replay(0, 0, NZUsize!(1024))
1404                    .await
1405                    .expect("unable to setup replay");
1406                pin_mut!(stream);
1407                while let Some(result) = stream.next().await {
1408                    match result {
1409                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1410                        Err(err) => panic!("Failed to read item: {err}"),
1411                    }
1412                }
1413            }
1414            drop(journal);
1415
1416            // Verify that replay stopped after corruption detected (the second blob).
1417            assert_eq!(items.len(), 1);
1418            assert_eq!(items[0].0, 1);
1419            assert_eq!(items[0].1, 1);
1420
1421            // Confirm second blob was truncated.
1422            let (_, blob_size) = context
1423                .open(&cfg.partition, &2u64.to_be_bytes())
1424                .await
1425                .expect("Failed to open blob");
1426            assert_eq!(blob_size, 0);
1427
1428            // Attempt to replay journal after truncation
1429            let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1430                .await
1431                .expect("Failed to re-initialize journal");
1432
1433            // Attempt to replay the journal
1434            let mut items = Vec::<(u64, u32)>::new();
1435            {
1436                let stream = journal
1437                    .replay(0, 0, NZUsize!(1024))
1438                    .await
1439                    .expect("unable to setup replay");
1440                pin_mut!(stream);
1441                while let Some(result) = stream.next().await {
1442                    match result {
1443                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1444                        Err(err) => panic!("Failed to read item: {err}"),
1445                    }
1446                }
1447            }
1448
1449            // Verify that only non-corrupted items were replayed
1450            assert_eq!(items.len(), 1);
1451            assert_eq!(items[0].0, 1);
1452            assert_eq!(items[0].1, 1);
1453
1454            // Append a new item to truncated partition
1455            let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1456            journal.sync(2).await.expect("Failed to sync blob");
1457
1458            // Get the new item (offset is 0 since blob was truncated)
1459            let item = journal.get(2, 0).await.expect("Failed to get item");
1460            assert_eq!(item, 5);
1461
1462            // Drop the journal (data already synced)
1463            drop(journal);
1464
1465            // Re-initialize the journal to simulate a restart
1466            let journal = Journal::init(context.clone(), cfg.clone())
1467                .await
1468                .expect("Failed to re-initialize journal");
1469
1470            // Attempt to replay the journal
1471            let mut items = Vec::<(u64, u32)>::new();
1472            {
1473                let stream = journal
1474                    .replay(0, 0, NZUsize!(1024))
1475                    .await
1476                    .expect("unable to setup replay");
1477                pin_mut!(stream);
1478                while let Some(result) = stream.next().await {
1479                    match result {
1480                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1481                        Err(err) => panic!("Failed to read item: {err}"),
1482                    }
1483                }
1484            }
1485
1486            // Verify that only non-corrupted items were replayed
1487            assert_eq!(items.len(), 2);
1488            assert_eq!(items[0].0, 1);
1489            assert_eq!(items[0].1, 1);
1490            assert_eq!(items[1].0, 2);
1491            assert_eq!(items[1].1, 5);
1492        });
1493    }
1494
1495    #[test_traced]
1496    fn test_journal_handling_extra_data() {
1497        // Initialize the deterministic context
1498        let executor = deterministic::Runner::default();
1499
1500        // Start the test within the executor
1501        executor.start(|context| async move {
1502            // Create a journal configuration
1503            let cfg = Config {
1504                partition: "test-partition".into(),
1505                compression: None,
1506                codec_config: (),
1507                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1508                write_buffer: NZUsize!(1024),
1509            };
1510
1511            // Initialize the journal
1512            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1513                .await
1514                .expect("Failed to initialize journal");
1515
1516            // Append 1 item to the first index
1517            journal.append(1, &1).await.expect("Failed to append data");
1518
1519            // Append multiple items to the second index
1520            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1521            for (index, data) in &data_items {
1522                journal
1523                    .append(*index, data)
1524                    .await
1525                    .expect("Failed to append data");
1526                journal.sync(*index).await.expect("Failed to sync blob");
1527            }
1528
1529            // Sync all sections and drop the journal
1530            journal.sync_all().await.expect("Failed to sync");
1531            drop(journal);
1532
1533            // Manually add extra data to the end of the second blob
1534            let (blob, blob_size) = context
1535                .open(&cfg.partition, &2u64.to_be_bytes())
1536                .await
1537                .expect("Failed to open blob");
1538            blob.write_at(blob_size, vec![0u8; 16])
1539                .await
1540                .expect("Failed to add extra data");
1541            blob.sync().await.expect("Failed to sync blob");
1542
1543            // Re-initialize the journal to simulate a restart
1544            let journal = Journal::init(context.with_label("second"), cfg)
1545                .await
1546                .expect("Failed to re-initialize journal");
1547
1548            // Attempt to replay the journal
1549            let mut items = Vec::<(u64, i32)>::new();
1550            let stream = journal
1551                .replay(0, 0, NZUsize!(1024))
1552                .await
1553                .expect("unable to setup replay");
1554            pin_mut!(stream);
1555            while let Some(result) = stream.next().await {
1556                match result {
1557                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1558                    Err(err) => panic!("Failed to read item: {err}"),
1559                }
1560            }
1561        });
1562    }
1563
1564    #[test_traced]
1565    fn test_journal_rewind() {
1566        // Initialize the deterministic context
1567        let executor = deterministic::Runner::default();
1568        executor.start(|context| async move {
1569            // Create journal
1570            let cfg = Config {
1571                partition: "test-partition".into(),
1572                compression: None,
1573                codec_config: (),
1574                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1575                write_buffer: NZUsize!(1024),
1576            };
1577            let mut journal = Journal::init(context, cfg).await.unwrap();
1578
1579            // Check size of non-existent section
1580            let size = journal.size(1).await.unwrap();
1581            assert_eq!(size, 0);
1582
1583            // Append data to section 1
1584            journal.append(1, &42i32).await.unwrap();
1585
1586            // Check size of section 1 - should be greater than 0
1587            let size = journal.size(1).await.unwrap();
1588            assert!(size > 0);
1589
1590            // Append more data and verify size increases
1591            journal.append(1, &43i32).await.unwrap();
1592            let new_size = journal.size(1).await.unwrap();
1593            assert!(new_size > size);
1594
1595            // Check size of different section - should still be 0
1596            let size = journal.size(2).await.unwrap();
1597            assert_eq!(size, 0);
1598
1599            // Append data to section 2
1600            journal.append(2, &44i32).await.unwrap();
1601
1602            // Check size of section 2 - should be greater than 0
1603            let size = journal.size(2).await.unwrap();
1604            assert!(size > 0);
1605
1606            // Rollback everything in section 1 and 2
1607            journal.rewind(1, 0).await.unwrap();
1608
1609            // Check size of section 1 - should be 0
1610            let size = journal.size(1).await.unwrap();
1611            assert_eq!(size, 0);
1612
1613            // Check size of section 2 - should be 0
1614            let size = journal.size(2).await.unwrap();
1615            assert_eq!(size, 0);
1616        });
1617    }
1618
1619    #[test_traced]
1620    fn test_journal_rewind_section() {
1621        // Initialize the deterministic context
1622        let executor = deterministic::Runner::default();
1623        executor.start(|context| async move {
1624            // Create journal
1625            let cfg = Config {
1626                partition: "test-partition".into(),
1627                compression: None,
1628                codec_config: (),
1629                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1630                write_buffer: NZUsize!(1024),
1631            };
1632            let mut journal = Journal::init(context, cfg).await.unwrap();
1633
1634            // Check size of non-existent section
1635            let size = journal.size(1).await.unwrap();
1636            assert_eq!(size, 0);
1637
1638            // Append data to section 1
1639            journal.append(1, &42i32).await.unwrap();
1640
1641            // Check size of section 1 - should be greater than 0
1642            let size = journal.size(1).await.unwrap();
1643            assert!(size > 0);
1644
1645            // Append more data and verify size increases
1646            journal.append(1, &43i32).await.unwrap();
1647            let new_size = journal.size(1).await.unwrap();
1648            assert!(new_size > size);
1649
1650            // Check size of different section - should still be 0
1651            let size = journal.size(2).await.unwrap();
1652            assert_eq!(size, 0);
1653
1654            // Append data to section 2
1655            journal.append(2, &44i32).await.unwrap();
1656
1657            // Check size of section 2 - should be greater than 0
1658            let size = journal.size(2).await.unwrap();
1659            assert!(size > 0);
1660
1661            // Rollback everything in section 1
1662            journal.rewind_section(1, 0).await.unwrap();
1663
1664            // Check size of section 1 - should be 0
1665            let size = journal.size(1).await.unwrap();
1666            assert_eq!(size, 0);
1667
1668            // Check size of section 2 - should be greater than 0
1669            let size = journal.size(2).await.unwrap();
1670            assert!(size > 0);
1671        });
1672    }
1673
1674    #[test_traced]
1675    fn test_journal_small_items() {
1676        let executor = deterministic::Runner::default();
1677        executor.start(|context| async move {
1678            let cfg = Config {
1679                partition: "test-partition".into(),
1680                compression: None,
1681                codec_config: (),
1682                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1683                write_buffer: NZUsize!(1024),
1684            };
1685
1686            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1687                .await
1688                .expect("Failed to initialize journal");
1689
1690            // Append many small (1-byte) items to the same section
1691            let num_items = 100;
1692            let mut offsets = Vec::new();
1693            for i in 0..num_items {
1694                let (offset, size) = journal
1695                    .append(1, &(i as u8))
1696                    .await
1697                    .expect("Failed to append data");
1698                assert_eq!(size, 1, "u8 should encode to 1 byte");
1699                offsets.push(offset);
1700            }
1701            journal.sync(1).await.expect("Failed to sync");
1702
1703            // Read each item back via random access
1704            for (i, &offset) in offsets.iter().enumerate() {
1705                let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1706                assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1707            }
1708
1709            // Drop and reopen to test replay
1710            drop(journal);
1711            let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1712                .await
1713                .expect("Failed to re-initialize journal");
1714
1715            // Replay and verify all items
1716            let stream = journal
1717                .replay(0, 0, NZUsize!(1024))
1718                .await
1719                .expect("Failed to setup replay");
1720            pin_mut!(stream);
1721
1722            let mut count = 0;
1723            while let Some(result) = stream.next().await {
1724                let (section, offset, size, item) = result.expect("Failed to replay item");
1725                assert_eq!(section, 1);
1726                assert_eq!(offset, offsets[count]);
1727                assert_eq!(size, 1);
1728                assert_eq!(item, count as u8);
1729                count += 1;
1730            }
1731            assert_eq!(count, num_items, "Should replay all items");
1732        });
1733    }
1734
1735    #[test_traced]
1736    fn test_journal_rewind_many_sections() {
1737        let executor = deterministic::Runner::default();
1738        executor.start(|context| async move {
1739            let cfg = Config {
1740                partition: "test-partition".into(),
1741                compression: None,
1742                codec_config: (),
1743                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1744                write_buffer: NZUsize!(1024),
1745            };
1746            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1747
1748            // Create sections 1-10 with data
1749            for section in 1u64..=10 {
1750                journal.append(section, &(section as i32)).await.unwrap();
1751            }
1752            journal.sync_all().await.unwrap();
1753
1754            // Verify all sections exist
1755            for section in 1u64..=10 {
1756                let size = journal.size(section).await.unwrap();
1757                assert!(size > 0, "section {section} should have data");
1758            }
1759
1760            // Rewind to section 5 (should remove sections 6-10)
1761            journal
1762                .rewind(5, journal.size(5).await.unwrap())
1763                .await
1764                .unwrap();
1765
1766            // Verify sections 1-5 still exist with correct data
1767            for section in 1u64..=5 {
1768                let size = journal.size(section).await.unwrap();
1769                assert!(size > 0, "section {section} should still have data");
1770            }
1771
1772            // Verify sections 6-10 are removed (size should be 0)
1773            for section in 6u64..=10 {
1774                let size = journal.size(section).await.unwrap();
1775                assert_eq!(size, 0, "section {section} should be removed");
1776            }
1777
1778            // Verify data integrity via replay
1779            {
1780                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1781                pin_mut!(stream);
1782                let mut items = Vec::new();
1783                while let Some(result) = stream.next().await {
1784                    let (section, _, _, item) = result.unwrap();
1785                    items.push((section, item));
1786                }
1787                assert_eq!(items.len(), 5);
1788                for (i, (section, item)) in items.iter().enumerate() {
1789                    assert_eq!(*section, (i + 1) as u64);
1790                    assert_eq!(*item, (i + 1) as i32);
1791                }
1792            }
1793
1794            journal.destroy().await.unwrap();
1795        });
1796    }
1797
1798    #[test_traced]
1799    fn test_journal_rewind_partial_truncation() {
1800        let executor = deterministic::Runner::default();
1801        executor.start(|context| async move {
1802            let cfg = Config {
1803                partition: "test-partition".into(),
1804                compression: None,
1805                codec_config: (),
1806                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1807                write_buffer: NZUsize!(1024),
1808            };
1809            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1810
1811            // Append 5 items and record sizes after each
1812            let mut sizes = Vec::new();
1813            for i in 0..5 {
1814                journal.append(1, &i).await.unwrap();
1815                journal.sync(1).await.unwrap();
1816                sizes.push(journal.size(1).await.unwrap());
1817            }
1818
1819            // Rewind to keep only first 3 items
1820            let target_size = sizes[2];
1821            journal.rewind(1, target_size).await.unwrap();
1822
1823            // Verify size is correct
1824            let new_size = journal.size(1).await.unwrap();
1825            assert_eq!(new_size, target_size);
1826
1827            // Verify first 3 items via replay
1828            {
1829                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1830                pin_mut!(stream);
1831                let mut items = Vec::new();
1832                while let Some(result) = stream.next().await {
1833                    let (_, _, _, item) = result.unwrap();
1834                    items.push(item);
1835                }
1836                assert_eq!(items.len(), 3);
1837                for (i, item) in items.iter().enumerate() {
1838                    assert_eq!(*item, i as i32);
1839                }
1840            }
1841
1842            journal.destroy().await.unwrap();
1843        });
1844    }
1845
1846    #[test_traced]
1847    fn test_journal_rewind_nonexistent_target() {
1848        let executor = deterministic::Runner::default();
1849        executor.start(|context| async move {
1850            let cfg = Config {
1851                partition: "test-partition".into(),
1852                compression: None,
1853                codec_config: (),
1854                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1855                write_buffer: NZUsize!(1024),
1856            };
1857            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1858
1859            // Create sections 5, 6, 7 (skip 1-4)
1860            for section in 5u64..=7 {
1861                journal.append(section, &(section as i32)).await.unwrap();
1862            }
1863            journal.sync_all().await.unwrap();
1864
1865            // Rewind to section 3 (doesn't exist)
1866            journal.rewind(3, 0).await.unwrap();
1867
1868            // Verify sections 5, 6, 7 are removed
1869            for section in 5u64..=7 {
1870                let size = journal.size(section).await.unwrap();
1871                assert_eq!(size, 0, "section {section} should be removed");
1872            }
1873
1874            // Verify replay returns nothing
1875            {
1876                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1877                pin_mut!(stream);
1878                let items: Vec<_> = stream.collect().await;
1879                assert!(items.is_empty());
1880            }
1881
1882            journal.destroy().await.unwrap();
1883        });
1884    }
1885
1886    #[test_traced]
1887    fn test_journal_rewind_persistence() {
1888        let executor = deterministic::Runner::default();
1889        executor.start(|context| async move {
1890            let cfg = Config {
1891                partition: "test-partition".into(),
1892                compression: None,
1893                codec_config: (),
1894                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1895                write_buffer: NZUsize!(1024),
1896            };
1897
1898            // Create sections 1-5 with data
1899            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1900                .await
1901                .unwrap();
1902            for section in 1u64..=5 {
1903                journal.append(section, &(section as i32)).await.unwrap();
1904            }
1905            journal.sync_all().await.unwrap();
1906
1907            // Rewind to section 2
1908            let size = journal.size(2).await.unwrap();
1909            journal.rewind(2, size).await.unwrap();
1910            journal.sync_all().await.unwrap();
1911            drop(journal);
1912
1913            // Re-init and verify only sections 1-2 exist
1914            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1915                .await
1916                .unwrap();
1917
1918            // Verify sections 1-2 have data
1919            for section in 1u64..=2 {
1920                let size = journal.size(section).await.unwrap();
1921                assert!(size > 0, "section {section} should have data after restart");
1922            }
1923
1924            // Verify sections 3-5 are gone
1925            for section in 3u64..=5 {
1926                let size = journal.size(section).await.unwrap();
1927                assert_eq!(size, 0, "section {section} should be gone after restart");
1928            }
1929
1930            // Verify data integrity via replay
1931            {
1932                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1933                pin_mut!(stream);
1934                let mut items = Vec::new();
1935                while let Some(result) = stream.next().await {
1936                    let (section, _, _, item) = result.unwrap();
1937                    items.push((section, item));
1938                }
1939                assert_eq!(items.len(), 2);
1940                assert_eq!(items[0], (1, 1));
1941                assert_eq!(items[1], (2, 2));
1942            }
1943
1944            journal.destroy().await.unwrap();
1945        });
1946    }
1947
1948    #[test_traced]
1949    fn test_journal_rewind_to_zero_removes_all_newer() {
1950        let executor = deterministic::Runner::default();
1951        executor.start(|context| async move {
1952            let cfg = Config {
1953                partition: "test-partition".into(),
1954                compression: None,
1955                codec_config: (),
1956                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1957                write_buffer: NZUsize!(1024),
1958            };
1959            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1960
1961            // Create sections 1, 2, 3
1962            for section in 1u64..=3 {
1963                journal.append(section, &(section as i32)).await.unwrap();
1964            }
1965            journal.sync_all().await.unwrap();
1966
1967            // Rewind section 1 to size 0
1968            journal.rewind(1, 0).await.unwrap();
1969
1970            // Verify section 1 exists but is empty
1971            let size = journal.size(1).await.unwrap();
1972            assert_eq!(size, 0, "section 1 should be empty");
1973
1974            // Verify sections 2, 3 are completely removed
1975            for section in 2u64..=3 {
1976                let size = journal.size(section).await.unwrap();
1977                assert_eq!(size, 0, "section {section} should be removed");
1978            }
1979
1980            // Verify replay returns nothing
1981            {
1982                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1983                pin_mut!(stream);
1984                let items: Vec<_> = stream.collect().await;
1985                assert!(items.is_empty());
1986            }
1987
1988            journal.destroy().await.unwrap();
1989        });
1990    }
1991
1992    #[test_traced]
1993    fn test_journal_replay_start_offset_with_trailing_bytes() {
1994        // Regression: valid_offset must be initialized to start_offset, not 0.
1995        let executor = deterministic::Runner::default();
1996        executor.start(|context| async move {
1997            let cfg = Config {
1998                partition: "test-partition".into(),
1999                compression: None,
2000                codec_config: (),
2001                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2002                write_buffer: NZUsize!(1024),
2003            };
2004            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2005                .await
2006                .expect("Failed to initialize journal");
2007
2008            // Append several items to build up valid data
2009            for i in 0..5i32 {
2010                journal.append(1, &i).await.unwrap();
2011            }
2012            journal.sync(1).await.unwrap();
2013            let valid_logical_size = journal.size(1).await.unwrap();
2014            drop(journal);
2015
2016            // Get the physical blob size before corruption
2017            let (blob, physical_size_before) = context
2018                .open(&cfg.partition, &1u64.to_be_bytes())
2019                .await
2020                .unwrap();
2021
2022            // Write incomplete varint: 0xFF has continuation bit set, needs more bytes
2023            // This creates 2 trailing bytes that cannot form a valid item
2024            blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2025                .await
2026                .unwrap();
2027            blob.sync().await.unwrap();
2028
2029            // Reopen journal and replay starting PAST all valid items
2030            // (start_offset = valid_logical_size means we skip all valid data)
2031            // The first thing encountered will be the trailing corrupt bytes
2032            let start_offset = valid_logical_size;
2033            {
2034                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2035                    .await
2036                    .unwrap();
2037
2038                let stream = journal
2039                    .replay(1, start_offset, NZUsize!(1024))
2040                    .await
2041                    .unwrap();
2042                pin_mut!(stream);
2043
2044                // Consume the stream - should detect trailing bytes and truncate
2045                while let Some(_result) = stream.next().await {}
2046            }
2047
2048            // Verify that valid data before start_offset was NOT lost
2049            let (_, physical_size_after) = context
2050                .open(&cfg.partition, &1u64.to_be_bytes())
2051                .await
2052                .unwrap();
2053
2054            // The blob should have been truncated back to the valid physical size
2055            // (removing the trailing corrupt bytes) but NOT to 0
2056            assert!(
2057                physical_size_after >= physical_size_before,
2058                "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2059                 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2060                 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2061            );
2062        });
2063    }
2064
2065    #[test_traced]
2066    fn test_journal_large_item_spanning_pages() {
2067        // 2048 bytes spans 2 full pages (PAGE_SIZE = 1024).
2068        const LARGE_SIZE: usize = 2048;
2069        type LargeItem = [u8; LARGE_SIZE];
2070
2071        let executor = deterministic::Runner::default();
2072        executor.start(|context| async move {
2073            let cfg = Config {
2074                partition: "test-partition".into(),
2075                compression: None,
2076                codec_config: (),
2077                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2078                write_buffer: NZUsize!(4096),
2079            };
2080            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2081                .await
2082                .expect("Failed to initialize journal");
2083
2084            // Create a large item that spans multiple pages.
2085            let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2086            for (i, byte) in large_data.iter_mut().enumerate() {
2087                *byte = (i % 256) as u8;
2088            }
2089            assert!(
2090                LARGE_SIZE > PAGE_SIZE.get() as usize,
2091                "Item must be larger than page size"
2092            );
2093
2094            // Append the large item
2095            let (offset, size) = journal
2096                .append(1, &large_data)
2097                .await
2098                .expect("Failed to append large item");
2099            assert_eq!(size as usize, LARGE_SIZE);
2100            journal.sync(1).await.expect("Failed to sync");
2101
2102            // Read the item back via random access
2103            let retrieved: LargeItem = journal
2104                .get(1, offset)
2105                .await
2106                .expect("Failed to get large item");
2107            assert_eq!(retrieved, large_data, "Random access read mismatch");
2108
2109            // Drop and reopen to test replay
2110            drop(journal);
2111            let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2112                .await
2113                .expect("Failed to re-initialize journal");
2114
2115            // Replay and verify the large item
2116            {
2117                let stream = journal
2118                    .replay(0, 0, NZUsize!(1024))
2119                    .await
2120                    .expect("Failed to setup replay");
2121                pin_mut!(stream);
2122
2123                let mut items = Vec::new();
2124                while let Some(result) = stream.next().await {
2125                    let (section, off, sz, item) = result.expect("Failed to replay item");
2126                    items.push((section, off, sz, item));
2127                }
2128
2129                assert_eq!(items.len(), 1, "Should have exactly one item");
2130                let (section, off, sz, item) = &items[0];
2131                assert_eq!(*section, 1);
2132                assert_eq!(*off, offset);
2133                assert_eq!(*sz as usize, LARGE_SIZE);
2134                assert_eq!(*item, large_data, "Replay read mismatch");
2135            }
2136
2137            journal.destroy().await.unwrap();
2138        });
2139    }
2140
2141    #[test_traced]
2142    fn test_journal_non_contiguous_sections() {
2143        // Test that sections with gaps in numbering work correctly.
2144        // Sections 1, 5, 10 should all be independent and accessible.
2145        let executor = deterministic::Runner::default();
2146        executor.start(|context| async move {
2147            let cfg = Config {
2148                partition: "test-partition".into(),
2149                compression: None,
2150                codec_config: (),
2151                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2152                write_buffer: NZUsize!(1024),
2153            };
2154            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2155                .await
2156                .expect("Failed to initialize journal");
2157
2158            // Create sections with gaps: 1, 5, 10
2159            let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2160            let mut offsets = Vec::new();
2161
2162            for (section, data) in &sections_and_data {
2163                let (offset, _) = journal
2164                    .append(*section, data)
2165                    .await
2166                    .expect("Failed to append");
2167                offsets.push(offset);
2168            }
2169            journal.sync_all().await.expect("Failed to sync");
2170
2171            // Verify random access to each section
2172            for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2173                let retrieved: i32 = journal
2174                    .get(*section, offsets[i])
2175                    .await
2176                    .expect("Failed to get item");
2177                assert_eq!(retrieved, *expected_data);
2178            }
2179
2180            // Verify non-existent sections return appropriate errors
2181            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2182                let result = journal.get(missing_section, 0).await;
2183                assert!(
2184                    matches!(result, Err(Error::SectionOutOfRange(_))),
2185                    "Expected SectionOutOfRange for section {}, got {:?}",
2186                    missing_section,
2187                    result
2188                );
2189            }
2190
2191            // Drop and reopen to test replay
2192            drop(journal);
2193            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2194                .await
2195                .expect("Failed to re-initialize journal");
2196
2197            // Replay and verify all items in order
2198            {
2199                let stream = journal
2200                    .replay(0, 0, NZUsize!(1024))
2201                    .await
2202                    .expect("Failed to setup replay");
2203                pin_mut!(stream);
2204
2205                let mut items = Vec::new();
2206                while let Some(result) = stream.next().await {
2207                    let (section, _, _, item) = result.expect("Failed to replay item");
2208                    items.push((section, item));
2209                }
2210
2211                assert_eq!(items.len(), 3, "Should have 3 items");
2212                assert_eq!(items[0], (1, 100));
2213                assert_eq!(items[1], (5, 500));
2214                assert_eq!(items[2], (10, 1000));
2215            }
2216
2217            // Test replay starting from middle section (5)
2218            {
2219                let stream = journal
2220                    .replay(5, 0, NZUsize!(1024))
2221                    .await
2222                    .expect("Failed to setup replay from section 5");
2223                pin_mut!(stream);
2224
2225                let mut items = Vec::new();
2226                while let Some(result) = stream.next().await {
2227                    let (section, _, _, item) = result.expect("Failed to replay item");
2228                    items.push((section, item));
2229                }
2230
2231                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2232                assert_eq!(items[0], (5, 500));
2233                assert_eq!(items[1], (10, 1000));
2234            }
2235
2236            // Test replay starting from non-existent section (should skip to next)
2237            {
2238                let stream = journal
2239                    .replay(3, 0, NZUsize!(1024))
2240                    .await
2241                    .expect("Failed to setup replay from section 3");
2242                pin_mut!(stream);
2243
2244                let mut items = Vec::new();
2245                while let Some(result) = stream.next().await {
2246                    let (section, _, _, item) = result.expect("Failed to replay item");
2247                    items.push((section, item));
2248                }
2249
2250                // Should get sections 5 and 10 (skipping non-existent 3, 4)
2251                assert_eq!(items.len(), 2);
2252                assert_eq!(items[0], (5, 500));
2253                assert_eq!(items[1], (10, 1000));
2254            }
2255
2256            journal.destroy().await.unwrap();
2257        });
2258    }
2259
2260    #[test_traced]
2261    fn test_journal_empty_section_in_middle() {
2262        // Test that replay correctly handles an empty section between sections with data.
2263        // Section 1 has data, section 2 is empty, section 3 has data.
2264        let executor = deterministic::Runner::default();
2265        executor.start(|context| async move {
2266            let cfg = Config {
2267                partition: "test-partition".into(),
2268                compression: None,
2269                codec_config: (),
2270                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2271                write_buffer: NZUsize!(1024),
2272            };
2273            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2274                .await
2275                .expect("Failed to initialize journal");
2276
2277            // Append to section 1
2278            journal.append(1, &100i32).await.expect("Failed to append");
2279
2280            // Create section 2 but don't append anything - just sync to create the blob
2281            // Actually, we need to append something and then rewind to make it empty
2282            journal.append(2, &200i32).await.expect("Failed to append");
2283            journal.sync(2).await.expect("Failed to sync");
2284            journal
2285                .rewind_section(2, 0)
2286                .await
2287                .expect("Failed to rewind");
2288
2289            // Append to section 3
2290            journal.append(3, &300i32).await.expect("Failed to append");
2291
2292            journal.sync_all().await.expect("Failed to sync");
2293
2294            // Verify section sizes
2295            assert!(journal.size(1).await.unwrap() > 0);
2296            assert_eq!(journal.size(2).await.unwrap(), 0);
2297            assert!(journal.size(3).await.unwrap() > 0);
2298
2299            // Drop and reopen to test replay
2300            drop(journal);
2301            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2302                .await
2303                .expect("Failed to re-initialize journal");
2304
2305            // Replay all - should get items from sections 1 and 3, skipping empty section 2
2306            {
2307                let stream = journal
2308                    .replay(0, 0, NZUsize!(1024))
2309                    .await
2310                    .expect("Failed to setup replay");
2311                pin_mut!(stream);
2312
2313                let mut items = Vec::new();
2314                while let Some(result) = stream.next().await {
2315                    let (section, _, _, item) = result.expect("Failed to replay item");
2316                    items.push((section, item));
2317                }
2318
2319                assert_eq!(
2320                    items.len(),
2321                    2,
2322                    "Should have 2 items (skipping empty section)"
2323                );
2324                assert_eq!(items[0], (1, 100));
2325                assert_eq!(items[1], (3, 300));
2326            }
2327
2328            // Replay starting from empty section 2 - should get only section 3
2329            {
2330                let stream = journal
2331                    .replay(2, 0, NZUsize!(1024))
2332                    .await
2333                    .expect("Failed to setup replay from section 2");
2334                pin_mut!(stream);
2335
2336                let mut items = Vec::new();
2337                while let Some(result) = stream.next().await {
2338                    let (section, _, _, item) = result.expect("Failed to replay item");
2339                    items.push((section, item));
2340                }
2341
2342                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2343                assert_eq!(items[0], (3, 300));
2344            }
2345
2346            journal.destroy().await.unwrap();
2347        });
2348    }
2349
2350    #[test_traced]
2351    fn test_journal_item_exactly_page_size() {
2352        // Test that items exactly equal to PAGE_SIZE work correctly.
2353        // This is a boundary condition where item fills exactly one page.
2354        const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2355        type ExactItem = [u8; ITEM_SIZE];
2356
2357        let executor = deterministic::Runner::default();
2358        executor.start(|context| async move {
2359            let cfg = Config {
2360                partition: "test-partition".into(),
2361                compression: None,
2362                codec_config: (),
2363                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2364                write_buffer: NZUsize!(4096),
2365            };
2366            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2367                .await
2368                .expect("Failed to initialize journal");
2369
2370            // Create an item exactly PAGE_SIZE bytes
2371            let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2372            for (i, byte) in exact_data.iter_mut().enumerate() {
2373                *byte = (i % 256) as u8;
2374            }
2375
2376            // Append the exact-size item
2377            let (offset, size) = journal
2378                .append(1, &exact_data)
2379                .await
2380                .expect("Failed to append exact item");
2381            assert_eq!(size as usize, ITEM_SIZE);
2382            journal.sync(1).await.expect("Failed to sync");
2383
2384            // Read the item back via random access
2385            let retrieved: ExactItem = journal
2386                .get(1, offset)
2387                .await
2388                .expect("Failed to get exact item");
2389            assert_eq!(retrieved, exact_data, "Random access read mismatch");
2390
2391            // Drop and reopen to test replay
2392            drop(journal);
2393            let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2394                .await
2395                .expect("Failed to re-initialize journal");
2396
2397            // Replay and verify
2398            {
2399                let stream = journal
2400                    .replay(0, 0, NZUsize!(1024))
2401                    .await
2402                    .expect("Failed to setup replay");
2403                pin_mut!(stream);
2404
2405                let mut items = Vec::new();
2406                while let Some(result) = stream.next().await {
2407                    let (section, off, sz, item) = result.expect("Failed to replay item");
2408                    items.push((section, off, sz, item));
2409                }
2410
2411                assert_eq!(items.len(), 1, "Should have exactly one item");
2412                let (section, off, sz, item) = &items[0];
2413                assert_eq!(*section, 1);
2414                assert_eq!(*off, offset);
2415                assert_eq!(*sz as usize, ITEM_SIZE);
2416                assert_eq!(*item, exact_data, "Replay read mismatch");
2417            }
2418
2419            journal.destroy().await.unwrap();
2420        });
2421    }
2422
2423    #[test_traced]
2424    fn test_journal_varint_spanning_page_boundary() {
2425        // Test that items with data spanning page boundaries work correctly
2426        // when using a small page size.
2427        //
2428        // With PAGE_SIZE=16:
2429        // - Physical page = 16 + 12 = 28 bytes
2430        // - Each [u8; 128] item = 2-byte varint + 128 bytes data = 130 bytes
2431        // - This spans multiple 16-byte pages, testing cross-page reading
2432        const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2433
2434        let executor = deterministic::Runner::default();
2435        executor.start(|context| async move {
2436            let cfg = Config {
2437                partition: "test-partition".into(),
2438                compression: None,
2439                codec_config: (),
2440                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2441                write_buffer: NZUsize!(1024),
2442            };
2443            let mut journal: Journal<_, [u8; 128]> =
2444                Journal::init(context.with_label("first"), cfg.clone())
2445                    .await
2446                    .expect("Failed to initialize journal");
2447
2448            // Create items that will span many 16-byte pages
2449            let item1: [u8; 128] = [1u8; 128];
2450            let item2: [u8; 128] = [2u8; 128];
2451            let item3: [u8; 128] = [3u8; 128];
2452
2453            // Append items - each is 130 bytes (2-byte varint + 128 data)
2454            // spanning ceil(130/16) = 9 pages worth of logical data
2455            let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2456            let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2457            let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2458
2459            journal.sync(1).await.expect("Failed to sync");
2460
2461            // Read items back via random access
2462            let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2463            let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2464            let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2465            assert_eq!(retrieved1, item1);
2466            assert_eq!(retrieved2, item2);
2467            assert_eq!(retrieved3, item3);
2468
2469            // Drop and reopen to test replay
2470            drop(journal);
2471            let journal: Journal<_, [u8; 128]> =
2472                Journal::init(context.with_label("second"), cfg.clone())
2473                    .await
2474                    .expect("Failed to re-initialize journal");
2475
2476            // Replay and verify all items
2477            {
2478                let stream = journal
2479                    .replay(0, 0, NZUsize!(64))
2480                    .await
2481                    .expect("Failed to setup replay");
2482                pin_mut!(stream);
2483
2484                let mut items = Vec::new();
2485                while let Some(result) = stream.next().await {
2486                    let (section, off, _, item) = result.expect("Failed to replay item");
2487                    items.push((section, off, item));
2488                }
2489
2490                assert_eq!(items.len(), 3, "Should have 3 items");
2491                assert_eq!(items[0], (1, offset1, item1));
2492                assert_eq!(items[1], (1, offset2, item2));
2493                assert_eq!(items[2], (1, offset3, item3));
2494            }
2495
2496            journal.destroy().await.unwrap();
2497        });
2498    }
2499
2500    #[test_traced]
2501    fn test_journal_clear() {
2502        let executor = deterministic::Runner::default();
2503        executor.start(|context| async move {
2504            let cfg = Config {
2505                partition: "clear-test".into(),
2506                compression: None,
2507                codec_config: (),
2508                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2509                write_buffer: NZUsize!(1024),
2510            };
2511
2512            let mut journal: Journal<_, u64> =
2513                Journal::init(context.with_label("journal"), cfg.clone())
2514                    .await
2515                    .expect("Failed to initialize journal");
2516
2517            // Append items across multiple sections
2518            for section in 0..5u64 {
2519                for i in 0..10u64 {
2520                    journal
2521                        .append(section, &(section * 1000 + i))
2522                        .await
2523                        .expect("Failed to append");
2524                }
2525                journal.sync(section).await.expect("Failed to sync");
2526            }
2527
2528            // Verify we have data
2529            assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2530            assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2531
2532            // Clear the journal
2533            journal.clear().await.expect("Failed to clear");
2534
2535            // After clear, all reads should fail
2536            for section in 0..5u64 {
2537                assert!(matches!(
2538                    journal.get(section, 0).await,
2539                    Err(Error::SectionOutOfRange(s)) if s == section
2540                ));
2541            }
2542
2543            // Append new data after clear
2544            for i in 0..5u64 {
2545                journal
2546                    .append(10, &(i * 100))
2547                    .await
2548                    .expect("Failed to append after clear");
2549            }
2550            journal.sync(10).await.expect("Failed to sync after clear");
2551
2552            // New data should be readable
2553            assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2554
2555            // Old sections should still be missing
2556            assert!(matches!(
2557                journal.get(0, 0).await,
2558                Err(Error::SectionOutOfRange(0))
2559            ));
2560
2561            journal.destroy().await.unwrap();
2562        });
2563    }
2564}