Skip to main content

commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use crate::{
7    journal::{
8        contiguous::{fixed, Contiguous, MutableContiguous},
9        segmented::variable,
10        Error,
11    },
12    Persistable,
13};
14use commonware_codec::{Codec, CodecShared};
15use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
16use commonware_utils::NZUsize;
17#[commonware_macros::stability(ALPHA)]
18use core::ops::Range;
19use futures::{future::Either, stream, Stream, StreamExt as _};
20use std::num::{NonZeroU64, NonZeroUsize};
21#[commonware_macros::stability(ALPHA)]
22use tracing::debug;
23use tracing::warn;
24
25const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
26
27/// Suffix appended to the base partition name for the data journal.
28const DATA_SUFFIX: &str = "_data";
29
30/// Suffix appended to the base partition name for the offsets journal.
31const OFFSETS_SUFFIX: &str = "_offsets";
32
33/// Calculate the section number for a given position.
34///
35/// # Arguments
36///
37/// * `position` - The absolute position in the journal
38/// * `items_per_section` - The number of items stored in each section
39///
40/// # Returns
41///
42/// The section number where the item at `position` should be stored.
43///
44/// # Examples
45///
46/// ```ignore
47/// // With 10 items per section:
48/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
49/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
50/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
51/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
52/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
53/// ```
54const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
55    position / items_per_section
56}
57
58/// Configuration for a [Journal].
59#[derive(Clone)]
60pub struct Config<C> {
61    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
62    pub partition: String,
63
64    /// The number of items to store in each section.
65    ///
66    /// Once set, this value cannot be changed across restarts.
67    /// All non-final sections will be full and persisted.
68    pub items_per_section: NonZeroU64,
69
70    /// Optional compression level for stored items.
71    pub compression: Option<u8>,
72
73    /// [Codec] configuration for encoding/decoding items.
74    pub codec_config: C,
75
76    /// Page cache for caching data.
77    pub page_cache: CacheRef,
78
79    /// Write buffer size for each section.
80    pub write_buffer: NonZeroUsize,
81}
82
83impl<C> Config<C> {
84    /// Returns the partition name for the data journal.
85    fn data_partition(&self) -> String {
86        format!("{}{}", self.partition, DATA_SUFFIX)
87    }
88
89    /// Returns the partition name for the offsets journal.
90    fn offsets_partition(&self) -> String {
91        format!("{}{}", self.partition, OFFSETS_SUFFIX)
92    }
93}
94
95/// A contiguous journal with variable-size entries.
96///
97/// This journal manages section assignment automatically, allowing callers to append items
98/// sequentially without manually tracking section numbers.
99///
100/// # Repair
101///
102/// Like
103/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
104/// and
105/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
106/// the first invalid data read will be considered the new end of the journal (and the
107/// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during
108/// init via the underlying segmented journals.
109///
110/// # Invariants
111///
112/// ## 1. Section Fullness
113///
114/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
115/// that on `init()`, we only need to replay the last section to determine the exact size.
116///
117/// ## 2. Data Journal is Source of Truth
118///
119/// The data journal is always the source of truth. The offsets journal is an index
120/// that may temporarily diverge during crashes. Divergences are automatically
121/// aligned during init():
122/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
123///   (This can happen if we crash after writing data journal but before writing offsets journal)
124/// * If offsets.size() > data.size(): Rewind offsets to match data size.
125///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
126/// * If offsets.bounds().start < data.bounds().start: Prune offsets to match
127///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
128///
129/// Note that we don't recover from the case where offsets.bounds().start >
130/// data.bounds().start. This should never occur because we always prune the data journal
131/// before the offsets journal.
132pub struct Journal<E: Clock + Storage + Metrics, V: Codec> {
133    /// The underlying variable-length data journal.
134    data: variable::Journal<E, V>,
135
136    /// Index mapping positions to byte offsets within the data journal.
137    /// The section can be calculated from the position using items_per_section.
138    offsets: fixed::Journal<E, u64>,
139
140    /// The number of items per section.
141    ///
142    /// # Invariant
143    ///
144    /// This value is immutable after initialization and must remain consistent
145    /// across restarts. Changing this value will result in data loss or corruption.
146    items_per_section: u64,
147
148    /// The next position to be assigned on append (total items appended).
149    ///
150    /// # Invariant
151    ///
152    /// Always >= `oldest_retained_pos`. Equal when data journal is empty or fully pruned.
153    size: u64,
154
155    /// The position of the first item that remains after pruning.
156    ///
157    /// After normal operation and pruning, `oldest_retained_pos` is section-aligned.
158    /// After `init_at_size(N)`, `oldest_retained_pos` may be mid-section.
159    ///
160    /// # Invariant
161    ///
162    /// Never decreases (pruning only moves forward).
163    oldest_retained_pos: u64,
164}
165
166impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
167    /// Initialize a contiguous variable journal.
168    ///
169    /// # Crash Recovery
170    ///
171    /// The data journal is the source of truth. If the offsets journal is inconsistent
172    /// it will be updated to match the data journal.
173    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
174        let items_per_section = cfg.items_per_section.get();
175        let data_partition = cfg.data_partition();
176        let offsets_partition = cfg.offsets_partition();
177
178        // Initialize underlying variable data journal
179        let mut data = variable::Journal::init(
180            context.with_label("data"),
181            variable::Config {
182                partition: data_partition,
183                compression: cfg.compression,
184                codec_config: cfg.codec_config,
185                page_cache: cfg.page_cache.clone(),
186                write_buffer: cfg.write_buffer,
187            },
188        )
189        .await?;
190
191        // Initialize offsets journal
192        let mut offsets = fixed::Journal::init(
193            context.with_label("offsets"),
194            fixed::Config {
195                partition: offsets_partition,
196                items_per_blob: cfg.items_per_section,
197                page_cache: cfg.page_cache,
198                write_buffer: cfg.write_buffer,
199            },
200        )
201        .await?;
202
203        // Validate and align offsets journal to match data journal
204        let (oldest_retained_pos, size) =
205            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
206
207        Ok(Self {
208            data,
209            offsets,
210            items_per_section,
211            size,
212            oldest_retained_pos,
213        })
214    }
215
216    /// Initialize an empty [Journal] at the given logical `size`.
217    ///
218    /// Returns a journal with journal.bounds() == Range{start: size, end: size}
219    /// and next append at position `size`.
220    #[commonware_macros::stability(ALPHA)]
221    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
222        // Initialize empty data journal
223        let data = variable::Journal::init(
224            context.with_label("data"),
225            variable::Config {
226                partition: cfg.data_partition(),
227                compression: cfg.compression,
228                codec_config: cfg.codec_config.clone(),
229                page_cache: cfg.page_cache.clone(),
230                write_buffer: cfg.write_buffer,
231            },
232        )
233        .await?;
234
235        // Initialize offsets journal at the target size
236        let offsets = fixed::Journal::init_at_size(
237            context.with_label("offsets"),
238            fixed::Config {
239                partition: cfg.offsets_partition(),
240                items_per_blob: cfg.items_per_section,
241                page_cache: cfg.page_cache,
242                write_buffer: cfg.write_buffer,
243            },
244            size,
245        )
246        .await?;
247
248        Ok(Self {
249            data,
250            offsets,
251            items_per_section: cfg.items_per_section.get(),
252            size,
253            oldest_retained_pos: size,
254        })
255    }
256
257    /// Initialize a [Journal] for use in state sync.
258    ///
259    /// The bounds are item locations (not section numbers). This function prepares the
260    /// on-disk journal so that subsequent appends go to the correct physical location for the
261    /// requested range.
262    ///
263    /// Behavior by existing on-disk state:
264    /// - Fresh (no data): returns an empty journal.
265    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
266    ///   empty journal.
267    /// - Overlap within [`range.start`, `range.end`]:
268    ///   - Prunes toward `range.start` (section-aligned, so some items before
269    ///     `range.start` may be retained)
270    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
271    ///
272    /// # Arguments
273    /// - `context`: storage context
274    /// - `cfg`: journal configuration
275    /// - `range`: range of item locations to retain
276    ///
277    /// # Returns
278    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
279    ///
280    /// # Errors
281    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
282    #[commonware_macros::stability(ALPHA)]
283    pub(crate) async fn init_sync(
284        context: E,
285        cfg: Config<V::Cfg>,
286        range: Range<u64>,
287    ) -> Result<Self, Error> {
288        assert!(!range.is_empty(), "range must not be empty");
289
290        debug!(
291            range.start,
292            range.end,
293            items_per_section = cfg.items_per_section.get(),
294            "initializing contiguous variable journal for sync"
295        );
296
297        // Initialize contiguous journal
298        let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
299
300        let size = journal.size();
301
302        // No existing data - initialize at the start of the sync range if needed
303        if size == 0 {
304            if range.start == 0 {
305                debug!("no existing journal data, returning empty journal");
306                return Ok(journal);
307            } else {
308                debug!(
309                    range.start,
310                    "no existing journal data, initializing at sync range start"
311                );
312                journal.destroy().await?;
313                return Self::init_at_size(context, cfg, range.start).await;
314            }
315        }
316
317        // Check if data exceeds the sync range
318        if size > range.end {
319            return Err(Error::ItemOutOfRange(size));
320        }
321
322        // If all existing data is before our sync range, destroy and recreate fresh
323        if size <= range.start {
324            // All data is stale (ends at or before range.start)
325            debug!(
326                size,
327                range.start, "existing journal data is stale, re-initializing at start position"
328            );
329            journal.destroy().await?;
330            return Self::init_at_size(context, cfg, range.start).await;
331        }
332
333        // Prune to lower bound if needed
334        let bounds = journal.bounds();
335        if !bounds.is_empty() && bounds.start < range.start {
336            debug!(
337                oldest_pos = bounds.start,
338                range.start, "pruning journal to sync range start"
339            );
340            journal.prune(range.start).await?;
341        }
342
343        Ok(journal)
344    }
345
346    /// Rewind the journal to the given size, discarding items from the end.
347    ///
348    /// After rewinding to size N, the journal will contain exactly N items, and the next append
349    /// will receive position N.
350    ///
351    /// # Errors
352    ///
353    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
354    ///
355    /// # Warning
356    ///
357    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
358    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
359        // Validate rewind target
360        match size.cmp(&self.size) {
361            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
362            std::cmp::Ordering::Equal => return Ok(()), // No-op
363            std::cmp::Ordering::Less => {}
364        }
365
366        // Rewind never updates oldest_retained_pos.
367        if size < self.oldest_retained_pos {
368            return Err(Error::ItemPruned(size));
369        }
370
371        // Read the offset of the first item to discard (at position 'size').
372        let discard_offset = self.offsets.read(size).await?;
373        let discard_section = position_to_section(size, self.items_per_section);
374
375        self.data
376            .rewind_to_offset(discard_section, discard_offset)
377            .await?;
378        self.offsets.rewind(size).await?;
379
380        // Update our size
381        self.size = size;
382
383        Ok(())
384    }
385
386    /// Append a new item to the journal, returning its position.
387    ///
388    /// The position returned is a stable, consecutively increasing value starting from 0.
389    /// This position remains constant after pruning.
390    ///
391    /// When a section becomes full, both the data journal and offsets journal are persisted
392    /// to maintain the invariant that all non-final sections are full and consistent.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the underlying storage operation fails or if the item cannot
397    /// be encoded.
398    ///
399    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
400    /// reopened to trigger alignment in [Journal::init].
401    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
402        // Calculate which section this position belongs to
403        let section = self.current_section();
404
405        // Append to data journal, get offset
406        let (offset, _size) = self.data.append(section, item).await?;
407
408        // Append offset to offsets journal
409        let offsets_pos = self.offsets.append(offset).await?;
410        assert_eq!(offsets_pos, self.size);
411
412        // Return the current position
413        let position = self.size;
414        self.size += 1;
415
416        // Maintain invariant that all full sections are persisted.
417        if self.size.is_multiple_of(self.items_per_section) {
418            futures::try_join!(self.data.sync(section), self.offsets.sync())?;
419        }
420
421        Ok(position)
422    }
423
424    /// Returns [start, end) where `start` and `end - 1` are the positions of the oldest and newest
425    /// retained items respectively.
426    pub const fn bounds(&self) -> std::ops::Range<u64> {
427        self.oldest_retained_pos..self.size
428    }
429
430    /// Return the total number of items that have been appended to the journal.
431    ///
432    /// This count is NOT affected by pruning. The next appended item will receive this
433    /// position as its value.
434    pub const fn size(&self) -> u64 {
435        self.size
436    }
437
438    /// Prune items at positions strictly less than `min_position`.
439    ///
440    /// Returns `true` if any data was pruned, `false` otherwise.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if the underlying storage operation fails.
445    ///
446    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
447    /// reopened to trigger alignment in [Journal::init].
448    pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
449        if min_position <= self.oldest_retained_pos {
450            return Ok(false);
451        }
452
453        // Cap min_position to size to maintain the invariant oldest_retained_pos <= size
454        let min_position = min_position.min(self.size);
455
456        // Calculate section number
457        let min_section = position_to_section(min_position, self.items_per_section);
458
459        let pruned = self.data.prune(min_section).await?;
460        if pruned {
461            let new_oldest = (min_section * self.items_per_section).max(self.oldest_retained_pos);
462            self.oldest_retained_pos = new_oldest;
463            self.offsets.prune(new_oldest).await?;
464        }
465        Ok(pruned)
466    }
467
468    /// Return a stream of all items in the journal starting from `start_pos`.
469    ///
470    /// Each item is yielded as a tuple `(position, item)` where position is the item's
471    /// position in the journal.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
476    /// errors occur during replay.
477    pub async fn replay(
478        &self,
479        start_pos: u64,
480        buffer_size: NonZeroUsize,
481    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
482        // Validate start position is within bounds.
483        let bounds = self.bounds();
484        if start_pos < bounds.start {
485            return Err(Error::ItemPruned(start_pos));
486        }
487        if start_pos > bounds.end {
488            return Err(Error::ItemOutOfRange(start_pos));
489        }
490
491        // If replaying at exactly size, return empty stream
492        if start_pos == bounds.end {
493            return Ok(Either::Left(stream::empty()));
494        }
495
496        // Use offsets index to find offset to start from, calculate section from position
497        let start_offset = self.offsets.read(start_pos).await?;
498        let start_section = position_to_section(start_pos, self.items_per_section);
499        let data_stream = self
500            .data
501            .replay(start_section, start_offset, buffer_size)
502            .await?;
503
504        // Transform the stream to include position information
505        let transformed = data_stream.enumerate().map(move |(idx, result)| {
506            result.map(|(_section, _offset, _size, item)| {
507                // Calculate position: start_pos + items read
508                let pos = start_pos + idx as u64;
509                (pos, item)
510            })
511        });
512
513        Ok(Either::Right(transformed))
514    }
515
516    /// Read the item at the given position.
517    ///
518    /// # Errors
519    ///
520    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
521    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
522    /// - Returns other errors if storage or decoding fails.
523    pub async fn read(&self, position: u64) -> Result<V, Error> {
524        // Check bounds
525        let bounds = self.bounds();
526        if position >= bounds.end {
527            return Err(Error::ItemOutOfRange(position));
528        }
529
530        if position < bounds.start {
531            return Err(Error::ItemPruned(position));
532        }
533
534        // Read offset from journal and calculate section from position
535        let offset = self.offsets.read(position).await?;
536        let section = position_to_section(position, self.items_per_section);
537
538        // Read item from data journal
539        self.data.get(section, offset).await
540    }
541
542    /// Durably persist the journal.
543    ///
544    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
545    /// before the next call to `sync()`.
546    pub async fn commit(&mut self) -> Result<(), Error> {
547        let section = self.current_section();
548        self.data.sync(section).await
549    }
550
551    /// Durably persist the journal and ensure recovery is not required on startup.
552    ///
553    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
554    pub async fn sync(&mut self) -> Result<(), Error> {
555        // Persist only the current (final) section of the data journal.
556        // All non-final sections are already persisted per Invariant #1.
557        let section = self.current_section();
558
559        // Persist both journals concurrently
560        futures::try_join!(self.data.sync(section), self.offsets.sync())?;
561
562        Ok(())
563    }
564
565    /// Remove any underlying blobs created by the journal.
566    ///
567    /// This destroys both the data journal and the offsets journal.
568    pub async fn destroy(self) -> Result<(), Error> {
569        self.data.destroy().await?;
570        self.offsets.destroy().await
571    }
572
573    /// Clear all data and reset the journal to a new starting position.
574    ///
575    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
576    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
577    #[commonware_macros::stability(ALPHA)]
578    pub(crate) async fn clear_to_size(&mut self, new_size: u64) -> Result<(), Error> {
579        self.data.clear().await?;
580
581        self.offsets.clear_to_size(new_size).await?;
582        self.size = new_size;
583        self.oldest_retained_pos = new_size;
584        Ok(())
585    }
586
587    /// Return the section number where the next append will write.
588    const fn current_section(&self) -> u64 {
589        position_to_section(self.size, self.items_per_section)
590    }
591
592    /// Align the offsets journal and data journal to be consistent in case a crash occurred
593    /// on a previous run and left the journals in an inconsistent state.
594    ///
595    /// The data journal is the source of truth. This function scans it to determine
596    /// what SHOULD be in the offsets journal, then fixes any mismatches.
597    ///
598    /// # Returns
599    ///
600    /// Returns `(pruning_boundary, size)` for the contiguous journal.
601    async fn align_journals(
602        data: &mut variable::Journal<E, V>,
603        offsets: &mut fixed::Journal<E, u64>,
604        items_per_section: u64,
605    ) -> Result<(u64, u64), Error> {
606        // === Handle empty data journal case ===
607        let items_in_last_section = match data.newest_section() {
608            Some(last_section) => {
609                let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
610                futures::pin_mut!(stream);
611                let mut count = 0u64;
612                while let Some(result) = stream.next().await {
613                    result?; // Propagate replay errors (corruption, etc.)
614                    count += 1;
615                }
616                count
617            }
618            None => 0,
619        };
620
621        // Data journal is empty if there are no sections or if there is one section and it has no items.
622        // The latter should only occur if a crash occured after opening a data journal blob but
623        // before writing to it.
624        let data_empty =
625            data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
626        if data_empty {
627            let size = offsets.size();
628
629            if !data.is_empty() {
630                // A section exists but contains 0 items. This can happen in two cases:
631                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
632                // 2. First append crash: we opened the first section blob but crashed before writing to it
633                // In both cases, calculate target position from the first remaining section
634                // SAFETY: data is non-empty (checked above)
635                let data_first_section = data.oldest_section().unwrap();
636                let data_section_start = data_first_section * items_per_section;
637                let target_pos = data_section_start.max(offsets.bounds().start);
638
639                warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
640                offsets.clear_to_size(target_pos).await?;
641                return Ok((target_pos, target_pos));
642            }
643
644            // data.blobs is empty. This can happen in two cases:
645            // 1. We completely pruned the data journal but crashed before pruning
646            //    the offsets journal.
647            // 2. The data journal was never opened.
648            let offsets_bounds = offsets.bounds();
649            if !offsets_bounds.is_empty() && offsets_bounds.start < size {
650                // Offsets has unpruned entries but data is gone - clear to match empty state.
651                // We use clear_to_size (not prune) to ensure bounds.start == bounds.end,
652                // even when size is mid-section.
653                warn!("crash repair: clearing offsets to {size} (prune-all crash)");
654                offsets.clear_to_size(size).await?;
655            }
656
657            return Ok((size, size));
658        }
659
660        // === Handle non-empty data journal case ===
661        let data_first_section = data.oldest_section().unwrap();
662        let data_last_section = data.newest_section().unwrap();
663
664        // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index.
665        // This differs from offsets.bounds().start which can be mid-section after init_at_size.
666        let data_oldest_pos = data_first_section * items_per_section;
667
668        // Align pruning state
669        // We always prune data before offsets, so offsets should never be "ahead" by a section.
670        let offsets_bounds = offsets.bounds();
671        match (
672            offsets_bounds.is_empty(),
673            offsets_bounds.start.cmp(&data_oldest_pos),
674        ) {
675            (true, _) => {
676                // Offsets journal is empty but data journal isn't.
677                // It should always be in the same section as the data journal, though.
678                let offsets_first_section = offsets_bounds.start / items_per_section;
679                if offsets_first_section != data_first_section {
680                    return Err(Error::Corruption(format!(
681                        "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
682                    )));
683                }
684                warn!(
685                    "crash repair: offsets journal empty at {}, will rebuild from data",
686                    offsets_bounds.start
687                );
688            }
689            (false, std::cmp::Ordering::Less) => {
690                // Offsets behind on pruning -- prune to catch up
691                warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
692                offsets.prune(data_oldest_pos).await?;
693            }
694            (false, std::cmp::Ordering::Greater) => {
695                // Compare sections: same section = valid, different section = corruption.
696                if offsets_bounds.start / items_per_section > data_first_section {
697                    return Err(Error::Corruption(format!(
698                        "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
699                        offsets_bounds.start
700                    )));
701                }
702            }
703            (false, std::cmp::Ordering::Equal) => {
704                // Both journals are pruned to the same position.
705            }
706        }
707
708        // Compute the correct logical size
709        // Uses bounds.start from offsets as the anchor because it tracks the exact starting
710        // position, which may be mid-section after init_at_size.
711        //
712        // Note: Corruption checks above ensure bounds.start is in data_first_section,
713        // so the subtraction in oldest_items cannot underflow.
714        // Re-fetch bounds since prune may have been called above.
715        let offsets_bounds = offsets.bounds();
716        let data_size = if data_first_section == data_last_section {
717            offsets_bounds.start + items_in_last_section
718        } else {
719            let oldest_items = (data_first_section + 1) * items_per_section - offsets_bounds.start;
720            let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
721            offsets_bounds.start + oldest_items + middle_items + items_in_last_section
722        };
723
724        // Align sizes
725        let offsets_size = offsets_bounds.end;
726        if offsets_size > data_size {
727            // Crashed after writing offsets but before writing data.
728            warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
729            offsets.rewind(data_size).await?;
730        } else if offsets_size < data_size {
731            // Crashed after writing data but before writing offsets.
732            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
733        }
734
735        // Final invariant checks
736        assert_eq!(offsets.bounds().end, data_size);
737
738        // After alignment, offsets and data must be in the same section.
739        // We return bounds.start from offsets as the true boundary.
740        let offsets_bounds = offsets.bounds();
741        assert!(
742            !offsets_bounds.is_empty(),
743            "offsets should have data after alignment"
744        );
745        assert_eq!(
746            offsets_bounds.start / items_per_section,
747            data_first_section,
748            "offsets and data should be in same oldest section"
749        );
750
751        offsets.sync().await?;
752        Ok((offsets_bounds.start, data_size))
753    }
754
755    /// Rebuild missing offset entries by replaying the data journal and
756    /// appending the missing entries to the offsets journal.
757    ///
758    /// The data journal is the source of truth. This function brings the offsets
759    /// journal up to date by replaying data items and indexing their positions.
760    ///
761    /// # Warning
762    ///
763    /// - Panics if data journal is empty
764    /// - Panics if `offsets_size` >= `data.size()`
765    async fn add_missing_offsets(
766        data: &variable::Journal<E, V>,
767        offsets: &mut fixed::Journal<E, u64>,
768        offsets_size: u64,
769        items_per_section: u64,
770    ) -> Result<(), Error> {
771        assert!(
772            !data.is_empty(),
773            "rebuild_offsets called with empty data journal"
774        );
775
776        // Find where to start replaying
777        let offsets_bounds = offsets.bounds();
778        let (start_section, resume_offset, skip_first) = if offsets_bounds.is_empty() {
779            // Offsets empty -- start from first data section
780            // SAFETY: data is non-empty (checked above)
781            let first_section = data.oldest_section().unwrap();
782            (first_section, 0, false)
783        } else if offsets_bounds.start < offsets_size {
784            // Offsets has items -- resume from last indexed position
785            let last_offset = offsets.read(offsets_size - 1).await?;
786            let last_section = position_to_section(offsets_size - 1, items_per_section);
787            (last_section, last_offset, true)
788        } else {
789            // Offsets fully pruned but data has items -- start from first data section
790            // SAFETY: data is non-empty (checked above)
791            let first_section = data.oldest_section().unwrap();
792            (first_section, 0, false)
793        };
794
795        // Replay data journal from start position through the end and index all items.
796        // The data journal is the source of truth, so we consume the entire stream.
797        // (replay streams from start_section onwards through all subsequent sections)
798        let stream = data
799            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
800            .await?;
801        futures::pin_mut!(stream);
802
803        let mut skipped_first = false;
804        while let Some(result) = stream.next().await {
805            let (_section, offset, _size, _item) = result?;
806
807            // Skip first item if resuming from last indexed offset
808            if skip_first && !skipped_first {
809                skipped_first = true;
810                continue;
811            }
812
813            offsets.append(offset).await?;
814        }
815
816        Ok(())
817    }
818}
819
820// Implement Contiguous trait for variable-length items
821impl<E: Clock + Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
822    type Item = V;
823
824    fn bounds(&self) -> std::ops::Range<u64> {
825        Self::bounds(self)
826    }
827
828    async fn replay(
829        &self,
830        start_pos: u64,
831        buffer: NonZeroUsize,
832    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
833        Self::replay(self, start_pos, buffer).await
834    }
835
836    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
837        Self::read(self, position).await
838    }
839}
840
841impl<E: Clock + Storage + Metrics, V: CodecShared> MutableContiguous for Journal<E, V> {
842    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
843        Self::append(self, item).await
844    }
845
846    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
847        Self::prune(self, min_position).await
848    }
849
850    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
851        Self::rewind(self, size).await
852    }
853}
854
855impl<E: Clock + Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
856    type Error = Error;
857
858    async fn commit(&mut self) -> Result<(), Error> {
859        Self::commit(self).await
860    }
861
862    async fn sync(&mut self) -> Result<(), Error> {
863        Self::sync(self).await
864    }
865
866    async fn destroy(self) -> Result<(), Error> {
867        Self::destroy(self).await
868    }
869}
870#[cfg(test)]
871mod tests {
872    use super::*;
873    use crate::journal::contiguous::tests::run_contiguous_tests;
874    use commonware_macros::test_traced;
875    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
876    use commonware_utils::{NZUsize, NZU16, NZU64};
877    use futures::FutureExt as _;
878    use std::num::NonZeroU16;
879
880    // Use some jank sizes to exercise boundary conditions.
881    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
882    const PAGE_CACHE_SIZE: usize = 2;
883    // Larger page sizes for tests that need more buffer space.
884    const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
885    const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
886
887    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
888    ///
889    /// When the offsets partition is completely lost and the data has been pruned, we cannot
890    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
891    /// This is a genuine external failure that should be detected and reported clearly.
892    #[test_traced]
893    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
894        let executor = deterministic::Runner::default();
895        executor.start(|context| async move {
896            let cfg = Config {
897                partition: "offsets_loss_after_prune".to_string(),
898                items_per_section: NZU64!(10),
899                compression: None,
900                codec_config: (),
901                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
902                write_buffer: NZUsize!(1024),
903            };
904
905            // === Phase 1: Create journal with data and prune ===
906            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
907                .await
908                .unwrap();
909
910            // Append 40 items across 4 sections (0-3)
911            for i in 0..40u64 {
912                journal.append(i * 100).await.unwrap();
913            }
914
915            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
916            journal.prune(20).await.unwrap();
917            assert_eq!(journal.bounds().start, 20);
918            assert_eq!(journal.bounds().end, 40);
919
920            journal.sync().await.unwrap();
921            drop(journal);
922
923            // === Phase 2: Simulate complete offsets partition loss ===
924            // Remove both the offsets data partition and its metadata partition
925            context
926                .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
927                .await
928                .expect("Failed to remove offsets blobs partition");
929            context
930                .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
931                .await
932                .expect("Failed to remove offsets metadata partition");
933
934            // === Phase 3: Verify this is detected as unrecoverable ===
935            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
936            assert!(matches!(result, Err(Error::Corruption(_))));
937        });
938    }
939
940    /// Test that init aligns state when data is pruned/lost but offsets survives.
941    ///
942    /// This handles both:
943    /// 1. Crash during prune-all (data pruned, offsets not yet)
944    /// 2. External data partition loss
945    ///
946    /// In both cases, we align by pruning offsets to match.
947    #[test_traced]
948    fn test_variable_align_data_offsets_mismatch() {
949        let executor = deterministic::Runner::default();
950        executor.start(|context| async move {
951            let cfg = Config {
952                partition: "data_loss_test".to_string(),
953                items_per_section: NZU64!(10),
954                compression: None,
955                codec_config: (),
956                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
957                write_buffer: NZUsize!(1024),
958            };
959
960            // === Setup: Create journal with data ===
961            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
962                .await
963                .unwrap();
964
965            // Append 20 items across 2 sections
966            for i in 0..20u64 {
967                variable.append(i * 100).await.unwrap();
968            }
969
970            variable.sync().await.unwrap();
971            drop(variable);
972
973            // === Simulate data loss: Delete data partition but keep offsets ===
974            context
975                .remove(&cfg.data_partition(), None)
976                .await
977                .expect("Failed to remove data partition");
978
979            // === Verify init aligns the mismatch ===
980            let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
981                .await
982                .expect("Should align offsets to match empty data");
983
984            // Size should be preserved
985            assert_eq!(journal.size(), 20);
986
987            // But no items remain (both journals pruned)
988            assert!(journal.bounds().is_empty());
989
990            // All reads should fail with ItemPruned
991            for i in 0..20 {
992                assert!(matches!(
993                    journal.read(i).await,
994                    Err(crate::journal::Error::ItemPruned(_))
995                ));
996            }
997
998            // Can append new data starting at position 20
999            let pos = journal.append(999).await.unwrap();
1000            assert_eq!(pos, 20);
1001            assert_eq!(journal.read(20).await.unwrap(), 999);
1002
1003            journal.destroy().await.unwrap();
1004        });
1005    }
1006
1007    #[test_traced]
1008    fn test_variable_contiguous() {
1009        let executor = deterministic::Runner::default();
1010        executor.start(|context| async move {
1011            run_contiguous_tests(move |test_name: String, idx: usize| {
1012                let context = context.with_label(&format!("{test_name}_{idx}"));
1013                async move {
1014                    Journal::<_, u64>::init(
1015                        context,
1016                        Config {
1017                            partition: format!("generic_test_{test_name}"),
1018                            items_per_section: NZU64!(10),
1019                            compression: None,
1020                            codec_config: (),
1021                            page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1022                            write_buffer: NZUsize!(1024),
1023                        },
1024                    )
1025                    .await
1026                }
1027                .boxed()
1028            })
1029            .await;
1030        });
1031    }
1032
1033    /// Test multiple sequential prunes with Variable-specific guarantees.
1034    #[test_traced]
1035    fn test_variable_multiple_sequential_prunes() {
1036        let executor = deterministic::Runner::default();
1037        executor.start(|context| async move {
1038            let cfg = Config {
1039                partition: "sequential_prunes".to_string(),
1040                items_per_section: NZU64!(10),
1041                compression: None,
1042                codec_config: (),
1043                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1044                write_buffer: NZUsize!(1024),
1045            };
1046
1047            let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1048
1049            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1050            for i in 0..40u64 {
1051                journal.append(i * 100).await.unwrap();
1052            }
1053
1054            // Initial state: all items accessible
1055            assert_eq!(journal.bounds().start, 0);
1056            assert_eq!(journal.bounds().end, 40);
1057
1058            // First prune: remove section 0 (positions 0-9)
1059            let pruned = journal.prune(10).await.unwrap();
1060            assert!(pruned);
1061
1062            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1063            assert_eq!(journal.bounds().start, 10);
1064
1065            // Items 0-9 should be pruned, 10+ should be accessible
1066            assert!(matches!(
1067                journal.read(0).await,
1068                Err(crate::journal::Error::ItemPruned(_))
1069            ));
1070            assert_eq!(journal.read(10).await.unwrap(), 1000);
1071            assert_eq!(journal.read(19).await.unwrap(), 1900);
1072
1073            // Second prune: remove section 1 (positions 10-19)
1074            let pruned = journal.prune(20).await.unwrap();
1075            assert!(pruned);
1076
1077            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1078            assert_eq!(journal.bounds().start, 20);
1079
1080            // Items 0-19 should be pruned, 20+ should be accessible
1081            assert!(matches!(
1082                journal.read(10).await,
1083                Err(crate::journal::Error::ItemPruned(_))
1084            ));
1085            assert!(matches!(
1086                journal.read(19).await,
1087                Err(crate::journal::Error::ItemPruned(_))
1088            ));
1089            assert_eq!(journal.read(20).await.unwrap(), 2000);
1090            assert_eq!(journal.read(29).await.unwrap(), 2900);
1091
1092            // Third prune: remove section 2 (positions 20-29)
1093            let pruned = journal.prune(30).await.unwrap();
1094            assert!(pruned);
1095
1096            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1097            assert_eq!(journal.bounds().start, 30);
1098
1099            // Items 0-29 should be pruned, 30+ should be accessible
1100            assert!(matches!(
1101                journal.read(20).await,
1102                Err(crate::journal::Error::ItemPruned(_))
1103            ));
1104            assert!(matches!(
1105                journal.read(29).await,
1106                Err(crate::journal::Error::ItemPruned(_))
1107            ));
1108            assert_eq!(journal.read(30).await.unwrap(), 3000);
1109            assert_eq!(journal.read(39).await.unwrap(), 3900);
1110
1111            // Size should still be 40 (pruning doesn't affect size)
1112            assert_eq!(journal.size(), 40);
1113
1114            journal.destroy().await.unwrap();
1115        });
1116    }
1117
1118    /// Test that pruning all data and re-initializing preserves positions.
1119    #[test_traced]
1120    fn test_variable_prune_all_then_reinit() {
1121        let executor = deterministic::Runner::default();
1122        executor.start(|context| async move {
1123            let cfg = Config {
1124                partition: "prune_all_reinit".to_string(),
1125                items_per_section: NZU64!(10),
1126                compression: None,
1127                codec_config: (),
1128                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1129                write_buffer: NZUsize!(1024),
1130            };
1131
1132            // === Phase 1: Create journal and append data ===
1133            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1134                .await
1135                .unwrap();
1136
1137            for i in 0..100u64 {
1138                journal.append(i * 100).await.unwrap();
1139            }
1140
1141            assert_eq!(journal.bounds().end, 100);
1142            assert_eq!(journal.bounds().start, 0);
1143
1144            // === Phase 2: Prune all data ===
1145            let pruned = journal.prune(100).await.unwrap();
1146            assert!(pruned);
1147
1148            // All data is pruned - no items remain
1149            assert_eq!(journal.bounds().end, 100);
1150            assert!(journal.bounds().is_empty());
1151
1152            // All reads should fail with ItemPruned
1153            for i in 0..100 {
1154                assert!(matches!(
1155                    journal.read(i).await,
1156                    Err(crate::journal::Error::ItemPruned(_))
1157                ));
1158            }
1159
1160            journal.sync().await.unwrap();
1161            drop(journal);
1162
1163            // === Phase 3: Re-init and verify position preserved ===
1164            let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1165                .await
1166                .unwrap();
1167
1168            // Size should be preserved, but no items remain
1169            assert_eq!(journal.bounds().end, 100);
1170            assert!(journal.bounds().is_empty());
1171
1172            // All reads should still fail
1173            for i in 0..100 {
1174                assert!(matches!(
1175                    journal.read(i).await,
1176                    Err(crate::journal::Error::ItemPruned(_))
1177                ));
1178            }
1179
1180            // === Phase 4: Append new data ===
1181            // Next append should get position 100
1182            journal.append(10000).await.unwrap();
1183            assert_eq!(journal.bounds().end, 101);
1184            // Now we have one item at position 100
1185            assert_eq!(journal.bounds().start, 100);
1186
1187            // Can read the new item
1188            assert_eq!(journal.read(100).await.unwrap(), 10000);
1189
1190            // Old positions still fail
1191            assert!(matches!(
1192                journal.read(99).await,
1193                Err(crate::journal::Error::ItemPruned(_))
1194            ));
1195
1196            journal.destroy().await.unwrap();
1197        });
1198    }
1199
1200    /// Test recovery from crash after data journal pruned but before offsets journal.
1201    #[test_traced]
1202    fn test_variable_recovery_prune_crash_offsets_behind() {
1203        let executor = deterministic::Runner::default();
1204        executor.start(|context| async move {
1205            // === Setup: Create Variable wrapper with data ===
1206            let cfg = Config {
1207                partition: "recovery_prune_crash".to_string(),
1208                items_per_section: NZU64!(10),
1209                compression: None,
1210                codec_config: (),
1211                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1212                write_buffer: NZUsize!(1024),
1213            };
1214
1215            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1216                .await
1217                .unwrap();
1218
1219            // Append 40 items across 4 sections to both journals
1220            for i in 0..40u64 {
1221                variable.append(i * 100).await.unwrap();
1222            }
1223
1224            // Prune to position 10 normally (both data and offsets journals pruned)
1225            variable.prune(10).await.unwrap();
1226            assert_eq!(variable.bounds().start, 10);
1227
1228            // === Simulate crash: Prune data journal but not offsets journal ===
1229            // Manually prune data journal to section 2 (position 20)
1230            variable.data.prune(2).await.unwrap();
1231            // Offsets journal still has data from position 10-19
1232
1233            variable.sync().await.unwrap();
1234            drop(variable);
1235
1236            // === Verify recovery ===
1237            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1238                .await
1239                .unwrap();
1240
1241            // Init should auto-repair: offsets journal pruned to match data journal
1242            assert_eq!(variable.bounds().start, 20);
1243            assert_eq!(variable.bounds().end, 40);
1244
1245            // Reads before position 20 should fail (pruned from both journals)
1246            assert!(matches!(
1247                variable.read(10).await,
1248                Err(crate::journal::Error::ItemPruned(_))
1249            ));
1250
1251            // Reads at position 20+ should succeed
1252            assert_eq!(variable.read(20).await.unwrap(), 2000);
1253            assert_eq!(variable.read(39).await.unwrap(), 3900);
1254
1255            variable.destroy().await.unwrap();
1256        });
1257    }
1258
1259    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1260    ///
1261    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1262    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1263    #[test_traced]
1264    fn test_variable_recovery_offsets_ahead_corruption() {
1265        let executor = deterministic::Runner::default();
1266        executor.start(|context| async move {
1267            // === Setup: Create Variable wrapper with data ===
1268            let cfg = Config {
1269                partition: "recovery_offsets_ahead".to_string(),
1270                items_per_section: NZU64!(10),
1271                compression: None,
1272                codec_config: (),
1273                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1274                write_buffer: NZUsize!(1024),
1275            };
1276
1277            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1278                .await
1279                .unwrap();
1280
1281            // Append 40 items across 4 sections to both journals
1282            for i in 0..40u64 {
1283                variable.append(i * 100).await.unwrap();
1284            }
1285
1286            // Prune offsets journal ahead of data journal (impossible state)
1287            variable.offsets.prune(20).await.unwrap(); // Prune to position 20
1288            variable.data.prune(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1289
1290            variable.sync().await.unwrap();
1291            drop(variable);
1292
1293            // === Verify corruption detected ===
1294            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1295            assert!(matches!(result, Err(Error::Corruption(_))));
1296        });
1297    }
1298
1299    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1300    #[test_traced]
1301    fn test_variable_recovery_append_crash_offsets_behind() {
1302        let executor = deterministic::Runner::default();
1303        executor.start(|context| async move {
1304            // === Setup: Create Variable wrapper with partial data ===
1305            let cfg = Config {
1306                partition: "recovery_append_crash".to_string(),
1307                items_per_section: NZU64!(10),
1308                compression: None,
1309                codec_config: (),
1310                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1311                write_buffer: NZUsize!(1024),
1312            };
1313
1314            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1315                .await
1316                .unwrap();
1317
1318            // Append 15 items to both journals (fills section 0, partial section 1)
1319            for i in 0..15u64 {
1320                variable.append(i * 100).await.unwrap();
1321            }
1322
1323            assert_eq!(variable.size(), 15);
1324
1325            // Manually append 5 more items directly to data journal only
1326            for i in 15..20u64 {
1327                variable.data.append(1, i * 100).await.unwrap();
1328            }
1329            // Offsets journal still has only 15 entries
1330
1331            variable.sync().await.unwrap();
1332            drop(variable);
1333
1334            // === Verify recovery ===
1335            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1336                .await
1337                .unwrap();
1338
1339            // Init should rebuild offsets journal from data journal replay
1340            assert_eq!(variable.bounds().end, 20);
1341            assert_eq!(variable.bounds().start, 0);
1342
1343            // All items should be readable from both journals
1344            for i in 0..20u64 {
1345                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1346            }
1347
1348            // Offsets journal should be fully rebuilt to match data journal
1349            assert_eq!(variable.offsets.size(), 20);
1350
1351            variable.destroy().await.unwrap();
1352        });
1353    }
1354
1355    /// Test recovery from multiple prune operations with crash.
1356    #[test_traced]
1357    fn test_variable_recovery_multiple_prunes_crash() {
1358        let executor = deterministic::Runner::default();
1359        executor.start(|context| async move {
1360            // === Setup: Create Variable wrapper with data ===
1361            let cfg = Config {
1362                partition: "recovery_multiple_prunes".to_string(),
1363                items_per_section: NZU64!(10),
1364                compression: None,
1365                codec_config: (),
1366                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1367                write_buffer: NZUsize!(1024),
1368            };
1369
1370            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1371                .await
1372                .unwrap();
1373
1374            // Append 50 items across 5 sections to both journals
1375            for i in 0..50u64 {
1376                variable.append(i * 100).await.unwrap();
1377            }
1378
1379            // Prune to position 10 normally (both data and offsets journals pruned)
1380            variable.prune(10).await.unwrap();
1381            assert_eq!(variable.bounds().start, 10);
1382
1383            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1384            // Manually prune data journal to section 3 (position 30)
1385            variable.data.prune(3).await.unwrap();
1386            // Offsets journal still thinks oldest is position 10
1387
1388            variable.sync().await.unwrap();
1389            drop(variable);
1390
1391            // === Verify recovery ===
1392            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1393                .await
1394                .unwrap();
1395
1396            // Init should auto-repair: offsets journal pruned to match data journal
1397            assert_eq!(variable.bounds().start, 30);
1398            assert_eq!(variable.bounds().end, 50);
1399
1400            // Reads before position 30 should fail (pruned from both journals)
1401            assert!(matches!(
1402                variable.read(10).await,
1403                Err(crate::journal::Error::ItemPruned(_))
1404            ));
1405            assert!(matches!(
1406                variable.read(20).await,
1407                Err(crate::journal::Error::ItemPruned(_))
1408            ));
1409
1410            // Reads at position 30+ should succeed
1411            assert_eq!(variable.read(30).await.unwrap(), 3000);
1412            assert_eq!(variable.read(49).await.unwrap(), 4900);
1413
1414            variable.destroy().await.unwrap();
1415        });
1416    }
1417
1418    /// Test recovery from crash during rewind operation.
1419    ///
1420    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1421    /// This creates a situation where offsets journal has been rewound but data journal still
1422    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1423    /// offsets index across all sections to match the data journal.
1424    #[test_traced]
1425    fn test_variable_recovery_rewind_crash_multi_section() {
1426        let executor = deterministic::Runner::default();
1427        executor.start(|context| async move {
1428            // === Setup: Create Variable wrapper with data across multiple sections ===
1429            let cfg = Config {
1430                partition: "recovery_rewind_crash".to_string(),
1431                items_per_section: NZU64!(10),
1432                compression: None,
1433                codec_config: (),
1434                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1435                write_buffer: NZUsize!(1024),
1436            };
1437
1438            let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1439                .await
1440                .unwrap();
1441
1442            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1443            for i in 0..25u64 {
1444                variable.append(i * 100).await.unwrap();
1445            }
1446
1447            assert_eq!(variable.size(), 25);
1448
1449            // === Simulate crash during rewind(5) ===
1450            // Rewind offsets journal to size 5 (keeps positions 0-4)
1451            variable.offsets.rewind(5).await.unwrap();
1452            // CRASH before data.rewind() completes - data still has all 3 sections
1453
1454            variable.sync().await.unwrap();
1455            drop(variable);
1456
1457            // === Verify recovery ===
1458            let mut variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1459                .await
1460                .unwrap();
1461
1462            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1463            assert_eq!(variable.bounds().end, 25);
1464            assert_eq!(variable.bounds().start, 0);
1465
1466            // All items should be readable - offsets rebuilt correctly across all sections
1467            for i in 0..25u64 {
1468                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1469            }
1470
1471            // Verify offsets journal fully rebuilt
1472            assert_eq!(variable.offsets.size(), 25);
1473
1474            // Verify next append gets position 25
1475            let pos = variable.append(2500).await.unwrap();
1476            assert_eq!(pos, 25);
1477            assert_eq!(variable.read(25).await.unwrap(), 2500);
1478
1479            variable.destroy().await.unwrap();
1480        });
1481    }
1482
1483    /// Test recovery from crash after data sync but before offsets sync when journal was
1484    /// previously emptied by pruning.
1485    #[test_traced]
1486    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1487        let executor = deterministic::Runner::default();
1488        executor.start(|context| async move {
1489            let cfg = Config {
1490                partition: "recovery_empty_after_prune".to_string(),
1491                items_per_section: NZU64!(10),
1492                compression: None,
1493                codec_config: (),
1494                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1495                write_buffer: NZUsize!(1024),
1496            };
1497
1498            // === Phase 1: Create journal with one full section ===
1499            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1500                .await
1501                .unwrap();
1502
1503            // Append 10 items (positions 0-9), fills section 0
1504            for i in 0..10u64 {
1505                journal.append(i * 100).await.unwrap();
1506            }
1507            assert_eq!(journal.bounds().end, 10);
1508            assert_eq!(journal.bounds().start, 0);
1509
1510            // === Phase 2: Prune to create empty journal ===
1511            journal.prune(10).await.unwrap();
1512            assert_eq!(journal.bounds().end, 10);
1513            assert!(journal.bounds().is_empty()); // Empty!
1514
1515            // === Phase 3: Append directly to data journal to simulate crash ===
1516            // Manually append to data journal only (bypassing Variable's append logic)
1517            // This simulates the case where data was synced but offsets wasn't
1518            for i in 10..20u64 {
1519                journal.data.append(1, i * 100).await.unwrap();
1520            }
1521            // Sync the data journal (section 1)
1522            journal.data.sync(1).await.unwrap();
1523            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1524
1525            // Close without syncing offsets
1526            drop(journal);
1527
1528            // === Phase 4: Verify recovery succeeds ===
1529            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1530                .await
1531                .expect("Should recover from crash after data sync but before offsets sync");
1532
1533            // All data should be recovered
1534            assert_eq!(journal.bounds().end, 20);
1535            assert_eq!(journal.bounds().start, 10);
1536
1537            // All items from position 10-19 should be readable
1538            for i in 10..20u64 {
1539                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1540            }
1541
1542            // Items 0-9 should be pruned
1543            for i in 0..10 {
1544                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1545            }
1546
1547            journal.destroy().await.unwrap();
1548        });
1549    }
1550
1551    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1552    #[test_traced]
1553    fn test_variable_concurrent_sync_recovery() {
1554        let executor = deterministic::Runner::default();
1555        executor.start(|context| async move {
1556            let cfg = Config {
1557                partition: "concurrent_sync_recovery".to_string(),
1558                items_per_section: NZU64!(10),
1559                compression: None,
1560                codec_config: (),
1561                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1562                write_buffer: NZUsize!(1024),
1563            };
1564
1565            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1566                .await
1567                .unwrap();
1568
1569            // Append items across a section boundary
1570            for i in 0..15u64 {
1571                journal.append(i * 100).await.unwrap();
1572            }
1573
1574            // Manually sync only data to simulate crash during concurrent sync
1575            journal.commit().await.unwrap();
1576
1577            // Simulate a crash (offsets not synced)
1578            drop(journal);
1579
1580            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1581                .await
1582                .unwrap();
1583
1584            // Data should be intact and offsets rebuilt
1585            assert_eq!(journal.size(), 15);
1586            for i in 0..15u64 {
1587                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1588            }
1589
1590            journal.destroy().await.unwrap();
1591        });
1592    }
1593
1594    #[test_traced]
1595    fn test_init_at_size_zero() {
1596        let executor = deterministic::Runner::default();
1597        executor.start(|context| async move {
1598            let cfg = Config {
1599                partition: "init_at_size_zero".to_string(),
1600                items_per_section: NZU64!(5),
1601                compression: None,
1602                codec_config: (),
1603                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1604                write_buffer: NZUsize!(1024),
1605            };
1606
1607            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1608                .await
1609                .unwrap();
1610
1611            // Size should be 0
1612            assert_eq!(journal.size(), 0);
1613
1614            // No oldest retained position (empty journal)
1615            assert!(journal.bounds().is_empty());
1616
1617            // Next append should get position 0
1618            let pos = journal.append(100).await.unwrap();
1619            assert_eq!(pos, 0);
1620            assert_eq!(journal.size(), 1);
1621            assert_eq!(journal.read(0).await.unwrap(), 100);
1622
1623            journal.destroy().await.unwrap();
1624        });
1625    }
1626
1627    #[test_traced]
1628    fn test_init_at_size_section_boundary() {
1629        let executor = deterministic::Runner::default();
1630        executor.start(|context| async move {
1631            let cfg = Config {
1632                partition: "init_at_size_boundary".to_string(),
1633                items_per_section: NZU64!(5),
1634                compression: None,
1635                codec_config: (),
1636                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1637                write_buffer: NZUsize!(1024),
1638            };
1639
1640            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1641            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1642                .await
1643                .unwrap();
1644
1645            // Size should be 10
1646            assert_eq!(journal.bounds().end, 10);
1647
1648            // No data yet, so no oldest retained position
1649            assert!(journal.bounds().is_empty());
1650
1651            // Next append should get position 10
1652            let pos = journal.append(1000).await.unwrap();
1653            assert_eq!(pos, 10);
1654            assert_eq!(journal.size(), 11);
1655            assert_eq!(journal.read(10).await.unwrap(), 1000);
1656
1657            // Can continue appending
1658            let pos = journal.append(1001).await.unwrap();
1659            assert_eq!(pos, 11);
1660            assert_eq!(journal.read(11).await.unwrap(), 1001);
1661
1662            journal.destroy().await.unwrap();
1663        });
1664    }
1665
1666    #[test_traced]
1667    fn test_init_at_size_mid_section() {
1668        let executor = deterministic::Runner::default();
1669        executor.start(|context| async move {
1670            let cfg = Config {
1671                partition: "init_at_size_mid".to_string(),
1672                items_per_section: NZU64!(5),
1673                compression: None,
1674                codec_config: (),
1675                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1676                write_buffer: NZUsize!(1024),
1677            };
1678
1679            // Initialize at position 7 (middle of section 1 with items_per_section=5)
1680            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1681                .await
1682                .unwrap();
1683
1684            // Size should be 7
1685            assert_eq!(journal.bounds().end, 7);
1686
1687            // No data yet, so no oldest retained position
1688            assert!(journal.bounds().is_empty());
1689
1690            // Next append should get position 7
1691            let pos = journal.append(700).await.unwrap();
1692            assert_eq!(pos, 7);
1693            assert_eq!(journal.size(), 8);
1694            assert_eq!(journal.read(7).await.unwrap(), 700);
1695
1696            journal.destroy().await.unwrap();
1697        });
1698    }
1699
1700    #[test_traced]
1701    fn test_init_at_size_persistence() {
1702        let executor = deterministic::Runner::default();
1703        executor.start(|context| async move {
1704            let cfg = Config {
1705                partition: "init_at_size_persist".to_string(),
1706                items_per_section: NZU64!(5),
1707                compression: None,
1708                codec_config: (),
1709                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1710                write_buffer: NZUsize!(1024),
1711            };
1712
1713            // Initialize at position 15
1714            let mut journal =
1715                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1716                    .await
1717                    .unwrap();
1718
1719            // Append some items
1720            for i in 0..5u64 {
1721                let pos = journal.append(1500 + i).await.unwrap();
1722                assert_eq!(pos, 15 + i);
1723            }
1724
1725            assert_eq!(journal.size(), 20);
1726
1727            // Sync and reopen
1728            journal.sync().await.unwrap();
1729            drop(journal);
1730
1731            let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1732                .await
1733                .unwrap();
1734
1735            // Size and data should be preserved
1736            assert_eq!(journal.bounds().end, 20);
1737            assert_eq!(journal.bounds().start, 15);
1738
1739            // Verify data
1740            for i in 0..5u64 {
1741                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1742            }
1743
1744            // Can continue appending
1745            let pos = journal.append(9999).await.unwrap();
1746            assert_eq!(pos, 20);
1747            assert_eq!(journal.read(20).await.unwrap(), 9999);
1748
1749            journal.destroy().await.unwrap();
1750        });
1751    }
1752
1753    #[test_traced]
1754    fn test_init_at_size_persistence_without_data() {
1755        let executor = deterministic::Runner::default();
1756        executor.start(|context| async move {
1757            let cfg = Config {
1758                partition: "init_at_size_persist_empty".to_string(),
1759                items_per_section: NZU64!(5),
1760                compression: None,
1761                codec_config: (),
1762                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1763                write_buffer: NZUsize!(1024),
1764            };
1765
1766            // Initialize at position 15
1767            let journal =
1768                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1769                    .await
1770                    .unwrap();
1771
1772            assert_eq!(journal.bounds().end, 15);
1773            assert!(journal.bounds().is_empty());
1774
1775            // Drop without writing any data
1776            drop(journal);
1777
1778            // Reopen and verify size persisted
1779            let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1780                .await
1781                .unwrap();
1782
1783            assert_eq!(journal.bounds().end, 15);
1784            assert!(journal.bounds().is_empty());
1785
1786            // Can append starting at position 15
1787            let pos = journal.append(1500).await.unwrap();
1788            assert_eq!(pos, 15);
1789            assert_eq!(journal.read(15).await.unwrap(), 1500);
1790
1791            journal.destroy().await.unwrap();
1792        });
1793    }
1794
1795    /// Test init_at_size with mid-section value persists correctly across restart.
1796    #[test_traced]
1797    fn test_init_at_size_mid_section_persistence() {
1798        let executor = deterministic::Runner::default();
1799        executor.start(|context| async move {
1800            let cfg = Config {
1801                partition: "init_at_size_mid_section".to_string(),
1802                items_per_section: NZU64!(5),
1803                compression: None,
1804                codec_config: (),
1805                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1806                write_buffer: NZUsize!(1024),
1807            };
1808
1809            // Initialize at position 7 (mid-section, 7 % 5 = 2)
1810            let mut journal =
1811                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1812                    .await
1813                    .unwrap();
1814
1815            // Append 3 items at positions 7, 8, 9 (fills rest of section 1)
1816            for i in 0..3u64 {
1817                let pos = journal.append(700 + i).await.unwrap();
1818                assert_eq!(pos, 7 + i);
1819            }
1820
1821            assert_eq!(journal.bounds().end, 10);
1822            assert_eq!(journal.bounds().start, 7);
1823
1824            // Sync and reopen
1825            journal.sync().await.unwrap();
1826            drop(journal);
1827
1828            // Reopen
1829            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1830                .await
1831                .unwrap();
1832
1833            // Size and bounds.start should be preserved correctly
1834            assert_eq!(journal.bounds().end, 10);
1835            assert_eq!(journal.bounds().start, 7);
1836
1837            // Verify data
1838            for i in 0..3u64 {
1839                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1840            }
1841
1842            // Positions before 7 should be pruned
1843            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
1844
1845            journal.destroy().await.unwrap();
1846        });
1847    }
1848
1849    /// Test init_at_size mid-section with data spanning multiple sections.
1850    #[test_traced]
1851    fn test_init_at_size_mid_section_multi_section_persistence() {
1852        let executor = deterministic::Runner::default();
1853        executor.start(|context| async move {
1854            let cfg = Config {
1855                partition: "init_at_size_multi_section".to_string(),
1856                items_per_section: NZU64!(5),
1857                compression: None,
1858                codec_config: (),
1859                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1860                write_buffer: NZUsize!(1024),
1861            };
1862
1863            // Initialize at position 7 (mid-section)
1864            let mut journal =
1865                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1866                    .await
1867                    .unwrap();
1868
1869            // Append 8 items: positions 7-14 (section 1: 3 items, section 2: 5 items)
1870            for i in 0..8u64 {
1871                let pos = journal.append(700 + i).await.unwrap();
1872                assert_eq!(pos, 7 + i);
1873            }
1874
1875            assert_eq!(journal.bounds().end, 15);
1876            assert_eq!(journal.bounds().start, 7);
1877
1878            // Sync and reopen
1879            journal.sync().await.unwrap();
1880            drop(journal);
1881
1882            // Reopen
1883            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1884                .await
1885                .unwrap();
1886
1887            // Verify state preserved
1888            assert_eq!(journal.bounds().end, 15);
1889            assert_eq!(journal.bounds().start, 7);
1890
1891            // Verify all data
1892            for i in 0..8u64 {
1893                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1894            }
1895
1896            journal.destroy().await.unwrap();
1897        });
1898    }
1899
1900    /// Regression test: data-empty crash repair must preserve mid-section pruning boundary.
1901    #[test_traced]
1902    fn test_align_journals_data_empty_mid_section_pruning_boundary() {
1903        let executor = deterministic::Runner::default();
1904        executor.start(|context| async move {
1905            let cfg = Config {
1906                partition: "align_journals_mid_section_pruning_boundary".to_string(),
1907                items_per_section: NZU64!(5),
1908                compression: None,
1909                codec_config: (),
1910                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1911                write_buffer: NZUsize!(1024),
1912            };
1913
1914            // Phase 1: Create data and offsets, then simulate data-only pruning crash.
1915            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1916                .await
1917                .unwrap();
1918            for i in 0..7u64 {
1919                journal.append(100 + i).await.unwrap();
1920            }
1921            journal.sync().await.unwrap();
1922
1923            // Simulate crash after data was cleared but before offsets were pruned.
1924            journal.data.clear().await.unwrap();
1925            drop(journal);
1926
1927            // Phase 2: Init triggers data-empty repair and should treat journal as fully pruned at size 7.
1928            let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1929                .await
1930                .unwrap();
1931            assert_eq!(journal.bounds().end, 7);
1932            assert!(journal.bounds().is_empty());
1933
1934            // Append one item at position 7.
1935            let pos = journal.append(777).await.unwrap();
1936            assert_eq!(pos, 7);
1937            assert_eq!(journal.size(), 8);
1938            assert_eq!(journal.read(7).await.unwrap(), 777);
1939
1940            // Sync only the data journal to simulate a crash before offsets are synced.
1941            let section = 7 / cfg.items_per_section.get();
1942            journal.data.sync(section).await.unwrap();
1943            drop(journal);
1944
1945            // Phase 3: Reopen and verify we did not lose the appended item.
1946            let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
1947                .await
1948                .unwrap();
1949            assert_eq!(journal.bounds().end, 8);
1950            assert_eq!(journal.bounds().start, 7);
1951            assert_eq!(journal.read(7).await.unwrap(), 777);
1952
1953            journal.destroy().await.unwrap();
1954        });
1955    }
1956
1957    /// Test crash recovery: init_at_size + append + crash with data synced but offsets not.
1958    #[test_traced]
1959    fn test_init_at_size_crash_data_synced_offsets_not() {
1960        let executor = deterministic::Runner::default();
1961        executor.start(|context| async move {
1962            let cfg = Config {
1963                partition: "init_at_size_crash_recovery".to_string(),
1964                items_per_section: NZU64!(5),
1965                compression: None,
1966                codec_config: (),
1967                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1968                write_buffer: NZUsize!(1024),
1969            };
1970
1971            // Initialize at position 7 (mid-section)
1972            let mut journal =
1973                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1974                    .await
1975                    .unwrap();
1976
1977            // Append 3 items
1978            for i in 0..3u64 {
1979                journal.append(700 + i).await.unwrap();
1980            }
1981
1982            // Sync only the data journal, not offsets (simulate crash)
1983            journal.data.sync(1).await.unwrap();
1984            // Don't sync offsets - simulates crash after data write but before offsets write
1985            drop(journal);
1986
1987            // Reopen - should recover by rebuilding offsets from data
1988            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1989                .await
1990                .unwrap();
1991
1992            // Verify recovery
1993            assert_eq!(journal.bounds().end, 10);
1994            assert_eq!(journal.bounds().start, 7);
1995
1996            // Verify data is accessible
1997            for i in 0..3u64 {
1998                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1999            }
2000
2001            journal.destroy().await.unwrap();
2002        });
2003    }
2004
2005    #[test_traced]
2006    fn test_prune_does_not_move_oldest_retained_backwards() {
2007        let executor = deterministic::Runner::default();
2008        executor.start(|context| async move {
2009            let cfg = Config {
2010                partition: "prune_no_backwards".to_string(),
2011                items_per_section: NZU64!(5),
2012                compression: None,
2013                codec_config: (),
2014                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2015                write_buffer: NZUsize!(1024),
2016            };
2017
2018            let mut journal =
2019                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2020                    .await
2021                    .unwrap();
2022
2023            // Append a few items at positions 7..9
2024            for i in 0..3u64 {
2025                let pos = journal.append(700 + i).await.unwrap();
2026                assert_eq!(pos, 7 + i);
2027            }
2028            assert_eq!(journal.bounds().start, 7);
2029
2030            // Prune to a position within the same section should not move bounds.start backwards.
2031            journal.prune(8).await.unwrap();
2032            assert_eq!(journal.bounds().start, 7);
2033            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2034            assert_eq!(journal.read(7).await.unwrap(), 700);
2035
2036            journal.destroy().await.unwrap();
2037        });
2038    }
2039
2040    #[test_traced]
2041    fn test_init_at_size_large_offset() {
2042        let executor = deterministic::Runner::default();
2043        executor.start(|context| async move {
2044            let cfg = Config {
2045                partition: "init_at_size_large".to_string(),
2046                items_per_section: NZU64!(5),
2047                compression: None,
2048                codec_config: (),
2049                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2050                write_buffer: NZUsize!(1024),
2051            };
2052
2053            // Initialize at a large position (position 1000)
2054            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2055                .await
2056                .unwrap();
2057
2058            assert_eq!(journal.bounds().end, 1000);
2059            // No data yet, so no oldest retained position
2060            assert!(journal.bounds().is_empty());
2061
2062            // Next append should get position 1000
2063            let pos = journal.append(100000).await.unwrap();
2064            assert_eq!(pos, 1000);
2065            assert_eq!(journal.read(1000).await.unwrap(), 100000);
2066
2067            journal.destroy().await.unwrap();
2068        });
2069    }
2070
2071    #[test_traced]
2072    fn test_init_at_size_prune_and_append() {
2073        let executor = deterministic::Runner::default();
2074        executor.start(|context| async move {
2075            let cfg = Config {
2076                partition: "init_at_size_prune".to_string(),
2077                items_per_section: NZU64!(5),
2078                compression: None,
2079                codec_config: (),
2080                page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2081                write_buffer: NZUsize!(1024),
2082            };
2083
2084            // Initialize at position 20
2085            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2086                .await
2087                .unwrap();
2088
2089            // Append items 20-29
2090            for i in 0..10u64 {
2091                journal.append(2000 + i).await.unwrap();
2092            }
2093
2094            assert_eq!(journal.size(), 30);
2095
2096            // Prune to position 25
2097            journal.prune(25).await.unwrap();
2098
2099            assert_eq!(journal.bounds().end, 30);
2100            assert_eq!(journal.bounds().start, 25);
2101
2102            // Verify remaining items are readable
2103            for i in 25..30u64 {
2104                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2105            }
2106
2107            // Continue appending
2108            let pos = journal.append(3000).await.unwrap();
2109            assert_eq!(pos, 30);
2110
2111            journal.destroy().await.unwrap();
2112        });
2113    }
2114
2115    /// Test `init_sync` when there is no existing data on disk.
2116    #[test_traced]
2117    fn test_init_sync_no_existing_data() {
2118        let executor = deterministic::Runner::default();
2119        executor.start(|context| async move {
2120            let cfg = Config {
2121                partition: "test_fresh_start".into(),
2122                items_per_section: NZU64!(5),
2123                compression: None,
2124                codec_config: (),
2125                write_buffer: NZUsize!(1024),
2126                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2127            };
2128
2129            // Initialize journal with sync boundaries when no existing data exists
2130            let lower_bound = 10;
2131            let upper_bound = 26;
2132            let mut journal =
2133                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2134                    .await
2135                    .expect("Failed to initialize journal with sync boundaries");
2136
2137            assert_eq!(journal.bounds().end, lower_bound);
2138            assert!(journal.bounds().is_empty());
2139
2140            // Append items using the contiguous API
2141            let pos1 = journal.append(42u64).await.unwrap();
2142            assert_eq!(pos1, lower_bound);
2143            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2144
2145            let pos2 = journal.append(43u64).await.unwrap();
2146            assert_eq!(pos2, lower_bound + 1);
2147            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2148
2149            journal.destroy().await.unwrap();
2150        });
2151    }
2152
2153    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
2154    #[test_traced]
2155    fn test_init_sync_existing_data_overlap() {
2156        let executor = deterministic::Runner::default();
2157        executor.start(|context| async move {
2158            let cfg = Config {
2159                partition: "test_overlap".into(),
2160                items_per_section: NZU64!(5),
2161                compression: None,
2162                codec_config: (),
2163                write_buffer: NZUsize!(1024),
2164                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2165            };
2166
2167            // Create initial journal with data in multiple sections
2168            let mut journal =
2169                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2170                    .await
2171                    .expect("Failed to create initial journal");
2172
2173            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2174            for i in 0..20u64 {
2175                journal.append(i * 100).await.unwrap();
2176            }
2177            journal.sync().await.unwrap();
2178            drop(journal);
2179
2180            // Initialize with sync boundaries that overlap with existing data
2181            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
2182            let lower_bound = 8;
2183            let upper_bound = 31;
2184            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2185                context.clone(),
2186                cfg.clone(),
2187                lower_bound..upper_bound,
2188            )
2189            .await
2190            .expect("Failed to initialize journal with overlap");
2191
2192            assert_eq!(journal.size(), 20);
2193
2194            // Verify oldest retained is pruned to lower_bound's section boundary (5)
2195            assert_eq!(journal.bounds().start, 5); // Section 1 starts at position 5
2196
2197            // Verify data integrity: positions before 5 are pruned
2198            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2199            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2200
2201            // Positions 5-19 should be accessible
2202            assert_eq!(journal.read(5).await.unwrap(), 500);
2203            assert_eq!(journal.read(8).await.unwrap(), 800);
2204            assert_eq!(journal.read(19).await.unwrap(), 1900);
2205
2206            // Position 20+ should not exist yet
2207            assert!(matches!(
2208                journal.read(20).await,
2209                Err(Error::ItemOutOfRange(_))
2210            ));
2211
2212            // Assert journal can accept new items
2213            let pos = journal.append(999).await.unwrap();
2214            assert_eq!(pos, 20);
2215            assert_eq!(journal.read(20).await.unwrap(), 999);
2216
2217            journal.destroy().await.unwrap();
2218        });
2219    }
2220
2221    /// Test `init_sync` with invalid parameters.
2222    #[should_panic]
2223    #[test_traced]
2224    fn test_init_sync_invalid_parameters() {
2225        let executor = deterministic::Runner::default();
2226        executor.start(|context| async move {
2227            let cfg = Config {
2228                partition: "test_invalid".into(),
2229                items_per_section: NZU64!(5),
2230                compression: None,
2231                codec_config: (),
2232                write_buffer: NZUsize!(1024),
2233                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2234            };
2235
2236            #[allow(clippy::reversed_empty_ranges)]
2237            let _result = Journal::<deterministic::Context, u64>::init_sync(
2238                context.clone(),
2239                cfg,
2240                10..5, // invalid range: lower > upper
2241            )
2242            .await;
2243        });
2244    }
2245
2246    /// Test `init_sync` when existing data exactly matches the sync range.
2247    #[test_traced]
2248    fn test_init_sync_existing_data_exact_match() {
2249        let executor = deterministic::Runner::default();
2250        executor.start(|context| async move {
2251            let items_per_section = NZU64!(5);
2252            let cfg = Config {
2253                partition: "test_exact_match".to_string(),
2254                items_per_section,
2255                compression: None,
2256                codec_config: (),
2257                write_buffer: NZUsize!(1024),
2258                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2259            };
2260
2261            // Create initial journal with data exactly matching sync range
2262            let mut journal =
2263                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2264                    .await
2265                    .expect("Failed to create initial journal");
2266
2267            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2268            for i in 0..20u64 {
2269                journal.append(i * 100).await.unwrap();
2270            }
2271            journal.sync().await.unwrap();
2272            drop(journal);
2273
2274            // Initialize with sync boundaries that exactly match existing data
2275            let lower_bound = 5; // section 1
2276            let upper_bound = 20; // section 3
2277            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2278                context.clone(),
2279                cfg.clone(),
2280                lower_bound..upper_bound,
2281            )
2282            .await
2283            .expect("Failed to initialize journal with exact match");
2284
2285            assert_eq!(journal.size(), 20);
2286
2287            // Verify pruning to lower bound (section 1 boundary = position 5)
2288            assert_eq!(journal.bounds().start, 5); // Section 1 starts at position 5
2289
2290            // Verify positions before 5 are pruned
2291            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2292            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2293
2294            // Positions 5-19 should be accessible
2295            assert_eq!(journal.read(5).await.unwrap(), 500);
2296            assert_eq!(journal.read(10).await.unwrap(), 1000);
2297            assert_eq!(journal.read(19).await.unwrap(), 1900);
2298
2299            // Position 20+ should not exist yet
2300            assert!(matches!(
2301                journal.read(20).await,
2302                Err(Error::ItemOutOfRange(_))
2303            ));
2304
2305            // Assert journal can accept new operations
2306            let pos = journal.append(999).await.unwrap();
2307            assert_eq!(pos, 20);
2308            assert_eq!(journal.read(20).await.unwrap(), 999);
2309
2310            journal.destroy().await.unwrap();
2311        });
2312    }
2313
2314    /// Test `init_sync` when existing data exceeds the sync target range.
2315    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2316    #[test_traced]
2317    fn test_init_sync_existing_data_exceeds_upper_bound() {
2318        let executor = deterministic::Runner::default();
2319        executor.start(|context| async move {
2320            let items_per_section = NZU64!(5);
2321            let cfg = Config {
2322                partition: "test_unexpected_data".into(),
2323                items_per_section,
2324                compression: None,
2325                codec_config: (),
2326                write_buffer: NZUsize!(1024),
2327                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2328            };
2329
2330            // Create initial journal with data beyond sync range
2331            let mut journal = Journal::<deterministic::Context, u64>::init(
2332                context.with_label("initial"),
2333                cfg.clone(),
2334            )
2335            .await
2336            .expect("Failed to create initial journal");
2337
2338            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2339            for i in 0..30u64 {
2340                journal.append(i * 1000).await.unwrap();
2341            }
2342            journal.sync().await.unwrap();
2343            drop(journal);
2344
2345            // Initialize with sync boundaries that are exceeded by existing data
2346            let lower_bound = 8; // section 1
2347            for (i, upper_bound) in (9..29).enumerate() {
2348                let result = Journal::<deterministic::Context, u64>::init_sync(
2349                    context.with_label(&format!("sync_{i}")),
2350                    cfg.clone(),
2351                    lower_bound..upper_bound,
2352                )
2353                .await;
2354
2355                // Should return ItemOutOfRange error since data exists beyond upper_bound
2356                assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2357            }
2358        });
2359    }
2360
2361    /// Test `init_sync` when all existing data is stale (before lower bound).
2362    #[test_traced]
2363    fn test_init_sync_existing_data_stale() {
2364        let executor = deterministic::Runner::default();
2365        executor.start(|context| async move {
2366            let items_per_section = NZU64!(5);
2367            let cfg = Config {
2368                partition: "test_stale".into(),
2369                items_per_section,
2370                compression: None,
2371                codec_config: (),
2372                write_buffer: NZUsize!(1024),
2373                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2374            };
2375
2376            // Create initial journal with stale data
2377            let mut journal = Journal::<deterministic::Context, u64>::init(
2378                context.with_label("first"),
2379                cfg.clone(),
2380            )
2381            .await
2382            .expect("Failed to create initial journal");
2383
2384            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2385            for i in 0..10u64 {
2386                journal.append(i * 100).await.unwrap();
2387            }
2388            journal.sync().await.unwrap();
2389            drop(journal);
2390
2391            // Initialize with sync boundaries beyond all existing data
2392            let lower_bound = 15; // section 3
2393            let upper_bound = 26; // last element in section 5
2394            let journal = Journal::<deterministic::Context, u64>::init_sync(
2395                context.with_label("second"),
2396                cfg.clone(),
2397                lower_bound..upper_bound,
2398            )
2399            .await
2400            .expect("Failed to initialize journal with stale data");
2401
2402            assert_eq!(journal.size(), 15);
2403
2404            // Verify fresh journal (all old data destroyed, starts at position 15)
2405            assert!(journal.bounds().is_empty());
2406
2407            // Verify old positions don't exist
2408            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2409            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2410            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2411
2412            journal.destroy().await.unwrap();
2413        });
2414    }
2415
2416    /// Test `init_sync` with section boundary edge cases.
2417    #[test_traced]
2418    fn test_init_sync_section_boundaries() {
2419        let executor = deterministic::Runner::default();
2420        executor.start(|context| async move {
2421            let items_per_section = NZU64!(5);
2422            let cfg = Config {
2423                partition: "test_boundaries".into(),
2424                items_per_section,
2425                compression: None,
2426                codec_config: (),
2427                write_buffer: NZUsize!(1024),
2428                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2429            };
2430
2431            // Create journal with data at section boundaries
2432            let mut journal =
2433                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2434                    .await
2435                    .expect("Failed to create initial journal");
2436
2437            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2438            for i in 0..25u64 {
2439                journal.append(i * 100).await.unwrap();
2440            }
2441            journal.sync().await.unwrap();
2442            drop(journal);
2443
2444            // Test sync boundaries exactly at section boundaries
2445            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2446            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2447            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2448                context.clone(),
2449                cfg.clone(),
2450                lower_bound..upper_bound,
2451            )
2452            .await
2453            .expect("Failed to initialize journal at boundaries");
2454
2455            assert_eq!(journal.size(), 25);
2456
2457            // Verify oldest retained is at section 3 boundary (position 15)
2458            assert_eq!(journal.bounds().start, 15);
2459
2460            // Verify positions before 15 are pruned
2461            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2462            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2463
2464            // Verify positions 15-24 are accessible
2465            assert_eq!(journal.read(15).await.unwrap(), 1500);
2466            assert_eq!(journal.read(20).await.unwrap(), 2000);
2467            assert_eq!(journal.read(24).await.unwrap(), 2400);
2468
2469            // Position 25+ should not exist yet
2470            assert!(matches!(
2471                journal.read(25).await,
2472                Err(Error::ItemOutOfRange(_))
2473            ));
2474
2475            // Assert journal can accept new operations
2476            let pos = journal.append(999).await.unwrap();
2477            assert_eq!(pos, 25);
2478            assert_eq!(journal.read(25).await.unwrap(), 999);
2479
2480            journal.destroy().await.unwrap();
2481        });
2482    }
2483
2484    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2485    #[test_traced]
2486    fn test_init_sync_same_section_bounds() {
2487        let executor = deterministic::Runner::default();
2488        executor.start(|context| async move {
2489            let items_per_section = NZU64!(5);
2490            let cfg = Config {
2491                partition: "test_same_section".into(),
2492                items_per_section,
2493                compression: None,
2494                codec_config: (),
2495                write_buffer: NZUsize!(1024),
2496                page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2497            };
2498
2499            // Create journal with data in multiple sections
2500            let mut journal =
2501                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2502                    .await
2503                    .expect("Failed to create initial journal");
2504
2505            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2506            for i in 0..15u64 {
2507                journal.append(i * 100).await.unwrap();
2508            }
2509            journal.sync().await.unwrap();
2510            drop(journal);
2511
2512            // Test sync boundaries within the same section
2513            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2514            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2515            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2516                context.clone(),
2517                cfg.clone(),
2518                lower_bound..upper_bound,
2519            )
2520            .await
2521            .expect("Failed to initialize journal with same-section bounds");
2522
2523            assert_eq!(journal.size(), 15);
2524
2525            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2526            // Oldest retained position should be at section 2 boundary (position 10)
2527            assert_eq!(journal.bounds().start, 10);
2528
2529            // Verify positions before 10 are pruned
2530            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2531            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2532
2533            // Verify positions 10-14 are accessible
2534            assert_eq!(journal.read(10).await.unwrap(), 1000);
2535            assert_eq!(journal.read(11).await.unwrap(), 1100);
2536            assert_eq!(journal.read(14).await.unwrap(), 1400);
2537
2538            // Position 15+ should not exist yet
2539            assert!(matches!(
2540                journal.read(15).await,
2541                Err(Error::ItemOutOfRange(_))
2542            ));
2543
2544            // Assert journal can accept new operations
2545            let pos = journal.append(999).await.unwrap();
2546            assert_eq!(pos, 15);
2547            assert_eq!(journal.read(15).await.unwrap(), 999);
2548
2549            journal.destroy().await.unwrap();
2550        });
2551    }
2552
2553    /// Test contiguous variable journal with items_per_section=1.
2554    ///
2555    /// This is a regression test for a bug where reading from size()-1 fails
2556    /// when using items_per_section=1, particularly after pruning and restart.
2557    #[test_traced]
2558    fn test_single_item_per_section() {
2559        let executor = deterministic::Runner::default();
2560        executor.start(|context| async move {
2561            let cfg = Config {
2562                partition: "single_item_per_section".to_string(),
2563                items_per_section: NZU64!(1),
2564                compression: None,
2565                codec_config: (),
2566                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2567                write_buffer: NZUsize!(1024),
2568            };
2569
2570            // === Test 1: Basic single item operation ===
2571            let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2572                .await
2573                .unwrap();
2574
2575            // Verify empty state
2576            assert_eq!(journal.bounds().end, 0);
2577            assert!(journal.bounds().is_empty());
2578
2579            // Append 1 item (value = position * 100, so position 0 has value 0)
2580            let pos = journal.append(0).await.unwrap();
2581            assert_eq!(pos, 0);
2582            assert_eq!(journal.size(), 1);
2583
2584            // Sync
2585            journal.sync().await.unwrap();
2586
2587            // Read from size() - 1
2588            let value = journal.read(journal.size() - 1).await.unwrap();
2589            assert_eq!(value, 0);
2590
2591            // === Test 2: Multiple items with single item per section ===
2592            for i in 1..10u64 {
2593                let pos = journal.append(i * 100).await.unwrap();
2594                assert_eq!(pos, i);
2595                assert_eq!(journal.size(), i + 1);
2596
2597                // Verify we can read the just-appended item at size() - 1
2598                let value = journal.read(journal.size() - 1).await.unwrap();
2599                assert_eq!(value, i * 100);
2600            }
2601
2602            // Verify all items can be read
2603            for i in 0..10u64 {
2604                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2605            }
2606
2607            journal.sync().await.unwrap();
2608
2609            // === Test 3: Pruning with single item per section ===
2610            // Prune to position 5 (removes positions 0-4)
2611            let pruned = journal.prune(5).await.unwrap();
2612            assert!(pruned);
2613
2614            // Size should still be 10
2615            assert_eq!(journal.size(), 10);
2616
2617            // bounds.start should be 5
2618            assert_eq!(journal.bounds().start, 5);
2619
2620            // Reading from bounds.end - 1 (position 9) should still work
2621            let value = journal.read(journal.size() - 1).await.unwrap();
2622            assert_eq!(value, 900);
2623
2624            // Reading from pruned positions should return ItemPruned
2625            for i in 0..5 {
2626                assert!(matches!(
2627                    journal.read(i).await,
2628                    Err(crate::journal::Error::ItemPruned(_))
2629                ));
2630            }
2631
2632            // Reading from retained positions should work
2633            for i in 5..10u64 {
2634                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2635            }
2636
2637            // Append more items after pruning
2638            for i in 10..15u64 {
2639                let pos = journal.append(i * 100).await.unwrap();
2640                assert_eq!(pos, i);
2641
2642                // Verify we can read from size() - 1
2643                let value = journal.read(journal.size() - 1).await.unwrap();
2644                assert_eq!(value, i * 100);
2645            }
2646
2647            journal.sync().await.unwrap();
2648            drop(journal);
2649
2650            // === Test 4: Restart persistence with single item per section ===
2651            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2652                .await
2653                .unwrap();
2654
2655            // Verify size is preserved
2656            assert_eq!(journal.size(), 15);
2657
2658            // Verify bounds.start is preserved
2659            assert_eq!(journal.bounds().start, 5);
2660
2661            // Reading from bounds.end - 1 should work after restart
2662            let value = journal.read(journal.size() - 1).await.unwrap();
2663            assert_eq!(value, 1400);
2664
2665            // Reading all retained positions should work
2666            for i in 5..15u64 {
2667                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2668            }
2669
2670            journal.destroy().await.unwrap();
2671
2672            // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) ===
2673            // Fresh journal for this test
2674            let mut journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2675                .await
2676                .unwrap();
2677
2678            // Append 10 items (positions 0-9)
2679            for i in 0..10u64 {
2680                journal.append(i * 1000).await.unwrap();
2681            }
2682
2683            // Prune to position 5 (removes positions 0-4)
2684            journal.prune(5).await.unwrap();
2685            assert_eq!(journal.bounds().end, 10);
2686            assert_eq!(journal.bounds().start, 5);
2687
2688            // Sync and restart
2689            journal.sync().await.unwrap();
2690            drop(journal);
2691
2692            // Re-open journal
2693            let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
2694                .await
2695                .unwrap();
2696
2697            // Verify state after restart
2698            assert_eq!(journal.bounds().end, 10);
2699            assert_eq!(journal.bounds().start, 5);
2700
2701            // KEY TEST: Reading from bounds.end - 1 (position 9) should work
2702            let value = journal.read(journal.size() - 1).await.unwrap();
2703            assert_eq!(value, 9000);
2704
2705            // Verify all retained positions (5-9) work
2706            for i in 5..10u64 {
2707                assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2708            }
2709
2710            journal.destroy().await.unwrap();
2711
2712            // === Test 6: Prune all items (edge case) ===
2713            // This tests the scenario where prune removes everything.
2714            // Callers must check bounds().is_empty() before reading.
2715            let mut journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
2716                .await
2717                .unwrap();
2718
2719            for i in 0..5u64 {
2720                journal.append(i * 100).await.unwrap();
2721            }
2722            journal.sync().await.unwrap();
2723
2724            // Prune all items
2725            journal.prune(5).await.unwrap();
2726            assert_eq!(journal.bounds().end, 5); // Size unchanged
2727            assert!(journal.bounds().is_empty()); // All pruned
2728
2729            // bounds.end - 1 = 4, but position 4 is pruned
2730            let result = journal.read(journal.size() - 1).await;
2731            assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2732
2733            // After appending, reading works again
2734            journal.append(500).await.unwrap();
2735            assert_eq!(journal.bounds().start, 5);
2736            assert_eq!(journal.read(journal.bounds().end - 1).await.unwrap(), 500);
2737
2738            journal.destroy().await.unwrap();
2739        });
2740    }
2741
2742    #[test_traced]
2743    fn test_variable_journal_clear_to_size() {
2744        let executor = deterministic::Runner::default();
2745        executor.start(|context| async move {
2746            let cfg = Config {
2747                partition: "clear_test".to_string(),
2748                items_per_section: NZU64!(10),
2749                compression: None,
2750                codec_config: (),
2751                page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2752                write_buffer: NZUsize!(1024),
2753            };
2754
2755            let mut journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
2756                .await
2757                .unwrap();
2758
2759            // Append 25 items (spanning multiple sections)
2760            for i in 0..25u64 {
2761                journal.append(i * 100).await.unwrap();
2762            }
2763            assert_eq!(journal.bounds().end, 25);
2764            assert_eq!(journal.bounds().start, 0);
2765            journal.sync().await.unwrap();
2766
2767            // Clear to position 100, effectively resetting the journal
2768            journal.clear_to_size(100).await.unwrap();
2769            assert_eq!(journal.bounds().end, 100);
2770            assert!(journal.bounds().is_empty());
2771
2772            // Old positions should fail
2773            for i in 0..25 {
2774                assert!(matches!(
2775                    journal.read(i).await,
2776                    Err(crate::journal::Error::ItemPruned(_))
2777                ));
2778            }
2779
2780            // Verify size persists after restart without writing any data
2781            drop(journal);
2782            let mut journal =
2783                Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
2784                    .await
2785                    .unwrap();
2786            assert_eq!(journal.bounds().end, 100);
2787            assert!(journal.bounds().is_empty());
2788
2789            // Append new data starting at position 100
2790            for i in 100..105u64 {
2791                let pos = journal.append(i * 100).await.unwrap();
2792                assert_eq!(pos, i);
2793            }
2794            assert_eq!(journal.bounds().end, 105);
2795            assert_eq!(journal.bounds().start, 100);
2796
2797            // New positions should be readable
2798            for i in 100..105u64 {
2799                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2800            }
2801
2802            // Sync and re-init to verify persistence
2803            journal.sync().await.unwrap();
2804            drop(journal);
2805
2806            let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
2807                .await
2808                .unwrap();
2809
2810            assert_eq!(journal.bounds().end, 105);
2811            assert_eq!(journal.bounds().start, 100);
2812            for i in 100..105u64 {
2813                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2814            }
2815
2816            journal.destroy().await.unwrap();
2817        });
2818    }
2819}