Skip to main content

commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use super::Reader as _;
7use crate::{
8    journal::{
9        contiguous::{fixed, Contiguous, Many, Mutable},
10        segmented::variable,
11        Error,
12    },
13    Context, Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::buffer::paged::CacheRef;
17use commonware_utils::{
18    sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19    NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31/// Suffix appended to the base partition name for the data journal.
32const DATA_SUFFIX: &str = "_data";
33
34/// Suffix appended to the base partition name for the offsets journal.
35const OFFSETS_SUFFIX: &str = "_offsets";
36
37/// Calculate the section number for a given position.
38///
39/// # Arguments
40///
41/// * `position` - The absolute position in the journal
42/// * `items_per_section` - The number of items stored in each section
43///
44/// # Returns
45///
46/// The section number where the item at `position` should be stored.
47///
48/// # Examples
49///
50/// ```ignore
51/// // With 10 items per section:
52/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
53/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
54/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
55/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
56/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
57/// ```
58const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59    position / items_per_section
60}
61
62/// Configuration for a [Journal].
63#[derive(Clone)]
64pub struct Config<C> {
65    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
66    pub partition: String,
67
68    /// The number of items to store in each section.
69    ///
70    /// Once set, this value cannot be changed across restarts.
71    /// All non-final sections will be full and persisted.
72    pub items_per_section: NonZeroU64,
73
74    /// Optional compression level for stored items.
75    pub compression: Option<u8>,
76
77    /// [Codec] configuration for encoding/decoding items.
78    pub codec_config: C,
79
80    /// Page cache for buffering reads from the underlying storage.
81    pub page_cache: CacheRef,
82
83    /// Write buffer size for each section.
84    pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88    /// Returns the partition name for the data journal.
89    fn data_partition(&self) -> String {
90        format!("{}{}", self.partition, DATA_SUFFIX)
91    }
92
93    /// Returns the partition name for the offsets journal.
94    fn offsets_partition(&self) -> String {
95        format!("{}{}", self.partition, OFFSETS_SUFFIX)
96    }
97}
98
99/// Inner journal state protected by a lock for interior mutability.
100struct Inner<E: Context, V: Codec> {
101    /// The underlying variable-length data journal.
102    data: variable::Journal<E, V>,
103
104    /// The next position to be assigned on append (total items appended).
105    ///
106    /// # Invariant
107    ///
108    /// Always >= `pruning_boundary`. Equal when data journal is empty or fully pruned.
109    size: u64,
110
111    /// The position before which all items have been pruned.
112    ///
113    /// After normal operation and pruning, the value is section-aligned.
114    /// After `init_at_size(N)`, the value may be mid-section.
115    ///
116    /// # Invariant
117    ///
118    /// Never decreases (pruning only moves forward).
119    pruning_boundary: u64,
120}
121
122impl<E: Context, V: CodecShared> Inner<E, V> {
123    /// Read the item at the given position using the provided offsets reader.
124    ///
125    /// # Errors
126    ///
127    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
128    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
129    /// - Returns other errors if storage or decoding fails.
130    async fn read(
131        &self,
132        position: u64,
133        items_per_section: u64,
134        offsets: &impl super::Reader<Item = u64>,
135    ) -> Result<V, Error> {
136        if position >= self.size {
137            return Err(Error::ItemOutOfRange(position));
138        }
139        if position < self.pruning_boundary {
140            return Err(Error::ItemPruned(position));
141        }
142
143        let offset = offsets.read(position).await?;
144        let section = position_to_section(position, items_per_section);
145
146        self.data.get(section, offset).await
147    }
148
149    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
150    fn try_read_sync(
151        &self,
152        position: u64,
153        items_per_section: u64,
154        offsets: &impl super::Reader<Item = u64>,
155    ) -> Option<V> {
156        if position >= self.size || position < self.pruning_boundary {
157            return None;
158        }
159        let offset = offsets.try_read_sync(position)?;
160        let section = position_to_section(position, items_per_section);
161        self.data.try_get_sync(section, offset)
162    }
163}
164
165/// A contiguous journal with variable-size entries.
166///
167/// This journal manages section assignment automatically, allowing callers to append items
168/// sequentially without manually tracking section numbers.
169///
170/// # Repair
171///
172/// Like
173/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
174/// and
175/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
176/// the first invalid data read will be considered the new end of the journal (and the
177/// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during
178/// init via the underlying segmented journals.
179///
180/// # Invariants
181///
182/// ## 1. Section Fullness
183///
184/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
185/// that on `init()`, we only need to replay the last section to determine the exact size.
186///
187/// ## 2. Data Journal is Source of Truth
188///
189/// The data journal is always the source of truth. The offsets journal is an index
190/// that may temporarily diverge during crashes. Divergences are automatically
191/// aligned during init():
192/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
193///   (This can happen if we crash after writing data journal but before writing offsets journal)
194/// * If offsets.size() > data.size(): Rewind offsets to match data size.
195///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
196/// * If offsets.bounds().start < data.bounds().start: Prune offsets to match
197///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
198///
199/// Note that we don't recover from the case where offsets.bounds().start >
200/// data.bounds().start. This should never occur because we always prune the data journal
201/// before the offsets journal.
202pub struct Journal<E: Context, V: Codec> {
203    /// Inner state for data journal metadata.
204    ///
205    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
206    /// race conditions while allowing concurrent reads during sync.
207    inner: UpgradableAsyncRwLock<Inner<E, V>>,
208
209    /// Index mapping positions to byte offsets within the data journal.
210    /// The section can be calculated from the position using items_per_section.
211    offsets: fixed::Journal<E, u64>,
212
213    /// The number of items per section.
214    ///
215    /// # Invariant
216    ///
217    /// This value is immutable after initialization and must remain consistent
218    /// across restarts. Changing this value will result in data loss or corruption.
219    items_per_section: u64,
220
221    /// Optional compression level when encoding items.
222    compression: Option<u8>,
223}
224
225/// A reader guard that holds a consistent snapshot of the variable journal's bounds.
226pub struct Reader<'a, E: Context, V: Codec> {
227    guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
228    offsets: fixed::Reader<'a, E, u64>,
229    items_per_section: u64,
230}
231
232impl<E: Context, V: CodecShared> super::Reader for Reader<'_, E, V> {
233    type Item = V;
234
235    fn bounds(&self) -> std::ops::Range<u64> {
236        self.guard.pruning_boundary..self.guard.size
237    }
238
239    async fn read(&self, position: u64) -> Result<V, Error> {
240        self.guard
241            .read(position, self.items_per_section, &self.offsets)
242            .await
243    }
244
245    fn try_read_sync(&self, position: u64) -> Option<V> {
246        self.guard
247            .try_read_sync(position, self.items_per_section, &self.offsets)
248    }
249
250    async fn replay(
251        &self,
252        buffer_size: NonZeroUsize,
253        start_pos: u64,
254    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
255        // Validate bounds.
256        if start_pos < self.guard.pruning_boundary {
257            return Err(Error::ItemPruned(start_pos));
258        }
259        if start_pos > self.guard.size {
260            return Err(Error::ItemOutOfRange(start_pos));
261        }
262
263        // Get the starting offset and section. For empty range (start_pos == size),
264        // use a section beyond existing data so data.replay returns empty naturally.
265        let (start_section, start_offset) = if start_pos < self.guard.size {
266            let offset = self.offsets.read(start_pos).await?;
267            let section = position_to_section(start_pos, self.items_per_section);
268            (section, offset)
269        } else {
270            (u64::MAX, 0)
271        };
272
273        let inner_stream = self
274            .guard
275            .data
276            .replay(start_section, start_offset, buffer_size)
277            .await?;
278
279        // Map the stream to add positions.
280        let stream = inner_stream
281            .zip(stream::iter(start_pos..))
282            .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
283
284        Ok(stream)
285    }
286}
287
288impl<E: Context, V: CodecShared> Journal<E, V> {
289    /// Initialize a contiguous variable journal.
290    ///
291    /// # Crash Recovery
292    ///
293    /// The data journal is the source of truth. If the offsets journal is inconsistent
294    /// it will be updated to match the data journal.
295    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
296        let items_per_section = cfg.items_per_section.get();
297        let data_partition = cfg.data_partition();
298        let offsets_partition = cfg.offsets_partition();
299
300        // Initialize underlying variable data journal
301        let mut data = variable::Journal::init(
302            context.with_label("data"),
303            variable::Config {
304                partition: data_partition,
305                compression: cfg.compression,
306                codec_config: cfg.codec_config,
307                page_cache: cfg.page_cache.clone(),
308                write_buffer: cfg.write_buffer,
309            },
310        )
311        .await?;
312
313        // Initialize offsets journal
314        let mut offsets = fixed::Journal::init(
315            context.with_label("offsets"),
316            fixed::Config {
317                partition: offsets_partition,
318                items_per_blob: cfg.items_per_section,
319                page_cache: cfg.page_cache,
320                write_buffer: cfg.write_buffer,
321            },
322        )
323        .await?;
324
325        // Validate and align offsets journal to match data journal
326        let (pruning_boundary, size) =
327            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
328
329        Ok(Self {
330            inner: UpgradableAsyncRwLock::new(Inner {
331                data,
332                size,
333                pruning_boundary,
334            }),
335            offsets,
336            items_per_section,
337            compression: cfg.compression,
338        })
339    }
340
341    /// Initialize an empty [Journal] at the given logical `size`.
342    ///
343    /// Returns a journal with journal.bounds() == Range{start: size, end: size}
344    /// and next append at position `size`.
345    #[commonware_macros::stability(ALPHA)]
346    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
347        // Initialize empty data journal
348        let data = variable::Journal::init(
349            context.with_label("data"),
350            variable::Config {
351                partition: cfg.data_partition(),
352                compression: cfg.compression,
353                codec_config: cfg.codec_config.clone(),
354                page_cache: cfg.page_cache.clone(),
355                write_buffer: cfg.write_buffer,
356            },
357        )
358        .await?;
359
360        // Initialize offsets journal at the target size
361        let offsets = fixed::Journal::init_at_size(
362            context.with_label("offsets"),
363            fixed::Config {
364                partition: cfg.offsets_partition(),
365                items_per_blob: cfg.items_per_section,
366                page_cache: cfg.page_cache,
367                write_buffer: cfg.write_buffer,
368            },
369            size,
370        )
371        .await?;
372
373        Ok(Self {
374            inner: UpgradableAsyncRwLock::new(Inner {
375                data,
376                size,
377                pruning_boundary: size,
378            }),
379            offsets,
380            items_per_section: cfg.items_per_section.get(),
381            compression: cfg.compression,
382        })
383    }
384
385    /// Initialize a [Journal] for use in state sync.
386    ///
387    /// The bounds are item locations (not section numbers). This function prepares the
388    /// on-disk journal so that subsequent appends go to the correct physical location for the
389    /// requested range.
390    ///
391    /// Behavior by existing on-disk state:
392    /// - Fresh (no data): returns an empty journal.
393    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
394    ///   empty journal.
395    /// - Overlap within [`range.start`, `range.end`]:
396    ///   - Prunes toward `range.start` (section-aligned, so some items before
397    ///     `range.start` may be retained)
398    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
399    ///
400    /// # Arguments
401    /// - `context`: storage context
402    /// - `cfg`: journal configuration
403    /// - `range`: range of item locations to retain
404    ///
405    /// # Returns
406    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
407    ///
408    /// # Errors
409    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
410    #[commonware_macros::stability(ALPHA)]
411    pub(crate) async fn init_sync(
412        context: E,
413        cfg: Config<V::Cfg>,
414        range: Range<u64>,
415    ) -> Result<Self, Error> {
416        assert!(!range.is_empty(), "range must not be empty");
417
418        debug!(
419            range.start,
420            range.end,
421            items_per_section = cfg.items_per_section.get(),
422            "initializing contiguous variable journal for sync"
423        );
424
425        // Initialize contiguous journal
426        let journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
427
428        let size = journal.size().await;
429
430        // No existing data - initialize at the start of the sync range if needed
431        if size == 0 {
432            if range.start == 0 {
433                debug!("no existing journal data, returning empty journal");
434                return Ok(journal);
435            } else {
436                debug!(
437                    range.start,
438                    "no existing journal data, initializing at sync range start"
439                );
440                journal.destroy().await?;
441                return Self::init_at_size(context, cfg, range.start).await;
442            }
443        }
444
445        // Check if data exceeds the sync range
446        if size > range.end {
447            return Err(Error::ItemOutOfRange(size));
448        }
449
450        // If all existing data is before our sync range, destroy and recreate fresh
451        if size <= range.start {
452            // All data is stale (ends at or before range.start)
453            debug!(
454                size,
455                range.start, "existing journal data is stale, re-initializing at start position"
456            );
457            journal.destroy().await?;
458            return Self::init_at_size(context, cfg, range.start).await;
459        }
460
461        // Prune to lower bound if needed
462        let bounds = journal.reader().await.bounds();
463        if !bounds.is_empty() && bounds.start < range.start {
464            debug!(
465                oldest_pos = bounds.start,
466                range.start, "pruning journal to sync range start"
467            );
468            journal.prune(range.start).await?;
469        }
470
471        Ok(journal)
472    }
473
474    /// Rewind the journal to the given size, discarding items from the end.
475    ///
476    /// After rewinding to size N, the journal will contain exactly N items, and the next append
477    /// will receive position N.
478    ///
479    /// # Errors
480    ///
481    /// Returns [Error::InvalidRewind] if `size` is larger than current size.
482    /// Returns [Error::ItemPruned] if `size` is smaller than the pruning boundary.
483    ///
484    /// # Warning
485    ///
486    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
487    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
488        let mut inner = self.inner.write().await;
489
490        // Validate rewind target
491        match size.cmp(&inner.size) {
492            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
493            std::cmp::Ordering::Equal => return Ok(()), // No-op
494            std::cmp::Ordering::Less => {}
495        }
496
497        // Rewind never updates the pruning boundary.
498        if size < inner.pruning_boundary {
499            return Err(Error::ItemPruned(size));
500        }
501
502        // Read the offset of the first item to discard (at position 'size').
503        let discard_offset = {
504            let offsets_reader = self.offsets.reader().await;
505            offsets_reader.read(size).await?
506        };
507        let discard_section = position_to_section(size, self.items_per_section);
508
509        inner
510            .data
511            .rewind_to_offset(discard_section, discard_offset)
512            .await?;
513        self.offsets.rewind(size).await?;
514
515        // Update our size
516        inner.size = size;
517
518        Ok(())
519    }
520
521    /// Append a new item to the journal, returning its position.
522    ///
523    /// The position returned is a stable, consecutively increasing value starting from 0.
524    /// This position remains constant after pruning.
525    ///
526    /// When a section becomes full, both the data journal and offsets journal are persisted
527    /// to maintain the invariant that all non-final sections are full and consistent.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the underlying storage operation fails or if the item cannot
532    /// be encoded.
533    ///
534    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
535    /// reopened to trigger alignment in [Journal::init].
536    pub async fn append(&self, item: &V) -> Result<u64, Error> {
537        self.append_many(Many::Flat(std::slice::from_ref(item)))
538            .await
539    }
540
541    /// Append items to the journal, returning the position of the last item appended.
542    ///
543    /// Acquires the write lock once for all items instead of per-item.
544    /// Returns [Error::EmptyAppend] if items is empty.
545    pub async fn append_many<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
546        if items.is_empty() {
547            return Err(Error::EmptyAppend);
548        }
549
550        // Encode before grabbing write guard.
551        let encode = |item: &V| variable::Journal::<E, V>::encode_item(self.compression, item);
552        let encoded: Vec<_> = match &items {
553            Many::Flat(s) => s.iter().map(encode).collect::<Result<Vec<_>, _>>()?,
554            Many::Nested(nested_items) => nested_items
555                .iter()
556                .flat_map(|items| items.iter())
557                .map(encode)
558                .collect::<Result<Vec<_>, _>>()?,
559        };
560
561        // Mutating operations are serialized by taking the write guard.
562        let mut inner = self.inner.write().await;
563
564        let mut last_position = 0;
565        for (index, (buf, _item_len)) in encoded.iter().enumerate() {
566            // Calculate which section this position belongs to.
567            let section = position_to_section(inner.size, self.items_per_section);
568
569            // Append pre-encoded data to the data journal, get offset.
570            let offset = inner.data.append_raw(section, buf).await?;
571
572            // Append offset to offsets journal.
573            let offsets_pos = self.offsets.append(&offset).await?;
574            assert_eq!(offsets_pos, inner.size);
575
576            // Return the current position.
577            last_position = inner.size;
578            inner.size += 1;
579
580            // The section was filled and must be synced. Downgrade so readers can continue
581            // during the sync while mutators remain blocked.
582            if inner.size.is_multiple_of(self.items_per_section) {
583                let inner_ref = inner.downgrade_to_upgradable();
584                futures::try_join!(inner_ref.data.sync(section), self.offsets.sync())?;
585                if index + 1 == encoded.len() {
586                    return Ok(last_position);
587                }
588                inner = inner_ref.upgrade().await;
589            }
590        }
591
592        Ok(last_position)
593    }
594
595    /// Acquire a reader guard that holds a consistent view of the journal.
596    pub async fn reader(&self) -> Reader<'_, E, V> {
597        Reader {
598            guard: self.inner.read().await,
599            offsets: self.offsets.reader().await,
600            items_per_section: self.items_per_section,
601        }
602    }
603
604    /// Return the total number of items in the journal, irrespective of pruning. The next value
605    /// appended to the journal will be at this position.
606    pub async fn size(&self) -> u64 {
607        self.inner.read().await.size
608    }
609
610    /// Prune items at positions strictly less than `min_position`.
611    ///
612    /// Returns `true` if any data was pruned, `false` otherwise.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if the underlying storage operation fails.
617    ///
618    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
619    /// reopened to trigger alignment in [Journal::init].
620    pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
621        let mut inner = self.inner.write().await;
622
623        if min_position <= inner.pruning_boundary {
624            return Ok(false);
625        }
626
627        // Cap min_position to size to maintain the invariant pruning_boundary <= size
628        let min_position = min_position.min(inner.size);
629
630        // Calculate section number
631        let min_section = position_to_section(min_position, self.items_per_section);
632
633        let pruned = inner.data.prune(min_section).await?;
634        if pruned {
635            let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
636            inner.pruning_boundary = new_oldest;
637            self.offsets.prune(new_oldest).await?;
638        }
639        Ok(pruned)
640    }
641
642    /// Durably persist the journal.
643    ///
644    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
645    /// before the next call to `sync()`.
646    pub async fn commit(&self) -> Result<(), Error> {
647        // Serialize with append/prune/rewind so section selection is stable, while still allowing
648        // concurrent readers.
649        let inner = self.inner.upgradable_read().await;
650
651        let section = position_to_section(inner.size, self.items_per_section);
652        inner.data.sync(section).await
653    }
654
655    /// Durably persist the journal and ensure recovery is not required on startup.
656    ///
657    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
658    pub async fn sync(&self) -> Result<(), Error> {
659        // Serialize with append/prune/rewind so section selection is stable, while still allowing
660        // concurrent readers.
661        let inner = self.inner.upgradable_read().await;
662
663        // Persist only the current (final) section of the data journal.
664        // All non-final sections are already persisted per Invariant #1.
665        let section = position_to_section(inner.size, self.items_per_section);
666
667        // Persist both journals concurrently. These journals may not exist yet if the
668        // previous section was just filled. This is checked internally.
669        futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
670
671        Ok(())
672    }
673
674    /// Remove any underlying blobs created by the journal.
675    ///
676    /// This destroys both the data journal and the offsets journal.
677    pub async fn destroy(self) -> Result<(), Error> {
678        let inner = self.inner.into_inner();
679        inner.data.destroy().await?;
680        self.offsets.destroy().await
681    }
682
683    /// Clear all data and reset the journal to a new starting position.
684    ///
685    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
686    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
687    #[commonware_macros::stability(ALPHA)]
688    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
689        let mut inner = self.inner.write().await;
690        inner.data.clear().await?;
691
692        self.offsets.clear_to_size(new_size).await?;
693        inner.size = new_size;
694        inner.pruning_boundary = new_size;
695        Ok(())
696    }
697
698    /// Align the offsets journal and data journal to be consistent in case a crash occurred
699    /// on a previous run and left the journals in an inconsistent state.
700    ///
701    /// The data journal is the source of truth. This function scans it to determine
702    /// what SHOULD be in the offsets journal, then fixes any mismatches.
703    ///
704    /// # Returns
705    ///
706    /// Returns `(pruning_boundary, size)` for the contiguous journal.
707    async fn align_journals(
708        data: &mut variable::Journal<E, V>,
709        offsets: &mut fixed::Journal<E, u64>,
710        items_per_section: u64,
711    ) -> Result<(u64, u64), Error> {
712        // === Handle empty data journal case ===
713        let items_in_last_section = match data.newest_section() {
714            Some(last_section) => {
715                let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
716                futures::pin_mut!(stream);
717                let mut count = 0u64;
718                while let Some(result) = stream.next().await {
719                    result?; // Propagate replay errors (corruption, etc.)
720                    count += 1;
721                }
722                count
723            }
724            None => 0,
725        };
726
727        // Data journal is empty if there are no sections or if there is one section and it has no items.
728        // The latter should only occur if a crash occured after opening a data journal blob but
729        // before writing to it.
730        let data_empty =
731            data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
732        if data_empty {
733            let offsets_bounds = {
734                let offsets_reader = offsets.reader().await;
735                offsets_reader.bounds()
736            };
737            let size = offsets_bounds.end;
738
739            if !data.is_empty() {
740                // A section exists but contains 0 items. This can happen in two cases:
741                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
742                // 2. First append crash: we opened the first section blob but crashed before writing to it
743                // In both cases, calculate target position from the first remaining section
744                // SAFETY: data is non-empty (checked above)
745                let data_first_section = data.oldest_section().unwrap();
746                let data_section_start = data_first_section * items_per_section;
747                let target_pos = data_section_start.max(offsets_bounds.start);
748
749                warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
750                offsets.clear_to_size(target_pos).await?;
751                return Ok((target_pos, target_pos));
752            }
753
754            // data.blobs is empty. This can happen in two cases:
755            // 1. We completely pruned the data journal but crashed before pruning
756            //    the offsets journal.
757            // 2. The data journal was never opened.
758            if !offsets_bounds.is_empty() && offsets_bounds.start < size {
759                // Offsets has unpruned entries but data is gone - clear to match empty state.
760                // We use clear_to_size (not prune) to ensure bounds.start == bounds.end,
761                // even when size is mid-section.
762                warn!("crash repair: clearing offsets to {size} (prune-all crash)");
763                offsets.clear_to_size(size).await?;
764            }
765
766            return Ok((size, size));
767        }
768
769        // === Handle non-empty data journal case ===
770        let data_first_section = data.oldest_section().unwrap();
771        let data_last_section = data.newest_section().unwrap();
772
773        // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index.
774        // This differs from offsets bounds start which can be mid-section after init_at_size.
775        let data_oldest_pos = data_first_section * items_per_section;
776
777        // Align pruning state
778        // We always prune data before offsets, so offsets should never be "ahead" by a section.
779        {
780            let offsets_bounds = {
781                let offsets_reader = offsets.reader().await;
782                offsets_reader.bounds()
783            };
784            match (
785                offsets_bounds.is_empty(),
786                offsets_bounds.start.cmp(&data_oldest_pos),
787            ) {
788                (true, _) => {
789                    // Offsets journal is empty but data journal isn't.
790                    // It should always be in the same section as the data journal, though.
791                    let offsets_first_section = offsets_bounds.start / items_per_section;
792                    if offsets_first_section != data_first_section {
793                        return Err(Error::Corruption(format!(
794                            "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
795                        )));
796                    }
797                    warn!(
798                        "crash repair: offsets journal empty at {}, will rebuild from data",
799                        offsets_bounds.start
800                    );
801                }
802                (false, std::cmp::Ordering::Less) => {
803                    // Offsets behind on pruning -- prune to catch up
804                    warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
805                    offsets.prune(data_oldest_pos).await?;
806                }
807                (false, std::cmp::Ordering::Greater) => {
808                    // Compare sections: same section = valid, different section = corruption.
809                    if offsets_bounds.start / items_per_section > data_first_section {
810                        return Err(Error::Corruption(format!(
811                            "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
812                            offsets_bounds.start
813                        )));
814                    }
815                }
816                (false, std::cmp::Ordering::Equal) => {
817                    // Both journals are pruned to the same position.
818                }
819            }
820        }
821
822        // Compute the correct logical size
823        // Uses bounds.start from offsets as the anchor because it tracks the exact starting
824        // position, which may be mid-section after init_at_size.
825        //
826        // Note: Corruption checks above ensure bounds.start is in data_first_section,
827        // so the subtraction in oldest_items cannot underflow.
828        // Re-fetch bounds since prune may have been called above.
829        let (offsets_bounds, data_size) = {
830            let offsets_reader = offsets.reader().await;
831            let offsets_bounds = offsets_reader.bounds();
832            let data_size = if data_first_section == data_last_section {
833                offsets_bounds.start + items_in_last_section
834            } else {
835                let oldest_items =
836                    (data_first_section + 1) * items_per_section - offsets_bounds.start;
837                let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
838                offsets_bounds.start + oldest_items + middle_items + items_in_last_section
839            };
840            (offsets_bounds, data_size)
841        };
842
843        // Align sizes
844        let offsets_size = offsets_bounds.end;
845        if offsets_size > data_size {
846            // Crashed after writing offsets but before writing data.
847            warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
848            offsets.rewind(data_size).await?;
849        } else if offsets_size < data_size {
850            // Crashed after writing data but before writing offsets.
851            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
852        }
853
854        // Final invariant checks
855        let pruning_boundary = {
856            let offsets_reader = offsets.reader().await;
857            let offsets_bounds = offsets_reader.bounds();
858            assert_eq!(offsets_bounds.end, data_size);
859
860            // After alignment, offsets and data must be in the same section.
861            // We return bounds.start from offsets as the true boundary.
862            assert!(
863                !offsets_bounds.is_empty(),
864                "offsets should have data after alignment"
865            );
866            assert_eq!(
867                offsets_bounds.start / items_per_section,
868                data_first_section,
869                "offsets and data should be in same oldest section"
870            );
871            offsets_bounds.start
872        };
873
874        offsets.sync().await?;
875        Ok((pruning_boundary, data_size))
876    }
877
878    /// Rebuild missing offset entries by replaying the data journal and
879    /// appending the missing entries to the offsets journal.
880    ///
881    /// The data journal is the source of truth. This function brings the offsets
882    /// journal up to date by replaying data items and indexing their positions.
883    ///
884    /// # Warning
885    ///
886    /// - Panics if data journal is empty
887    /// - Panics if `offsets_size` >= `data.size()`
888    async fn add_missing_offsets(
889        data: &variable::Journal<E, V>,
890        offsets: &mut fixed::Journal<E, u64>,
891        offsets_size: u64,
892        items_per_section: u64,
893    ) -> Result<(), Error> {
894        assert!(
895            !data.is_empty(),
896            "rebuild_offsets called with empty data journal"
897        );
898
899        // Find where to start replaying
900        let (start_section, resume_offset, skip_first) = {
901            let offsets_reader = offsets.reader().await;
902            let offsets_bounds = offsets_reader.bounds();
903            if offsets_bounds.is_empty() {
904                // Offsets empty -- start from first data section
905                // SAFETY: data is non-empty (checked above)
906                let first_section = data.oldest_section().unwrap();
907                (first_section, 0, false)
908            } else if offsets_bounds.start < offsets_size {
909                // Offsets has items -- resume from last indexed position
910                let last_offset = offsets_reader.read(offsets_size - 1).await?;
911                let last_section = position_to_section(offsets_size - 1, items_per_section);
912                (last_section, last_offset, true)
913            } else {
914                // Offsets fully pruned but data has items -- start from first data section
915                // SAFETY: data is non-empty (checked above)
916                let first_section = data.oldest_section().unwrap();
917                (first_section, 0, false)
918            }
919        };
920
921        // Replay data journal from start position through the end and index all items.
922        // The data journal is the source of truth, so we consume the entire stream.
923        // (replay streams from start_section onwards through all subsequent sections)
924        let stream = data
925            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
926            .await?;
927        futures::pin_mut!(stream);
928
929        let mut skipped_first = false;
930        while let Some(result) = stream.next().await {
931            let (_section, offset, _size, _item) = result?;
932
933            // Skip first item if resuming from last indexed offset
934            if skip_first && !skipped_first {
935                skipped_first = true;
936                continue;
937            }
938
939            offsets.append(&offset).await?;
940        }
941
942        Ok(())
943    }
944}
945
946// Implement Contiguous trait for variable-length items
947impl<E: Context, V: CodecShared> Contiguous for Journal<E, V> {
948    type Item = V;
949
950    async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
951        Self::reader(self).await
952    }
953
954    async fn size(&self) -> u64 {
955        Self::size(self).await
956    }
957}
958
959impl<E: Context, V: CodecShared> Mutable for Journal<E, V> {
960    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
961        Self::append(self, item).await
962    }
963
964    async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
965        Self::append_many(self, items).await
966    }
967
968    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
969        Self::prune(self, min_position).await
970    }
971
972    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
973        Self::rewind(self, size).await
974    }
975}
976
977impl<E: Context, V: CodecShared> Persistable for Journal<E, V> {
978    type Error = Error;
979
980    async fn commit(&self) -> Result<(), Error> {
981        self.commit().await
982    }
983
984    async fn sync(&self) -> Result<(), Error> {
985        self.sync().await
986    }
987
988    async fn destroy(self) -> Result<(), Error> {
989        self.destroy().await
990    }
991}
992
993#[commonware_macros::stability(ALPHA)]
994impl<E: Context, V: CodecShared> crate::journal::authenticated::Inner<E> for Journal<E, V> {
995    type Config = Config<V::Cfg>;
996
997    async fn init<F: crate::merkle::Family, H: commonware_cryptography::Hasher>(
998        context: E,
999        merkle_cfg: crate::merkle::journaled::Config,
1000        journal_cfg: Self::Config,
1001        rewind_predicate: fn(&V) -> bool,
1002    ) -> Result<
1003        crate::journal::authenticated::Journal<F, E, Self, H>,
1004        crate::journal::authenticated::Error<F>,
1005    > {
1006        crate::journal::authenticated::Journal::<F, E, Self, H>::new(
1007            context,
1008            merkle_cfg,
1009            journal_cfg,
1010            rewind_predicate,
1011        )
1012        .await
1013    }
1014}
1015
1016#[cfg(test)]
1017impl<E: Context, V: CodecShared> Journal<E, V> {
1018    /// Test helper: Read the item at the given position.
1019    pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
1020        self.reader().await.read(position).await
1021    }
1022
1023    /// Test helper: Return the bounds of the journal.
1024    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1025        self.reader().await.bounds()
1026    }
1027
1028    /// Test helper: Prune the internal data journal directly (simulates crash scenario).
1029    pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
1030        let mut inner = self.inner.write().await;
1031        inner.data.prune(section).await
1032    }
1033
1034    /// Test helper: Prune the internal offsets journal directly (simulates crash scenario).
1035    pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
1036        self.offsets.prune(position).await
1037    }
1038
1039    /// Test helper: Rewind the internal offsets journal directly (simulates crash scenario).
1040    pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
1041        self.offsets.rewind(position).await
1042    }
1043
1044    /// Test helper: Get the size of the internal offsets journal.
1045    pub(crate) async fn test_offsets_size(&self) -> u64 {
1046        self.offsets.size().await
1047    }
1048
1049    /// Test helper: Append directly to the internal data journal (simulates crash scenario).
1050    pub(crate) async fn test_append_data(
1051        &self,
1052        section: u64,
1053        item: V,
1054    ) -> Result<(u64, u32), Error> {
1055        let mut inner = self.inner.write().await;
1056        inner.data.append(section, &item).await
1057    }
1058
1059    /// Test helper: Sync the internal data journal.
1060    pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
1061        let inner = self.inner.read().await;
1062        inner
1063            .data
1064            .sync(inner.data.newest_section().unwrap_or(0))
1065            .await
1066    }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072    use crate::journal::contiguous::tests::run_contiguous_tests;
1073    use commonware_macros::test_traced;
1074    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner, Storage};
1075    use commonware_utils::{NZUsize, NZU16, NZU64};
1076    use futures::FutureExt as _;
1077    use std::num::NonZeroU16;
1078
1079    // Use some jank sizes to exercise boundary conditions.
1080    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1081    const PAGE_CACHE_SIZE: usize = 2;
1082    // Larger page sizes for tests that need more buffer space.
1083    const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1084    const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1085
1086    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
1087    ///
1088    /// When the offsets partition is completely lost and the data has been pruned, we cannot
1089    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
1090    /// This is a genuine external failure that should be detected and reported clearly.
1091    #[test_traced]
1092    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1093        let executor = deterministic::Runner::default();
1094        executor.start(|context| async move {
1095            let cfg = Config {
1096                partition: "offsets-loss-after-prune".into(),
1097                items_per_section: NZU64!(10),
1098                compression: None,
1099                codec_config: (),
1100                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1101                write_buffer: NZUsize!(1024),
1102            };
1103
1104            // === Phase 1: Create journal with data and prune ===
1105            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1106                .await
1107                .unwrap();
1108
1109            // Append 40 items across 4 sections (0-3)
1110            for i in 0..40u64 {
1111                journal.append(&(i * 100)).await.unwrap();
1112            }
1113
1114            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
1115            journal.prune(20).await.unwrap();
1116            let bounds = journal.bounds().await;
1117            assert_eq!(bounds.start, 20);
1118            assert_eq!(bounds.end, 40);
1119
1120            journal.sync().await.unwrap();
1121            drop(journal);
1122
1123            // === Phase 2: Simulate complete offsets partition loss ===
1124            // Remove both the offsets data partition and its metadata partition
1125            context
1126                .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1127                .await
1128                .expect("Failed to remove offsets blobs partition");
1129            context
1130                .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1131                .await
1132                .expect("Failed to remove offsets metadata partition");
1133
1134            // === Phase 3: Verify this is detected as unrecoverable ===
1135            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1136            assert!(matches!(result, Err(Error::Corruption(_))));
1137        });
1138    }
1139
1140    /// Test that init aligns state when data is pruned/lost but offsets survives.
1141    ///
1142    /// This handles both:
1143    /// 1. Crash during prune-all (data pruned, offsets not yet)
1144    /// 2. External data partition loss
1145    ///
1146    /// In both cases, we align by pruning offsets to match.
1147    #[test_traced]
1148    fn test_variable_align_data_offsets_mismatch() {
1149        let executor = deterministic::Runner::default();
1150        executor.start(|context| async move {
1151            let cfg = Config {
1152                partition: "data-loss-test".into(),
1153                items_per_section: NZU64!(10),
1154                compression: None,
1155                codec_config: (),
1156                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1157                write_buffer: NZUsize!(1024),
1158            };
1159
1160            // === Setup: Create journal with data ===
1161            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1162                .await
1163                .unwrap();
1164
1165            // Append 20 items across 2 sections
1166            for i in 0..20u64 {
1167                variable.append(&(i * 100)).await.unwrap();
1168            }
1169
1170            variable.sync().await.unwrap();
1171            drop(variable);
1172
1173            // === Simulate data loss: Delete data partition but keep offsets ===
1174            context
1175                .remove(&cfg.data_partition(), None)
1176                .await
1177                .expect("Failed to remove data partition");
1178
1179            // === Verify init aligns the mismatch ===
1180            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1181                .await
1182                .expect("Should align offsets to match empty data");
1183
1184            // Size should be preserved
1185            assert_eq!(journal.size().await, 20);
1186
1187            // But no items remain (both journals pruned)
1188            assert!(journal.bounds().await.is_empty());
1189
1190            // All reads should fail with ItemPruned
1191            for i in 0..20 {
1192                assert!(matches!(
1193                    journal.read(i).await,
1194                    Err(crate::journal::Error::ItemPruned(_))
1195                ));
1196            }
1197
1198            // Can append new data starting at position 20
1199            let pos = journal.append(&999).await.unwrap();
1200            assert_eq!(pos, 20);
1201            assert_eq!(journal.read(20).await.unwrap(), 999);
1202
1203            journal.destroy().await.unwrap();
1204        });
1205    }
1206
1207    /// Test replay behavior for variable-length items.
1208    #[test_traced]
1209    fn test_variable_replay() {
1210        let executor = deterministic::Runner::default();
1211        executor.start(|context| async move {
1212            let cfg = Config {
1213                partition: "replay".into(),
1214                items_per_section: NZU64!(10),
1215                compression: None,
1216                codec_config: (),
1217                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1218                write_buffer: NZUsize!(1024),
1219            };
1220
1221            // Initialize journal
1222            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1223
1224            // Append 40 items across 4 sections (0-3)
1225            for i in 0..40u64 {
1226                journal.append(&(i * 100)).await.unwrap();
1227            }
1228
1229            // Test 1: Full replay
1230            {
1231                let reader = journal.reader().await;
1232                let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1233                futures::pin_mut!(stream);
1234                for i in 0..40u64 {
1235                    let (pos, item) = stream.next().await.unwrap().unwrap();
1236                    assert_eq!(pos, i);
1237                    assert_eq!(item, i * 100);
1238                }
1239                assert!(stream.next().await.is_none());
1240            }
1241
1242            // Test 2: Partial replay from middle of section
1243            {
1244                let reader = journal.reader().await;
1245                let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1246                futures::pin_mut!(stream);
1247                for i in 15..40u64 {
1248                    let (pos, item) = stream.next().await.unwrap().unwrap();
1249                    assert_eq!(pos, i);
1250                    assert_eq!(item, i * 100);
1251                }
1252                assert!(stream.next().await.is_none());
1253            }
1254
1255            // Test 3: Partial replay from section boundary
1256            {
1257                let reader = journal.reader().await;
1258                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1259                futures::pin_mut!(stream);
1260                for i in 20..40u64 {
1261                    let (pos, item) = stream.next().await.unwrap().unwrap();
1262                    assert_eq!(pos, i);
1263                    assert_eq!(item, i * 100);
1264                }
1265                assert!(stream.next().await.is_none());
1266            }
1267
1268            // Test 4: Prune and verify replay from pruned
1269            journal.prune(20).await.unwrap();
1270            {
1271                let reader = journal.reader().await;
1272                let res = reader.replay(NZUsize!(20), 0).await;
1273                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1274            }
1275            {
1276                let reader = journal.reader().await;
1277                let res = reader.replay(NZUsize!(20), 19).await;
1278                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1279            }
1280
1281            // Test 5: Replay from exactly at pruning boundary after prune
1282            {
1283                let reader = journal.reader().await;
1284                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1285                futures::pin_mut!(stream);
1286                for i in 20..40u64 {
1287                    let (pos, item) = stream.next().await.unwrap().unwrap();
1288                    assert_eq!(pos, i);
1289                    assert_eq!(item, i * 100);
1290                }
1291                assert!(stream.next().await.is_none());
1292            }
1293
1294            // Test 6: Replay from the end
1295            {
1296                let reader = journal.reader().await;
1297                let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1298                futures::pin_mut!(stream);
1299                assert!(stream.next().await.is_none());
1300            }
1301
1302            // Test 7: Replay beyond the end (should error)
1303            {
1304                let reader = journal.reader().await;
1305                let res = reader.replay(NZUsize!(20), 41).await;
1306                assert!(matches!(
1307                    res,
1308                    Err(crate::journal::Error::ItemOutOfRange(41))
1309                ));
1310            }
1311
1312            journal.destroy().await.unwrap();
1313        });
1314    }
1315
1316    #[test_traced]
1317    fn test_variable_contiguous() {
1318        let executor = deterministic::Runner::default();
1319        executor.start(|context| async move {
1320            run_contiguous_tests(move |test_name: String, idx: usize| {
1321                let label = test_name.replace('-', "_");
1322                let context = context.with_label(&format!("{label}_{idx}"));
1323                async move {
1324                    let cfg = Config {
1325                        partition: format!("generic-test-{test_name}"),
1326                        items_per_section: NZU64!(10),
1327                        compression: None,
1328                        codec_config: (),
1329                        page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1330                        write_buffer: NZUsize!(1024),
1331                    };
1332                    Journal::<_, u64>::init(context, cfg).await
1333                }
1334                .boxed()
1335            })
1336            .await;
1337        });
1338    }
1339
1340    /// Test multiple sequential prunes with Variable-specific guarantees.
1341    #[test_traced]
1342    fn test_variable_multiple_sequential_prunes() {
1343        let executor = deterministic::Runner::default();
1344        executor.start(|context| async move {
1345            let cfg = Config {
1346                partition: "sequential-prunes".into(),
1347                items_per_section: NZU64!(10),
1348                compression: None,
1349                codec_config: (),
1350                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1351                write_buffer: NZUsize!(1024),
1352            };
1353
1354            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1355
1356            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1357            for i in 0..40u64 {
1358                journal.append(&(i * 100)).await.unwrap();
1359            }
1360
1361            // Initial state: all items accessible
1362            let bounds = journal.bounds().await;
1363            assert_eq!(bounds.start, 0);
1364            assert_eq!(bounds.end, 40);
1365
1366            // First prune: remove section 0 (positions 0-9)
1367            let pruned = journal.prune(10).await.unwrap();
1368            assert!(pruned);
1369
1370            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1371            assert_eq!(journal.bounds().await.start, 10);
1372
1373            // Items 0-9 should be pruned, 10+ should be accessible
1374            assert!(matches!(
1375                journal.read(0).await,
1376                Err(crate::journal::Error::ItemPruned(_))
1377            ));
1378            assert_eq!(journal.read(10).await.unwrap(), 1000);
1379            assert_eq!(journal.read(19).await.unwrap(), 1900);
1380
1381            // Second prune: remove section 1 (positions 10-19)
1382            let pruned = journal.prune(20).await.unwrap();
1383            assert!(pruned);
1384
1385            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1386            assert_eq!(journal.bounds().await.start, 20);
1387
1388            // Items 0-19 should be pruned, 20+ should be accessible
1389            assert!(matches!(
1390                journal.read(10).await,
1391                Err(crate::journal::Error::ItemPruned(_))
1392            ));
1393            assert!(matches!(
1394                journal.read(19).await,
1395                Err(crate::journal::Error::ItemPruned(_))
1396            ));
1397            assert_eq!(journal.read(20).await.unwrap(), 2000);
1398            assert_eq!(journal.read(29).await.unwrap(), 2900);
1399
1400            // Third prune: remove section 2 (positions 20-29)
1401            let pruned = journal.prune(30).await.unwrap();
1402            assert!(pruned);
1403
1404            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1405            assert_eq!(journal.bounds().await.start, 30);
1406
1407            // Items 0-29 should be pruned, 30+ should be accessible
1408            assert!(matches!(
1409                journal.read(20).await,
1410                Err(crate::journal::Error::ItemPruned(_))
1411            ));
1412            assert!(matches!(
1413                journal.read(29).await,
1414                Err(crate::journal::Error::ItemPruned(_))
1415            ));
1416            assert_eq!(journal.read(30).await.unwrap(), 3000);
1417            assert_eq!(journal.read(39).await.unwrap(), 3900);
1418
1419            // Size should still be 40 (pruning doesn't affect size)
1420            assert_eq!(journal.size().await, 40);
1421
1422            journal.destroy().await.unwrap();
1423        });
1424    }
1425
1426    /// Test that pruning all data and re-initializing preserves positions.
1427    #[test_traced]
1428    fn test_variable_prune_all_then_reinit() {
1429        let executor = deterministic::Runner::default();
1430        executor.start(|context| async move {
1431            let cfg = Config {
1432                partition: "prune-all-reinit".into(),
1433                items_per_section: NZU64!(10),
1434                compression: None,
1435                codec_config: (),
1436                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1437                write_buffer: NZUsize!(1024),
1438            };
1439
1440            // === Phase 1: Create journal and append data ===
1441            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1442                .await
1443                .unwrap();
1444
1445            for i in 0..100u64 {
1446                journal.append(&(i * 100)).await.unwrap();
1447            }
1448
1449            let bounds = journal.bounds().await;
1450            assert_eq!(bounds.end, 100);
1451            assert_eq!(bounds.start, 0);
1452
1453            // === Phase 2: Prune all data ===
1454            let pruned = journal.prune(100).await.unwrap();
1455            assert!(pruned);
1456
1457            // All data is pruned - no items remain
1458            let bounds = journal.bounds().await;
1459            assert_eq!(bounds.end, 100);
1460            assert!(bounds.is_empty());
1461
1462            // All reads should fail with ItemPruned
1463            for i in 0..100 {
1464                assert!(matches!(
1465                    journal.read(i).await,
1466                    Err(crate::journal::Error::ItemPruned(_))
1467                ));
1468            }
1469
1470            journal.sync().await.unwrap();
1471            drop(journal);
1472
1473            // === Phase 3: Re-init and verify position preserved ===
1474            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1475                .await
1476                .unwrap();
1477
1478            // Size should be preserved, but no items remain
1479            let bounds = journal.bounds().await;
1480            assert_eq!(bounds.end, 100);
1481            assert!(bounds.is_empty());
1482
1483            // All reads should still fail
1484            for i in 0..100 {
1485                assert!(matches!(
1486                    journal.read(i).await,
1487                    Err(crate::journal::Error::ItemPruned(_))
1488                ));
1489            }
1490
1491            // === Phase 4: Append new data ===
1492            // Next append should get position 100
1493            journal.append(&10000).await.unwrap();
1494            let bounds = journal.bounds().await;
1495            assert_eq!(bounds.end, 101);
1496            // Now we have one item at position 100
1497            assert_eq!(bounds.start, 100);
1498
1499            // Can read the new item
1500            assert_eq!(journal.read(100).await.unwrap(), 10000);
1501
1502            // Old positions still fail
1503            assert!(matches!(
1504                journal.read(99).await,
1505                Err(crate::journal::Error::ItemPruned(_))
1506            ));
1507
1508            journal.destroy().await.unwrap();
1509        });
1510    }
1511
1512    /// Test recovery from crash after data journal pruned but before offsets journal.
1513    #[test_traced]
1514    fn test_variable_recovery_prune_crash_offsets_behind() {
1515        let executor = deterministic::Runner::default();
1516        executor.start(|context| async move {
1517            // === Setup: Create Variable wrapper with data ===
1518            let cfg = Config {
1519                partition: "recovery-prune-crash".into(),
1520                items_per_section: NZU64!(10),
1521                compression: None,
1522                codec_config: (),
1523                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1524                write_buffer: NZUsize!(1024),
1525            };
1526
1527            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1528                .await
1529                .unwrap();
1530
1531            // Append 40 items across 4 sections to both journals
1532            for i in 0..40u64 {
1533                variable.append(&(i * 100)).await.unwrap();
1534            }
1535
1536            // Prune to position 10 normally (both data and offsets journals pruned)
1537            variable.prune(10).await.unwrap();
1538            assert_eq!(variable.bounds().await.start, 10);
1539
1540            // === Simulate crash: Prune data journal but not offsets journal ===
1541            // Manually prune data journal to section 2 (position 20)
1542            variable.test_prune_data(2).await.unwrap();
1543            // Offsets journal still has data from position 10-19
1544
1545            variable.sync().await.unwrap();
1546            drop(variable);
1547
1548            // === Verify recovery ===
1549            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1550                .await
1551                .unwrap();
1552
1553            // Init should auto-repair: offsets journal pruned to match data journal
1554            let bounds = variable.bounds().await;
1555            assert_eq!(bounds.start, 20);
1556            assert_eq!(bounds.end, 40);
1557
1558            // Reads before position 20 should fail (pruned from both journals)
1559            assert!(matches!(
1560                variable.read(10).await,
1561                Err(crate::journal::Error::ItemPruned(_))
1562            ));
1563
1564            // Reads at position 20+ should succeed
1565            assert_eq!(variable.read(20).await.unwrap(), 2000);
1566            assert_eq!(variable.read(39).await.unwrap(), 3900);
1567
1568            variable.destroy().await.unwrap();
1569        });
1570    }
1571
1572    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1573    ///
1574    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1575    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1576    #[test_traced]
1577    fn test_variable_recovery_offsets_ahead_corruption() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context| async move {
1580            // === Setup: Create Variable wrapper with data ===
1581            let cfg = Config {
1582                partition: "recovery-offsets-ahead".into(),
1583                items_per_section: NZU64!(10),
1584                compression: None,
1585                codec_config: (),
1586                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1587                write_buffer: NZUsize!(1024),
1588            };
1589
1590            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1591                .await
1592                .unwrap();
1593
1594            // Append 40 items across 4 sections to both journals
1595            for i in 0..40u64 {
1596                variable.append(&(i * 100)).await.unwrap();
1597            }
1598
1599            // Prune offsets journal ahead of data journal (impossible state)
1600            variable.test_prune_offsets(20).await.unwrap(); // Prune to position 20
1601            variable.test_prune_data(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1602
1603            variable.sync().await.unwrap();
1604            drop(variable);
1605
1606            // === Verify corruption detected ===
1607            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1608            assert!(matches!(result, Err(Error::Corruption(_))));
1609        });
1610    }
1611
1612    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1613    #[test_traced]
1614    fn test_variable_recovery_append_crash_offsets_behind() {
1615        let executor = deterministic::Runner::default();
1616        executor.start(|context| async move {
1617            // === Setup: Create Variable wrapper with partial data ===
1618            let cfg = Config {
1619                partition: "recovery-append-crash".into(),
1620                items_per_section: NZU64!(10),
1621                compression: None,
1622                codec_config: (),
1623                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1624                write_buffer: NZUsize!(1024),
1625            };
1626
1627            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1628                .await
1629                .unwrap();
1630
1631            // Append 15 items to both journals (fills section 0, partial section 1)
1632            for i in 0..15u64 {
1633                variable.append(&(i * 100)).await.unwrap();
1634            }
1635
1636            assert_eq!(variable.size().await, 15);
1637
1638            // Manually append 5 more items directly to data journal only
1639            for i in 15..20u64 {
1640                variable.test_append_data(1, i * 100).await.unwrap();
1641            }
1642            // Offsets journal still has only 15 entries
1643
1644            variable.sync().await.unwrap();
1645            drop(variable);
1646
1647            // === Verify recovery ===
1648            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1649                .await
1650                .unwrap();
1651
1652            // Init should rebuild offsets journal from data journal replay
1653            let bounds = variable.bounds().await;
1654            assert_eq!(bounds.end, 20);
1655            assert_eq!(bounds.start, 0);
1656
1657            // All items should be readable from both journals
1658            for i in 0..20u64 {
1659                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1660            }
1661
1662            // Offsets journal should be fully rebuilt to match data journal
1663            assert_eq!(variable.test_offsets_size().await, 20);
1664
1665            variable.destroy().await.unwrap();
1666        });
1667    }
1668
1669    /// Test recovery from multiple prune operations with crash.
1670    #[test_traced]
1671    fn test_variable_recovery_multiple_prunes_crash() {
1672        let executor = deterministic::Runner::default();
1673        executor.start(|context| async move {
1674            // === Setup: Create Variable wrapper with data ===
1675            let cfg = Config {
1676                partition: "recovery-multiple-prunes".into(),
1677                items_per_section: NZU64!(10),
1678                compression: None,
1679                codec_config: (),
1680                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1681                write_buffer: NZUsize!(1024),
1682            };
1683
1684            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1685                .await
1686                .unwrap();
1687
1688            // Append 50 items across 5 sections to both journals
1689            for i in 0..50u64 {
1690                variable.append(&(i * 100)).await.unwrap();
1691            }
1692
1693            // Prune to position 10 normally (both data and offsets journals pruned)
1694            variable.prune(10).await.unwrap();
1695            assert_eq!(variable.bounds().await.start, 10);
1696
1697            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1698            // Manually prune data journal to section 3 (position 30)
1699            variable.test_prune_data(3).await.unwrap();
1700            // Offsets journal still thinks oldest is position 10
1701
1702            variable.sync().await.unwrap();
1703            drop(variable);
1704
1705            // === Verify recovery ===
1706            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1707                .await
1708                .unwrap();
1709
1710            // Init should auto-repair: offsets journal pruned to match data journal
1711            let bounds = variable.bounds().await;
1712            assert_eq!(bounds.start, 30);
1713            assert_eq!(bounds.end, 50);
1714
1715            // Reads before position 30 should fail (pruned from both journals)
1716            assert!(matches!(
1717                variable.read(10).await,
1718                Err(crate::journal::Error::ItemPruned(_))
1719            ));
1720            assert!(matches!(
1721                variable.read(20).await,
1722                Err(crate::journal::Error::ItemPruned(_))
1723            ));
1724
1725            // Reads at position 30+ should succeed
1726            assert_eq!(variable.read(30).await.unwrap(), 3000);
1727            assert_eq!(variable.read(49).await.unwrap(), 4900);
1728
1729            variable.destroy().await.unwrap();
1730        });
1731    }
1732
1733    /// Test recovery from crash during rewind operation.
1734    ///
1735    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1736    /// This creates a situation where offsets journal has been rewound but data journal still
1737    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1738    /// offsets index across all sections to match the data journal.
1739    #[test_traced]
1740    fn test_variable_recovery_rewind_crash_multi_section() {
1741        let executor = deterministic::Runner::default();
1742        executor.start(|context| async move {
1743            // === Setup: Create Variable wrapper with data across multiple sections ===
1744            let cfg = Config {
1745                partition: "recovery-rewind-crash".into(),
1746                items_per_section: NZU64!(10),
1747                compression: None,
1748                codec_config: (),
1749                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1750                write_buffer: NZUsize!(1024),
1751            };
1752
1753            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1754                .await
1755                .unwrap();
1756
1757            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1758            for i in 0..25u64 {
1759                variable.append(&(i * 100)).await.unwrap();
1760            }
1761
1762            assert_eq!(variable.size().await, 25);
1763
1764            // === Simulate crash during rewind(5) ===
1765            // Rewind offsets journal to size 5 (keeps positions 0-4)
1766            variable.test_rewind_offsets(5).await.unwrap();
1767            // CRASH before data.rewind() completes - data still has all 3 sections
1768
1769            variable.sync().await.unwrap();
1770            drop(variable);
1771
1772            // === Verify recovery ===
1773            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1774                .await
1775                .unwrap();
1776
1777            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1778            let bounds = variable.bounds().await;
1779            assert_eq!(bounds.end, 25);
1780            assert_eq!(bounds.start, 0);
1781
1782            // All items should be readable - offsets rebuilt correctly across all sections
1783            for i in 0..25u64 {
1784                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1785            }
1786
1787            // Verify offsets journal fully rebuilt
1788            assert_eq!(variable.test_offsets_size().await, 25);
1789
1790            // Verify next append gets position 25
1791            let pos = variable.append(&2500).await.unwrap();
1792            assert_eq!(pos, 25);
1793            assert_eq!(variable.read(25).await.unwrap(), 2500);
1794
1795            variable.destroy().await.unwrap();
1796        });
1797    }
1798
1799    /// Test recovery from crash after data sync but before offsets sync when journal was
1800    /// previously emptied by pruning.
1801    #[test_traced]
1802    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1803        let executor = deterministic::Runner::default();
1804        executor.start(|context| async move {
1805            let cfg = Config {
1806                partition: "recovery-empty-after-prune".into(),
1807                items_per_section: NZU64!(10),
1808                compression: None,
1809                codec_config: (),
1810                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1811                write_buffer: NZUsize!(1024),
1812            };
1813
1814            // === Phase 1: Create journal with one full section ===
1815            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1816                .await
1817                .unwrap();
1818
1819            // Append 10 items (positions 0-9), fills section 0
1820            for i in 0..10u64 {
1821                journal.append(&(i * 100)).await.unwrap();
1822            }
1823            let bounds = journal.bounds().await;
1824            assert_eq!(bounds.end, 10);
1825            assert_eq!(bounds.start, 0);
1826
1827            // === Phase 2: Prune to create empty journal ===
1828            journal.prune(10).await.unwrap();
1829            let bounds = journal.bounds().await;
1830            assert_eq!(bounds.end, 10);
1831            assert!(bounds.is_empty()); // Empty!
1832
1833            // === Phase 3: Append directly to data journal to simulate crash ===
1834            // Manually append to data journal only (bypassing Variable's append logic)
1835            // This simulates the case where data was synced but offsets wasn't
1836            for i in 10..20u64 {
1837                journal.test_append_data(1, i * 100).await.unwrap();
1838            }
1839            // Sync the data journal (section 1)
1840            journal.test_sync_data().await.unwrap();
1841            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1842
1843            // Close without syncing offsets
1844            drop(journal);
1845
1846            // === Phase 4: Verify recovery succeeds ===
1847            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1848                .await
1849                .expect("Should recover from crash after data sync but before offsets sync");
1850
1851            // All data should be recovered
1852            let bounds = journal.bounds().await;
1853            assert_eq!(bounds.end, 20);
1854            assert_eq!(bounds.start, 10);
1855
1856            // All items from position 10-19 should be readable
1857            for i in 10..20u64 {
1858                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1859            }
1860
1861            // Items 0-9 should be pruned
1862            for i in 0..10 {
1863                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1864            }
1865
1866            journal.destroy().await.unwrap();
1867        });
1868    }
1869
1870    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1871    #[test_traced]
1872    fn test_variable_concurrent_sync_recovery() {
1873        let executor = deterministic::Runner::default();
1874        executor.start(|context| async move {
1875            let cfg = Config {
1876                partition: "concurrent-sync-recovery".into(),
1877                items_per_section: NZU64!(10),
1878                compression: None,
1879                codec_config: (),
1880                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1881                write_buffer: NZUsize!(1024),
1882            };
1883
1884            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1885                .await
1886                .unwrap();
1887
1888            // Append items across a section boundary
1889            for i in 0..15u64 {
1890                journal.append(&(i * 100)).await.unwrap();
1891            }
1892
1893            // Manually sync only data to simulate crash during concurrent sync
1894            journal.commit().await.unwrap();
1895
1896            // Simulate a crash (offsets not synced)
1897            drop(journal);
1898
1899            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1900                .await
1901                .unwrap();
1902
1903            // Data should be intact and offsets rebuilt
1904            assert_eq!(journal.size().await, 15);
1905            for i in 0..15u64 {
1906                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1907            }
1908
1909            journal.destroy().await.unwrap();
1910        });
1911    }
1912
1913    #[test_traced]
1914    fn test_init_at_size_zero() {
1915        let executor = deterministic::Runner::default();
1916        executor.start(|context| async move {
1917            let cfg = Config {
1918                partition: "init-at-size-zero".into(),
1919                items_per_section: NZU64!(5),
1920                compression: None,
1921                codec_config: (),
1922                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1923                write_buffer: NZUsize!(1024),
1924            };
1925
1926            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1927                .await
1928                .unwrap();
1929
1930            // Size should be 0
1931            assert_eq!(journal.size().await, 0);
1932
1933            // No oldest retained position (empty journal)
1934            assert!(journal.bounds().await.is_empty());
1935
1936            // Next append should get position 0
1937            let pos = journal.append(&100).await.unwrap();
1938            assert_eq!(pos, 0);
1939            assert_eq!(journal.size().await, 1);
1940            assert_eq!(journal.read(0).await.unwrap(), 100);
1941
1942            journal.destroy().await.unwrap();
1943        });
1944    }
1945
1946    #[test_traced]
1947    fn test_init_at_size_section_boundary() {
1948        let executor = deterministic::Runner::default();
1949        executor.start(|context| async move {
1950            let cfg = Config {
1951                partition: "init-at-size-boundary".into(),
1952                items_per_section: NZU64!(5),
1953                compression: None,
1954                codec_config: (),
1955                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1956                write_buffer: NZUsize!(1024),
1957            };
1958
1959            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1960            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1961                .await
1962                .unwrap();
1963
1964            // Size should be 10
1965            let bounds = journal.bounds().await;
1966            assert_eq!(bounds.end, 10);
1967
1968            // No data yet, so no oldest retained position
1969            assert!(bounds.is_empty());
1970
1971            // Next append should get position 10
1972            let pos = journal.append(&1000).await.unwrap();
1973            assert_eq!(pos, 10);
1974            assert_eq!(journal.size().await, 11);
1975            assert_eq!(journal.read(10).await.unwrap(), 1000);
1976
1977            // Can continue appending
1978            let pos = journal.append(&1001).await.unwrap();
1979            assert_eq!(pos, 11);
1980            assert_eq!(journal.read(11).await.unwrap(), 1001);
1981
1982            journal.destroy().await.unwrap();
1983        });
1984    }
1985
1986    #[test_traced]
1987    fn test_init_at_size_mid_section() {
1988        let executor = deterministic::Runner::default();
1989        executor.start(|context| async move {
1990            let cfg = Config {
1991                partition: "init-at-size-mid".into(),
1992                items_per_section: NZU64!(5),
1993                compression: None,
1994                codec_config: (),
1995                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1996                write_buffer: NZUsize!(1024),
1997            };
1998
1999            // Initialize at position 7 (middle of section 1 with items_per_section=5)
2000            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
2001                .await
2002                .unwrap();
2003
2004            // Size should be 7
2005            let bounds = journal.bounds().await;
2006            assert_eq!(bounds.end, 7);
2007
2008            // No data yet, so no oldest retained position
2009            assert!(bounds.is_empty());
2010
2011            // Next append should get position 7
2012            let pos = journal.append(&700).await.unwrap();
2013            assert_eq!(pos, 7);
2014            assert_eq!(journal.size().await, 8);
2015            assert_eq!(journal.read(7).await.unwrap(), 700);
2016
2017            journal.destroy().await.unwrap();
2018        });
2019    }
2020
2021    #[test_traced]
2022    fn test_init_at_size_persistence() {
2023        let executor = deterministic::Runner::default();
2024        executor.start(|context| async move {
2025            let cfg = Config {
2026                partition: "init-at-size-persist".into(),
2027                items_per_section: NZU64!(5),
2028                compression: None,
2029                codec_config: (),
2030                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2031                write_buffer: NZUsize!(1024),
2032            };
2033
2034            // Initialize at position 15
2035            let journal =
2036                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2037                    .await
2038                    .unwrap();
2039
2040            // Append some items
2041            for i in 0..5u64 {
2042                let pos = journal.append(&(1500 + i)).await.unwrap();
2043                assert_eq!(pos, 15 + i);
2044            }
2045
2046            assert_eq!(journal.size().await, 20);
2047
2048            // Sync and reopen
2049            journal.sync().await.unwrap();
2050            drop(journal);
2051
2052            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2053                .await
2054                .unwrap();
2055
2056            // Size and data should be preserved
2057            let bounds = journal.bounds().await;
2058            assert_eq!(bounds.end, 20);
2059            assert_eq!(bounds.start, 15);
2060
2061            // Verify data
2062            for i in 0..5u64 {
2063                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
2064            }
2065
2066            // Can continue appending
2067            let pos = journal.append(&9999).await.unwrap();
2068            assert_eq!(pos, 20);
2069            assert_eq!(journal.read(20).await.unwrap(), 9999);
2070
2071            journal.destroy().await.unwrap();
2072        });
2073    }
2074
2075    #[test_traced]
2076    fn test_init_at_size_persistence_without_data() {
2077        let executor = deterministic::Runner::default();
2078        executor.start(|context| async move {
2079            let cfg = Config {
2080                partition: "init-at-size-persist-empty".into(),
2081                items_per_section: NZU64!(5),
2082                compression: None,
2083                codec_config: (),
2084                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2085                write_buffer: NZUsize!(1024),
2086            };
2087
2088            // Initialize at position 15
2089            let journal =
2090                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2091                    .await
2092                    .unwrap();
2093
2094            let bounds = journal.bounds().await;
2095            assert_eq!(bounds.end, 15);
2096            assert!(bounds.is_empty());
2097
2098            // Drop without writing any data
2099            drop(journal);
2100
2101            // Reopen and verify size persisted
2102            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2103                .await
2104                .unwrap();
2105
2106            let bounds = journal.bounds().await;
2107            assert_eq!(bounds.end, 15);
2108            assert!(bounds.is_empty());
2109
2110            // Can append starting at position 15
2111            let pos = journal.append(&1500).await.unwrap();
2112            assert_eq!(pos, 15);
2113            assert_eq!(journal.read(15).await.unwrap(), 1500);
2114
2115            journal.destroy().await.unwrap();
2116        });
2117    }
2118
2119    /// Test init_at_size with mid-section value persists correctly across restart.
2120    #[test_traced]
2121    fn test_init_at_size_mid_section_persistence() {
2122        let executor = deterministic::Runner::default();
2123        executor.start(|context| async move {
2124            let cfg = Config {
2125                partition: "init-at-size-mid-section".into(),
2126                items_per_section: NZU64!(5),
2127                compression: None,
2128                codec_config: (),
2129                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2130                write_buffer: NZUsize!(1024),
2131            };
2132
2133            // Initialize at position 7 (mid-section, 7 % 5 = 2)
2134            let journal =
2135                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2136                    .await
2137                    .unwrap();
2138
2139            // Append 3 items at positions 7, 8, 9 (fills rest of section 1)
2140            for i in 0..3u64 {
2141                let pos = journal.append(&(700 + i)).await.unwrap();
2142                assert_eq!(pos, 7 + i);
2143            }
2144
2145            let bounds = journal.bounds().await;
2146            assert_eq!(bounds.end, 10);
2147            assert_eq!(bounds.start, 7);
2148
2149            // Sync and reopen
2150            journal.sync().await.unwrap();
2151            drop(journal);
2152
2153            // Reopen
2154            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2155                .await
2156                .unwrap();
2157
2158            // Size and bounds.start should be preserved correctly
2159            let bounds = journal.bounds().await;
2160            assert_eq!(bounds.end, 10);
2161            assert_eq!(bounds.start, 7);
2162
2163            // Verify data
2164            for i in 0..3u64 {
2165                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2166            }
2167
2168            // Positions before 7 should be pruned
2169            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2170
2171            journal.destroy().await.unwrap();
2172        });
2173    }
2174
2175    /// Test init_at_size mid-section with data spanning multiple sections.
2176    #[test_traced]
2177    fn test_init_at_size_mid_section_multi_section_persistence() {
2178        let executor = deterministic::Runner::default();
2179        executor.start(|context| async move {
2180            let cfg = Config {
2181                partition: "init-at-size-multi-section".into(),
2182                items_per_section: NZU64!(5),
2183                compression: None,
2184                codec_config: (),
2185                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2186                write_buffer: NZUsize!(1024),
2187            };
2188
2189            // Initialize at position 7 (mid-section)
2190            let journal =
2191                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2192                    .await
2193                    .unwrap();
2194
2195            // Append 8 items: positions 7-14 (section 1: 3 items, section 2: 5 items)
2196            for i in 0..8u64 {
2197                let pos = journal.append(&(700 + i)).await.unwrap();
2198                assert_eq!(pos, 7 + i);
2199            }
2200
2201            let bounds = journal.bounds().await;
2202            assert_eq!(bounds.end, 15);
2203            assert_eq!(bounds.start, 7);
2204
2205            // Sync and reopen
2206            journal.sync().await.unwrap();
2207            drop(journal);
2208
2209            // Reopen
2210            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2211                .await
2212                .unwrap();
2213
2214            // Verify state preserved
2215            let bounds = journal.bounds().await;
2216            assert_eq!(bounds.end, 15);
2217            assert_eq!(bounds.start, 7);
2218
2219            // Verify all data
2220            for i in 0..8u64 {
2221                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2222            }
2223
2224            journal.destroy().await.unwrap();
2225        });
2226    }
2227
2228    /// Regression test: data-empty crash repair must preserve mid-section pruning boundary.
2229    #[test_traced]
2230    fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2231        let executor = deterministic::Runner::default();
2232        executor.start(|context| async move {
2233            let cfg = Config {
2234                partition: "align-journals-mid-section-pruning-boundary".into(),
2235                items_per_section: NZU64!(5),
2236                compression: None,
2237                codec_config: (),
2238                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2239                write_buffer: NZUsize!(1024),
2240            };
2241
2242            // Phase 1: Create data and offsets, then simulate data-only pruning crash.
2243            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2244                .await
2245                .unwrap();
2246            for i in 0..7u64 {
2247                journal.append(&(100 + i)).await.unwrap();
2248            }
2249            journal.sync().await.unwrap();
2250
2251            // Simulate crash after data was cleared but before offsets were pruned.
2252            journal.inner.write().await.data.clear().await.unwrap();
2253            drop(journal);
2254
2255            // Phase 2: Init triggers data-empty repair and should treat journal as fully pruned at size 7.
2256            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2257                .await
2258                .unwrap();
2259            let bounds = journal.bounds().await;
2260            assert_eq!(bounds.end, 7);
2261            assert!(bounds.is_empty());
2262
2263            // Append one item at position 7.
2264            let pos = journal.append(&777).await.unwrap();
2265            assert_eq!(pos, 7);
2266            assert_eq!(journal.size().await, 8);
2267            assert_eq!(journal.read(7).await.unwrap(), 777);
2268
2269            // Sync only the data journal to simulate a crash before offsets are synced.
2270            let section = 7 / cfg.items_per_section.get();
2271            journal
2272                .inner
2273                .write()
2274                .await
2275                .data
2276                .sync(section)
2277                .await
2278                .unwrap();
2279            drop(journal);
2280
2281            // Phase 3: Reopen and verify we did not lose the appended item.
2282            let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2283                .await
2284                .unwrap();
2285            let bounds = journal.bounds().await;
2286            assert_eq!(bounds.end, 8);
2287            assert_eq!(bounds.start, 7);
2288            assert_eq!(journal.read(7).await.unwrap(), 777);
2289
2290            journal.destroy().await.unwrap();
2291        });
2292    }
2293
2294    /// Test crash recovery: init_at_size + append + crash with data synced but offsets not.
2295    #[test_traced]
2296    fn test_init_at_size_crash_data_synced_offsets_not() {
2297        let executor = deterministic::Runner::default();
2298        executor.start(|context| async move {
2299            let cfg = Config {
2300                partition: "init-at-size-crash-recovery".into(),
2301                items_per_section: NZU64!(5),
2302                compression: None,
2303                codec_config: (),
2304                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2305                write_buffer: NZUsize!(1024),
2306            };
2307
2308            // Initialize at position 7 (mid-section)
2309            let journal =
2310                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2311                    .await
2312                    .unwrap();
2313
2314            // Append 3 items
2315            for i in 0..3u64 {
2316                journal.append(&(700 + i)).await.unwrap();
2317            }
2318
2319            // Sync only the data journal, not offsets (simulate crash)
2320            journal.inner.write().await.data.sync(1).await.unwrap();
2321            // Don't sync offsets - simulates crash after data write but before offsets write
2322            drop(journal);
2323
2324            // Reopen - should recover by rebuilding offsets from data
2325            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2326                .await
2327                .unwrap();
2328
2329            // Verify recovery
2330            let bounds = journal.bounds().await;
2331            assert_eq!(bounds.end, 10);
2332            assert_eq!(bounds.start, 7);
2333
2334            // Verify data is accessible
2335            for i in 0..3u64 {
2336                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2337            }
2338
2339            journal.destroy().await.unwrap();
2340        });
2341    }
2342
2343    #[test_traced]
2344    fn test_prune_does_not_move_oldest_retained_backwards() {
2345        let executor = deterministic::Runner::default();
2346        executor.start(|context| async move {
2347            let cfg = Config {
2348                partition: "prune-no-backwards".into(),
2349                items_per_section: NZU64!(5),
2350                compression: None,
2351                codec_config: (),
2352                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2353                write_buffer: NZUsize!(1024),
2354            };
2355
2356            let journal =
2357                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2358                    .await
2359                    .unwrap();
2360
2361            // Append a few items at positions 7..9
2362            for i in 0..3u64 {
2363                let pos = journal.append(&(700 + i)).await.unwrap();
2364                assert_eq!(pos, 7 + i);
2365            }
2366            assert_eq!(journal.bounds().await.start, 7);
2367
2368            // Prune to a position within the same section should not move bounds.start backwards.
2369            journal.prune(8).await.unwrap();
2370            assert_eq!(journal.bounds().await.start, 7);
2371            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2372            assert_eq!(journal.read(7).await.unwrap(), 700);
2373
2374            journal.destroy().await.unwrap();
2375        });
2376    }
2377
2378    #[test_traced]
2379    fn test_init_at_size_large_offset() {
2380        let executor = deterministic::Runner::default();
2381        executor.start(|context| async move {
2382            let cfg = Config {
2383                partition: "init-at-size-large".into(),
2384                items_per_section: NZU64!(5),
2385                compression: None,
2386                codec_config: (),
2387                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2388                write_buffer: NZUsize!(1024),
2389            };
2390
2391            // Initialize at a large position (position 1000)
2392            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2393                .await
2394                .unwrap();
2395
2396            let bounds = journal.bounds().await;
2397            assert_eq!(bounds.end, 1000);
2398            // No data yet, so no oldest retained position
2399            assert!(bounds.is_empty());
2400
2401            // Next append should get position 1000
2402            let pos = journal.append(&100000).await.unwrap();
2403            assert_eq!(pos, 1000);
2404            assert_eq!(journal.read(1000).await.unwrap(), 100000);
2405
2406            journal.destroy().await.unwrap();
2407        });
2408    }
2409
2410    #[test_traced]
2411    fn test_init_at_size_prune_and_append() {
2412        let executor = deterministic::Runner::default();
2413        executor.start(|context| async move {
2414            let cfg = Config {
2415                partition: "init-at-size-prune".into(),
2416                items_per_section: NZU64!(5),
2417                compression: None,
2418                codec_config: (),
2419                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2420                write_buffer: NZUsize!(1024),
2421            };
2422
2423            // Initialize at position 20
2424            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2425                .await
2426                .unwrap();
2427
2428            // Append items 20-29
2429            for i in 0..10u64 {
2430                journal.append(&(2000 + i)).await.unwrap();
2431            }
2432
2433            assert_eq!(journal.size().await, 30);
2434
2435            // Prune to position 25
2436            journal.prune(25).await.unwrap();
2437
2438            let bounds = journal.bounds().await;
2439            assert_eq!(bounds.end, 30);
2440            assert_eq!(bounds.start, 25);
2441
2442            // Verify remaining items are readable
2443            for i in 25..30u64 {
2444                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2445            }
2446
2447            // Continue appending
2448            let pos = journal.append(&3000).await.unwrap();
2449            assert_eq!(pos, 30);
2450
2451            journal.destroy().await.unwrap();
2452        });
2453    }
2454
2455    /// Test `init_sync` when there is no existing data on disk.
2456    #[test_traced]
2457    fn test_init_sync_no_existing_data() {
2458        let executor = deterministic::Runner::default();
2459        executor.start(|context| async move {
2460            let cfg = Config {
2461                partition: "test-fresh-start".into(),
2462                items_per_section: NZU64!(5),
2463                compression: None,
2464                codec_config: (),
2465                write_buffer: NZUsize!(1024),
2466                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2467            };
2468
2469            // Initialize journal with sync boundaries when no existing data exists
2470            let lower_bound = 10;
2471            let upper_bound = 26;
2472            let journal =
2473                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2474                    .await
2475                    .expect("Failed to initialize journal with sync boundaries");
2476
2477            let bounds = journal.bounds().await;
2478            assert_eq!(bounds.end, lower_bound);
2479            assert!(bounds.is_empty());
2480
2481            // Append items using the contiguous API
2482            let pos1 = journal.append(&42u64).await.unwrap();
2483            assert_eq!(pos1, lower_bound);
2484            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2485
2486            let pos2 = journal.append(&43u64).await.unwrap();
2487            assert_eq!(pos2, lower_bound + 1);
2488            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2489
2490            journal.destroy().await.unwrap();
2491        });
2492    }
2493
2494    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
2495    #[test_traced]
2496    fn test_init_sync_existing_data_overlap() {
2497        let executor = deterministic::Runner::default();
2498        executor.start(|context| async move {
2499            let cfg = Config {
2500                partition: "test-overlap".into(),
2501                items_per_section: NZU64!(5),
2502                compression: None,
2503                codec_config: (),
2504                write_buffer: NZUsize!(1024),
2505                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2506            };
2507
2508            // Create initial journal with data in multiple sections
2509            let journal =
2510                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2511                    .await
2512                    .expect("Failed to create initial journal");
2513
2514            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2515            for i in 0..20u64 {
2516                journal.append(&(i * 100)).await.unwrap();
2517            }
2518            journal.sync().await.unwrap();
2519            drop(journal);
2520
2521            // Initialize with sync boundaries that overlap with existing data
2522            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
2523            let lower_bound = 8;
2524            let upper_bound = 31;
2525            let journal = Journal::<deterministic::Context, u64>::init_sync(
2526                context.clone(),
2527                cfg.clone(),
2528                lower_bound..upper_bound,
2529            )
2530            .await
2531            .expect("Failed to initialize journal with overlap");
2532
2533            assert_eq!(journal.size().await, 20);
2534
2535            // Verify oldest retained is pruned to lower_bound's section boundary (5)
2536            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2537
2538            // Verify data integrity: positions before 5 are pruned
2539            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2540            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2541
2542            // Positions 5-19 should be accessible
2543            assert_eq!(journal.read(5).await.unwrap(), 500);
2544            assert_eq!(journal.read(8).await.unwrap(), 800);
2545            assert_eq!(journal.read(19).await.unwrap(), 1900);
2546
2547            // Position 20+ should not exist yet
2548            assert!(matches!(
2549                journal.read(20).await,
2550                Err(Error::ItemOutOfRange(_))
2551            ));
2552
2553            // Assert journal can accept new items
2554            let pos = journal.append(&999).await.unwrap();
2555            assert_eq!(pos, 20);
2556            assert_eq!(journal.read(20).await.unwrap(), 999);
2557
2558            journal.destroy().await.unwrap();
2559        });
2560    }
2561
2562    /// Test `init_sync` with invalid parameters.
2563    #[should_panic]
2564    #[test_traced]
2565    fn test_init_sync_invalid_parameters() {
2566        let executor = deterministic::Runner::default();
2567        executor.start(|context| async move {
2568            let cfg = Config {
2569                partition: "test-invalid".into(),
2570                items_per_section: NZU64!(5),
2571                compression: None,
2572                codec_config: (),
2573                write_buffer: NZUsize!(1024),
2574                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2575            };
2576
2577            #[allow(clippy::reversed_empty_ranges)]
2578            let _result = Journal::<deterministic::Context, u64>::init_sync(
2579                context.clone(),
2580                cfg,
2581                10..5, // invalid range: lower > upper
2582            )
2583            .await;
2584        });
2585    }
2586
2587    /// Test `init_sync` when existing data exactly matches the sync range.
2588    #[test_traced]
2589    fn test_init_sync_existing_data_exact_match() {
2590        let executor = deterministic::Runner::default();
2591        executor.start(|context| async move {
2592            let items_per_section = NZU64!(5);
2593            let cfg = Config {
2594                partition: "test-exact-match".into(),
2595                items_per_section,
2596                compression: None,
2597                codec_config: (),
2598                write_buffer: NZUsize!(1024),
2599                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2600            };
2601
2602            // Create initial journal with data exactly matching sync range
2603            let journal =
2604                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2605                    .await
2606                    .expect("Failed to create initial journal");
2607
2608            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2609            for i in 0..20u64 {
2610                journal.append(&(i * 100)).await.unwrap();
2611            }
2612            journal.sync().await.unwrap();
2613            drop(journal);
2614
2615            // Initialize with sync boundaries that exactly match existing data
2616            let lower_bound = 5; // section 1
2617            let upper_bound = 20; // section 3
2618            let journal = Journal::<deterministic::Context, u64>::init_sync(
2619                context.clone(),
2620                cfg.clone(),
2621                lower_bound..upper_bound,
2622            )
2623            .await
2624            .expect("Failed to initialize journal with exact match");
2625
2626            assert_eq!(journal.size().await, 20);
2627
2628            // Verify pruning to lower bound (section 1 boundary = position 5)
2629            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2630
2631            // Verify positions before 5 are pruned
2632            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2633            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2634
2635            // Positions 5-19 should be accessible
2636            assert_eq!(journal.read(5).await.unwrap(), 500);
2637            assert_eq!(journal.read(10).await.unwrap(), 1000);
2638            assert_eq!(journal.read(19).await.unwrap(), 1900);
2639
2640            // Position 20+ should not exist yet
2641            assert!(matches!(
2642                journal.read(20).await,
2643                Err(Error::ItemOutOfRange(_))
2644            ));
2645
2646            // Assert journal can accept new operations
2647            let pos = journal.append(&999).await.unwrap();
2648            assert_eq!(pos, 20);
2649            assert_eq!(journal.read(20).await.unwrap(), 999);
2650
2651            journal.destroy().await.unwrap();
2652        });
2653    }
2654
2655    /// Test `init_sync` when existing data exceeds the sync target range.
2656    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2657    #[test_traced]
2658    fn test_init_sync_existing_data_exceeds_upper_bound() {
2659        let executor = deterministic::Runner::default();
2660        executor.start(|context| async move {
2661            let items_per_section = NZU64!(5);
2662            let cfg = Config {
2663                partition: "test-unexpected-data".into(),
2664                items_per_section,
2665                compression: None,
2666                codec_config: (),
2667                write_buffer: NZUsize!(1024),
2668                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2669            };
2670
2671            // Create initial journal with data beyond sync range
2672            let journal = Journal::<deterministic::Context, u64>::init(
2673                context.with_label("initial"),
2674                cfg.clone(),
2675            )
2676            .await
2677            .expect("Failed to create initial journal");
2678
2679            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2680            for i in 0..30u64 {
2681                journal.append(&(i * 1000)).await.unwrap();
2682            }
2683            journal.sync().await.unwrap();
2684            drop(journal);
2685
2686            // Initialize with sync boundaries that are exceeded by existing data
2687            let lower_bound = 8; // section 1
2688            for (i, upper_bound) in (9..29).enumerate() {
2689                let result = Journal::<deterministic::Context, u64>::init_sync(
2690                    context.with_label(&format!("sync_{i}")),
2691                    cfg.clone(),
2692                    lower_bound..upper_bound,
2693                )
2694                .await;
2695
2696                // Should return ItemOutOfRange error since data exists beyond upper_bound
2697                assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2698            }
2699        });
2700    }
2701
2702    /// Test `init_sync` when all existing data is stale (before lower bound).
2703    #[test_traced]
2704    fn test_init_sync_existing_data_stale() {
2705        let executor = deterministic::Runner::default();
2706        executor.start(|context| async move {
2707            let items_per_section = NZU64!(5);
2708            let cfg = Config {
2709                partition: "test-stale".into(),
2710                items_per_section,
2711                compression: None,
2712                codec_config: (),
2713                write_buffer: NZUsize!(1024),
2714                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2715            };
2716
2717            // Create initial journal with stale data
2718            let journal = Journal::<deterministic::Context, u64>::init(
2719                context.with_label("first"),
2720                cfg.clone(),
2721            )
2722            .await
2723            .expect("Failed to create initial journal");
2724
2725            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2726            for i in 0..10u64 {
2727                journal.append(&(i * 100)).await.unwrap();
2728            }
2729            journal.sync().await.unwrap();
2730            drop(journal);
2731
2732            // Initialize with sync boundaries beyond all existing data
2733            let lower_bound = 15; // section 3
2734            let upper_bound = 26; // last element in section 5
2735            let journal = Journal::<deterministic::Context, u64>::init_sync(
2736                context.with_label("second"),
2737                cfg.clone(),
2738                lower_bound..upper_bound,
2739            )
2740            .await
2741            .expect("Failed to initialize journal with stale data");
2742
2743            assert_eq!(journal.size().await, 15);
2744
2745            // Verify fresh journal (all old data destroyed, starts at position 15)
2746            assert!(journal.bounds().await.is_empty());
2747
2748            // Verify old positions don't exist
2749            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2750            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2751            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2752
2753            journal.destroy().await.unwrap();
2754        });
2755    }
2756
2757    /// Test `init_sync` with section boundary edge cases.
2758    #[test_traced]
2759    fn test_init_sync_section_boundaries() {
2760        let executor = deterministic::Runner::default();
2761        executor.start(|context| async move {
2762            let items_per_section = NZU64!(5);
2763            let cfg = Config {
2764                partition: "test-boundaries".into(),
2765                items_per_section,
2766                compression: None,
2767                codec_config: (),
2768                write_buffer: NZUsize!(1024),
2769                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2770            };
2771
2772            // Create journal with data at section boundaries
2773            let journal =
2774                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2775                    .await
2776                    .expect("Failed to create initial journal");
2777
2778            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2779            for i in 0..25u64 {
2780                journal.append(&(i * 100)).await.unwrap();
2781            }
2782            journal.sync().await.unwrap();
2783            drop(journal);
2784
2785            // Test sync boundaries exactly at section boundaries
2786            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2787            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2788            let journal = Journal::<deterministic::Context, u64>::init_sync(
2789                context.clone(),
2790                cfg.clone(),
2791                lower_bound..upper_bound,
2792            )
2793            .await
2794            .expect("Failed to initialize journal at boundaries");
2795
2796            assert_eq!(journal.size().await, 25);
2797
2798            // Verify oldest retained is at section 3 boundary (position 15)
2799            assert_eq!(journal.bounds().await.start, 15);
2800
2801            // Verify positions before 15 are pruned
2802            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2803            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2804
2805            // Verify positions 15-24 are accessible
2806            assert_eq!(journal.read(15).await.unwrap(), 1500);
2807            assert_eq!(journal.read(20).await.unwrap(), 2000);
2808            assert_eq!(journal.read(24).await.unwrap(), 2400);
2809
2810            // Position 25+ should not exist yet
2811            assert!(matches!(
2812                journal.read(25).await,
2813                Err(Error::ItemOutOfRange(_))
2814            ));
2815
2816            // Assert journal can accept new operations
2817            let pos = journal.append(&999).await.unwrap();
2818            assert_eq!(pos, 25);
2819            assert_eq!(journal.read(25).await.unwrap(), 999);
2820
2821            journal.destroy().await.unwrap();
2822        });
2823    }
2824
2825    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2826    #[test_traced]
2827    fn test_init_sync_same_section_bounds() {
2828        let executor = deterministic::Runner::default();
2829        executor.start(|context| async move {
2830            let items_per_section = NZU64!(5);
2831            let cfg = Config {
2832                partition: "test-same-section".into(),
2833                items_per_section,
2834                compression: None,
2835                codec_config: (),
2836                write_buffer: NZUsize!(1024),
2837                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2838            };
2839
2840            // Create journal with data in multiple sections
2841            let journal =
2842                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2843                    .await
2844                    .expect("Failed to create initial journal");
2845
2846            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2847            for i in 0..15u64 {
2848                journal.append(&(i * 100)).await.unwrap();
2849            }
2850            journal.sync().await.unwrap();
2851            drop(journal);
2852
2853            // Test sync boundaries within the same section
2854            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2855            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2856            let journal = Journal::<deterministic::Context, u64>::init_sync(
2857                context.clone(),
2858                cfg.clone(),
2859                lower_bound..upper_bound,
2860            )
2861            .await
2862            .expect("Failed to initialize journal with same-section bounds");
2863
2864            assert_eq!(journal.size().await, 15);
2865
2866            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2867            // Oldest retained position should be at section 2 boundary (position 10)
2868            assert_eq!(journal.bounds().await.start, 10);
2869
2870            // Verify positions before 10 are pruned
2871            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2872            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2873
2874            // Verify positions 10-14 are accessible
2875            assert_eq!(journal.read(10).await.unwrap(), 1000);
2876            assert_eq!(journal.read(11).await.unwrap(), 1100);
2877            assert_eq!(journal.read(14).await.unwrap(), 1400);
2878
2879            // Position 15+ should not exist yet
2880            assert!(matches!(
2881                journal.read(15).await,
2882                Err(Error::ItemOutOfRange(_))
2883            ));
2884
2885            // Assert journal can accept new operations
2886            let pos = journal.append(&999).await.unwrap();
2887            assert_eq!(pos, 15);
2888            assert_eq!(journal.read(15).await.unwrap(), 999);
2889
2890            journal.destroy().await.unwrap();
2891        });
2892    }
2893
2894    /// Test contiguous variable journal with items_per_section=1.
2895    ///
2896    /// This is a regression test for a bug where reading from size()-1 fails
2897    /// when using items_per_section=1, particularly after pruning and restart.
2898    #[test_traced]
2899    fn test_single_item_per_section() {
2900        let executor = deterministic::Runner::default();
2901        executor.start(|context| async move {
2902            let cfg = Config {
2903                partition: "single-item-per-section".into(),
2904                items_per_section: NZU64!(1),
2905                compression: None,
2906                codec_config: (),
2907                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2908                write_buffer: NZUsize!(1024),
2909            };
2910
2911            // === Test 1: Basic single item operation ===
2912            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2913                .await
2914                .unwrap();
2915
2916            // Verify empty state
2917            let bounds = journal.bounds().await;
2918            assert_eq!(bounds.end, 0);
2919            assert!(bounds.is_empty());
2920
2921            // Append 1 item (value = position * 100, so position 0 has value 0)
2922            let pos = journal.append(&0).await.unwrap();
2923            assert_eq!(pos, 0);
2924            assert_eq!(journal.size().await, 1);
2925
2926            // Sync
2927            journal.sync().await.unwrap();
2928
2929            // Read from size() - 1
2930            let value = journal.read(journal.size().await - 1).await.unwrap();
2931            assert_eq!(value, 0);
2932
2933            // === Test 2: Multiple items with single item per section ===
2934            for i in 1..10u64 {
2935                let pos = journal.append(&(i * 100)).await.unwrap();
2936                assert_eq!(pos, i);
2937                assert_eq!(journal.size().await, i + 1);
2938
2939                // Verify we can read the just-appended item at size() - 1
2940                let value = journal.read(journal.size().await - 1).await.unwrap();
2941                assert_eq!(value, i * 100);
2942            }
2943
2944            // Verify all items can be read
2945            for i in 0..10u64 {
2946                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2947            }
2948
2949            journal.sync().await.unwrap();
2950
2951            // === Test 3: Pruning with single item per section ===
2952            // Prune to position 5 (removes positions 0-4)
2953            let pruned = journal.prune(5).await.unwrap();
2954            assert!(pruned);
2955
2956            // Size should still be 10
2957            assert_eq!(journal.size().await, 10);
2958
2959            // bounds.start should be 5
2960            assert_eq!(journal.bounds().await.start, 5);
2961
2962            // Reading from bounds.end - 1 (position 9) should still work
2963            let value = journal.read(journal.size().await - 1).await.unwrap();
2964            assert_eq!(value, 900);
2965
2966            // Reading from pruned positions should return ItemPruned
2967            for i in 0..5 {
2968                assert!(matches!(
2969                    journal.read(i).await,
2970                    Err(crate::journal::Error::ItemPruned(_))
2971                ));
2972            }
2973
2974            // Reading from retained positions should work
2975            for i in 5..10u64 {
2976                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2977            }
2978
2979            // Append more items after pruning
2980            for i in 10..15u64 {
2981                let pos = journal.append(&(i * 100)).await.unwrap();
2982                assert_eq!(pos, i);
2983
2984                // Verify we can read from size() - 1
2985                let value = journal.read(journal.size().await - 1).await.unwrap();
2986                assert_eq!(value, i * 100);
2987            }
2988
2989            journal.sync().await.unwrap();
2990            drop(journal);
2991
2992            // === Test 4: Restart persistence with single item per section ===
2993            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2994                .await
2995                .unwrap();
2996
2997            // Verify size is preserved
2998            assert_eq!(journal.size().await, 15);
2999
3000            // Verify bounds.start is preserved
3001            assert_eq!(journal.bounds().await.start, 5);
3002
3003            // Reading from bounds.end - 1 should work after restart
3004            let value = journal.read(journal.size().await - 1).await.unwrap();
3005            assert_eq!(value, 1400);
3006
3007            // Reading all retained positions should work
3008            for i in 5..15u64 {
3009                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3010            }
3011
3012            journal.destroy().await.unwrap();
3013
3014            // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) ===
3015            // Fresh journal for this test
3016            let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
3017                .await
3018                .unwrap();
3019
3020            // Append 10 items (positions 0-9)
3021            for i in 0..10u64 {
3022                journal.append(&(i * 1000)).await.unwrap();
3023            }
3024
3025            // Prune to position 5 (removes positions 0-4)
3026            journal.prune(5).await.unwrap();
3027            let bounds = journal.bounds().await;
3028            assert_eq!(bounds.end, 10);
3029            assert_eq!(bounds.start, 5);
3030
3031            // Sync and restart
3032            journal.sync().await.unwrap();
3033            drop(journal);
3034
3035            // Re-open journal
3036            let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
3037                .await
3038                .unwrap();
3039
3040            // Verify state after restart
3041            let bounds = journal.bounds().await;
3042            assert_eq!(bounds.end, 10);
3043            assert_eq!(bounds.start, 5);
3044
3045            // KEY TEST: Reading from bounds.end - 1 (position 9) should work
3046            let value = journal.read(journal.size().await - 1).await.unwrap();
3047            assert_eq!(value, 9000);
3048
3049            // Verify all retained positions (5-9) work
3050            for i in 5..10u64 {
3051                assert_eq!(journal.read(i).await.unwrap(), i * 1000);
3052            }
3053
3054            journal.destroy().await.unwrap();
3055
3056            // === Test 6: Prune all items (edge case) ===
3057            // This tests the scenario where prune removes everything.
3058            // Callers must check bounds().is_empty() before reading.
3059            let journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
3060                .await
3061                .unwrap();
3062
3063            for i in 0..5u64 {
3064                journal.append(&(i * 100)).await.unwrap();
3065            }
3066            journal.sync().await.unwrap();
3067
3068            // Prune all items
3069            journal.prune(5).await.unwrap();
3070            let bounds = journal.bounds().await;
3071            assert_eq!(bounds.end, 5); // Size unchanged
3072            assert!(bounds.is_empty()); // All pruned
3073
3074            // bounds.end - 1 = 4, but position 4 is pruned
3075            let result = journal.read(journal.size().await - 1).await;
3076            assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
3077
3078            // After appending, reading works again
3079            journal.append(&500).await.unwrap();
3080            let bounds = journal.bounds().await;
3081            assert_eq!(bounds.start, 5);
3082            assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3083
3084            journal.destroy().await.unwrap();
3085        });
3086    }
3087
3088    #[test_traced]
3089    fn test_variable_journal_clear_to_size() {
3090        let executor = deterministic::Runner::default();
3091        executor.start(|context| async move {
3092            let cfg = Config {
3093                partition: "clear-test".into(),
3094                items_per_section: NZU64!(10),
3095                compression: None,
3096                codec_config: (),
3097                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3098                write_buffer: NZUsize!(1024),
3099            };
3100
3101            let journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
3102                .await
3103                .unwrap();
3104
3105            // Append 25 items (spanning multiple sections)
3106            for i in 0..25u64 {
3107                journal.append(&(i * 100)).await.unwrap();
3108            }
3109            let bounds = journal.bounds().await;
3110            assert_eq!(bounds.end, 25);
3111            assert_eq!(bounds.start, 0);
3112            journal.sync().await.unwrap();
3113
3114            // Clear to position 100, effectively resetting the journal
3115            journal.clear_to_size(100).await.unwrap();
3116            let bounds = journal.bounds().await;
3117            assert_eq!(bounds.end, 100);
3118            assert!(bounds.is_empty());
3119
3120            // Old positions should fail
3121            for i in 0..25 {
3122                assert!(matches!(
3123                    journal.read(i).await,
3124                    Err(crate::journal::Error::ItemPruned(_))
3125                ));
3126            }
3127
3128            // Verify size persists after restart without writing any data
3129            drop(journal);
3130            let journal =
3131                Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
3132                    .await
3133                    .unwrap();
3134            let bounds = journal.bounds().await;
3135            assert_eq!(bounds.end, 100);
3136            assert!(bounds.is_empty());
3137
3138            // Append new data starting at position 100
3139            for i in 100..105u64 {
3140                let pos = journal.append(&(i * 100)).await.unwrap();
3141                assert_eq!(pos, i);
3142            }
3143            let bounds = journal.bounds().await;
3144            assert_eq!(bounds.end, 105);
3145            assert_eq!(bounds.start, 100);
3146
3147            // New positions should be readable
3148            for i in 100..105u64 {
3149                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3150            }
3151
3152            // Sync and re-init to verify persistence
3153            journal.sync().await.unwrap();
3154            drop(journal);
3155
3156            let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
3157                .await
3158                .unwrap();
3159
3160            let bounds = journal.bounds().await;
3161            assert_eq!(bounds.end, 105);
3162            assert_eq!(bounds.start, 100);
3163            for i in 100..105u64 {
3164                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3165            }
3166
3167            journal.destroy().await.unwrap();
3168        });
3169    }
3170}