Skip to main content

commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular [Blob] in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    |
16//! +---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |
18//! +---+---+---+---+---+---+---+---+
19//! ```
20//!
21//! # Open Blobs
22//!
23//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
24//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
25//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
26//! `sections`.
27//!
28//! # Sync
29//!
30//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
31//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
32//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
33//!
34//! # Pruning
35//!
36//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
37//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
38//! could be used, for example, by some blockchain application to prune old blocks.
39//!
40//! # Replay
41//!
42//! During application initialization, it is very common to replay data from `Journal` to recover
43//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
44//! method to produce a stream of all items in the `Journal` in order of their `section` and
45//! `offset`.
46//!
47//! # Compression
48//!
49//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
50//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
51//! be changed between initializations of `Journal`, however, it must remain populated if any data
52//! was written with compression enabled.
53//!
54//! # Example
55//!
56//! ```rust
57//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
58//! use commonware_storage::journal::segmented::variable::{Journal, Config};
59//! use commonware_utils::{NZUsize, NZU16};
60//!
61//! let executor = deterministic::Runner::default();
62//! executor.start(|context| async move {
63//!     // Create a journal
64//!     let mut journal = Journal::init(context, Config{
65//!         partition: "partition".to_string(),
66//!         compression: None,
67//!         codec_config: (),
68//!         page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
69//!         write_buffer: NZUsize!(1024 * 1024),
70//!     }).await.unwrap();
71//!
72//!     // Append data to the journal
73//!     journal.append(1, 128).await.unwrap();
74//!
75//!     // Sync the journal
76//!     journal.sync_all().await.unwrap();
77//! });
78//! ```
79
80use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
81use crate::journal::Error;
82use commonware_codec::{
83    varint::{UInt, MAX_U32_VARINT_SIZE},
84    Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
85};
86use commonware_runtime::{
87    buffer::paged::{Append, CacheRef, Replay},
88    Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
89};
90use futures::stream::{self, Stream, StreamExt};
91use std::{io::Cursor, num::NonZeroUsize};
92use tracing::{trace, warn};
93use zstd::{bulk::compress, decode_all};
94
95/// Configuration for `Journal` storage.
96#[derive(Clone)]
97pub struct Config<C> {
98    /// The `commonware-runtime::Storage` partition to use
99    /// for storing journal blobs.
100    pub partition: String,
101
102    /// Optional compression level (using `zstd`) to apply to data before storing.
103    pub compression: Option<u8>,
104
105    /// The codec configuration to use for encoding and decoding items.
106    pub codec_config: C,
107
108    /// The page cache to use for caching data.
109    pub page_cache: CacheRef,
110
111    /// The size of the write buffer to use for each blob.
112    pub write_buffer: NonZeroUsize,
113}
114
115/// Decodes a varint length prefix from a buffer.
116/// Returns (item_size, varint_len).
117#[inline]
118fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
119    let initial = buf.remaining();
120    let size = UInt::<u32>::read(buf)?.0 as usize;
121    let varint_len = initial - buf.remaining();
122    Ok((size, varint_len))
123}
124
125/// Result of finding an item in a buffer (offsets/lengths, not slices).
126enum ItemInfo {
127    /// All item data is available in the buffer.
128    Complete {
129        /// Length of the varint prefix.
130        varint_len: usize,
131        /// Length of the item data.
132        data_len: usize,
133    },
134    /// Only some item data is available.
135    Incomplete {
136        /// Length of the varint prefix.
137        varint_len: usize,
138        /// Bytes of item data available in buffer.
139        prefix_len: usize,
140        /// Full size of the item.
141        total_len: usize,
142    },
143}
144
145/// Find an item in a buffer by decoding its length prefix.
146///
147/// Returns (next_offset, item_info). The buffer is advanced past the varint.
148fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
149    let available = buf.remaining();
150    let (size, varint_len) = decode_length_prefix(buf)?;
151    let next_offset = offset
152        .checked_add(varint_len as u64)
153        .ok_or(Error::OffsetOverflow)?
154        .checked_add(size as u64)
155        .ok_or(Error::OffsetOverflow)?;
156    let buffered = available.saturating_sub(varint_len);
157
158    let item = if buffered >= size {
159        ItemInfo::Complete {
160            varint_len,
161            data_len: size,
162        }
163    } else {
164        ItemInfo::Incomplete {
165            varint_len,
166            prefix_len: buffered,
167            total_len: size,
168        }
169    };
170
171    Ok((next_offset, item))
172}
173
174/// State for replaying a single section's blob.
175struct ReplayState<B: Blob, C> {
176    section: u64,
177    blob: Append<B>,
178    replay: Replay<B>,
179    skip_bytes: u64,
180    offset: u64,
181    valid_offset: u64,
182    codec_config: C,
183    compressed: bool,
184    done: bool,
185}
186
187/// Decode item data with optional decompression.
188fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
189    if compressed {
190        let decompressed =
191            decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
192        V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
193    } else {
194        V::decode_cfg(item_data, cfg).map_err(Error::Codec)
195    }
196}
197
198/// A segmented journal with variable-size entries.
199///
200/// Each section is stored in a separate blob. Items are length-prefixed with a varint.
201///
202/// # Repair
203///
204/// Like
205/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
206/// and
207/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
208/// the first invalid data read will be considered the new end of the journal (and the
209/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
210/// replay (not init) because any blob could have trailing bytes.
211pub struct Journal<E: Storage + Metrics, V: Codec> {
212    manager: Manager<E, AppendFactory>,
213
214    /// Compression level (if enabled).
215    compression: Option<u8>,
216
217    /// Codec configuration.
218    codec_config: V::Cfg,
219}
220
221impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
222    /// Initialize a new `Journal` instance.
223    ///
224    /// All backing blobs are opened but not read during
225    /// initialization. The `replay` method can be used
226    /// to iterate over all items in the `Journal`.
227    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
228        let manager_cfg = ManagerConfig {
229            partition: cfg.partition,
230            factory: AppendFactory {
231                write_buffer: cfg.write_buffer,
232                page_cache_ref: cfg.page_cache,
233            },
234        };
235        let manager = Manager::init(context, manager_cfg).await?;
236
237        Ok(Self {
238            manager,
239            compression: cfg.compression,
240            codec_config: cfg.codec_config,
241        })
242    }
243
244    /// Reads an item from the blob at the given offset.
245    async fn read(
246        compressed: bool,
247        cfg: &V::Cfg,
248        blob: &Append<E::Blob>,
249        offset: u64,
250    ) -> Result<(u64, u32, V), Error> {
251        // Read varint header (max 5 bytes for u32)
252        let (buf, available) = blob
253            .read_up_to(IoBufMut::zeroed(MAX_U32_VARINT_SIZE), offset)
254            .await?;
255        let buf = buf.freeze();
256        let mut cursor = Cursor::new(buf.slice(..available));
257        let (next_offset, item_info) = find_item(&mut cursor, offset)?;
258
259        // Decode item - either directly from buffer or by chaining prefix with remainder
260        let (item_size, decoded) = match item_info {
261            ItemInfo::Complete {
262                varint_len,
263                data_len,
264            } => {
265                // Data follows varint in buffer
266                let data = buf.slice(varint_len..varint_len + data_len);
267                let decoded = decode_item::<V>(data, cfg, compressed)?;
268                (data_len as u32, decoded)
269            }
270            ItemInfo::Incomplete {
271                varint_len,
272                prefix_len,
273                total_len,
274            } => {
275                // Read remainder and chain with prefix to avoid copying
276                let prefix = buf.slice(varint_len..varint_len + prefix_len);
277                let read_offset = offset + varint_len as u64 + prefix_len as u64;
278                let remainder_len = total_len - prefix_len;
279                let mut remainder = vec![0u8; remainder_len];
280                blob.read_into(&mut remainder, read_offset).await?;
281                let chained = prefix.chain(IoBuf::from(remainder));
282                let decoded = decode_item::<V>(chained, cfg, compressed)?;
283                (total_len as u32, decoded)
284            }
285        };
286
287        Ok((next_offset, item_size, decoded))
288    }
289
290    /// Returns an ordered stream of all items in the journal starting with the item at the given
291    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
292    /// (section, offset, size, item).
293    pub async fn replay(
294        &self,
295        start_section: u64,
296        mut start_offset: u64,
297        buffer: NonZeroUsize,
298    ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
299        // Collect all blobs to replay (keeping blob reference for potential resize)
300        let codec_config = self.codec_config.clone();
301        let compressed = self.compression.is_some();
302        let mut blobs = Vec::new();
303        for (&section, blob) in self.manager.sections_from(start_section) {
304            blobs.push((
305                section,
306                blob.clone(),
307                blob.replay(buffer).await?,
308                codec_config.clone(),
309                compressed,
310            ));
311        }
312
313        // Stream items as they are read to avoid occupying too much memory
314        Ok(stream::iter(blobs).flat_map(
315            move |(section, blob, replay, codec_config, compressed)| {
316                // Calculate initial skip bytes for first blob
317                let skip_bytes = if section == start_section {
318                    start_offset
319                } else {
320                    start_offset = 0;
321                    0
322                };
323
324                stream::unfold(
325                    ReplayState {
326                        section,
327                        blob,
328                        replay,
329                        skip_bytes,
330                        offset: 0,
331                        valid_offset: skip_bytes,
332                        codec_config,
333                        compressed,
334                        done: false,
335                    },
336                    move |mut state| async move {
337                        if state.done {
338                            return None;
339                        }
340
341                        let blob_size = state.replay.blob_size();
342                        let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
343                        loop {
344                            // Ensure we have enough data for varint header.
345                            // ensure() returns Ok(false) if exhausted with fewer bytes,
346                            // but we still try to decode from remaining bytes.
347                            match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
348                                Ok(true) => {}
349                                Ok(false) => {
350                                    // Reader exhausted - check if buffer is empty
351                                    if state.replay.remaining() == 0 {
352                                        state.done = true;
353                                        return if batch.is_empty() {
354                                            None
355                                        } else {
356                                            Some((batch, state))
357                                        };
358                                    }
359                                    // Buffer still has data - continue to try decoding
360                                }
361                                Err(err) => {
362                                    batch.push(Err(err.into()));
363                                    state.done = true;
364                                    return Some((batch, state));
365                                }
366                            }
367
368                            // Skip bytes if needed (for start_offset)
369                            if state.skip_bytes > 0 {
370                                let to_skip =
371                                    state.skip_bytes.min(state.replay.remaining() as u64) as usize;
372                                state.replay.advance(to_skip);
373                                state.skip_bytes -= to_skip as u64;
374                                state.offset += to_skip as u64;
375                                continue;
376                            }
377
378                            // Try to decode length prefix
379                            let before_remaining = state.replay.remaining();
380                            let (item_size, varint_len) =
381                                match decode_length_prefix(&mut state.replay) {
382                                    Ok(result) => result,
383                                    Err(err) => {
384                                        // Could be incomplete varint - check if reader exhausted
385                                        if state.replay.is_exhausted()
386                                            || before_remaining < MAX_U32_VARINT_SIZE
387                                        {
388                                            // Treat as trailing bytes
389                                            if state.valid_offset < blob_size
390                                                && state.offset < blob_size
391                                            {
392                                                warn!(
393                                                    blob = state.section,
394                                                    bad_offset = state.offset,
395                                                    new_size = state.valid_offset,
396                                                    "trailing bytes detected: truncating"
397                                                );
398                                                state.blob.resize(state.valid_offset).await.ok()?;
399                                            }
400                                            state.done = true;
401                                            return if batch.is_empty() {
402                                                None
403                                            } else {
404                                                Some((batch, state))
405                                            };
406                                        }
407                                        batch.push(Err(err));
408                                        state.done = true;
409                                        return Some((batch, state));
410                                    }
411                                };
412
413                            // Ensure we have enough data for item body
414                            match state.replay.ensure(item_size).await {
415                                Ok(true) => {}
416                                Ok(false) => {
417                                    // Incomplete item at end - truncate
418                                    warn!(
419                                        blob = state.section,
420                                        bad_offset = state.offset,
421                                        new_size = state.valid_offset,
422                                        "incomplete item at end: truncating"
423                                    );
424                                    state.blob.resize(state.valid_offset).await.ok()?;
425                                    state.done = true;
426                                    return if batch.is_empty() {
427                                        None
428                                    } else {
429                                        Some((batch, state))
430                                    };
431                                }
432                                Err(err) => {
433                                    batch.push(Err(err.into()));
434                                    state.done = true;
435                                    return Some((batch, state));
436                                }
437                            }
438
439                            // Decode item - use take() to limit bytes read
440                            let item_offset = state.offset;
441                            let next_offset = match state
442                                .offset
443                                .checked_add(varint_len as u64)
444                                .and_then(|o| o.checked_add(item_size as u64))
445                            {
446                                Some(o) => o,
447                                None => {
448                                    batch.push(Err(Error::OffsetOverflow));
449                                    state.done = true;
450                                    return Some((batch, state));
451                                }
452                            };
453                            match decode_item::<V>(
454                                (&mut state.replay).take(item_size),
455                                &state.codec_config,
456                                state.compressed,
457                            ) {
458                                Ok(decoded) => {
459                                    batch.push(Ok((
460                                        state.section,
461                                        item_offset,
462                                        item_size as u32,
463                                        decoded,
464                                    )));
465                                    state.valid_offset = next_offset;
466                                    state.offset = next_offset;
467                                }
468                                Err(err) => {
469                                    batch.push(Err(err));
470                                    state.done = true;
471                                    return Some((batch, state));
472                                }
473                            }
474
475                            // Return batch if we have items and buffer is low
476                            if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
477                                return Some((batch, state));
478                            }
479                        }
480                    },
481                )
482                .flat_map(stream::iter)
483            },
484        ))
485    }
486
487    /// Appends an item to `Journal` in a given `section`, returning the offset
488    /// where the item was written and the size of the item (which may now be smaller
489    /// than the encoded size from the codec, if compression is enabled).
490    pub async fn append(&mut self, section: u64, item: V) -> Result<(u64, u32), Error> {
491        // Create buffer with item data (no checksum, no alignment)
492        let (buf, item_len) = if let Some(compression) = self.compression {
493            // Compressed: encode first, then compress
494            let encoded = item.encode();
495            let compressed =
496                compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
497            let item_len = compressed.len();
498            let item_len_u32: u32 = match item_len.try_into() {
499                Ok(len) => len,
500                Err(_) => return Err(Error::ItemTooLarge(item_len)),
501            };
502            let size_len = UInt(item_len_u32).encode_size();
503            let entry_len = size_len
504                .checked_add(item_len)
505                .ok_or(Error::OffsetOverflow)?;
506
507            let mut buf = Vec::with_capacity(entry_len);
508            UInt(item_len_u32).write(&mut buf);
509            buf.put_slice(&compressed);
510
511            (buf, item_len)
512        } else {
513            // Uncompressed: pre-allocate exact size to avoid copying
514            let item_len = item.encode_size();
515            let item_len_u32: u32 = match item_len.try_into() {
516                Ok(len) => len,
517                Err(_) => return Err(Error::ItemTooLarge(item_len)),
518            };
519            let size_len = UInt(item_len_u32).encode_size();
520            let entry_len = size_len
521                .checked_add(item_len)
522                .ok_or(Error::OffsetOverflow)?;
523
524            let mut buf = Vec::with_capacity(entry_len);
525            UInt(item_len_u32).write(&mut buf);
526            item.write(&mut buf);
527
528            (buf, item_len)
529        };
530
531        // Get or create blob
532        let blob = self.manager.get_or_create(section).await?;
533
534        // Get current position - this is where we'll write (no alignment)
535        let offset = blob.size().await;
536
537        // Append item to blob
538        blob.append(&buf).await?;
539        trace!(blob = section, offset, "appended item");
540        Ok((offset, item_len as u32))
541    }
542
543    /// Retrieves an item from `Journal` at a given `section` and `offset`.
544    ///
545    /// # Errors
546    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
547    ///    current execution.
548    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
549    ///    data appended to it, or has been pruned in a previous execution).
550    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
551    ///    previously appended item) will result in an error, with the specific type being
552    ///    undefined.
553    pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
554        let blob = self
555            .manager
556            .get(section)?
557            .ok_or(Error::SectionOutOfRange(section))?;
558
559        // Perform a multi-op read.
560        let (_, _, item) =
561            Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
562        Ok(item)
563    }
564
565    /// Gets the size of the journal for a specific section.
566    ///
567    /// Returns 0 if the section does not exist.
568    pub async fn size(&self, section: u64) -> Result<u64, Error> {
569        self.manager.size(section).await
570    }
571
572    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
573    ///
574    /// # Warnings
575    ///
576    /// * This operation is not guaranteed to survive restarts until sync is called.
577    /// * This operation is not atomic, but it will always leave the journal in a consistent state
578    ///   in the event of failure since blobs are always removed in reverse order of section.
579    pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
580        self.manager.rewind(section, offset).await
581    }
582
583    /// Rewinds the journal to the given `section` and `size`.
584    ///
585    /// This removes any data beyond the specified `section` and `size`.
586    ///
587    /// # Warnings
588    ///
589    /// * This operation is not guaranteed to survive restarts until sync is called.
590    /// * This operation is not atomic, but it will always leave the journal in a consistent state
591    ///   in the event of failure since blobs are always removed in reverse order of section.
592    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
593        self.manager.rewind(section, size).await
594    }
595
596    /// Rewinds the `section` to the given `size`.
597    ///
598    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
599    ///
600    /// # Warning
601    ///
602    /// This operation is not guaranteed to survive restarts until sync is called.
603    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
604        self.manager.rewind_section(section, size).await
605    }
606
607    /// Ensures that all data in a given `section` is synced to the underlying store.
608    ///
609    /// If the `section` does not exist, no error will be returned.
610    pub async fn sync(&self, section: u64) -> Result<(), Error> {
611        self.manager.sync(section).await
612    }
613
614    /// Syncs all open sections.
615    pub async fn sync_all(&self) -> Result<(), Error> {
616        self.manager.sync_all().await
617    }
618
619    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
620    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
621        self.manager.prune(min).await
622    }
623
624    /// Returns the number of the oldest section in the journal.
625    pub fn oldest_section(&self) -> Option<u64> {
626        self.manager.oldest_section()
627    }
628
629    /// Returns the number of the newest section in the journal.
630    pub fn newest_section(&self) -> Option<u64> {
631        self.manager.newest_section()
632    }
633
634    /// Returns true if no sections exist.
635    pub fn is_empty(&self) -> bool {
636        self.manager.is_empty()
637    }
638
639    /// Returns the number of sections.
640    pub fn num_sections(&self) -> usize {
641        self.manager.num_sections()
642    }
643
644    /// Removes any underlying blobs created by the journal.
645    pub async fn destroy(self) -> Result<(), Error> {
646        self.manager.destroy().await
647    }
648
649    /// Clear all data, resetting the journal to an empty state.
650    ///
651    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
652    pub async fn clear(&mut self) -> Result<(), Error> {
653        self.manager.clear().await
654    }
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660    use commonware_macros::test_traced;
661    use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
662    use commonware_utils::{NZUsize, NZU16};
663    use futures::{pin_mut, StreamExt};
664    use std::num::NonZeroU16;
665
666    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
667    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
668
669    #[test_traced]
670    fn test_journal_append_and_read() {
671        // Initialize the deterministic context
672        let executor = deterministic::Runner::default();
673
674        // Start the test within the executor
675        executor.start(|context| async move {
676            // Initialize the journal
677            let cfg = Config {
678                partition: "test_partition".into(),
679                compression: None,
680                codec_config: (),
681                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
682                write_buffer: NZUsize!(1024),
683            };
684            let index = 1u64;
685            let data = 10;
686            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
687                .await
688                .expect("Failed to initialize journal");
689
690            // Append an item to the journal
691            journal
692                .append(index, data)
693                .await
694                .expect("Failed to append data");
695
696            // Check metrics
697            let buffer = context.encode();
698            assert!(buffer.contains("first_tracked 1"));
699
700            // Drop and re-open the journal to simulate a restart
701            journal.sync(index).await.expect("Failed to sync journal");
702            drop(journal);
703            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
704                .await
705                .expect("Failed to re-initialize journal");
706
707            // Replay the journal and collect items
708            let mut items = Vec::new();
709            let stream = journal
710                .replay(0, 0, NZUsize!(1024))
711                .await
712                .expect("unable to setup replay");
713            pin_mut!(stream);
714            while let Some(result) = stream.next().await {
715                match result {
716                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
717                    Err(err) => panic!("Failed to read item: {err}"),
718                }
719            }
720
721            // Verify that the item was replayed correctly
722            assert_eq!(items.len(), 1);
723            assert_eq!(items[0].0, index);
724            assert_eq!(items[0].1, data);
725
726            // Check metrics
727            let buffer = context.encode();
728            assert!(buffer.contains("second_tracked 1"));
729        });
730    }
731
732    #[test_traced]
733    fn test_journal_multiple_appends_and_reads() {
734        // Initialize the deterministic context
735        let executor = deterministic::Runner::default();
736
737        // Start the test within the executor
738        executor.start(|context| async move {
739            // Create a journal configuration
740            let cfg = Config {
741                partition: "test_partition".into(),
742                compression: None,
743                codec_config: (),
744                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
745                write_buffer: NZUsize!(1024),
746            };
747
748            // Initialize the journal
749            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
750                .await
751                .expect("Failed to initialize journal");
752
753            // Append multiple items to different blobs
754            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
755            for (index, data) in &data_items {
756                journal
757                    .append(*index, *data)
758                    .await
759                    .expect("Failed to append data");
760                journal.sync(*index).await.expect("Failed to sync blob");
761            }
762
763            // Check metrics
764            let buffer = context.encode();
765            assert!(buffer.contains("first_tracked 3"));
766            assert!(buffer.contains("first_synced_total 4"));
767
768            // Drop and re-open the journal to simulate a restart
769            drop(journal);
770            let journal = Journal::init(context.with_label("second"), cfg)
771                .await
772                .expect("Failed to re-initialize journal");
773
774            // Replay the journal and collect items
775            let mut items = Vec::<(u64, u32)>::new();
776            {
777                let stream = journal
778                    .replay(0, 0, NZUsize!(1024))
779                    .await
780                    .expect("unable to setup replay");
781                pin_mut!(stream);
782                while let Some(result) = stream.next().await {
783                    match result {
784                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
785                        Err(err) => panic!("Failed to read item: {err}"),
786                    }
787                }
788            }
789
790            // Verify that all items were replayed correctly
791            assert_eq!(items.len(), data_items.len());
792            for ((expected_index, expected_data), (actual_index, actual_data)) in
793                data_items.iter().zip(items.iter())
794            {
795                assert_eq!(actual_index, expected_index);
796                assert_eq!(actual_data, expected_data);
797            }
798
799            // Cleanup
800            journal.destroy().await.expect("Failed to destroy journal");
801        });
802    }
803
804    #[test_traced]
805    fn test_journal_prune_blobs() {
806        // Initialize the deterministic context
807        let executor = deterministic::Runner::default();
808
809        // Start the test within the executor
810        executor.start(|context| async move {
811            // Create a journal configuration
812            let cfg = Config {
813                partition: "test_partition".into(),
814                compression: None,
815                codec_config: (),
816                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
817                write_buffer: NZUsize!(1024),
818            };
819
820            // Initialize the journal
821            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
822                .await
823                .expect("Failed to initialize journal");
824
825            // Append items to multiple blobs
826            for index in 1u64..=5u64 {
827                journal
828                    .append(index, index)
829                    .await
830                    .expect("Failed to append data");
831                journal.sync(index).await.expect("Failed to sync blob");
832            }
833
834            // Add one item out-of-order
835            let data = 99;
836            journal
837                .append(2u64, data)
838                .await
839                .expect("Failed to append data");
840            journal.sync(2u64).await.expect("Failed to sync blob");
841
842            // Prune blobs with indices less than 3
843            journal.prune(3).await.expect("Failed to prune blobs");
844
845            // Check metrics
846            let buffer = context.encode();
847            assert!(buffer.contains("first_pruned_total 2"));
848
849            // Prune again with a section less than the previous one, should be a no-op
850            journal.prune(2).await.expect("Failed to no-op prune");
851            let buffer = context.encode();
852            assert!(buffer.contains("first_pruned_total 2"));
853
854            // Drop and re-open the journal to simulate a restart
855            drop(journal);
856            let mut journal = Journal::init(context.with_label("second"), cfg.clone())
857                .await
858                .expect("Failed to re-initialize journal");
859
860            // Replay the journal and collect items
861            let mut items = Vec::<(u64, u64)>::new();
862            {
863                let stream = journal
864                    .replay(0, 0, NZUsize!(1024))
865                    .await
866                    .expect("unable to setup replay");
867                pin_mut!(stream);
868                while let Some(result) = stream.next().await {
869                    match result {
870                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
871                        Err(err) => panic!("Failed to read item: {err}"),
872                    }
873                }
874            }
875
876            // Verify that items from blobs 1 and 2 are not present
877            assert_eq!(items.len(), 3);
878            let expected_indices = [3u64, 4u64, 5u64];
879            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
880                assert_eq!(item.0, *expected_index);
881            }
882
883            // Prune all blobs
884            journal.prune(6).await.expect("Failed to prune blobs");
885
886            // Drop the journal
887            drop(journal);
888
889            // Ensure no remaining blobs exist
890            //
891            // Note: We don't remove the partition, so this does not error
892            // and instead returns an empty list of blobs.
893            assert!(context
894                .scan(&cfg.partition)
895                .await
896                .expect("Failed to list blobs")
897                .is_empty());
898        });
899    }
900
901    #[test_traced]
902    fn test_journal_prune_guard() {
903        let executor = deterministic::Runner::default();
904
905        executor.start(|context| async move {
906            let cfg = Config {
907                partition: "test_partition".into(),
908                compression: None,
909                codec_config: (),
910                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
911                write_buffer: NZUsize!(1024),
912            };
913
914            let mut journal = Journal::init(context.clone(), cfg.clone())
915                .await
916                .expect("Failed to initialize journal");
917
918            // Append items to sections 1-5
919            for section in 1u64..=5u64 {
920                journal
921                    .append(section, section as i32)
922                    .await
923                    .expect("Failed to append data");
924                journal.sync(section).await.expect("Failed to sync");
925            }
926
927            // Prune sections < 3
928            journal.prune(3).await.expect("Failed to prune");
929
930            // Test that accessing pruned sections returns the correct error
931
932            // Test append on pruned section
933            match journal.append(1, 100).await {
934                Err(Error::AlreadyPrunedToSection(3)) => {}
935                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
936            }
937
938            match journal.append(2, 100).await {
939                Err(Error::AlreadyPrunedToSection(3)) => {}
940                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
941            }
942
943            // Test get on pruned section
944            match journal.get(1, 0).await {
945                Err(Error::AlreadyPrunedToSection(3)) => {}
946                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
947            }
948
949            // Test size on pruned section
950            match journal.size(1).await {
951                Err(Error::AlreadyPrunedToSection(3)) => {}
952                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
953            }
954
955            // Test rewind on pruned section
956            match journal.rewind(2, 0).await {
957                Err(Error::AlreadyPrunedToSection(3)) => {}
958                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
959            }
960
961            // Test rewind_section on pruned section
962            match journal.rewind_section(1, 0).await {
963                Err(Error::AlreadyPrunedToSection(3)) => {}
964                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
965            }
966
967            // Test sync on pruned section
968            match journal.sync(2).await {
969                Err(Error::AlreadyPrunedToSection(3)) => {}
970                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
971            }
972
973            // Test that accessing sections at or after the threshold works
974            assert!(journal.get(3, 0).await.is_ok());
975            assert!(journal.get(4, 0).await.is_ok());
976            assert!(journal.get(5, 0).await.is_ok());
977            assert!(journal.size(3).await.is_ok());
978            assert!(journal.sync(4).await.is_ok());
979
980            // Append to section at threshold should work
981            journal
982                .append(3, 999)
983                .await
984                .expect("Should be able to append to section 3");
985
986            // Prune more sections
987            journal.prune(5).await.expect("Failed to prune");
988
989            // Verify sections 3 and 4 are now pruned
990            match journal.get(3, 0).await {
991                Err(Error::AlreadyPrunedToSection(5)) => {}
992                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
993            }
994
995            match journal.get(4, 0).await {
996                Err(Error::AlreadyPrunedToSection(5)) => {}
997                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
998            }
999
1000            // Section 5 should still be accessible
1001            assert!(journal.get(5, 0).await.is_ok());
1002        });
1003    }
1004
1005    #[test_traced]
1006    fn test_journal_prune_guard_across_restart() {
1007        let executor = deterministic::Runner::default();
1008
1009        executor.start(|context| async move {
1010            let cfg = Config {
1011                partition: "test_partition".into(),
1012                compression: None,
1013                codec_config: (),
1014                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1015                write_buffer: NZUsize!(1024),
1016            };
1017
1018            // First session: create and prune
1019            {
1020                let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1021                    .await
1022                    .expect("Failed to initialize journal");
1023
1024                for section in 1u64..=5u64 {
1025                    journal
1026                        .append(section, section as i32)
1027                        .await
1028                        .expect("Failed to append data");
1029                    journal.sync(section).await.expect("Failed to sync");
1030                }
1031
1032                journal.prune(3).await.expect("Failed to prune");
1033            }
1034
1035            // Second session: verify oldest_retained_section is reset
1036            {
1037                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1038                    .await
1039                    .expect("Failed to re-initialize journal");
1040
1041                // But the actual sections 1 and 2 should be gone from storage
1042                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1043                match journal.get(1, 0).await {
1044                    Err(Error::SectionOutOfRange(1)) => {}
1045                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1046                }
1047
1048                match journal.get(2, 0).await {
1049                    Err(Error::SectionOutOfRange(2)) => {}
1050                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1051                }
1052
1053                // Sections 3-5 should still be accessible
1054                assert!(journal.get(3, 0).await.is_ok());
1055                assert!(journal.get(4, 0).await.is_ok());
1056                assert!(journal.get(5, 0).await.is_ok());
1057            }
1058        });
1059    }
1060
1061    #[test_traced]
1062    fn test_journal_with_invalid_blob_name() {
1063        // Initialize the deterministic context
1064        let executor = deterministic::Runner::default();
1065
1066        // Start the test within the executor
1067        executor.start(|context| async move {
1068            // Create a journal configuration
1069            let cfg = Config {
1070                partition: "test_partition".into(),
1071                compression: None,
1072                codec_config: (),
1073                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1074                write_buffer: NZUsize!(1024),
1075            };
1076
1077            // Manually create a blob with an invalid name (not 8 bytes)
1078            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1079            let (blob, _) = context
1080                .open(&cfg.partition, invalid_blob_name)
1081                .await
1082                .expect("Failed to create blob with invalid name");
1083            blob.sync().await.expect("Failed to sync blob");
1084
1085            // Attempt to initialize the journal
1086            let result = Journal::<_, u64>::init(context, cfg).await;
1087
1088            // Expect an error
1089            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1090        });
1091    }
1092
1093    #[test_traced]
1094    fn test_journal_read_size_missing() {
1095        // Initialize the deterministic context
1096        let executor = deterministic::Runner::default();
1097
1098        // Start the test within the executor
1099        executor.start(|context| async move {
1100            // Create a journal configuration
1101            let cfg = Config {
1102                partition: "test_partition".into(),
1103                compression: None,
1104                codec_config: (),
1105                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106                write_buffer: NZUsize!(1024),
1107            };
1108
1109            // Manually create a blob with incomplete size data
1110            let section = 1u64;
1111            let blob_name = section.to_be_bytes();
1112            let (blob, _) = context
1113                .open(&cfg.partition, &blob_name)
1114                .await
1115                .expect("Failed to create blob");
1116
1117            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1118            let mut incomplete_data = Vec::new();
1119            UInt(u32::MAX).write(&mut incomplete_data);
1120            incomplete_data.truncate(1);
1121            blob.write_at(0, incomplete_data)
1122                .await
1123                .expect("Failed to write incomplete data");
1124            blob.sync().await.expect("Failed to sync blob");
1125
1126            // Initialize the journal
1127            let journal = Journal::init(context, cfg)
1128                .await
1129                .expect("Failed to initialize journal");
1130
1131            // Attempt to replay the journal
1132            let stream = journal
1133                .replay(0, 0, NZUsize!(1024))
1134                .await
1135                .expect("unable to setup replay");
1136            pin_mut!(stream);
1137            let mut items = Vec::<(u64, u64)>::new();
1138            while let Some(result) = stream.next().await {
1139                match result {
1140                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1141                    Err(err) => panic!("Failed to read item: {err}"),
1142                }
1143            }
1144            assert!(items.is_empty());
1145        });
1146    }
1147
1148    #[test_traced]
1149    fn test_journal_read_item_missing() {
1150        // Initialize the deterministic context
1151        let executor = deterministic::Runner::default();
1152
1153        // Start the test within the executor
1154        executor.start(|context| async move {
1155            // Create a journal configuration
1156            let cfg = Config {
1157                partition: "test_partition".into(),
1158                compression: None,
1159                codec_config: (),
1160                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1161                write_buffer: NZUsize!(1024),
1162            };
1163
1164            // Manually create a blob with missing item data
1165            let section = 1u64;
1166            let blob_name = section.to_be_bytes();
1167            let (blob, _) = context
1168                .open(&cfg.partition, &blob_name)
1169                .await
1170                .expect("Failed to create blob");
1171
1172            // Write size but incomplete item data
1173            let item_size: u32 = 10; // Size indicates 10 bytes of data
1174            let mut buf = Vec::new();
1175            UInt(item_size).write(&mut buf); // Varint encoding
1176            let data = [2u8; 5];
1177            BufMut::put_slice(&mut buf, &data);
1178            blob.write_at(0, buf)
1179                .await
1180                .expect("Failed to write incomplete item");
1181            blob.sync().await.expect("Failed to sync blob");
1182
1183            // Initialize the journal
1184            let journal = Journal::init(context, cfg)
1185                .await
1186                .expect("Failed to initialize journal");
1187
1188            // Attempt to replay the journal
1189            let stream = journal
1190                .replay(0, 0, NZUsize!(1024))
1191                .await
1192                .expect("unable to setup replay");
1193            pin_mut!(stream);
1194            let mut items = Vec::<(u64, u64)>::new();
1195            while let Some(result) = stream.next().await {
1196                match result {
1197                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1198                    Err(err) => panic!("Failed to read item: {err}"),
1199                }
1200            }
1201            assert!(items.is_empty());
1202        });
1203    }
1204
1205    #[test_traced]
1206    fn test_journal_read_checksum_missing() {
1207        // Initialize the deterministic context
1208        let executor = deterministic::Runner::default();
1209
1210        // Start the test within the executor
1211        executor.start(|context| async move {
1212            // Create a journal configuration
1213            let cfg = Config {
1214                partition: "test_partition".into(),
1215                compression: None,
1216                codec_config: (),
1217                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1218                write_buffer: NZUsize!(1024),
1219            };
1220
1221            // Manually create a blob with missing checksum
1222            let section = 1u64;
1223            let blob_name = section.to_be_bytes();
1224            let (blob, _) = context
1225                .open(&cfg.partition, &blob_name)
1226                .await
1227                .expect("Failed to create blob");
1228
1229            // Prepare item data
1230            let item_data = b"Test data";
1231            let item_size = item_data.len() as u32;
1232
1233            // Write size (varint) and data, but no checksum
1234            let mut buf = Vec::new();
1235            UInt(item_size).write(&mut buf);
1236            BufMut::put_slice(&mut buf, item_data);
1237            blob.write_at(0, buf)
1238                .await
1239                .expect("Failed to write item without checksum");
1240
1241            blob.sync().await.expect("Failed to sync blob");
1242
1243            // Initialize the journal
1244            let journal = Journal::init(context, cfg)
1245                .await
1246                .expect("Failed to initialize journal");
1247
1248            // Attempt to replay the journal
1249            //
1250            // This will truncate the leftover bytes from our manual write.
1251            let stream = journal
1252                .replay(0, 0, NZUsize!(1024))
1253                .await
1254                .expect("unable to setup replay");
1255            pin_mut!(stream);
1256            let mut items = Vec::<(u64, u64)>::new();
1257            while let Some(result) = stream.next().await {
1258                match result {
1259                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1260                    Err(err) => panic!("Failed to read item: {err}"),
1261                }
1262            }
1263            assert!(items.is_empty());
1264        });
1265    }
1266
1267    #[test_traced]
1268    fn test_journal_read_checksum_mismatch() {
1269        // Initialize the deterministic context
1270        let executor = deterministic::Runner::default();
1271
1272        // Start the test within the executor
1273        executor.start(|context| async move {
1274            // Create a journal configuration
1275            let cfg = Config {
1276                partition: "test_partition".into(),
1277                compression: None,
1278                codec_config: (),
1279                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1280                write_buffer: NZUsize!(1024),
1281            };
1282
1283            // Manually create a blob with incorrect checksum
1284            let section = 1u64;
1285            let blob_name = section.to_be_bytes();
1286            let (blob, _) = context
1287                .open(&cfg.partition, &blob_name)
1288                .await
1289                .expect("Failed to create blob");
1290
1291            // Prepare item data
1292            let item_data = b"Test data";
1293            let item_size = item_data.len() as u32;
1294            let incorrect_checksum: u32 = 0xDEADBEEF;
1295
1296            // Write size (varint), data, and incorrect checksum
1297            let mut buf = Vec::new();
1298            UInt(item_size).write(&mut buf);
1299            BufMut::put_slice(&mut buf, item_data);
1300            buf.put_u32(incorrect_checksum);
1301            blob.write_at(0, buf)
1302                .await
1303                .expect("Failed to write item with bad checksum");
1304
1305            blob.sync().await.expect("Failed to sync blob");
1306
1307            // Initialize the journal
1308            let journal = Journal::init(context.clone(), cfg.clone())
1309                .await
1310                .expect("Failed to initialize journal");
1311
1312            // Attempt to replay the journal
1313            {
1314                let stream = journal
1315                    .replay(0, 0, NZUsize!(1024))
1316                    .await
1317                    .expect("unable to setup replay");
1318                pin_mut!(stream);
1319                let mut items = Vec::<(u64, u64)>::new();
1320                while let Some(result) = stream.next().await {
1321                    match result {
1322                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1323                        Err(err) => panic!("Failed to read item: {err}"),
1324                    }
1325                }
1326                assert!(items.is_empty());
1327            }
1328            drop(journal);
1329
1330            // Confirm blob is expected length
1331            let (_, blob_size) = context
1332                .open(&cfg.partition, &section.to_be_bytes())
1333                .await
1334                .expect("Failed to open blob");
1335            assert_eq!(blob_size, 0);
1336        });
1337    }
1338
1339    #[test_traced]
1340    fn test_journal_truncation_recovery() {
1341        // Initialize the deterministic context
1342        let executor = deterministic::Runner::default();
1343
1344        // Start the test within the executor
1345        executor.start(|context| async move {
1346            // Create a journal configuration
1347            let cfg = Config {
1348                partition: "test_partition".into(),
1349                compression: None,
1350                codec_config: (),
1351                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1352                write_buffer: NZUsize!(1024),
1353            };
1354
1355            // Initialize the journal
1356            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1357                .await
1358                .expect("Failed to initialize journal");
1359
1360            // Append 1 item to the first index
1361            journal.append(1, 1).await.expect("Failed to append data");
1362
1363            // Append multiple items to the second section
1364            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1365            for (index, data) in &data_items {
1366                journal
1367                    .append(*index, *data)
1368                    .await
1369                    .expect("Failed to append data");
1370                journal.sync(*index).await.expect("Failed to sync blob");
1371            }
1372
1373            // Sync all sections and drop the journal
1374            journal.sync_all().await.expect("Failed to sync");
1375            drop(journal);
1376
1377            // Manually corrupt the end of the second blob
1378            let (blob, blob_size) = context
1379                .open(&cfg.partition, &2u64.to_be_bytes())
1380                .await
1381                .expect("Failed to open blob");
1382            blob.resize(blob_size - 4)
1383                .await
1384                .expect("Failed to corrupt blob");
1385            blob.sync().await.expect("Failed to sync blob");
1386
1387            // Re-initialize the journal to simulate a restart
1388            let journal = Journal::init(context.with_label("second"), cfg.clone())
1389                .await
1390                .expect("Failed to re-initialize journal");
1391
1392            // Attempt to replay the journal
1393            let mut items = Vec::<(u64, u32)>::new();
1394            {
1395                let stream = journal
1396                    .replay(0, 0, NZUsize!(1024))
1397                    .await
1398                    .expect("unable to setup replay");
1399                pin_mut!(stream);
1400                while let Some(result) = stream.next().await {
1401                    match result {
1402                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1403                        Err(err) => panic!("Failed to read item: {err}"),
1404                    }
1405                }
1406            }
1407            drop(journal);
1408
1409            // Verify that replay stopped after corruption detected (the second blob).
1410            assert_eq!(items.len(), 1);
1411            assert_eq!(items[0].0, 1);
1412            assert_eq!(items[0].1, 1);
1413
1414            // Confirm second blob was truncated.
1415            let (_, blob_size) = context
1416                .open(&cfg.partition, &2u64.to_be_bytes())
1417                .await
1418                .expect("Failed to open blob");
1419            assert_eq!(blob_size, 0);
1420
1421            // Attempt to replay journal after truncation
1422            let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1423                .await
1424                .expect("Failed to re-initialize journal");
1425
1426            // Attempt to replay the journal
1427            let mut items = Vec::<(u64, u32)>::new();
1428            {
1429                let stream = journal
1430                    .replay(0, 0, NZUsize!(1024))
1431                    .await
1432                    .expect("unable to setup replay");
1433                pin_mut!(stream);
1434                while let Some(result) = stream.next().await {
1435                    match result {
1436                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1437                        Err(err) => panic!("Failed to read item: {err}"),
1438                    }
1439                }
1440            }
1441
1442            // Verify that only non-corrupted items were replayed
1443            assert_eq!(items.len(), 1);
1444            assert_eq!(items[0].0, 1);
1445            assert_eq!(items[0].1, 1);
1446
1447            // Append a new item to truncated partition
1448            let (_offset, _) = journal.append(2, 5).await.expect("Failed to append data");
1449            journal.sync(2).await.expect("Failed to sync blob");
1450
1451            // Get the new item (offset is 0 since blob was truncated)
1452            let item = journal.get(2, 0).await.expect("Failed to get item");
1453            assert_eq!(item, 5);
1454
1455            // Drop the journal (data already synced)
1456            drop(journal);
1457
1458            // Re-initialize the journal to simulate a restart
1459            let journal = Journal::init(context.clone(), cfg.clone())
1460                .await
1461                .expect("Failed to re-initialize journal");
1462
1463            // Attempt to replay the journal
1464            let mut items = Vec::<(u64, u32)>::new();
1465            {
1466                let stream = journal
1467                    .replay(0, 0, NZUsize!(1024))
1468                    .await
1469                    .expect("unable to setup replay");
1470                pin_mut!(stream);
1471                while let Some(result) = stream.next().await {
1472                    match result {
1473                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1474                        Err(err) => panic!("Failed to read item: {err}"),
1475                    }
1476                }
1477            }
1478
1479            // Verify that only non-corrupted items were replayed
1480            assert_eq!(items.len(), 2);
1481            assert_eq!(items[0].0, 1);
1482            assert_eq!(items[0].1, 1);
1483            assert_eq!(items[1].0, 2);
1484            assert_eq!(items[1].1, 5);
1485        });
1486    }
1487
1488    #[test_traced]
1489    fn test_journal_handling_extra_data() {
1490        // Initialize the deterministic context
1491        let executor = deterministic::Runner::default();
1492
1493        // Start the test within the executor
1494        executor.start(|context| async move {
1495            // Create a journal configuration
1496            let cfg = Config {
1497                partition: "test_partition".into(),
1498                compression: None,
1499                codec_config: (),
1500                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1501                write_buffer: NZUsize!(1024),
1502            };
1503
1504            // Initialize the journal
1505            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1506                .await
1507                .expect("Failed to initialize journal");
1508
1509            // Append 1 item to the first index
1510            journal.append(1, 1).await.expect("Failed to append data");
1511
1512            // Append multiple items to the second index
1513            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1514            for (index, data) in &data_items {
1515                journal
1516                    .append(*index, *data)
1517                    .await
1518                    .expect("Failed to append data");
1519                journal.sync(*index).await.expect("Failed to sync blob");
1520            }
1521
1522            // Sync all sections and drop the journal
1523            journal.sync_all().await.expect("Failed to sync");
1524            drop(journal);
1525
1526            // Manually add extra data to the end of the second blob
1527            let (blob, blob_size) = context
1528                .open(&cfg.partition, &2u64.to_be_bytes())
1529                .await
1530                .expect("Failed to open blob");
1531            blob.write_at(blob_size, vec![0u8; 16])
1532                .await
1533                .expect("Failed to add extra data");
1534            blob.sync().await.expect("Failed to sync blob");
1535
1536            // Re-initialize the journal to simulate a restart
1537            let journal = Journal::init(context.with_label("second"), cfg)
1538                .await
1539                .expect("Failed to re-initialize journal");
1540
1541            // Attempt to replay the journal
1542            let mut items = Vec::<(u64, i32)>::new();
1543            let stream = journal
1544                .replay(0, 0, NZUsize!(1024))
1545                .await
1546                .expect("unable to setup replay");
1547            pin_mut!(stream);
1548            while let Some(result) = stream.next().await {
1549                match result {
1550                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1551                    Err(err) => panic!("Failed to read item: {err}"),
1552                }
1553            }
1554        });
1555    }
1556
1557    #[test_traced]
1558    fn test_journal_rewind() {
1559        // Initialize the deterministic context
1560        let executor = deterministic::Runner::default();
1561        executor.start(|context| async move {
1562            // Create journal
1563            let cfg = Config {
1564                partition: "test_partition".to_string(),
1565                compression: None,
1566                codec_config: (),
1567                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1568                write_buffer: NZUsize!(1024),
1569            };
1570            let mut journal = Journal::init(context, cfg).await.unwrap();
1571
1572            // Check size of non-existent section
1573            let size = journal.size(1).await.unwrap();
1574            assert_eq!(size, 0);
1575
1576            // Append data to section 1
1577            journal.append(1, 42i32).await.unwrap();
1578
1579            // Check size of section 1 - should be greater than 0
1580            let size = journal.size(1).await.unwrap();
1581            assert!(size > 0);
1582
1583            // Append more data and verify size increases
1584            journal.append(1, 43i32).await.unwrap();
1585            let new_size = journal.size(1).await.unwrap();
1586            assert!(new_size > size);
1587
1588            // Check size of different section - should still be 0
1589            let size = journal.size(2).await.unwrap();
1590            assert_eq!(size, 0);
1591
1592            // Append data to section 2
1593            journal.append(2, 44i32).await.unwrap();
1594
1595            // Check size of section 2 - should be greater than 0
1596            let size = journal.size(2).await.unwrap();
1597            assert!(size > 0);
1598
1599            // Rollback everything in section 1 and 2
1600            journal.rewind(1, 0).await.unwrap();
1601
1602            // Check size of section 1 - should be 0
1603            let size = journal.size(1).await.unwrap();
1604            assert_eq!(size, 0);
1605
1606            // Check size of section 2 - should be 0
1607            let size = journal.size(2).await.unwrap();
1608            assert_eq!(size, 0);
1609        });
1610    }
1611
1612    #[test_traced]
1613    fn test_journal_rewind_section() {
1614        // Initialize the deterministic context
1615        let executor = deterministic::Runner::default();
1616        executor.start(|context| async move {
1617            // Create journal
1618            let cfg = Config {
1619                partition: "test_partition".to_string(),
1620                compression: None,
1621                codec_config: (),
1622                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1623                write_buffer: NZUsize!(1024),
1624            };
1625            let mut journal = Journal::init(context, cfg).await.unwrap();
1626
1627            // Check size of non-existent section
1628            let size = journal.size(1).await.unwrap();
1629            assert_eq!(size, 0);
1630
1631            // Append data to section 1
1632            journal.append(1, 42i32).await.unwrap();
1633
1634            // Check size of section 1 - should be greater than 0
1635            let size = journal.size(1).await.unwrap();
1636            assert!(size > 0);
1637
1638            // Append more data and verify size increases
1639            journal.append(1, 43i32).await.unwrap();
1640            let new_size = journal.size(1).await.unwrap();
1641            assert!(new_size > size);
1642
1643            // Check size of different section - should still be 0
1644            let size = journal.size(2).await.unwrap();
1645            assert_eq!(size, 0);
1646
1647            // Append data to section 2
1648            journal.append(2, 44i32).await.unwrap();
1649
1650            // Check size of section 2 - should be greater than 0
1651            let size = journal.size(2).await.unwrap();
1652            assert!(size > 0);
1653
1654            // Rollback everything in section 1
1655            journal.rewind_section(1, 0).await.unwrap();
1656
1657            // Check size of section 1 - should be 0
1658            let size = journal.size(1).await.unwrap();
1659            assert_eq!(size, 0);
1660
1661            // Check size of section 2 - should be greater than 0
1662            let size = journal.size(2).await.unwrap();
1663            assert!(size > 0);
1664        });
1665    }
1666
1667    #[test_traced]
1668    fn test_journal_small_items() {
1669        let executor = deterministic::Runner::default();
1670        executor.start(|context| async move {
1671            let cfg = Config {
1672                partition: "test_partition".into(),
1673                compression: None,
1674                codec_config: (),
1675                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1676                write_buffer: NZUsize!(1024),
1677            };
1678
1679            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1680                .await
1681                .expect("Failed to initialize journal");
1682
1683            // Append many small (1-byte) items to the same section
1684            let num_items = 100;
1685            let mut offsets = Vec::new();
1686            for i in 0..num_items {
1687                let (offset, size) = journal
1688                    .append(1, i as u8)
1689                    .await
1690                    .expect("Failed to append data");
1691                assert_eq!(size, 1, "u8 should encode to 1 byte");
1692                offsets.push(offset);
1693            }
1694            journal.sync(1).await.expect("Failed to sync");
1695
1696            // Read each item back via random access
1697            for (i, &offset) in offsets.iter().enumerate() {
1698                let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1699                assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1700            }
1701
1702            // Drop and reopen to test replay
1703            drop(journal);
1704            let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1705                .await
1706                .expect("Failed to re-initialize journal");
1707
1708            // Replay and verify all items
1709            let stream = journal
1710                .replay(0, 0, NZUsize!(1024))
1711                .await
1712                .expect("Failed to setup replay");
1713            pin_mut!(stream);
1714
1715            let mut count = 0;
1716            while let Some(result) = stream.next().await {
1717                let (section, offset, size, item) = result.expect("Failed to replay item");
1718                assert_eq!(section, 1);
1719                assert_eq!(offset, offsets[count]);
1720                assert_eq!(size, 1);
1721                assert_eq!(item, count as u8);
1722                count += 1;
1723            }
1724            assert_eq!(count, num_items, "Should replay all items");
1725        });
1726    }
1727
1728    #[test_traced]
1729    fn test_journal_rewind_many_sections() {
1730        let executor = deterministic::Runner::default();
1731        executor.start(|context| async move {
1732            let cfg = Config {
1733                partition: "test_partition".to_string(),
1734                compression: None,
1735                codec_config: (),
1736                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1737                write_buffer: NZUsize!(1024),
1738            };
1739            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1740
1741            // Create sections 1-10 with data
1742            for section in 1u64..=10 {
1743                journal.append(section, section as i32).await.unwrap();
1744            }
1745            journal.sync_all().await.unwrap();
1746
1747            // Verify all sections exist
1748            for section in 1u64..=10 {
1749                let size = journal.size(section).await.unwrap();
1750                assert!(size > 0, "section {section} should have data");
1751            }
1752
1753            // Rewind to section 5 (should remove sections 6-10)
1754            journal
1755                .rewind(5, journal.size(5).await.unwrap())
1756                .await
1757                .unwrap();
1758
1759            // Verify sections 1-5 still exist with correct data
1760            for section in 1u64..=5 {
1761                let size = journal.size(section).await.unwrap();
1762                assert!(size > 0, "section {section} should still have data");
1763            }
1764
1765            // Verify sections 6-10 are removed (size should be 0)
1766            for section in 6u64..=10 {
1767                let size = journal.size(section).await.unwrap();
1768                assert_eq!(size, 0, "section {section} should be removed");
1769            }
1770
1771            // Verify data integrity via replay
1772            {
1773                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1774                pin_mut!(stream);
1775                let mut items = Vec::new();
1776                while let Some(result) = stream.next().await {
1777                    let (section, _, _, item) = result.unwrap();
1778                    items.push((section, item));
1779                }
1780                assert_eq!(items.len(), 5);
1781                for (i, (section, item)) in items.iter().enumerate() {
1782                    assert_eq!(*section, (i + 1) as u64);
1783                    assert_eq!(*item, (i + 1) as i32);
1784                }
1785            }
1786
1787            journal.destroy().await.unwrap();
1788        });
1789    }
1790
1791    #[test_traced]
1792    fn test_journal_rewind_partial_truncation() {
1793        let executor = deterministic::Runner::default();
1794        executor.start(|context| async move {
1795            let cfg = Config {
1796                partition: "test_partition".to_string(),
1797                compression: None,
1798                codec_config: (),
1799                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1800                write_buffer: NZUsize!(1024),
1801            };
1802            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1803
1804            // Append 5 items and record sizes after each
1805            let mut sizes = Vec::new();
1806            for i in 0..5 {
1807                journal.append(1, i).await.unwrap();
1808                journal.sync(1).await.unwrap();
1809                sizes.push(journal.size(1).await.unwrap());
1810            }
1811
1812            // Rewind to keep only first 3 items
1813            let target_size = sizes[2];
1814            journal.rewind(1, target_size).await.unwrap();
1815
1816            // Verify size is correct
1817            let new_size = journal.size(1).await.unwrap();
1818            assert_eq!(new_size, target_size);
1819
1820            // Verify first 3 items via replay
1821            {
1822                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1823                pin_mut!(stream);
1824                let mut items = Vec::new();
1825                while let Some(result) = stream.next().await {
1826                    let (_, _, _, item) = result.unwrap();
1827                    items.push(item);
1828                }
1829                assert_eq!(items.len(), 3);
1830                for (i, item) in items.iter().enumerate() {
1831                    assert_eq!(*item, i as i32);
1832                }
1833            }
1834
1835            journal.destroy().await.unwrap();
1836        });
1837    }
1838
1839    #[test_traced]
1840    fn test_journal_rewind_nonexistent_target() {
1841        let executor = deterministic::Runner::default();
1842        executor.start(|context| async move {
1843            let cfg = Config {
1844                partition: "test_partition".to_string(),
1845                compression: None,
1846                codec_config: (),
1847                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1848                write_buffer: NZUsize!(1024),
1849            };
1850            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1851
1852            // Create sections 5, 6, 7 (skip 1-4)
1853            for section in 5u64..=7 {
1854                journal.append(section, section as i32).await.unwrap();
1855            }
1856            journal.sync_all().await.unwrap();
1857
1858            // Rewind to section 3 (doesn't exist)
1859            journal.rewind(3, 0).await.unwrap();
1860
1861            // Verify sections 5, 6, 7 are removed
1862            for section in 5u64..=7 {
1863                let size = journal.size(section).await.unwrap();
1864                assert_eq!(size, 0, "section {section} should be removed");
1865            }
1866
1867            // Verify replay returns nothing
1868            {
1869                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1870                pin_mut!(stream);
1871                let items: Vec<_> = stream.collect().await;
1872                assert!(items.is_empty());
1873            }
1874
1875            journal.destroy().await.unwrap();
1876        });
1877    }
1878
1879    #[test_traced]
1880    fn test_journal_rewind_persistence() {
1881        let executor = deterministic::Runner::default();
1882        executor.start(|context| async move {
1883            let cfg = Config {
1884                partition: "test_partition".to_string(),
1885                compression: None,
1886                codec_config: (),
1887                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1888                write_buffer: NZUsize!(1024),
1889            };
1890
1891            // Create sections 1-5 with data
1892            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1893                .await
1894                .unwrap();
1895            for section in 1u64..=5 {
1896                journal.append(section, section as i32).await.unwrap();
1897            }
1898            journal.sync_all().await.unwrap();
1899
1900            // Rewind to section 2
1901            let size = journal.size(2).await.unwrap();
1902            journal.rewind(2, size).await.unwrap();
1903            journal.sync_all().await.unwrap();
1904            drop(journal);
1905
1906            // Re-init and verify only sections 1-2 exist
1907            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1908                .await
1909                .unwrap();
1910
1911            // Verify sections 1-2 have data
1912            for section in 1u64..=2 {
1913                let size = journal.size(section).await.unwrap();
1914                assert!(size > 0, "section {section} should have data after restart");
1915            }
1916
1917            // Verify sections 3-5 are gone
1918            for section in 3u64..=5 {
1919                let size = journal.size(section).await.unwrap();
1920                assert_eq!(size, 0, "section {section} should be gone after restart");
1921            }
1922
1923            // Verify data integrity via replay
1924            {
1925                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1926                pin_mut!(stream);
1927                let mut items = Vec::new();
1928                while let Some(result) = stream.next().await {
1929                    let (section, _, _, item) = result.unwrap();
1930                    items.push((section, item));
1931                }
1932                assert_eq!(items.len(), 2);
1933                assert_eq!(items[0], (1, 1));
1934                assert_eq!(items[1], (2, 2));
1935            }
1936
1937            journal.destroy().await.unwrap();
1938        });
1939    }
1940
1941    #[test_traced]
1942    fn test_journal_rewind_to_zero_removes_all_newer() {
1943        let executor = deterministic::Runner::default();
1944        executor.start(|context| async move {
1945            let cfg = Config {
1946                partition: "test_partition".to_string(),
1947                compression: None,
1948                codec_config: (),
1949                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1950                write_buffer: NZUsize!(1024),
1951            };
1952            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1953
1954            // Create sections 1, 2, 3
1955            for section in 1u64..=3 {
1956                journal.append(section, section as i32).await.unwrap();
1957            }
1958            journal.sync_all().await.unwrap();
1959
1960            // Rewind section 1 to size 0
1961            journal.rewind(1, 0).await.unwrap();
1962
1963            // Verify section 1 exists but is empty
1964            let size = journal.size(1).await.unwrap();
1965            assert_eq!(size, 0, "section 1 should be empty");
1966
1967            // Verify sections 2, 3 are completely removed
1968            for section in 2u64..=3 {
1969                let size = journal.size(section).await.unwrap();
1970                assert_eq!(size, 0, "section {section} should be removed");
1971            }
1972
1973            // Verify replay returns nothing
1974            {
1975                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1976                pin_mut!(stream);
1977                let items: Vec<_> = stream.collect().await;
1978                assert!(items.is_empty());
1979            }
1980
1981            journal.destroy().await.unwrap();
1982        });
1983    }
1984
1985    #[test_traced]
1986    fn test_journal_replay_start_offset_with_trailing_bytes() {
1987        // Regression: valid_offset must be initialized to start_offset, not 0.
1988        let executor = deterministic::Runner::default();
1989        executor.start(|context| async move {
1990            let cfg = Config {
1991                partition: "test_partition".into(),
1992                compression: None,
1993                codec_config: (),
1994                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1995                write_buffer: NZUsize!(1024),
1996            };
1997            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1998                .await
1999                .expect("Failed to initialize journal");
2000
2001            // Append several items to build up valid data
2002            for i in 0..5i32 {
2003                journal.append(1, i).await.unwrap();
2004            }
2005            journal.sync(1).await.unwrap();
2006            let valid_logical_size = journal.size(1).await.unwrap();
2007            drop(journal);
2008
2009            // Get the physical blob size before corruption
2010            let (blob, physical_size_before) = context
2011                .open(&cfg.partition, &1u64.to_be_bytes())
2012                .await
2013                .unwrap();
2014
2015            // Write incomplete varint: 0xFF has continuation bit set, needs more bytes
2016            // This creates 2 trailing bytes that cannot form a valid item
2017            blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2018                .await
2019                .unwrap();
2020            blob.sync().await.unwrap();
2021
2022            // Reopen journal and replay starting PAST all valid items
2023            // (start_offset = valid_logical_size means we skip all valid data)
2024            // The first thing encountered will be the trailing corrupt bytes
2025            let start_offset = valid_logical_size;
2026            {
2027                let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2028                    .await
2029                    .unwrap();
2030
2031                let stream = journal
2032                    .replay(1, start_offset, NZUsize!(1024))
2033                    .await
2034                    .unwrap();
2035                pin_mut!(stream);
2036
2037                // Consume the stream - should detect trailing bytes and truncate
2038                while let Some(_result) = stream.next().await {}
2039            }
2040
2041            // Verify that valid data before start_offset was NOT lost
2042            let (_, physical_size_after) = context
2043                .open(&cfg.partition, &1u64.to_be_bytes())
2044                .await
2045                .unwrap();
2046
2047            // The blob should have been truncated back to the valid physical size
2048            // (removing the trailing corrupt bytes) but NOT to 0
2049            assert!(
2050                physical_size_after >= physical_size_before,
2051                "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2052                 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2053                 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2054            );
2055        });
2056    }
2057
2058    #[test_traced]
2059    fn test_journal_large_item_spanning_pages() {
2060        // 2048 bytes spans 2 full pages (PAGE_SIZE = 1024).
2061        const LARGE_SIZE: usize = 2048;
2062        type LargeItem = [u8; LARGE_SIZE];
2063
2064        let executor = deterministic::Runner::default();
2065        executor.start(|context| async move {
2066            let cfg = Config {
2067                partition: "test_partition".into(),
2068                compression: None,
2069                codec_config: (),
2070                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2071                write_buffer: NZUsize!(4096),
2072            };
2073            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2074                .await
2075                .expect("Failed to initialize journal");
2076
2077            // Create a large item that spans multiple pages.
2078            let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2079            for (i, byte) in large_data.iter_mut().enumerate() {
2080                *byte = (i % 256) as u8;
2081            }
2082            assert!(
2083                LARGE_SIZE > PAGE_SIZE.get() as usize,
2084                "Item must be larger than page size"
2085            );
2086
2087            // Append the large item
2088            let (offset, size) = journal
2089                .append(1, large_data)
2090                .await
2091                .expect("Failed to append large item");
2092            assert_eq!(size as usize, LARGE_SIZE);
2093            journal.sync(1).await.expect("Failed to sync");
2094
2095            // Read the item back via random access
2096            let retrieved: LargeItem = journal
2097                .get(1, offset)
2098                .await
2099                .expect("Failed to get large item");
2100            assert_eq!(retrieved, large_data, "Random access read mismatch");
2101
2102            // Drop and reopen to test replay
2103            drop(journal);
2104            let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2105                .await
2106                .expect("Failed to re-initialize journal");
2107
2108            // Replay and verify the large item
2109            {
2110                let stream = journal
2111                    .replay(0, 0, NZUsize!(1024))
2112                    .await
2113                    .expect("Failed to setup replay");
2114                pin_mut!(stream);
2115
2116                let mut items = Vec::new();
2117                while let Some(result) = stream.next().await {
2118                    let (section, off, sz, item) = result.expect("Failed to replay item");
2119                    items.push((section, off, sz, item));
2120                }
2121
2122                assert_eq!(items.len(), 1, "Should have exactly one item");
2123                let (section, off, sz, item) = &items[0];
2124                assert_eq!(*section, 1);
2125                assert_eq!(*off, offset);
2126                assert_eq!(*sz as usize, LARGE_SIZE);
2127                assert_eq!(*item, large_data, "Replay read mismatch");
2128            }
2129
2130            journal.destroy().await.unwrap();
2131        });
2132    }
2133
2134    #[test_traced]
2135    fn test_journal_non_contiguous_sections() {
2136        // Test that sections with gaps in numbering work correctly.
2137        // Sections 1, 5, 10 should all be independent and accessible.
2138        let executor = deterministic::Runner::default();
2139        executor.start(|context| async move {
2140            let cfg = Config {
2141                partition: "test_partition".into(),
2142                compression: None,
2143                codec_config: (),
2144                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2145                write_buffer: NZUsize!(1024),
2146            };
2147            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2148                .await
2149                .expect("Failed to initialize journal");
2150
2151            // Create sections with gaps: 1, 5, 10
2152            let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2153            let mut offsets = Vec::new();
2154
2155            for (section, data) in &sections_and_data {
2156                let (offset, _) = journal
2157                    .append(*section, *data)
2158                    .await
2159                    .expect("Failed to append");
2160                offsets.push(offset);
2161            }
2162            journal.sync_all().await.expect("Failed to sync");
2163
2164            // Verify random access to each section
2165            for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2166                let retrieved: i32 = journal
2167                    .get(*section, offsets[i])
2168                    .await
2169                    .expect("Failed to get item");
2170                assert_eq!(retrieved, *expected_data);
2171            }
2172
2173            // Verify non-existent sections return appropriate errors
2174            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2175                let result = journal.get(missing_section, 0).await;
2176                assert!(
2177                    matches!(result, Err(Error::SectionOutOfRange(_))),
2178                    "Expected SectionOutOfRange for section {}, got {:?}",
2179                    missing_section,
2180                    result
2181                );
2182            }
2183
2184            // Drop and reopen to test replay
2185            drop(journal);
2186            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2187                .await
2188                .expect("Failed to re-initialize journal");
2189
2190            // Replay and verify all items in order
2191            {
2192                let stream = journal
2193                    .replay(0, 0, NZUsize!(1024))
2194                    .await
2195                    .expect("Failed to setup replay");
2196                pin_mut!(stream);
2197
2198                let mut items = Vec::new();
2199                while let Some(result) = stream.next().await {
2200                    let (section, _, _, item) = result.expect("Failed to replay item");
2201                    items.push((section, item));
2202                }
2203
2204                assert_eq!(items.len(), 3, "Should have 3 items");
2205                assert_eq!(items[0], (1, 100));
2206                assert_eq!(items[1], (5, 500));
2207                assert_eq!(items[2], (10, 1000));
2208            }
2209
2210            // Test replay starting from middle section (5)
2211            {
2212                let stream = journal
2213                    .replay(5, 0, NZUsize!(1024))
2214                    .await
2215                    .expect("Failed to setup replay from section 5");
2216                pin_mut!(stream);
2217
2218                let mut items = Vec::new();
2219                while let Some(result) = stream.next().await {
2220                    let (section, _, _, item) = result.expect("Failed to replay item");
2221                    items.push((section, item));
2222                }
2223
2224                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2225                assert_eq!(items[0], (5, 500));
2226                assert_eq!(items[1], (10, 1000));
2227            }
2228
2229            // Test replay starting from non-existent section (should skip to next)
2230            {
2231                let stream = journal
2232                    .replay(3, 0, NZUsize!(1024))
2233                    .await
2234                    .expect("Failed to setup replay from section 3");
2235                pin_mut!(stream);
2236
2237                let mut items = Vec::new();
2238                while let Some(result) = stream.next().await {
2239                    let (section, _, _, item) = result.expect("Failed to replay item");
2240                    items.push((section, item));
2241                }
2242
2243                // Should get sections 5 and 10 (skipping non-existent 3, 4)
2244                assert_eq!(items.len(), 2);
2245                assert_eq!(items[0], (5, 500));
2246                assert_eq!(items[1], (10, 1000));
2247            }
2248
2249            journal.destroy().await.unwrap();
2250        });
2251    }
2252
2253    #[test_traced]
2254    fn test_journal_empty_section_in_middle() {
2255        // Test that replay correctly handles an empty section between sections with data.
2256        // Section 1 has data, section 2 is empty, section 3 has data.
2257        let executor = deterministic::Runner::default();
2258        executor.start(|context| async move {
2259            let cfg = Config {
2260                partition: "test_partition".into(),
2261                compression: None,
2262                codec_config: (),
2263                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2264                write_buffer: NZUsize!(1024),
2265            };
2266            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2267                .await
2268                .expect("Failed to initialize journal");
2269
2270            // Append to section 1
2271            journal.append(1, 100i32).await.expect("Failed to append");
2272
2273            // Create section 2 but don't append anything - just sync to create the blob
2274            // Actually, we need to append something and then rewind to make it empty
2275            journal.append(2, 200i32).await.expect("Failed to append");
2276            journal.sync(2).await.expect("Failed to sync");
2277            journal
2278                .rewind_section(2, 0)
2279                .await
2280                .expect("Failed to rewind");
2281
2282            // Append to section 3
2283            journal.append(3, 300i32).await.expect("Failed to append");
2284
2285            journal.sync_all().await.expect("Failed to sync");
2286
2287            // Verify section sizes
2288            assert!(journal.size(1).await.unwrap() > 0);
2289            assert_eq!(journal.size(2).await.unwrap(), 0);
2290            assert!(journal.size(3).await.unwrap() > 0);
2291
2292            // Drop and reopen to test replay
2293            drop(journal);
2294            let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2295                .await
2296                .expect("Failed to re-initialize journal");
2297
2298            // Replay all - should get items from sections 1 and 3, skipping empty section 2
2299            {
2300                let stream = journal
2301                    .replay(0, 0, NZUsize!(1024))
2302                    .await
2303                    .expect("Failed to setup replay");
2304                pin_mut!(stream);
2305
2306                let mut items = Vec::new();
2307                while let Some(result) = stream.next().await {
2308                    let (section, _, _, item) = result.expect("Failed to replay item");
2309                    items.push((section, item));
2310                }
2311
2312                assert_eq!(
2313                    items.len(),
2314                    2,
2315                    "Should have 2 items (skipping empty section)"
2316                );
2317                assert_eq!(items[0], (1, 100));
2318                assert_eq!(items[1], (3, 300));
2319            }
2320
2321            // Replay starting from empty section 2 - should get only section 3
2322            {
2323                let stream = journal
2324                    .replay(2, 0, NZUsize!(1024))
2325                    .await
2326                    .expect("Failed to setup replay from section 2");
2327                pin_mut!(stream);
2328
2329                let mut items = Vec::new();
2330                while let Some(result) = stream.next().await {
2331                    let (section, _, _, item) = result.expect("Failed to replay item");
2332                    items.push((section, item));
2333                }
2334
2335                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2336                assert_eq!(items[0], (3, 300));
2337            }
2338
2339            journal.destroy().await.unwrap();
2340        });
2341    }
2342
2343    #[test_traced]
2344    fn test_journal_item_exactly_page_size() {
2345        // Test that items exactly equal to PAGE_SIZE work correctly.
2346        // This is a boundary condition where item fills exactly one page.
2347        const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2348        type ExactItem = [u8; ITEM_SIZE];
2349
2350        let executor = deterministic::Runner::default();
2351        executor.start(|context| async move {
2352            let cfg = Config {
2353                partition: "test_partition".into(),
2354                compression: None,
2355                codec_config: (),
2356                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2357                write_buffer: NZUsize!(4096),
2358            };
2359            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2360                .await
2361                .expect("Failed to initialize journal");
2362
2363            // Create an item exactly PAGE_SIZE bytes
2364            let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2365            for (i, byte) in exact_data.iter_mut().enumerate() {
2366                *byte = (i % 256) as u8;
2367            }
2368
2369            // Append the exact-size item
2370            let (offset, size) = journal
2371                .append(1, exact_data)
2372                .await
2373                .expect("Failed to append exact item");
2374            assert_eq!(size as usize, ITEM_SIZE);
2375            journal.sync(1).await.expect("Failed to sync");
2376
2377            // Read the item back via random access
2378            let retrieved: ExactItem = journal
2379                .get(1, offset)
2380                .await
2381                .expect("Failed to get exact item");
2382            assert_eq!(retrieved, exact_data, "Random access read mismatch");
2383
2384            // Drop and reopen to test replay
2385            drop(journal);
2386            let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2387                .await
2388                .expect("Failed to re-initialize journal");
2389
2390            // Replay and verify
2391            {
2392                let stream = journal
2393                    .replay(0, 0, NZUsize!(1024))
2394                    .await
2395                    .expect("Failed to setup replay");
2396                pin_mut!(stream);
2397
2398                let mut items = Vec::new();
2399                while let Some(result) = stream.next().await {
2400                    let (section, off, sz, item) = result.expect("Failed to replay item");
2401                    items.push((section, off, sz, item));
2402                }
2403
2404                assert_eq!(items.len(), 1, "Should have exactly one item");
2405                let (section, off, sz, item) = &items[0];
2406                assert_eq!(*section, 1);
2407                assert_eq!(*off, offset);
2408                assert_eq!(*sz as usize, ITEM_SIZE);
2409                assert_eq!(*item, exact_data, "Replay read mismatch");
2410            }
2411
2412            journal.destroy().await.unwrap();
2413        });
2414    }
2415
2416    #[test_traced]
2417    fn test_journal_varint_spanning_page_boundary() {
2418        // Test that items with data spanning page boundaries work correctly
2419        // when using a small page size.
2420        //
2421        // With PAGE_SIZE=16:
2422        // - Physical page = 16 + 12 = 28 bytes
2423        // - Each [u8; 128] item = 2-byte varint + 128 bytes data = 130 bytes
2424        // - This spans multiple 16-byte pages, testing cross-page reading
2425        const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2426
2427        let executor = deterministic::Runner::default();
2428        executor.start(|context| async move {
2429            let cfg = Config {
2430                partition: "test_partition".into(),
2431                compression: None,
2432                codec_config: (),
2433                page_cache: CacheRef::new(SMALL_PAGE, PAGE_CACHE_SIZE),
2434                write_buffer: NZUsize!(1024),
2435            };
2436            let mut journal: Journal<_, [u8; 128]> =
2437                Journal::init(context.with_label("first"), cfg.clone())
2438                    .await
2439                    .expect("Failed to initialize journal");
2440
2441            // Create items that will span many 16-byte pages
2442            let item1: [u8; 128] = [1u8; 128];
2443            let item2: [u8; 128] = [2u8; 128];
2444            let item3: [u8; 128] = [3u8; 128];
2445
2446            // Append items - each is 130 bytes (2-byte varint + 128 data)
2447            // spanning ceil(130/16) = 9 pages worth of logical data
2448            let (offset1, _) = journal.append(1, item1).await.expect("Failed to append");
2449            let (offset2, _) = journal.append(1, item2).await.expect("Failed to append");
2450            let (offset3, _) = journal.append(1, item3).await.expect("Failed to append");
2451
2452            journal.sync(1).await.expect("Failed to sync");
2453
2454            // Read items back via random access
2455            let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2456            let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2457            let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2458            assert_eq!(retrieved1, item1);
2459            assert_eq!(retrieved2, item2);
2460            assert_eq!(retrieved3, item3);
2461
2462            // Drop and reopen to test replay
2463            drop(journal);
2464            let journal: Journal<_, [u8; 128]> =
2465                Journal::init(context.with_label("second"), cfg.clone())
2466                    .await
2467                    .expect("Failed to re-initialize journal");
2468
2469            // Replay and verify all items
2470            {
2471                let stream = journal
2472                    .replay(0, 0, NZUsize!(64))
2473                    .await
2474                    .expect("Failed to setup replay");
2475                pin_mut!(stream);
2476
2477                let mut items = Vec::new();
2478                while let Some(result) = stream.next().await {
2479                    let (section, off, _, item) = result.expect("Failed to replay item");
2480                    items.push((section, off, item));
2481                }
2482
2483                assert_eq!(items.len(), 3, "Should have 3 items");
2484                assert_eq!(items[0], (1, offset1, item1));
2485                assert_eq!(items[1], (1, offset2, item2));
2486                assert_eq!(items[2], (1, offset3, item3));
2487            }
2488
2489            journal.destroy().await.unwrap();
2490        });
2491    }
2492
2493    #[test_traced]
2494    fn test_journal_clear() {
2495        let executor = deterministic::Runner::default();
2496        executor.start(|context| async move {
2497            let cfg = Config {
2498                partition: "clear_test".into(),
2499                compression: None,
2500                codec_config: (),
2501                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2502                write_buffer: NZUsize!(1024),
2503            };
2504
2505            let mut journal: Journal<_, u64> =
2506                Journal::init(context.with_label("journal"), cfg.clone())
2507                    .await
2508                    .expect("Failed to initialize journal");
2509
2510            // Append items across multiple sections
2511            for section in 0..5u64 {
2512                for i in 0..10u64 {
2513                    journal
2514                        .append(section, section * 1000 + i)
2515                        .await
2516                        .expect("Failed to append");
2517                }
2518                journal.sync(section).await.expect("Failed to sync");
2519            }
2520
2521            // Verify we have data
2522            assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2523            assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2524
2525            // Clear the journal
2526            journal.clear().await.expect("Failed to clear");
2527
2528            // After clear, all reads should fail
2529            for section in 0..5u64 {
2530                assert!(matches!(
2531                    journal.get(section, 0).await,
2532                    Err(Error::SectionOutOfRange(s)) if s == section
2533                ));
2534            }
2535
2536            // Append new data after clear
2537            for i in 0..5u64 {
2538                journal
2539                    .append(10, i * 100)
2540                    .await
2541                    .expect("Failed to append after clear");
2542            }
2543            journal.sync(10).await.expect("Failed to sync after clear");
2544
2545            // New data should be readable
2546            assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2547
2548            // Old sections should still be missing
2549            assert!(matches!(
2550                journal.get(0, 0).await,
2551                Err(Error::SectionOutOfRange(0))
2552            ));
2553
2554            journal.destroy().await.unwrap();
2555        });
2556    }
2557}