commonware_storage/journal/segmented/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `segmented::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular [Blob] in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+
15//! |       0 ~ 4       |    ...    |
16//! +---+---+---+---+---+---+---+---+
17//! | Size (varint u32) |   Data    |
18//! +---+---+---+---+---+---+---+---+
19//! ```
20//!
21//! # Open Blobs
22//!
23//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
24//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
25//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
26//! `sections`.
27//!
28//! # Sync
29//!
30//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
31//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
32//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
33//!
34//! # Pruning
35//!
36//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
37//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
38//! could be used, for example, by some blockchain application to prune old blocks.
39//!
40//! # Replay
41//!
42//! During application initialization, it is very common to replay data from `Journal` to recover
43//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
44//! method to produce a stream of all items in the `Journal` in order of their `section` and
45//! `offset`.
46//!
47//! # Compression
48//!
49//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
50//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
51//! be changed between initializations of `Journal`, however, it must remain populated if any data
52//! was written with compression enabled.
53//!
54//! # Example
55//!
56//! ```rust
57//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
58//! use commonware_storage::journal::segmented::variable::{Journal, Config};
59//! use commonware_utils::{NZUsize, NZU16};
60//!
61//! let executor = deterministic::Runner::default();
62//! executor.start(|context| async move {
63//!     // Create a journal
64//!     let mut journal = Journal::init(context, Config{
65//!         partition: "partition".to_string(),
66//!         compression: None,
67//!         codec_config: (),
68//!         buffer_pool: PoolRef::new(NZU16!(1024), NZUsize!(10)),
69//!         write_buffer: NZUsize!(1024 * 1024),
70//!     }).await.unwrap();
71//!
72//!     // Append data to the journal
73//!     journal.append(1, 128).await.unwrap();
74//!
75//!     // Sync the journal
76//!     journal.sync_all().await.unwrap();
77//! });
78//! ```
79
80use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
81use crate::journal::Error;
82use bytes::{Buf, BufMut, Bytes};
83use commonware_codec::{
84    varint::UInt, Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
85};
86use commonware_runtime::{
87    buffer::pool::{Append, PoolRef, Replay},
88    Blob, Metrics, Storage,
89};
90use futures::stream::{self, Stream, StreamExt};
91use std::{io::Cursor, num::NonZeroUsize};
92use tracing::{trace, warn};
93use zstd::{bulk::compress, decode_all};
94
95/// Maximum size of a varint for u32 (also the minimum useful read size for parsing item headers).
96const MAX_VARINT_SIZE: usize = 5;
97
98/// Configuration for `Journal` storage.
99#[derive(Clone)]
100pub struct Config<C> {
101    /// The `commonware-runtime::Storage` partition to use
102    /// for storing journal blobs.
103    pub partition: String,
104
105    /// Optional compression level (using `zstd`) to apply to data before storing.
106    pub compression: Option<u8>,
107
108    /// The codec configuration to use for encoding and decoding items.
109    pub codec_config: C,
110
111    /// The buffer pool to use for caching data.
112    pub buffer_pool: PoolRef,
113
114    /// The size of the write buffer to use for each blob.
115    pub write_buffer: NonZeroUsize,
116}
117
118/// Decodes a varint length prefix from a buffer.
119/// Returns (item_size, varint_len).
120#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122    let initial = buf.remaining();
123    let size = UInt::<u32>::read(buf)?.0 as usize;
124    let varint_len = initial - buf.remaining();
125    Ok((size, varint_len))
126}
127
128/// Result of finding an item in a buffer (offsets/lengths, not slices).
129enum ItemInfo {
130    /// All item data is available in the buffer.
131    Complete {
132        /// Length of the varint prefix.
133        varint_len: usize,
134        /// Length of the item data.
135        data_len: usize,
136    },
137    /// Only some item data is available.
138    Incomplete {
139        /// Length of the varint prefix.
140        varint_len: usize,
141        /// Bytes of item data available in buffer.
142        prefix_len: usize,
143        /// Full size of the item.
144        total_len: usize,
145    },
146}
147
148/// Find an item in a buffer by decoding its length prefix.
149///
150/// Returns (next_offset, item_info). The buffer is advanced past the varint.
151fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152    let available = buf.remaining();
153    let (size, varint_len) = decode_length_prefix(buf)?;
154    let next_offset = offset
155        .checked_add(varint_len as u64)
156        .ok_or(Error::OffsetOverflow)?
157        .checked_add(size as u64)
158        .ok_or(Error::OffsetOverflow)?;
159    let buffered = available.saturating_sub(varint_len);
160
161    let item = if buffered >= size {
162        ItemInfo::Complete {
163            varint_len,
164            data_len: size,
165        }
166    } else {
167        ItemInfo::Incomplete {
168            varint_len,
169            prefix_len: buffered,
170            total_len: size,
171        }
172    };
173
174    Ok((next_offset, item))
175}
176
177/// State for replaying a single section's blob.
178struct ReplayState<B: Blob, C> {
179    section: u64,
180    blob: Append<B>,
181    replay: Replay<B>,
182    skip_bytes: u64,
183    offset: u64,
184    valid_offset: u64,
185    codec_config: C,
186    compressed: bool,
187    done: bool,
188}
189
190/// Decode item data with optional decompression.
191fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192    if compressed {
193        let decompressed =
194            decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195        V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196    } else {
197        V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198    }
199}
200
201/// A segmented journal with variable-size entries.
202///
203/// Each section is stored in a separate blob. Items are length-prefixed with a varint.
204///
205/// # Repair
206///
207/// Like
208/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
209/// and
210/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
211/// the first invalid data read will be considered the new end of the journal (and the
212/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
213/// replay (not init) because any blob could have trailing bytes.
214pub struct Journal<E: Storage + Metrics, V: Codec> {
215    manager: Manager<E, AppendFactory>,
216
217    /// Compression level (if enabled).
218    compression: Option<u8>,
219
220    /// Codec configuration.
221    codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225    /// Initialize a new `Journal` instance.
226    ///
227    /// All backing blobs are opened but not read during
228    /// initialization. The `replay` method can be used
229    /// to iterate over all items in the `Journal`.
230    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231        let manager_cfg = ManagerConfig {
232            partition: cfg.partition,
233            factory: AppendFactory {
234                write_buffer: cfg.write_buffer,
235                pool_ref: cfg.buffer_pool,
236            },
237        };
238        let manager = Manager::init(context, manager_cfg).await?;
239
240        Ok(Self {
241            manager,
242            compression: cfg.compression,
243            codec_config: cfg.codec_config,
244        })
245    }
246
247    /// Reads an item from the blob at the given offset.
248    async fn read(
249        compressed: bool,
250        cfg: &V::Cfg,
251        blob: &Append<E::Blob>,
252        offset: u64,
253    ) -> Result<(u64, u32, V), Error> {
254        // Read varint header (max 5 bytes for u32)
255        let buf = vec![0u8; MAX_VARINT_SIZE];
256        let (stable_buf, available) = blob.read_up_to(buf, offset).await?;
257        let buf = Bytes::from(stable_buf);
258        let mut cursor = Cursor::new(buf.slice(..available));
259        let (next_offset, item_info) = find_item(&mut cursor, offset)?;
260
261        // Decode item - either directly from buffer or by chaining prefix with remainder
262        let (item_size, decoded) = match item_info {
263            ItemInfo::Complete {
264                varint_len,
265                data_len,
266            } => {
267                // Data follows varint in buffer
268                let data = buf.slice(varint_len..varint_len + data_len);
269                let decoded = decode_item::<V>(data, cfg, compressed)?;
270                (data_len as u32, decoded)
271            }
272            ItemInfo::Incomplete {
273                varint_len,
274                prefix_len,
275                total_len,
276            } => {
277                // Read remainder and chain with prefix to avoid copying
278                let prefix = buf.slice(varint_len..varint_len + prefix_len);
279                let read_offset = offset + varint_len as u64 + prefix_len as u64;
280                let remainder_len = total_len - prefix_len;
281                let mut remainder = vec![0u8; remainder_len];
282                blob.read_into(&mut remainder, read_offset).await?;
283                let chained = prefix.chain(Bytes::from(remainder));
284                let decoded = decode_item::<V>(chained, cfg, compressed)?;
285                (total_len as u32, decoded)
286            }
287        };
288
289        Ok((next_offset, item_size, decoded))
290    }
291
292    /// Returns an ordered stream of all items in the journal starting with the item at the given
293    /// `start_section` and `offset` into that section. Each item is returned as a tuple of
294    /// (section, offset, size, item).
295    pub async fn replay(
296        &self,
297        start_section: u64,
298        mut start_offset: u64,
299        buffer: NonZeroUsize,
300    ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
301        // Collect all blobs to replay (keeping blob reference for potential resize)
302        let codec_config = self.codec_config.clone();
303        let compressed = self.compression.is_some();
304        let mut blobs = Vec::new();
305        for (&section, blob) in self.manager.sections_from(start_section) {
306            blobs.push((
307                section,
308                blob.clone(),
309                blob.replay(buffer).await?,
310                codec_config.clone(),
311                compressed,
312            ));
313        }
314
315        // Stream items as they are read to avoid occupying too much memory
316        Ok(stream::iter(blobs).flat_map(
317            move |(section, blob, replay, codec_config, compressed)| {
318                // Calculate initial skip bytes for first blob
319                let skip_bytes = if section == start_section {
320                    start_offset
321                } else {
322                    start_offset = 0;
323                    0
324                };
325
326                stream::unfold(
327                    ReplayState {
328                        section,
329                        blob,
330                        replay,
331                        skip_bytes,
332                        offset: 0,
333                        valid_offset: skip_bytes,
334                        codec_config,
335                        compressed,
336                        done: false,
337                    },
338                    move |mut state| async move {
339                        if state.done {
340                            return None;
341                        }
342
343                        let blob_size = state.replay.blob_size();
344                        let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
345                        loop {
346                            // Ensure we have enough data for varint header.
347                            // ensure() returns Ok(false) if exhausted with fewer bytes,
348                            // but we still try to decode from remaining bytes.
349                            match state.replay.ensure(MAX_VARINT_SIZE).await {
350                                Ok(true) => {}
351                                Ok(false) => {
352                                    // Reader exhausted - check if buffer is empty
353                                    if state.replay.remaining() == 0 {
354                                        state.done = true;
355                                        return if batch.is_empty() {
356                                            None
357                                        } else {
358                                            Some((batch, state))
359                                        };
360                                    }
361                                    // Buffer still has data - continue to try decoding
362                                }
363                                Err(err) => {
364                                    batch.push(Err(err.into()));
365                                    state.done = true;
366                                    return Some((batch, state));
367                                }
368                            }
369
370                            // Skip bytes if needed (for start_offset)
371                            if state.skip_bytes > 0 {
372                                let to_skip =
373                                    state.skip_bytes.min(state.replay.remaining() as u64) as usize;
374                                state.replay.advance(to_skip);
375                                state.skip_bytes -= to_skip as u64;
376                                state.offset += to_skip as u64;
377                                continue;
378                            }
379
380                            // Try to decode length prefix
381                            let before_remaining = state.replay.remaining();
382                            let (item_size, varint_len) =
383                                match decode_length_prefix(&mut state.replay) {
384                                    Ok(result) => result,
385                                    Err(err) => {
386                                        // Could be incomplete varint - check if reader exhausted
387                                        if state.replay.is_exhausted()
388                                            || before_remaining < MAX_VARINT_SIZE
389                                        {
390                                            // Treat as trailing bytes
391                                            if state.valid_offset < blob_size
392                                                && state.offset < blob_size
393                                            {
394                                                warn!(
395                                                    blob = state.section,
396                                                    bad_offset = state.offset,
397                                                    new_size = state.valid_offset,
398                                                    "trailing bytes detected: truncating"
399                                                );
400                                                state.blob.resize(state.valid_offset).await.ok()?;
401                                            }
402                                            state.done = true;
403                                            return if batch.is_empty() {
404                                                None
405                                            } else {
406                                                Some((batch, state))
407                                            };
408                                        }
409                                        batch.push(Err(err));
410                                        state.done = true;
411                                        return Some((batch, state));
412                                    }
413                                };
414
415                            // Ensure we have enough data for item body
416                            match state.replay.ensure(item_size).await {
417                                Ok(true) => {}
418                                Ok(false) => {
419                                    // Incomplete item at end - truncate
420                                    warn!(
421                                        blob = state.section,
422                                        bad_offset = state.offset,
423                                        new_size = state.valid_offset,
424                                        "incomplete item at end: truncating"
425                                    );
426                                    state.blob.resize(state.valid_offset).await.ok()?;
427                                    state.done = true;
428                                    return if batch.is_empty() {
429                                        None
430                                    } else {
431                                        Some((batch, state))
432                                    };
433                                }
434                                Err(err) => {
435                                    batch.push(Err(err.into()));
436                                    state.done = true;
437                                    return Some((batch, state));
438                                }
439                            }
440
441                            // Decode item - use take() to limit bytes read
442                            let item_offset = state.offset;
443                            let next_offset = match state
444                                .offset
445                                .checked_add(varint_len as u64)
446                                .and_then(|o| o.checked_add(item_size as u64))
447                            {
448                                Some(o) => o,
449                                None => {
450                                    batch.push(Err(Error::OffsetOverflow));
451                                    state.done = true;
452                                    return Some((batch, state));
453                                }
454                            };
455                            match decode_item::<V>(
456                                (&mut state.replay).take(item_size),
457                                &state.codec_config,
458                                state.compressed,
459                            ) {
460                                Ok(decoded) => {
461                                    batch.push(Ok((
462                                        state.section,
463                                        item_offset,
464                                        item_size as u32,
465                                        decoded,
466                                    )));
467                                    state.valid_offset = next_offset;
468                                    state.offset = next_offset;
469                                }
470                                Err(err) => {
471                                    batch.push(Err(err));
472                                    state.done = true;
473                                    return Some((batch, state));
474                                }
475                            }
476
477                            // Return batch if we have items and buffer is low
478                            if !batch.is_empty() && state.replay.remaining() < MAX_VARINT_SIZE {
479                                return Some((batch, state));
480                            }
481                        }
482                    },
483                )
484                .flat_map(stream::iter)
485            },
486        ))
487    }
488
489    /// Appends an item to `Journal` in a given `section`, returning the offset
490    /// where the item was written and the size of the item (which may now be smaller
491    /// than the encoded size from the codec, if compression is enabled).
492    pub async fn append(&mut self, section: u64, item: V) -> Result<(u64, u32), Error> {
493        // Create buffer with item data (no checksum, no alignment)
494        let (buf, item_len) = if let Some(compression) = self.compression {
495            // Compressed: encode first, then compress
496            let encoded = item.encode();
497            let compressed =
498                compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
499            let item_len = compressed.len();
500            let item_len_u32: u32 = match item_len.try_into() {
501                Ok(len) => len,
502                Err(_) => return Err(Error::ItemTooLarge(item_len)),
503            };
504            let size_len = UInt(item_len_u32).encode_size();
505            let entry_len = size_len
506                .checked_add(item_len)
507                .ok_or(Error::OffsetOverflow)?;
508
509            let mut buf = Vec::with_capacity(entry_len);
510            UInt(item_len_u32).write(&mut buf);
511            buf.put_slice(&compressed);
512
513            (buf, item_len)
514        } else {
515            // Uncompressed: pre-allocate exact size to avoid copying
516            let item_len = item.encode_size();
517            let item_len_u32: u32 = match item_len.try_into() {
518                Ok(len) => len,
519                Err(_) => return Err(Error::ItemTooLarge(item_len)),
520            };
521            let size_len = UInt(item_len_u32).encode_size();
522            let entry_len = size_len
523                .checked_add(item_len)
524                .ok_or(Error::OffsetOverflow)?;
525
526            let mut buf = Vec::with_capacity(entry_len);
527            UInt(item_len_u32).write(&mut buf);
528            item.write(&mut buf);
529
530            (buf, item_len)
531        };
532
533        // Get or create blob
534        let blob = self.manager.get_or_create(section).await?;
535
536        // Get current position - this is where we'll write (no alignment)
537        let offset = blob.size().await;
538
539        // Append item to blob
540        blob.append(&buf).await?;
541        trace!(blob = section, offset, "appended item");
542        Ok((offset, item_len as u32))
543    }
544
545    /// Retrieves an item from `Journal` at a given `section` and `offset`.
546    ///
547    /// # Errors
548    ///  - [Error::AlreadyPrunedToSection] if the requested `section` has been pruned during the
549    ///    current execution.
550    ///  - [Error::SectionOutOfRange] if the requested `section` is empty (i.e. has never had any
551    ///    data appended to it, or has been pruned in a previous execution).
552    ///  - An invalid `offset` for a given section (that is, an offset that doesn't correspond to a
553    ///    previously appended item) will result in an error, with the specific type being
554    ///    undefined.
555    pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
556        let blob = self
557            .manager
558            .get(section)?
559            .ok_or(Error::SectionOutOfRange(section))?;
560
561        // Perform a multi-op read.
562        let (_, _, item) =
563            Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
564        Ok(item)
565    }
566
567    /// Gets the size of the journal for a specific section.
568    ///
569    /// Returns 0 if the section does not exist.
570    pub async fn size(&self, section: u64) -> Result<u64, Error> {
571        self.manager.size(section).await
572    }
573
574    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
575    ///
576    /// # Warnings
577    ///
578    /// * This operation is not guaranteed to survive restarts until sync is called.
579    /// * This operation is not atomic, but it will always leave the journal in a consistent state
580    ///   in the event of failure since blobs are always removed in reverse order of section.
581    pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
582        self.manager.rewind(section, offset).await
583    }
584
585    /// Rewinds the journal to the given `section` and `size`.
586    ///
587    /// This removes any data beyond the specified `section` and `size`.
588    ///
589    /// # Warnings
590    ///
591    /// * This operation is not guaranteed to survive restarts until sync is called.
592    /// * This operation is not atomic, but it will always leave the journal in a consistent state
593    ///   in the event of failure since blobs are always removed in reverse order of section.
594    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
595        self.manager.rewind(section, size).await
596    }
597
598    /// Rewinds the `section` to the given `size`.
599    ///
600    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
601    ///
602    /// # Warning
603    ///
604    /// This operation is not guaranteed to survive restarts until sync is called.
605    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
606        self.manager.rewind_section(section, size).await
607    }
608
609    /// Ensures that all data in a given `section` is synced to the underlying store.
610    ///
611    /// If the `section` does not exist, no error will be returned.
612    pub async fn sync(&self, section: u64) -> Result<(), Error> {
613        self.manager.sync(section).await
614    }
615
616    /// Syncs all open sections.
617    pub async fn sync_all(&self) -> Result<(), Error> {
618        self.manager.sync_all().await
619    }
620
621    /// Prunes all `sections` less than `min`. Returns true if any sections were pruned.
622    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
623        self.manager.prune(min).await
624    }
625
626    /// Returns the number of the oldest section in the journal.
627    pub fn oldest_section(&self) -> Option<u64> {
628        self.manager.oldest_section()
629    }
630
631    /// Returns the number of the newest section in the journal.
632    pub fn newest_section(&self) -> Option<u64> {
633        self.manager.newest_section()
634    }
635
636    /// Returns true if no sections exist.
637    pub fn is_empty(&self) -> bool {
638        self.manager.is_empty()
639    }
640
641    /// Returns the number of sections.
642    pub fn num_sections(&self) -> usize {
643        self.manager.num_sections()
644    }
645
646    /// Removes any underlying blobs created by the journal.
647    pub async fn destroy(self) -> Result<(), Error> {
648        self.manager.destroy().await
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use bytes::BufMut;
656    use commonware_macros::test_traced;
657    use commonware_runtime::{deterministic, Blob, Runner, Storage};
658    use commonware_utils::{NZUsize, NZU16};
659    use futures::{pin_mut, StreamExt};
660    use std::num::NonZeroU16;
661
662    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
663    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
664
665    #[test_traced]
666    fn test_journal_append_and_read() {
667        // Initialize the deterministic context
668        let executor = deterministic::Runner::default();
669
670        // Start the test within the executor
671        executor.start(|context| async move {
672            // Initialize the journal
673            let cfg = Config {
674                partition: "test_partition".into(),
675                compression: None,
676                codec_config: (),
677                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
678                write_buffer: NZUsize!(1024),
679            };
680            let index = 1u64;
681            let data = 10;
682            let mut journal = Journal::init(context.clone(), cfg.clone())
683                .await
684                .expect("Failed to initialize journal");
685
686            // Append an item to the journal
687            journal
688                .append(index, data)
689                .await
690                .expect("Failed to append data");
691
692            // Check metrics
693            let buffer = context.encode();
694            assert!(buffer.contains("tracked 1"));
695
696            // Drop and re-open the journal to simulate a restart
697            journal.sync(index).await.expect("Failed to sync journal");
698            drop(journal);
699            let journal = Journal::<_, i32>::init(context.clone(), cfg)
700                .await
701                .expect("Failed to re-initialize journal");
702
703            // Replay the journal and collect items
704            let mut items = Vec::new();
705            let stream = journal
706                .replay(0, 0, NZUsize!(1024))
707                .await
708                .expect("unable to setup replay");
709            pin_mut!(stream);
710            while let Some(result) = stream.next().await {
711                match result {
712                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
713                    Err(err) => panic!("Failed to read item: {err}"),
714                }
715            }
716
717            // Verify that the item was replayed correctly
718            assert_eq!(items.len(), 1);
719            assert_eq!(items[0].0, index);
720            assert_eq!(items[0].1, data);
721
722            // Check metrics
723            let buffer = context.encode();
724            assert!(buffer.contains("tracked 1"));
725        });
726    }
727
728    #[test_traced]
729    fn test_journal_multiple_appends_and_reads() {
730        // Initialize the deterministic context
731        let executor = deterministic::Runner::default();
732
733        // Start the test within the executor
734        executor.start(|context| async move {
735            // Create a journal configuration
736            let cfg = Config {
737                partition: "test_partition".into(),
738                compression: None,
739                codec_config: (),
740                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
741                write_buffer: NZUsize!(1024),
742            };
743
744            // Initialize the journal
745            let mut journal = Journal::init(context.clone(), cfg.clone())
746                .await
747                .expect("Failed to initialize journal");
748
749            // Append multiple items to different blobs
750            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
751            for (index, data) in &data_items {
752                journal
753                    .append(*index, *data)
754                    .await
755                    .expect("Failed to append data");
756                journal.sync(*index).await.expect("Failed to sync blob");
757            }
758
759            // Check metrics
760            let buffer = context.encode();
761            assert!(buffer.contains("tracked 3"));
762            assert!(buffer.contains("synced_total 4"));
763
764            // Drop and re-open the journal to simulate a restart
765            drop(journal);
766            let journal = Journal::init(context, cfg)
767                .await
768                .expect("Failed to re-initialize journal");
769
770            // Replay the journal and collect items
771            let mut items = Vec::<(u64, u32)>::new();
772            {
773                let stream = journal
774                    .replay(0, 0, NZUsize!(1024))
775                    .await
776                    .expect("unable to setup replay");
777                pin_mut!(stream);
778                while let Some(result) = stream.next().await {
779                    match result {
780                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
781                        Err(err) => panic!("Failed to read item: {err}"),
782                    }
783                }
784            }
785
786            // Verify that all items were replayed correctly
787            assert_eq!(items.len(), data_items.len());
788            for ((expected_index, expected_data), (actual_index, actual_data)) in
789                data_items.iter().zip(items.iter())
790            {
791                assert_eq!(actual_index, expected_index);
792                assert_eq!(actual_data, expected_data);
793            }
794
795            // Cleanup
796            journal.destroy().await.expect("Failed to destroy journal");
797        });
798    }
799
800    #[test_traced]
801    fn test_journal_prune_blobs() {
802        // Initialize the deterministic context
803        let executor = deterministic::Runner::default();
804
805        // Start the test within the executor
806        executor.start(|context| async move {
807            // Create a journal configuration
808            let cfg = Config {
809                partition: "test_partition".into(),
810                compression: None,
811                codec_config: (),
812                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
813                write_buffer: NZUsize!(1024),
814            };
815
816            // Initialize the journal
817            let mut journal = Journal::init(context.clone(), cfg.clone())
818                .await
819                .expect("Failed to initialize journal");
820
821            // Append items to multiple blobs
822            for index in 1u64..=5u64 {
823                journal
824                    .append(index, index)
825                    .await
826                    .expect("Failed to append data");
827                journal.sync(index).await.expect("Failed to sync blob");
828            }
829
830            // Add one item out-of-order
831            let data = 99;
832            journal
833                .append(2u64, data)
834                .await
835                .expect("Failed to append data");
836            journal.sync(2u64).await.expect("Failed to sync blob");
837
838            // Prune blobs with indices less than 3
839            journal.prune(3).await.expect("Failed to prune blobs");
840
841            // Check metrics
842            let buffer = context.encode();
843            assert!(buffer.contains("pruned_total 2"));
844
845            // Prune again with a section less than the previous one, should be a no-op
846            journal.prune(2).await.expect("Failed to no-op prune");
847            let buffer = context.encode();
848            assert!(buffer.contains("pruned_total 2"));
849
850            // Drop and re-open the journal to simulate a restart
851            drop(journal);
852            let mut journal = Journal::init(context.clone(), cfg.clone())
853                .await
854                .expect("Failed to re-initialize journal");
855
856            // Replay the journal and collect items
857            let mut items = Vec::<(u64, u64)>::new();
858            {
859                let stream = journal
860                    .replay(0, 0, NZUsize!(1024))
861                    .await
862                    .expect("unable to setup replay");
863                pin_mut!(stream);
864                while let Some(result) = stream.next().await {
865                    match result {
866                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
867                        Err(err) => panic!("Failed to read item: {err}"),
868                    }
869                }
870            }
871
872            // Verify that items from blobs 1 and 2 are not present
873            assert_eq!(items.len(), 3);
874            let expected_indices = [3u64, 4u64, 5u64];
875            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
876                assert_eq!(item.0, *expected_index);
877            }
878
879            // Prune all blobs
880            journal.prune(6).await.expect("Failed to prune blobs");
881
882            // Drop the journal
883            drop(journal);
884
885            // Ensure no remaining blobs exist
886            //
887            // Note: We don't remove the partition, so this does not error
888            // and instead returns an empty list of blobs.
889            assert!(context
890                .scan(&cfg.partition)
891                .await
892                .expect("Failed to list blobs")
893                .is_empty());
894        });
895    }
896
897    #[test_traced]
898    fn test_journal_prune_guard() {
899        let executor = deterministic::Runner::default();
900
901        executor.start(|context| async move {
902            let cfg = Config {
903                partition: "test_partition".into(),
904                compression: None,
905                codec_config: (),
906                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
907                write_buffer: NZUsize!(1024),
908            };
909
910            let mut journal = Journal::init(context.clone(), cfg.clone())
911                .await
912                .expect("Failed to initialize journal");
913
914            // Append items to sections 1-5
915            for section in 1u64..=5u64 {
916                journal
917                    .append(section, section as i32)
918                    .await
919                    .expect("Failed to append data");
920                journal.sync(section).await.expect("Failed to sync");
921            }
922
923            // Prune sections < 3
924            journal.prune(3).await.expect("Failed to prune");
925
926            // Test that accessing pruned sections returns the correct error
927
928            // Test append on pruned section
929            match journal.append(1, 100).await {
930                Err(Error::AlreadyPrunedToSection(3)) => {}
931                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
932            }
933
934            match journal.append(2, 100).await {
935                Err(Error::AlreadyPrunedToSection(3)) => {}
936                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
937            }
938
939            // Test get on pruned section
940            match journal.get(1, 0).await {
941                Err(Error::AlreadyPrunedToSection(3)) => {}
942                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
943            }
944
945            // Test size on pruned section
946            match journal.size(1).await {
947                Err(Error::AlreadyPrunedToSection(3)) => {}
948                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
949            }
950
951            // Test rewind on pruned section
952            match journal.rewind(2, 0).await {
953                Err(Error::AlreadyPrunedToSection(3)) => {}
954                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
955            }
956
957            // Test rewind_section on pruned section
958            match journal.rewind_section(1, 0).await {
959                Err(Error::AlreadyPrunedToSection(3)) => {}
960                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
961            }
962
963            // Test sync on pruned section
964            match journal.sync(2).await {
965                Err(Error::AlreadyPrunedToSection(3)) => {}
966                other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
967            }
968
969            // Test that accessing sections at or after the threshold works
970            assert!(journal.get(3, 0).await.is_ok());
971            assert!(journal.get(4, 0).await.is_ok());
972            assert!(journal.get(5, 0).await.is_ok());
973            assert!(journal.size(3).await.is_ok());
974            assert!(journal.sync(4).await.is_ok());
975
976            // Append to section at threshold should work
977            journal
978                .append(3, 999)
979                .await
980                .expect("Should be able to append to section 3");
981
982            // Prune more sections
983            journal.prune(5).await.expect("Failed to prune");
984
985            // Verify sections 3 and 4 are now pruned
986            match journal.get(3, 0).await {
987                Err(Error::AlreadyPrunedToSection(5)) => {}
988                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
989            }
990
991            match journal.get(4, 0).await {
992                Err(Error::AlreadyPrunedToSection(5)) => {}
993                other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
994            }
995
996            // Section 5 should still be accessible
997            assert!(journal.get(5, 0).await.is_ok());
998        });
999    }
1000
1001    #[test_traced]
1002    fn test_journal_prune_guard_across_restart() {
1003        let executor = deterministic::Runner::default();
1004
1005        executor.start(|context| async move {
1006            let cfg = Config {
1007                partition: "test_partition".into(),
1008                compression: None,
1009                codec_config: (),
1010                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1011                write_buffer: NZUsize!(1024),
1012            };
1013
1014            // First session: create and prune
1015            {
1016                let mut journal = Journal::init(context.clone(), cfg.clone())
1017                    .await
1018                    .expect("Failed to initialize journal");
1019
1020                for section in 1u64..=5u64 {
1021                    journal
1022                        .append(section, section as i32)
1023                        .await
1024                        .expect("Failed to append data");
1025                    journal.sync(section).await.expect("Failed to sync");
1026                }
1027
1028                journal.prune(3).await.expect("Failed to prune");
1029            }
1030
1031            // Second session: verify oldest_retained_section is reset
1032            {
1033                let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1034                    .await
1035                    .expect("Failed to re-initialize journal");
1036
1037                // But the actual sections 1 and 2 should be gone from storage
1038                // so get should return SectionOutOfRange, not AlreadyPrunedToSection
1039                match journal.get(1, 0).await {
1040                    Err(Error::SectionOutOfRange(1)) => {}
1041                    other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1042                }
1043
1044                match journal.get(2, 0).await {
1045                    Err(Error::SectionOutOfRange(2)) => {}
1046                    other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1047                }
1048
1049                // Sections 3-5 should still be accessible
1050                assert!(journal.get(3, 0).await.is_ok());
1051                assert!(journal.get(4, 0).await.is_ok());
1052                assert!(journal.get(5, 0).await.is_ok());
1053            }
1054        });
1055    }
1056
1057    #[test_traced]
1058    fn test_journal_with_invalid_blob_name() {
1059        // Initialize the deterministic context
1060        let executor = deterministic::Runner::default();
1061
1062        // Start the test within the executor
1063        executor.start(|context| async move {
1064            // Create a journal configuration
1065            let cfg = Config {
1066                partition: "test_partition".into(),
1067                compression: None,
1068                codec_config: (),
1069                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1070                write_buffer: NZUsize!(1024),
1071            };
1072
1073            // Manually create a blob with an invalid name (not 8 bytes)
1074            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1075            let (blob, _) = context
1076                .open(&cfg.partition, invalid_blob_name)
1077                .await
1078                .expect("Failed to create blob with invalid name");
1079            blob.sync().await.expect("Failed to sync blob");
1080
1081            // Attempt to initialize the journal
1082            let result = Journal::<_, u64>::init(context, cfg).await;
1083
1084            // Expect an error
1085            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1086        });
1087    }
1088
1089    #[test_traced]
1090    fn test_journal_read_size_missing() {
1091        // Initialize the deterministic context
1092        let executor = deterministic::Runner::default();
1093
1094        // Start the test within the executor
1095        executor.start(|context| async move {
1096            // Create a journal configuration
1097            let cfg = Config {
1098                partition: "test_partition".into(),
1099                compression: None,
1100                codec_config: (),
1101                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1102                write_buffer: NZUsize!(1024),
1103            };
1104
1105            // Manually create a blob with incomplete size data
1106            let section = 1u64;
1107            let blob_name = section.to_be_bytes();
1108            let (blob, _) = context
1109                .open(&cfg.partition, &blob_name)
1110                .await
1111                .expect("Failed to create blob");
1112
1113            // Write incomplete varint by encoding u32::MAX (5 bytes) and truncating to 1 byte
1114            let mut incomplete_data = Vec::new();
1115            UInt(u32::MAX).write(&mut incomplete_data);
1116            incomplete_data.truncate(1);
1117            blob.write_at(incomplete_data, 0)
1118                .await
1119                .expect("Failed to write incomplete data");
1120            blob.sync().await.expect("Failed to sync blob");
1121
1122            // Initialize the journal
1123            let journal = Journal::init(context, cfg)
1124                .await
1125                .expect("Failed to initialize journal");
1126
1127            // Attempt to replay the journal
1128            let stream = journal
1129                .replay(0, 0, NZUsize!(1024))
1130                .await
1131                .expect("unable to setup replay");
1132            pin_mut!(stream);
1133            let mut items = Vec::<(u64, u64)>::new();
1134            while let Some(result) = stream.next().await {
1135                match result {
1136                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1137                    Err(err) => panic!("Failed to read item: {err}"),
1138                }
1139            }
1140            assert!(items.is_empty());
1141        });
1142    }
1143
1144    #[test_traced]
1145    fn test_journal_read_item_missing() {
1146        // Initialize the deterministic context
1147        let executor = deterministic::Runner::default();
1148
1149        // Start the test within the executor
1150        executor.start(|context| async move {
1151            // Create a journal configuration
1152            let cfg = Config {
1153                partition: "test_partition".into(),
1154                compression: None,
1155                codec_config: (),
1156                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1157                write_buffer: NZUsize!(1024),
1158            };
1159
1160            // Manually create a blob with missing item data
1161            let section = 1u64;
1162            let blob_name = section.to_be_bytes();
1163            let (blob, _) = context
1164                .open(&cfg.partition, &blob_name)
1165                .await
1166                .expect("Failed to create blob");
1167
1168            // Write size but incomplete item data
1169            let item_size: u32 = 10; // Size indicates 10 bytes of data
1170            let mut buf = Vec::new();
1171            UInt(item_size).write(&mut buf); // Varint encoding
1172            let data = [2u8; 5];
1173            BufMut::put_slice(&mut buf, &data);
1174            blob.write_at(buf, 0)
1175                .await
1176                .expect("Failed to write incomplete item");
1177            blob.sync().await.expect("Failed to sync blob");
1178
1179            // Initialize the journal
1180            let journal = Journal::init(context, cfg)
1181                .await
1182                .expect("Failed to initialize journal");
1183
1184            // Attempt to replay the journal
1185            let stream = journal
1186                .replay(0, 0, NZUsize!(1024))
1187                .await
1188                .expect("unable to setup replay");
1189            pin_mut!(stream);
1190            let mut items = Vec::<(u64, u64)>::new();
1191            while let Some(result) = stream.next().await {
1192                match result {
1193                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1194                    Err(err) => panic!("Failed to read item: {err}"),
1195                }
1196            }
1197            assert!(items.is_empty());
1198        });
1199    }
1200
1201    #[test_traced]
1202    fn test_journal_read_checksum_missing() {
1203        // Initialize the deterministic context
1204        let executor = deterministic::Runner::default();
1205
1206        // Start the test within the executor
1207        executor.start(|context| async move {
1208            // Create a journal configuration
1209            let cfg = Config {
1210                partition: "test_partition".into(),
1211                compression: None,
1212                codec_config: (),
1213                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1214                write_buffer: NZUsize!(1024),
1215            };
1216
1217            // Manually create a blob with missing checksum
1218            let section = 1u64;
1219            let blob_name = section.to_be_bytes();
1220            let (blob, _) = context
1221                .open(&cfg.partition, &blob_name)
1222                .await
1223                .expect("Failed to create blob");
1224
1225            // Prepare item data
1226            let item_data = b"Test data";
1227            let item_size = item_data.len() as u32;
1228
1229            // Write size (varint) and data, but no checksum
1230            let mut buf = Vec::new();
1231            UInt(item_size).write(&mut buf);
1232            BufMut::put_slice(&mut buf, item_data);
1233            blob.write_at(buf, 0)
1234                .await
1235                .expect("Failed to write item without checksum");
1236
1237            blob.sync().await.expect("Failed to sync blob");
1238
1239            // Initialize the journal
1240            let journal = Journal::init(context, cfg)
1241                .await
1242                .expect("Failed to initialize journal");
1243
1244            // Attempt to replay the journal
1245            //
1246            // This will truncate the leftover bytes from our manual write.
1247            let stream = journal
1248                .replay(0, 0, NZUsize!(1024))
1249                .await
1250                .expect("unable to setup replay");
1251            pin_mut!(stream);
1252            let mut items = Vec::<(u64, u64)>::new();
1253            while let Some(result) = stream.next().await {
1254                match result {
1255                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1256                    Err(err) => panic!("Failed to read item: {err}"),
1257                }
1258            }
1259            assert!(items.is_empty());
1260        });
1261    }
1262
1263    #[test_traced]
1264    fn test_journal_read_checksum_mismatch() {
1265        // Initialize the deterministic context
1266        let executor = deterministic::Runner::default();
1267
1268        // Start the test within the executor
1269        executor.start(|context| async move {
1270            // Create a journal configuration
1271            let cfg = Config {
1272                partition: "test_partition".into(),
1273                compression: None,
1274                codec_config: (),
1275                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1276                write_buffer: NZUsize!(1024),
1277            };
1278
1279            // Manually create a blob with incorrect checksum
1280            let section = 1u64;
1281            let blob_name = section.to_be_bytes();
1282            let (blob, _) = context
1283                .open(&cfg.partition, &blob_name)
1284                .await
1285                .expect("Failed to create blob");
1286
1287            // Prepare item data
1288            let item_data = b"Test data";
1289            let item_size = item_data.len() as u32;
1290            let incorrect_checksum: u32 = 0xDEADBEEF;
1291
1292            // Write size (varint), data, and incorrect checksum
1293            let mut buf = Vec::new();
1294            UInt(item_size).write(&mut buf);
1295            BufMut::put_slice(&mut buf, item_data);
1296            buf.put_u32(incorrect_checksum);
1297            blob.write_at(buf, 0)
1298                .await
1299                .expect("Failed to write item with bad checksum");
1300
1301            blob.sync().await.expect("Failed to sync blob");
1302
1303            // Initialize the journal
1304            let journal = Journal::init(context.clone(), cfg.clone())
1305                .await
1306                .expect("Failed to initialize journal");
1307
1308            // Attempt to replay the journal
1309            {
1310                let stream = journal
1311                    .replay(0, 0, NZUsize!(1024))
1312                    .await
1313                    .expect("unable to setup replay");
1314                pin_mut!(stream);
1315                let mut items = Vec::<(u64, u64)>::new();
1316                while let Some(result) = stream.next().await {
1317                    match result {
1318                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1319                        Err(err) => panic!("Failed to read item: {err}"),
1320                    }
1321                }
1322                assert!(items.is_empty());
1323            }
1324            drop(journal);
1325
1326            // Confirm blob is expected length
1327            let (_, blob_size) = context
1328                .open(&cfg.partition, &section.to_be_bytes())
1329                .await
1330                .expect("Failed to open blob");
1331            assert_eq!(blob_size, 0);
1332        });
1333    }
1334
1335    #[test_traced]
1336    fn test_journal_truncation_recovery() {
1337        // Initialize the deterministic context
1338        let executor = deterministic::Runner::default();
1339
1340        // Start the test within the executor
1341        executor.start(|context| async move {
1342            // Create a journal configuration
1343            let cfg = Config {
1344                partition: "test_partition".into(),
1345                compression: None,
1346                codec_config: (),
1347                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1348                write_buffer: NZUsize!(1024),
1349            };
1350
1351            // Initialize the journal
1352            let mut journal = Journal::init(context.clone(), cfg.clone())
1353                .await
1354                .expect("Failed to initialize journal");
1355
1356            // Append 1 item to the first index
1357            journal.append(1, 1).await.expect("Failed to append data");
1358
1359            // Append multiple items to the second section
1360            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1361            for (index, data) in &data_items {
1362                journal
1363                    .append(*index, *data)
1364                    .await
1365                    .expect("Failed to append data");
1366                journal.sync(*index).await.expect("Failed to sync blob");
1367            }
1368
1369            // Sync all sections and drop the journal
1370            journal.sync_all().await.expect("Failed to sync");
1371            drop(journal);
1372
1373            // Manually corrupt the end of the second blob
1374            let (blob, blob_size) = context
1375                .open(&cfg.partition, &2u64.to_be_bytes())
1376                .await
1377                .expect("Failed to open blob");
1378            blob.resize(blob_size - 4)
1379                .await
1380                .expect("Failed to corrupt blob");
1381            blob.sync().await.expect("Failed to sync blob");
1382
1383            // Re-initialize the journal to simulate a restart
1384            let journal = Journal::init(context.clone(), cfg.clone())
1385                .await
1386                .expect("Failed to re-initialize journal");
1387
1388            // Attempt to replay the journal
1389            let mut items = Vec::<(u64, u32)>::new();
1390            {
1391                let stream = journal
1392                    .replay(0, 0, NZUsize!(1024))
1393                    .await
1394                    .expect("unable to setup replay");
1395                pin_mut!(stream);
1396                while let Some(result) = stream.next().await {
1397                    match result {
1398                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1399                        Err(err) => panic!("Failed to read item: {err}"),
1400                    }
1401                }
1402            }
1403            drop(journal);
1404
1405            // Verify that replay stopped after corruption detected (the second blob).
1406            assert_eq!(items.len(), 1);
1407            assert_eq!(items[0].0, 1);
1408            assert_eq!(items[0].1, 1);
1409
1410            // Confirm second blob was truncated.
1411            let (_, blob_size) = context
1412                .open(&cfg.partition, &2u64.to_be_bytes())
1413                .await
1414                .expect("Failed to open blob");
1415            assert_eq!(blob_size, 0);
1416
1417            // Attempt to replay journal after truncation
1418            let mut journal = Journal::init(context.clone(), cfg.clone())
1419                .await
1420                .expect("Failed to re-initialize journal");
1421
1422            // Attempt to replay the journal
1423            let mut items = Vec::<(u64, u32)>::new();
1424            {
1425                let stream = journal
1426                    .replay(0, 0, NZUsize!(1024))
1427                    .await
1428                    .expect("unable to setup replay");
1429                pin_mut!(stream);
1430                while let Some(result) = stream.next().await {
1431                    match result {
1432                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1433                        Err(err) => panic!("Failed to read item: {err}"),
1434                    }
1435                }
1436            }
1437
1438            // Verify that only non-corrupted items were replayed
1439            assert_eq!(items.len(), 1);
1440            assert_eq!(items[0].0, 1);
1441            assert_eq!(items[0].1, 1);
1442
1443            // Append a new item to truncated partition
1444            let (_offset, _) = journal.append(2, 5).await.expect("Failed to append data");
1445            journal.sync(2).await.expect("Failed to sync blob");
1446
1447            // Get the new item (offset is 0 since blob was truncated)
1448            let item = journal.get(2, 0).await.expect("Failed to get item");
1449            assert_eq!(item, 5);
1450
1451            // Drop the journal (data already synced)
1452            drop(journal);
1453
1454            // Re-initialize the journal to simulate a restart
1455            let journal = Journal::init(context.clone(), cfg.clone())
1456                .await
1457                .expect("Failed to re-initialize journal");
1458
1459            // Attempt to replay the journal
1460            let mut items = Vec::<(u64, u32)>::new();
1461            {
1462                let stream = journal
1463                    .replay(0, 0, NZUsize!(1024))
1464                    .await
1465                    .expect("unable to setup replay");
1466                pin_mut!(stream);
1467                while let Some(result) = stream.next().await {
1468                    match result {
1469                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1470                        Err(err) => panic!("Failed to read item: {err}"),
1471                    }
1472                }
1473            }
1474
1475            // Verify that only non-corrupted items were replayed
1476            assert_eq!(items.len(), 2);
1477            assert_eq!(items[0].0, 1);
1478            assert_eq!(items[0].1, 1);
1479            assert_eq!(items[1].0, 2);
1480            assert_eq!(items[1].1, 5);
1481        });
1482    }
1483
1484    #[test_traced]
1485    fn test_journal_handling_extra_data() {
1486        // Initialize the deterministic context
1487        let executor = deterministic::Runner::default();
1488
1489        // Start the test within the executor
1490        executor.start(|context| async move {
1491            // Create a journal configuration
1492            let cfg = Config {
1493                partition: "test_partition".into(),
1494                compression: None,
1495                codec_config: (),
1496                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1497                write_buffer: NZUsize!(1024),
1498            };
1499
1500            // Initialize the journal
1501            let mut journal = Journal::init(context.clone(), cfg.clone())
1502                .await
1503                .expect("Failed to initialize journal");
1504
1505            // Append 1 item to the first index
1506            journal.append(1, 1).await.expect("Failed to append data");
1507
1508            // Append multiple items to the second index
1509            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1510            for (index, data) in &data_items {
1511                journal
1512                    .append(*index, *data)
1513                    .await
1514                    .expect("Failed to append data");
1515                journal.sync(*index).await.expect("Failed to sync blob");
1516            }
1517
1518            // Sync all sections and drop the journal
1519            journal.sync_all().await.expect("Failed to sync");
1520            drop(journal);
1521
1522            // Manually add extra data to the end of the second blob
1523            let (blob, blob_size) = context
1524                .open(&cfg.partition, &2u64.to_be_bytes())
1525                .await
1526                .expect("Failed to open blob");
1527            blob.write_at(vec![0u8; 16], blob_size)
1528                .await
1529                .expect("Failed to add extra data");
1530            blob.sync().await.expect("Failed to sync blob");
1531
1532            // Re-initialize the journal to simulate a restart
1533            let journal = Journal::init(context, cfg)
1534                .await
1535                .expect("Failed to re-initialize journal");
1536
1537            // Attempt to replay the journal
1538            let mut items = Vec::<(u64, i32)>::new();
1539            let stream = journal
1540                .replay(0, 0, NZUsize!(1024))
1541                .await
1542                .expect("unable to setup replay");
1543            pin_mut!(stream);
1544            while let Some(result) = stream.next().await {
1545                match result {
1546                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1547                    Err(err) => panic!("Failed to read item: {err}"),
1548                }
1549            }
1550        });
1551    }
1552
1553    #[test_traced]
1554    fn test_journal_rewind() {
1555        // Initialize the deterministic context
1556        let executor = deterministic::Runner::default();
1557        executor.start(|context| async move {
1558            // Create journal
1559            let cfg = Config {
1560                partition: "test_partition".to_string(),
1561                compression: None,
1562                codec_config: (),
1563                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1564                write_buffer: NZUsize!(1024),
1565            };
1566            let mut journal = Journal::init(context, cfg).await.unwrap();
1567
1568            // Check size of non-existent section
1569            let size = journal.size(1).await.unwrap();
1570            assert_eq!(size, 0);
1571
1572            // Append data to section 1
1573            journal.append(1, 42i32).await.unwrap();
1574
1575            // Check size of section 1 - should be greater than 0
1576            let size = journal.size(1).await.unwrap();
1577            assert!(size > 0);
1578
1579            // Append more data and verify size increases
1580            journal.append(1, 43i32).await.unwrap();
1581            let new_size = journal.size(1).await.unwrap();
1582            assert!(new_size > size);
1583
1584            // Check size of different section - should still be 0
1585            let size = journal.size(2).await.unwrap();
1586            assert_eq!(size, 0);
1587
1588            // Append data to section 2
1589            journal.append(2, 44i32).await.unwrap();
1590
1591            // Check size of section 2 - should be greater than 0
1592            let size = journal.size(2).await.unwrap();
1593            assert!(size > 0);
1594
1595            // Rollback everything in section 1 and 2
1596            journal.rewind(1, 0).await.unwrap();
1597
1598            // Check size of section 1 - should be 0
1599            let size = journal.size(1).await.unwrap();
1600            assert_eq!(size, 0);
1601
1602            // Check size of section 2 - should be 0
1603            let size = journal.size(2).await.unwrap();
1604            assert_eq!(size, 0);
1605        });
1606    }
1607
1608    #[test_traced]
1609    fn test_journal_rewind_section() {
1610        // Initialize the deterministic context
1611        let executor = deterministic::Runner::default();
1612        executor.start(|context| async move {
1613            // Create journal
1614            let cfg = Config {
1615                partition: "test_partition".to_string(),
1616                compression: None,
1617                codec_config: (),
1618                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1619                write_buffer: NZUsize!(1024),
1620            };
1621            let mut journal = Journal::init(context, cfg).await.unwrap();
1622
1623            // Check size of non-existent section
1624            let size = journal.size(1).await.unwrap();
1625            assert_eq!(size, 0);
1626
1627            // Append data to section 1
1628            journal.append(1, 42i32).await.unwrap();
1629
1630            // Check size of section 1 - should be greater than 0
1631            let size = journal.size(1).await.unwrap();
1632            assert!(size > 0);
1633
1634            // Append more data and verify size increases
1635            journal.append(1, 43i32).await.unwrap();
1636            let new_size = journal.size(1).await.unwrap();
1637            assert!(new_size > size);
1638
1639            // Check size of different section - should still be 0
1640            let size = journal.size(2).await.unwrap();
1641            assert_eq!(size, 0);
1642
1643            // Append data to section 2
1644            journal.append(2, 44i32).await.unwrap();
1645
1646            // Check size of section 2 - should be greater than 0
1647            let size = journal.size(2).await.unwrap();
1648            assert!(size > 0);
1649
1650            // Rollback everything in section 1
1651            journal.rewind_section(1, 0).await.unwrap();
1652
1653            // Check size of section 1 - should be 0
1654            let size = journal.size(1).await.unwrap();
1655            assert_eq!(size, 0);
1656
1657            // Check size of section 2 - should be greater than 0
1658            let size = journal.size(2).await.unwrap();
1659            assert!(size > 0);
1660        });
1661    }
1662
1663    #[test_traced]
1664    fn test_journal_small_items() {
1665        let executor = deterministic::Runner::default();
1666        executor.start(|context| async move {
1667            let cfg = Config {
1668                partition: "test_partition".into(),
1669                compression: None,
1670                codec_config: (),
1671                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1672                write_buffer: NZUsize!(1024),
1673            };
1674
1675            let mut journal = Journal::init(context.clone(), cfg.clone())
1676                .await
1677                .expect("Failed to initialize journal");
1678
1679            // Append many small (1-byte) items to the same section
1680            let num_items = 100;
1681            let mut offsets = Vec::new();
1682            for i in 0..num_items {
1683                let (offset, size) = journal
1684                    .append(1, i as u8)
1685                    .await
1686                    .expect("Failed to append data");
1687                assert_eq!(size, 1, "u8 should encode to 1 byte");
1688                offsets.push(offset);
1689            }
1690            journal.sync(1).await.expect("Failed to sync");
1691
1692            // Read each item back via random access
1693            for (i, &offset) in offsets.iter().enumerate() {
1694                let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1695                assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1696            }
1697
1698            // Drop and reopen to test replay
1699            drop(journal);
1700            let journal = Journal::<_, u8>::init(context, cfg)
1701                .await
1702                .expect("Failed to re-initialize journal");
1703
1704            // Replay and verify all items
1705            let stream = journal
1706                .replay(0, 0, NZUsize!(1024))
1707                .await
1708                .expect("Failed to setup replay");
1709            pin_mut!(stream);
1710
1711            let mut count = 0;
1712            while let Some(result) = stream.next().await {
1713                let (section, offset, size, item) = result.expect("Failed to replay item");
1714                assert_eq!(section, 1);
1715                assert_eq!(offset, offsets[count]);
1716                assert_eq!(size, 1);
1717                assert_eq!(item, count as u8);
1718                count += 1;
1719            }
1720            assert_eq!(count, num_items, "Should replay all items");
1721        });
1722    }
1723
1724    #[test_traced]
1725    fn test_journal_rewind_many_sections() {
1726        let executor = deterministic::Runner::default();
1727        executor.start(|context| async move {
1728            let cfg = Config {
1729                partition: "test_partition".to_string(),
1730                compression: None,
1731                codec_config: (),
1732                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1733                write_buffer: NZUsize!(1024),
1734            };
1735            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1736
1737            // Create sections 1-10 with data
1738            for section in 1u64..=10 {
1739                journal.append(section, section as i32).await.unwrap();
1740            }
1741            journal.sync_all().await.unwrap();
1742
1743            // Verify all sections exist
1744            for section in 1u64..=10 {
1745                let size = journal.size(section).await.unwrap();
1746                assert!(size > 0, "section {section} should have data");
1747            }
1748
1749            // Rewind to section 5 (should remove sections 6-10)
1750            journal
1751                .rewind(5, journal.size(5).await.unwrap())
1752                .await
1753                .unwrap();
1754
1755            // Verify sections 1-5 still exist with correct data
1756            for section in 1u64..=5 {
1757                let size = journal.size(section).await.unwrap();
1758                assert!(size > 0, "section {section} should still have data");
1759            }
1760
1761            // Verify sections 6-10 are removed (size should be 0)
1762            for section in 6u64..=10 {
1763                let size = journal.size(section).await.unwrap();
1764                assert_eq!(size, 0, "section {section} should be removed");
1765            }
1766
1767            // Verify data integrity via replay
1768            {
1769                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1770                pin_mut!(stream);
1771                let mut items = Vec::new();
1772                while let Some(result) = stream.next().await {
1773                    let (section, _, _, item) = result.unwrap();
1774                    items.push((section, item));
1775                }
1776                assert_eq!(items.len(), 5);
1777                for (i, (section, item)) in items.iter().enumerate() {
1778                    assert_eq!(*section, (i + 1) as u64);
1779                    assert_eq!(*item, (i + 1) as i32);
1780                }
1781            }
1782
1783            journal.destroy().await.unwrap();
1784        });
1785    }
1786
1787    #[test_traced]
1788    fn test_journal_rewind_partial_truncation() {
1789        let executor = deterministic::Runner::default();
1790        executor.start(|context| async move {
1791            let cfg = Config {
1792                partition: "test_partition".to_string(),
1793                compression: None,
1794                codec_config: (),
1795                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1796                write_buffer: NZUsize!(1024),
1797            };
1798            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1799
1800            // Append 5 items and record sizes after each
1801            let mut sizes = Vec::new();
1802            for i in 0..5 {
1803                journal.append(1, i).await.unwrap();
1804                journal.sync(1).await.unwrap();
1805                sizes.push(journal.size(1).await.unwrap());
1806            }
1807
1808            // Rewind to keep only first 3 items
1809            let target_size = sizes[2];
1810            journal.rewind(1, target_size).await.unwrap();
1811
1812            // Verify size is correct
1813            let new_size = journal.size(1).await.unwrap();
1814            assert_eq!(new_size, target_size);
1815
1816            // Verify first 3 items via replay
1817            {
1818                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1819                pin_mut!(stream);
1820                let mut items = Vec::new();
1821                while let Some(result) = stream.next().await {
1822                    let (_, _, _, item) = result.unwrap();
1823                    items.push(item);
1824                }
1825                assert_eq!(items.len(), 3);
1826                for (i, item) in items.iter().enumerate() {
1827                    assert_eq!(*item, i as i32);
1828                }
1829            }
1830
1831            journal.destroy().await.unwrap();
1832        });
1833    }
1834
1835    #[test_traced]
1836    fn test_journal_rewind_nonexistent_target() {
1837        let executor = deterministic::Runner::default();
1838        executor.start(|context| async move {
1839            let cfg = Config {
1840                partition: "test_partition".to_string(),
1841                compression: None,
1842                codec_config: (),
1843                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1844                write_buffer: NZUsize!(1024),
1845            };
1846            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1847
1848            // Create sections 5, 6, 7 (skip 1-4)
1849            for section in 5u64..=7 {
1850                journal.append(section, section as i32).await.unwrap();
1851            }
1852            journal.sync_all().await.unwrap();
1853
1854            // Rewind to section 3 (doesn't exist)
1855            journal.rewind(3, 0).await.unwrap();
1856
1857            // Verify sections 5, 6, 7 are removed
1858            for section in 5u64..=7 {
1859                let size = journal.size(section).await.unwrap();
1860                assert_eq!(size, 0, "section {section} should be removed");
1861            }
1862
1863            // Verify replay returns nothing
1864            {
1865                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1866                pin_mut!(stream);
1867                let items: Vec<_> = stream.collect().await;
1868                assert!(items.is_empty());
1869            }
1870
1871            journal.destroy().await.unwrap();
1872        });
1873    }
1874
1875    #[test_traced]
1876    fn test_journal_rewind_persistence() {
1877        let executor = deterministic::Runner::default();
1878        executor.start(|context| async move {
1879            let cfg = Config {
1880                partition: "test_partition".to_string(),
1881                compression: None,
1882                codec_config: (),
1883                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1884                write_buffer: NZUsize!(1024),
1885            };
1886
1887            // Create sections 1-5 with data
1888            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1889            for section in 1u64..=5 {
1890                journal.append(section, section as i32).await.unwrap();
1891            }
1892            journal.sync_all().await.unwrap();
1893
1894            // Rewind to section 2
1895            let size = journal.size(2).await.unwrap();
1896            journal.rewind(2, size).await.unwrap();
1897            journal.sync_all().await.unwrap();
1898            drop(journal);
1899
1900            // Re-init and verify only sections 1-2 exist
1901            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1902                .await
1903                .unwrap();
1904
1905            // Verify sections 1-2 have data
1906            for section in 1u64..=2 {
1907                let size = journal.size(section).await.unwrap();
1908                assert!(size > 0, "section {section} should have data after restart");
1909            }
1910
1911            // Verify sections 3-5 are gone
1912            for section in 3u64..=5 {
1913                let size = journal.size(section).await.unwrap();
1914                assert_eq!(size, 0, "section {section} should be gone after restart");
1915            }
1916
1917            // Verify data integrity via replay
1918            {
1919                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1920                pin_mut!(stream);
1921                let mut items = Vec::new();
1922                while let Some(result) = stream.next().await {
1923                    let (section, _, _, item) = result.unwrap();
1924                    items.push((section, item));
1925                }
1926                assert_eq!(items.len(), 2);
1927                assert_eq!(items[0], (1, 1));
1928                assert_eq!(items[1], (2, 2));
1929            }
1930
1931            journal.destroy().await.unwrap();
1932        });
1933    }
1934
1935    #[test_traced]
1936    fn test_journal_rewind_to_zero_removes_all_newer() {
1937        let executor = deterministic::Runner::default();
1938        executor.start(|context| async move {
1939            let cfg = Config {
1940                partition: "test_partition".to_string(),
1941                compression: None,
1942                codec_config: (),
1943                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1944                write_buffer: NZUsize!(1024),
1945            };
1946            let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1947
1948            // Create sections 1, 2, 3
1949            for section in 1u64..=3 {
1950                journal.append(section, section as i32).await.unwrap();
1951            }
1952            journal.sync_all().await.unwrap();
1953
1954            // Rewind section 1 to size 0
1955            journal.rewind(1, 0).await.unwrap();
1956
1957            // Verify section 1 exists but is empty
1958            let size = journal.size(1).await.unwrap();
1959            assert_eq!(size, 0, "section 1 should be empty");
1960
1961            // Verify sections 2, 3 are completely removed
1962            for section in 2u64..=3 {
1963                let size = journal.size(section).await.unwrap();
1964                assert_eq!(size, 0, "section {section} should be removed");
1965            }
1966
1967            // Verify replay returns nothing
1968            {
1969                let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1970                pin_mut!(stream);
1971                let items: Vec<_> = stream.collect().await;
1972                assert!(items.is_empty());
1973            }
1974
1975            journal.destroy().await.unwrap();
1976        });
1977    }
1978
1979    #[test_traced]
1980    fn test_journal_replay_start_offset_with_trailing_bytes() {
1981        // Regression: valid_offset must be initialized to start_offset, not 0.
1982        let executor = deterministic::Runner::default();
1983        executor.start(|context| async move {
1984            let cfg = Config {
1985                partition: "test_partition".into(),
1986                compression: None,
1987                codec_config: (),
1988                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1989                write_buffer: NZUsize!(1024),
1990            };
1991            let mut journal = Journal::init(context.clone(), cfg.clone())
1992                .await
1993                .expect("Failed to initialize journal");
1994
1995            // Append several items to build up valid data
1996            for i in 0..5i32 {
1997                journal.append(1, i).await.unwrap();
1998            }
1999            journal.sync(1).await.unwrap();
2000            let valid_logical_size = journal.size(1).await.unwrap();
2001            drop(journal);
2002
2003            // Get the physical blob size before corruption
2004            let (blob, physical_size_before) = context
2005                .open(&cfg.partition, &1u64.to_be_bytes())
2006                .await
2007                .unwrap();
2008
2009            // Write incomplete varint: 0xFF has continuation bit set, needs more bytes
2010            // This creates 2 trailing bytes that cannot form a valid item
2011            blob.write_at(vec![0xFF, 0xFF], physical_size_before)
2012                .await
2013                .unwrap();
2014            blob.sync().await.unwrap();
2015
2016            // Reopen journal and replay starting PAST all valid items
2017            // (start_offset = valid_logical_size means we skip all valid data)
2018            // The first thing encountered will be the trailing corrupt bytes
2019            let start_offset = valid_logical_size;
2020            {
2021                let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2022                    .await
2023                    .unwrap();
2024
2025                let stream = journal
2026                    .replay(1, start_offset, NZUsize!(1024))
2027                    .await
2028                    .unwrap();
2029                pin_mut!(stream);
2030
2031                // Consume the stream - should detect trailing bytes and truncate
2032                while let Some(_result) = stream.next().await {}
2033            }
2034
2035            // Verify that valid data before start_offset was NOT lost
2036            let (_, physical_size_after) = context
2037                .open(&cfg.partition, &1u64.to_be_bytes())
2038                .await
2039                .unwrap();
2040
2041            // The blob should have been truncated back to the valid physical size
2042            // (removing the trailing corrupt bytes) but NOT to 0
2043            assert!(
2044                physical_size_after >= physical_size_before,
2045                "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2046                 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2047                 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2048            );
2049        });
2050    }
2051
2052    #[test_traced]
2053    fn test_journal_large_item_spanning_pages() {
2054        // 2048 bytes spans 2 full pages (PAGE_SIZE = 1024).
2055        const LARGE_SIZE: usize = 2048;
2056        type LargeItem = [u8; LARGE_SIZE];
2057
2058        let executor = deterministic::Runner::default();
2059        executor.start(|context| async move {
2060            let cfg = Config {
2061                partition: "test_partition".into(),
2062                compression: None,
2063                codec_config: (),
2064                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2065                write_buffer: NZUsize!(4096),
2066            };
2067            let mut journal = Journal::init(context.clone(), cfg.clone())
2068                .await
2069                .expect("Failed to initialize journal");
2070
2071            // Create a large item that spans multiple pages.
2072            let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2073            for (i, byte) in large_data.iter_mut().enumerate() {
2074                *byte = (i % 256) as u8;
2075            }
2076            assert!(
2077                LARGE_SIZE > PAGE_SIZE.get() as usize,
2078                "Item must be larger than page size"
2079            );
2080
2081            // Append the large item
2082            let (offset, size) = journal
2083                .append(1, large_data)
2084                .await
2085                .expect("Failed to append large item");
2086            assert_eq!(size as usize, LARGE_SIZE);
2087            journal.sync(1).await.expect("Failed to sync");
2088
2089            // Read the item back via random access
2090            let retrieved: LargeItem = journal
2091                .get(1, offset)
2092                .await
2093                .expect("Failed to get large item");
2094            assert_eq!(retrieved, large_data, "Random access read mismatch");
2095
2096            // Drop and reopen to test replay
2097            drop(journal);
2098            let journal = Journal::<_, LargeItem>::init(context.clone(), cfg.clone())
2099                .await
2100                .expect("Failed to re-initialize journal");
2101
2102            // Replay and verify the large item
2103            {
2104                let stream = journal
2105                    .replay(0, 0, NZUsize!(1024))
2106                    .await
2107                    .expect("Failed to setup replay");
2108                pin_mut!(stream);
2109
2110                let mut items = Vec::new();
2111                while let Some(result) = stream.next().await {
2112                    let (section, off, sz, item) = result.expect("Failed to replay item");
2113                    items.push((section, off, sz, item));
2114                }
2115
2116                assert_eq!(items.len(), 1, "Should have exactly one item");
2117                let (section, off, sz, item) = &items[0];
2118                assert_eq!(*section, 1);
2119                assert_eq!(*off, offset);
2120                assert_eq!(*sz as usize, LARGE_SIZE);
2121                assert_eq!(*item, large_data, "Replay read mismatch");
2122            }
2123
2124            journal.destroy().await.unwrap();
2125        });
2126    }
2127
2128    #[test_traced]
2129    fn test_journal_non_contiguous_sections() {
2130        // Test that sections with gaps in numbering work correctly.
2131        // Sections 1, 5, 10 should all be independent and accessible.
2132        let executor = deterministic::Runner::default();
2133        executor.start(|context| async move {
2134            let cfg = Config {
2135                partition: "test_partition".into(),
2136                compression: None,
2137                codec_config: (),
2138                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2139                write_buffer: NZUsize!(1024),
2140            };
2141            let mut journal = Journal::init(context.clone(), cfg.clone())
2142                .await
2143                .expect("Failed to initialize journal");
2144
2145            // Create sections with gaps: 1, 5, 10
2146            let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2147            let mut offsets = Vec::new();
2148
2149            for (section, data) in &sections_and_data {
2150                let (offset, _) = journal
2151                    .append(*section, *data)
2152                    .await
2153                    .expect("Failed to append");
2154                offsets.push(offset);
2155            }
2156            journal.sync_all().await.expect("Failed to sync");
2157
2158            // Verify random access to each section
2159            for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2160                let retrieved: i32 = journal
2161                    .get(*section, offsets[i])
2162                    .await
2163                    .expect("Failed to get item");
2164                assert_eq!(retrieved, *expected_data);
2165            }
2166
2167            // Verify non-existent sections return appropriate errors
2168            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2169                let result = journal.get(missing_section, 0).await;
2170                assert!(
2171                    matches!(result, Err(Error::SectionOutOfRange(_))),
2172                    "Expected SectionOutOfRange for section {}, got {:?}",
2173                    missing_section,
2174                    result
2175                );
2176            }
2177
2178            // Drop and reopen to test replay
2179            drop(journal);
2180            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2181                .await
2182                .expect("Failed to re-initialize journal");
2183
2184            // Replay and verify all items in order
2185            {
2186                let stream = journal
2187                    .replay(0, 0, NZUsize!(1024))
2188                    .await
2189                    .expect("Failed to setup replay");
2190                pin_mut!(stream);
2191
2192                let mut items = Vec::new();
2193                while let Some(result) = stream.next().await {
2194                    let (section, _, _, item) = result.expect("Failed to replay item");
2195                    items.push((section, item));
2196                }
2197
2198                assert_eq!(items.len(), 3, "Should have 3 items");
2199                assert_eq!(items[0], (1, 100));
2200                assert_eq!(items[1], (5, 500));
2201                assert_eq!(items[2], (10, 1000));
2202            }
2203
2204            // Test replay starting from middle section (5)
2205            {
2206                let stream = journal
2207                    .replay(5, 0, NZUsize!(1024))
2208                    .await
2209                    .expect("Failed to setup replay from section 5");
2210                pin_mut!(stream);
2211
2212                let mut items = Vec::new();
2213                while let Some(result) = stream.next().await {
2214                    let (section, _, _, item) = result.expect("Failed to replay item");
2215                    items.push((section, item));
2216                }
2217
2218                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2219                assert_eq!(items[0], (5, 500));
2220                assert_eq!(items[1], (10, 1000));
2221            }
2222
2223            // Test replay starting from non-existent section (should skip to next)
2224            {
2225                let stream = journal
2226                    .replay(3, 0, NZUsize!(1024))
2227                    .await
2228                    .expect("Failed to setup replay from section 3");
2229                pin_mut!(stream);
2230
2231                let mut items = Vec::new();
2232                while let Some(result) = stream.next().await {
2233                    let (section, _, _, item) = result.expect("Failed to replay item");
2234                    items.push((section, item));
2235                }
2236
2237                // Should get sections 5 and 10 (skipping non-existent 3, 4)
2238                assert_eq!(items.len(), 2);
2239                assert_eq!(items[0], (5, 500));
2240                assert_eq!(items[1], (10, 1000));
2241            }
2242
2243            journal.destroy().await.unwrap();
2244        });
2245    }
2246
2247    #[test_traced]
2248    fn test_journal_empty_section_in_middle() {
2249        // Test that replay correctly handles an empty section between sections with data.
2250        // Section 1 has data, section 2 is empty, section 3 has data.
2251        let executor = deterministic::Runner::default();
2252        executor.start(|context| async move {
2253            let cfg = Config {
2254                partition: "test_partition".into(),
2255                compression: None,
2256                codec_config: (),
2257                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2258                write_buffer: NZUsize!(1024),
2259            };
2260            let mut journal = Journal::init(context.clone(), cfg.clone())
2261                .await
2262                .expect("Failed to initialize journal");
2263
2264            // Append to section 1
2265            journal.append(1, 100i32).await.expect("Failed to append");
2266
2267            // Create section 2 but don't append anything - just sync to create the blob
2268            // Actually, we need to append something and then rewind to make it empty
2269            journal.append(2, 200i32).await.expect("Failed to append");
2270            journal.sync(2).await.expect("Failed to sync");
2271            journal
2272                .rewind_section(2, 0)
2273                .await
2274                .expect("Failed to rewind");
2275
2276            // Append to section 3
2277            journal.append(3, 300i32).await.expect("Failed to append");
2278
2279            journal.sync_all().await.expect("Failed to sync");
2280
2281            // Verify section sizes
2282            assert!(journal.size(1).await.unwrap() > 0);
2283            assert_eq!(journal.size(2).await.unwrap(), 0);
2284            assert!(journal.size(3).await.unwrap() > 0);
2285
2286            // Drop and reopen to test replay
2287            drop(journal);
2288            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2289                .await
2290                .expect("Failed to re-initialize journal");
2291
2292            // Replay all - should get items from sections 1 and 3, skipping empty section 2
2293            {
2294                let stream = journal
2295                    .replay(0, 0, NZUsize!(1024))
2296                    .await
2297                    .expect("Failed to setup replay");
2298                pin_mut!(stream);
2299
2300                let mut items = Vec::new();
2301                while let Some(result) = stream.next().await {
2302                    let (section, _, _, item) = result.expect("Failed to replay item");
2303                    items.push((section, item));
2304                }
2305
2306                assert_eq!(
2307                    items.len(),
2308                    2,
2309                    "Should have 2 items (skipping empty section)"
2310                );
2311                assert_eq!(items[0], (1, 100));
2312                assert_eq!(items[1], (3, 300));
2313            }
2314
2315            // Replay starting from empty section 2 - should get only section 3
2316            {
2317                let stream = journal
2318                    .replay(2, 0, NZUsize!(1024))
2319                    .await
2320                    .expect("Failed to setup replay from section 2");
2321                pin_mut!(stream);
2322
2323                let mut items = Vec::new();
2324                while let Some(result) = stream.next().await {
2325                    let (section, _, _, item) = result.expect("Failed to replay item");
2326                    items.push((section, item));
2327                }
2328
2329                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2330                assert_eq!(items[0], (3, 300));
2331            }
2332
2333            journal.destroy().await.unwrap();
2334        });
2335    }
2336
2337    #[test_traced]
2338    fn test_journal_item_exactly_page_size() {
2339        // Test that items exactly equal to PAGE_SIZE work correctly.
2340        // This is a boundary condition where item fills exactly one page.
2341        const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2342        type ExactItem = [u8; ITEM_SIZE];
2343
2344        let executor = deterministic::Runner::default();
2345        executor.start(|context| async move {
2346            let cfg = Config {
2347                partition: "test_partition".into(),
2348                compression: None,
2349                codec_config: (),
2350                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2351                write_buffer: NZUsize!(4096),
2352            };
2353            let mut journal = Journal::init(context.clone(), cfg.clone())
2354                .await
2355                .expect("Failed to initialize journal");
2356
2357            // Create an item exactly PAGE_SIZE bytes
2358            let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2359            for (i, byte) in exact_data.iter_mut().enumerate() {
2360                *byte = (i % 256) as u8;
2361            }
2362
2363            // Append the exact-size item
2364            let (offset, size) = journal
2365                .append(1, exact_data)
2366                .await
2367                .expect("Failed to append exact item");
2368            assert_eq!(size as usize, ITEM_SIZE);
2369            journal.sync(1).await.expect("Failed to sync");
2370
2371            // Read the item back via random access
2372            let retrieved: ExactItem = journal
2373                .get(1, offset)
2374                .await
2375                .expect("Failed to get exact item");
2376            assert_eq!(retrieved, exact_data, "Random access read mismatch");
2377
2378            // Drop and reopen to test replay
2379            drop(journal);
2380            let journal = Journal::<_, ExactItem>::init(context.clone(), cfg.clone())
2381                .await
2382                .expect("Failed to re-initialize journal");
2383
2384            // Replay and verify
2385            {
2386                let stream = journal
2387                    .replay(0, 0, NZUsize!(1024))
2388                    .await
2389                    .expect("Failed to setup replay");
2390                pin_mut!(stream);
2391
2392                let mut items = Vec::new();
2393                while let Some(result) = stream.next().await {
2394                    let (section, off, sz, item) = result.expect("Failed to replay item");
2395                    items.push((section, off, sz, item));
2396                }
2397
2398                assert_eq!(items.len(), 1, "Should have exactly one item");
2399                let (section, off, sz, item) = &items[0];
2400                assert_eq!(*section, 1);
2401                assert_eq!(*off, offset);
2402                assert_eq!(*sz as usize, ITEM_SIZE);
2403                assert_eq!(*item, exact_data, "Replay read mismatch");
2404            }
2405
2406            journal.destroy().await.unwrap();
2407        });
2408    }
2409
2410    #[test_traced]
2411    fn test_journal_varint_spanning_page_boundary() {
2412        // Test that items with data spanning page boundaries work correctly
2413        // when using a small page size.
2414        //
2415        // With PAGE_SIZE=16:
2416        // - Physical page = 16 + 12 = 28 bytes
2417        // - Each [u8; 128] item = 2-byte varint + 128 bytes data = 130 bytes
2418        // - This spans multiple 16-byte pages, testing cross-page reading
2419        const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2420
2421        let executor = deterministic::Runner::default();
2422        executor.start(|context| async move {
2423            let cfg = Config {
2424                partition: "test_partition".into(),
2425                compression: None,
2426                codec_config: (),
2427                buffer_pool: PoolRef::new(SMALL_PAGE, PAGE_CACHE_SIZE),
2428                write_buffer: NZUsize!(1024),
2429            };
2430            let mut journal: Journal<_, [u8; 128]> = Journal::init(context.clone(), cfg.clone())
2431                .await
2432                .expect("Failed to initialize journal");
2433
2434            // Create items that will span many 16-byte pages
2435            let item1: [u8; 128] = [1u8; 128];
2436            let item2: [u8; 128] = [2u8; 128];
2437            let item3: [u8; 128] = [3u8; 128];
2438
2439            // Append items - each is 130 bytes (2-byte varint + 128 data)
2440            // spanning ceil(130/16) = 9 pages worth of logical data
2441            let (offset1, _) = journal.append(1, item1).await.expect("Failed to append");
2442            let (offset2, _) = journal.append(1, item2).await.expect("Failed to append");
2443            let (offset3, _) = journal.append(1, item3).await.expect("Failed to append");
2444
2445            journal.sync(1).await.expect("Failed to sync");
2446
2447            // Read items back via random access
2448            let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2449            let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2450            let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2451            assert_eq!(retrieved1, item1);
2452            assert_eq!(retrieved2, item2);
2453            assert_eq!(retrieved3, item3);
2454
2455            // Drop and reopen to test replay
2456            drop(journal);
2457            let journal: Journal<_, [u8; 128]> = Journal::init(context.clone(), cfg.clone())
2458                .await
2459                .expect("Failed to re-initialize journal");
2460
2461            // Replay and verify all items
2462            {
2463                let stream = journal
2464                    .replay(0, 0, NZUsize!(64))
2465                    .await
2466                    .expect("Failed to setup replay");
2467                pin_mut!(stream);
2468
2469                let mut items = Vec::new();
2470                while let Some(result) = stream.next().await {
2471                    let (section, off, _, item) = result.expect("Failed to replay item");
2472                    items.push((section, off, item));
2473                }
2474
2475                assert_eq!(items.len(), 3, "Should have 3 items");
2476                assert_eq!(items[0], (1, offset1, item1));
2477                assert_eq!(items[1], (1, offset2, item2));
2478                assert_eq!(items[2], (1, offset3, item3));
2479            }
2480
2481            journal.destroy().await.unwrap();
2482        });
2483    }
2484}