Skip to main content

commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use super::Reader as _;
7use crate::{
8    journal::{
9        contiguous::{fixed, Contiguous, Mutable},
10        segmented::variable,
11        Error,
12    },
13    Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
17use commonware_utils::{
18    sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19    NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31/// Suffix appended to the base partition name for the data journal.
32const DATA_SUFFIX: &str = "_data";
33
34/// Suffix appended to the base partition name for the offsets journal.
35const OFFSETS_SUFFIX: &str = "_offsets";
36
37/// Calculate the section number for a given position.
38///
39/// # Arguments
40///
41/// * `position` - The absolute position in the journal
42/// * `items_per_section` - The number of items stored in each section
43///
44/// # Returns
45///
46/// The section number where the item at `position` should be stored.
47///
48/// # Examples
49///
50/// ```ignore
51/// // With 10 items per section:
52/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
53/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
54/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
55/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
56/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
57/// ```
58const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59    position / items_per_section
60}
61
62/// Configuration for a [Journal].
63#[derive(Clone)]
64pub struct Config<C> {
65    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
66    pub partition: String,
67
68    /// The number of items to store in each section.
69    ///
70    /// Once set, this value cannot be changed across restarts.
71    /// All non-final sections will be full and persisted.
72    pub items_per_section: NonZeroU64,
73
74    /// Optional compression level for stored items.
75    pub compression: Option<u8>,
76
77    /// [Codec] configuration for encoding/decoding items.
78    pub codec_config: C,
79
80    /// Page cache for buffering reads from the underlying storage.
81    pub page_cache: CacheRef,
82
83    /// Write buffer size for each section.
84    pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88    /// Returns the partition name for the data journal.
89    fn data_partition(&self) -> String {
90        format!("{}{}", self.partition, DATA_SUFFIX)
91    }
92
93    /// Returns the partition name for the offsets journal.
94    fn offsets_partition(&self) -> String {
95        format!("{}{}", self.partition, OFFSETS_SUFFIX)
96    }
97}
98
99/// Inner journal state protected by a lock for interior mutability.
100struct Inner<E: Clock + Storage + Metrics, V: Codec> {
101    /// The underlying variable-length data journal.
102    data: variable::Journal<E, V>,
103
104    /// The next position to be assigned on append (total items appended).
105    ///
106    /// # Invariant
107    ///
108    /// Always >= `pruning_boundary`. Equal when data journal is empty or fully pruned.
109    size: u64,
110
111    /// The position before which all items have been pruned.
112    ///
113    /// After normal operation and pruning, the value is section-aligned.
114    /// After `init_at_size(N)`, the value may be mid-section.
115    ///
116    /// # Invariant
117    ///
118    /// Never decreases (pruning only moves forward).
119    pruning_boundary: u64,
120}
121
122impl<E: Clock + Storage + Metrics, V: CodecShared> Inner<E, V> {
123    /// Read the item at the given position using the provided offsets reader.
124    ///
125    /// # Errors
126    ///
127    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
128    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
129    /// - Returns other errors if storage or decoding fails.
130    async fn read(
131        &self,
132        position: u64,
133        items_per_section: u64,
134        offsets: &impl super::Reader<Item = u64>,
135    ) -> Result<V, Error> {
136        if position >= self.size {
137            return Err(Error::ItemOutOfRange(position));
138        }
139        if position < self.pruning_boundary {
140            return Err(Error::ItemPruned(position));
141        }
142
143        let offset = offsets.read(position).await?;
144        let section = position_to_section(position, items_per_section);
145
146        self.data.get(section, offset).await
147    }
148}
149
150/// A contiguous journal with variable-size entries.
151///
152/// This journal manages section assignment automatically, allowing callers to append items
153/// sequentially without manually tracking section numbers.
154///
155/// # Repair
156///
157/// Like
158/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
159/// and
160/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
161/// the first invalid data read will be considered the new end of the journal (and the
162/// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during
163/// init via the underlying segmented journals.
164///
165/// # Invariants
166///
167/// ## 1. Section Fullness
168///
169/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
170/// that on `init()`, we only need to replay the last section to determine the exact size.
171///
172/// ## 2. Data Journal is Source of Truth
173///
174/// The data journal is always the source of truth. The offsets journal is an index
175/// that may temporarily diverge during crashes. Divergences are automatically
176/// aligned during init():
177/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
178///   (This can happen if we crash after writing data journal but before writing offsets journal)
179/// * If offsets.size() > data.size(): Rewind offsets to match data size.
180///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
181/// * If offsets.bounds().start < data.bounds().start: Prune offsets to match
182///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
183///
184/// Note that we don't recover from the case where offsets.bounds().start >
185/// data.bounds().start. This should never occur because we always prune the data journal
186/// before the offsets journal.
187pub struct Journal<E: Clock + Storage + Metrics, V: Codec> {
188    /// Inner state for data journal metadata.
189    ///
190    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
191    /// race conditions while allowing concurrent reads during sync.
192    inner: UpgradableAsyncRwLock<Inner<E, V>>,
193
194    /// Index mapping positions to byte offsets within the data journal.
195    /// The section can be calculated from the position using items_per_section.
196    offsets: fixed::Journal<E, u64>,
197
198    /// The number of items per section.
199    ///
200    /// # Invariant
201    ///
202    /// This value is immutable after initialization and must remain consistent
203    /// across restarts. Changing this value will result in data loss or corruption.
204    items_per_section: u64,
205}
206
207/// A reader guard that holds a consistent snapshot of the variable journal's bounds.
208pub struct Reader<'a, E: Clock + Storage + Metrics, V: Codec> {
209    guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
210    offsets: fixed::Reader<'a, E, u64>,
211    items_per_section: u64,
212}
213
214impl<E: Clock + Storage + Metrics, V: CodecShared> super::Reader for Reader<'_, E, V> {
215    type Item = V;
216
217    fn bounds(&self) -> std::ops::Range<u64> {
218        self.guard.pruning_boundary..self.guard.size
219    }
220
221    async fn read(&self, position: u64) -> Result<V, Error> {
222        self.guard
223            .read(position, self.items_per_section, &self.offsets)
224            .await
225    }
226
227    async fn replay(
228        &self,
229        buffer_size: NonZeroUsize,
230        start_pos: u64,
231    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
232        // Validate bounds.
233        if start_pos < self.guard.pruning_boundary {
234            return Err(Error::ItemPruned(start_pos));
235        }
236        if start_pos > self.guard.size {
237            return Err(Error::ItemOutOfRange(start_pos));
238        }
239
240        // Get the starting offset and section. For empty range (start_pos == size),
241        // use a section beyond existing data so data.replay returns empty naturally.
242        let (start_section, start_offset) = if start_pos < self.guard.size {
243            let offset = self.offsets.read(start_pos).await?;
244            let section = position_to_section(start_pos, self.items_per_section);
245            (section, offset)
246        } else {
247            (u64::MAX, 0)
248        };
249
250        let inner_stream = self
251            .guard
252            .data
253            .replay(start_section, start_offset, buffer_size)
254            .await?;
255
256        // Map the stream to add positions.
257        let stream = inner_stream
258            .zip(stream::iter(start_pos..))
259            .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
260
261        Ok(stream)
262    }
263}
264
265impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
266    /// Initialize a contiguous variable journal.
267    ///
268    /// # Crash Recovery
269    ///
270    /// The data journal is the source of truth. If the offsets journal is inconsistent
271    /// it will be updated to match the data journal.
272    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
273        let items_per_section = cfg.items_per_section.get();
274        let data_partition = cfg.data_partition();
275        let offsets_partition = cfg.offsets_partition();
276
277        // Initialize underlying variable data journal
278        let mut data = variable::Journal::init(
279            context.with_label("data"),
280            variable::Config {
281                partition: data_partition,
282                compression: cfg.compression,
283                codec_config: cfg.codec_config,
284                page_cache: cfg.page_cache.clone(),
285                write_buffer: cfg.write_buffer,
286            },
287        )
288        .await?;
289
290        // Initialize offsets journal
291        let mut offsets = fixed::Journal::init(
292            context.with_label("offsets"),
293            fixed::Config {
294                partition: offsets_partition,
295                items_per_blob: cfg.items_per_section,
296                page_cache: cfg.page_cache,
297                write_buffer: cfg.write_buffer,
298            },
299        )
300        .await?;
301
302        // Validate and align offsets journal to match data journal
303        let (pruning_boundary, size) =
304            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
305
306        Ok(Self {
307            inner: UpgradableAsyncRwLock::new(Inner {
308                data,
309                size,
310                pruning_boundary,
311            }),
312            offsets,
313            items_per_section,
314        })
315    }
316
317    /// Initialize an empty [Journal] at the given logical `size`.
318    ///
319    /// Returns a journal with journal.bounds() == Range{start: size, end: size}
320    /// and next append at position `size`.
321    #[commonware_macros::stability(ALPHA)]
322    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
323        // Initialize empty data journal
324        let data = variable::Journal::init(
325            context.with_label("data"),
326            variable::Config {
327                partition: cfg.data_partition(),
328                compression: cfg.compression,
329                codec_config: cfg.codec_config.clone(),
330                page_cache: cfg.page_cache.clone(),
331                write_buffer: cfg.write_buffer,
332            },
333        )
334        .await?;
335
336        // Initialize offsets journal at the target size
337        let offsets = fixed::Journal::init_at_size(
338            context.with_label("offsets"),
339            fixed::Config {
340                partition: cfg.offsets_partition(),
341                items_per_blob: cfg.items_per_section,
342                page_cache: cfg.page_cache,
343                write_buffer: cfg.write_buffer,
344            },
345            size,
346        )
347        .await?;
348
349        Ok(Self {
350            inner: UpgradableAsyncRwLock::new(Inner {
351                data,
352                size,
353                pruning_boundary: size,
354            }),
355            offsets,
356            items_per_section: cfg.items_per_section.get(),
357        })
358    }
359
360    /// Initialize a [Journal] for use in state sync.
361    ///
362    /// The bounds are item locations (not section numbers). This function prepares the
363    /// on-disk journal so that subsequent appends go to the correct physical location for the
364    /// requested range.
365    ///
366    /// Behavior by existing on-disk state:
367    /// - Fresh (no data): returns an empty journal.
368    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
369    ///   empty journal.
370    /// - Overlap within [`range.start`, `range.end`]:
371    ///   - Prunes toward `range.start` (section-aligned, so some items before
372    ///     `range.start` may be retained)
373    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
374    ///
375    /// # Arguments
376    /// - `context`: storage context
377    /// - `cfg`: journal configuration
378    /// - `range`: range of item locations to retain
379    ///
380    /// # Returns
381    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
382    ///
383    /// # Errors
384    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
385    #[commonware_macros::stability(ALPHA)]
386    pub(crate) async fn init_sync(
387        context: E,
388        cfg: Config<V::Cfg>,
389        range: Range<u64>,
390    ) -> Result<Self, Error> {
391        assert!(!range.is_empty(), "range must not be empty");
392
393        debug!(
394            range.start,
395            range.end,
396            items_per_section = cfg.items_per_section.get(),
397            "initializing contiguous variable journal for sync"
398        );
399
400        // Initialize contiguous journal
401        let journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
402
403        let size = journal.size().await;
404
405        // No existing data - initialize at the start of the sync range if needed
406        if size == 0 {
407            if range.start == 0 {
408                debug!("no existing journal data, returning empty journal");
409                return Ok(journal);
410            } else {
411                debug!(
412                    range.start,
413                    "no existing journal data, initializing at sync range start"
414                );
415                journal.destroy().await?;
416                return Self::init_at_size(context, cfg, range.start).await;
417            }
418        }
419
420        // Check if data exceeds the sync range
421        if size > range.end {
422            return Err(Error::ItemOutOfRange(size));
423        }
424
425        // If all existing data is before our sync range, destroy and recreate fresh
426        if size <= range.start {
427            // All data is stale (ends at or before range.start)
428            debug!(
429                size,
430                range.start, "existing journal data is stale, re-initializing at start position"
431            );
432            journal.destroy().await?;
433            return Self::init_at_size(context, cfg, range.start).await;
434        }
435
436        // Prune to lower bound if needed
437        let bounds = journal.reader().await.bounds();
438        if !bounds.is_empty() && bounds.start < range.start {
439            debug!(
440                oldest_pos = bounds.start,
441                range.start, "pruning journal to sync range start"
442            );
443            journal.prune(range.start).await?;
444        }
445
446        Ok(journal)
447    }
448
449    /// Rewind the journal to the given size, discarding items from the end.
450    ///
451    /// After rewinding to size N, the journal will contain exactly N items, and the next append
452    /// will receive position N.
453    ///
454    /// # Errors
455    ///
456    /// Returns [Error::InvalidRewind] if `size` is larger than current size.
457    /// Returns [Error::ItemPruned] if `size` is smaller than the pruning boundary.
458    ///
459    /// # Warning
460    ///
461    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
462    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
463        let mut inner = self.inner.write().await;
464
465        // Validate rewind target
466        match size.cmp(&inner.size) {
467            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
468            std::cmp::Ordering::Equal => return Ok(()), // No-op
469            std::cmp::Ordering::Less => {}
470        }
471
472        // Rewind never updates the pruning boundary.
473        if size < inner.pruning_boundary {
474            return Err(Error::ItemPruned(size));
475        }
476
477        // Read the offset of the first item to discard (at position 'size').
478        let discard_offset = {
479            let offsets_reader = self.offsets.reader().await;
480            offsets_reader.read(size).await?
481        };
482        let discard_section = position_to_section(size, self.items_per_section);
483
484        inner
485            .data
486            .rewind_to_offset(discard_section, discard_offset)
487            .await?;
488        self.offsets.rewind(size).await?;
489
490        // Update our size
491        inner.size = size;
492
493        Ok(())
494    }
495
496    /// Append a new item to the journal, returning its position.
497    ///
498    /// The position returned is a stable, consecutively increasing value starting from 0.
499    /// This position remains constant after pruning.
500    ///
501    /// When a section becomes full, both the data journal and offsets journal are persisted
502    /// to maintain the invariant that all non-final sections are full and consistent.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the underlying storage operation fails or if the item cannot
507    /// be encoded.
508    ///
509    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
510    /// reopened to trigger alignment in [Journal::init].
511    pub async fn append(&self, item: &V) -> Result<u64, Error> {
512        // Mutating operations are serialized by taking the write guard.
513        let mut inner = self.inner.write().await;
514
515        // Calculate which section this position belongs to
516        let section = position_to_section(inner.size, self.items_per_section);
517
518        // Append to data journal, get offset
519        let (offset, _size) = inner.data.append(section, item).await?;
520
521        // Append offset to offsets journal
522        let offsets_pos = self.offsets.append(&offset).await?;
523        assert_eq!(offsets_pos, inner.size);
524
525        // Return the current position
526        let position = inner.size;
527        inner.size += 1;
528
529        // Return early if no sync is needed (section not full).
530        if !inner.size.is_multiple_of(self.items_per_section) {
531            return Ok(position);
532        }
533
534        // The section was filled and must be synced. Downgrade so readers can continue during the
535        // sync while mutators remain blocked.
536        let inner = inner.downgrade_to_upgradable();
537        futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
538
539        Ok(position)
540    }
541
542    /// Acquire a reader guard that holds a consistent view of the journal.
543    pub async fn reader(&self) -> Reader<'_, E, V> {
544        Reader {
545            guard: self.inner.read().await,
546            offsets: self.offsets.reader().await,
547            items_per_section: self.items_per_section,
548        }
549    }
550
551    /// Return the total number of items in the journal, irrespective of pruning. The next value
552    /// appended to the journal will be at this position.
553    pub async fn size(&self) -> u64 {
554        self.inner.read().await.size
555    }
556
557    /// Prune items at positions strictly less than `min_position`.
558    ///
559    /// Returns `true` if any data was pruned, `false` otherwise.
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if the underlying storage operation fails.
564    ///
565    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
566    /// reopened to trigger alignment in [Journal::init].
567    pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
568        let mut inner = self.inner.write().await;
569
570        if min_position <= inner.pruning_boundary {
571            return Ok(false);
572        }
573
574        // Cap min_position to size to maintain the invariant pruning_boundary <= size
575        let min_position = min_position.min(inner.size);
576
577        // Calculate section number
578        let min_section = position_to_section(min_position, self.items_per_section);
579
580        let pruned = inner.data.prune(min_section).await?;
581        if pruned {
582            let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
583            inner.pruning_boundary = new_oldest;
584            self.offsets.prune(new_oldest).await?;
585        }
586        Ok(pruned)
587    }
588
589    /// Durably persist the journal.
590    ///
591    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
592    /// before the next call to `sync()`.
593    pub async fn commit(&self) -> Result<(), Error> {
594        // Serialize with append/prune/rewind so section selection is stable, while still allowing
595        // concurrent readers.
596        let inner = self.inner.upgradable_read().await;
597
598        let section = position_to_section(inner.size, self.items_per_section);
599        inner.data.sync(section).await
600    }
601
602    /// Durably persist the journal and ensure recovery is not required on startup.
603    ///
604    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
605    pub async fn sync(&self) -> Result<(), Error> {
606        // Serialize with append/prune/rewind so section selection is stable, while still allowing
607        // concurrent readers.
608        let inner = self.inner.upgradable_read().await;
609
610        // Persist only the current (final) section of the data journal.
611        // All non-final sections are already persisted per Invariant #1.
612        let section = position_to_section(inner.size, self.items_per_section);
613
614        // Persist both journals concurrently. These journals may not exist yet if the
615        // previous section was just filled. This is checked internally.
616        futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
617
618        Ok(())
619    }
620
621    /// Remove any underlying blobs created by the journal.
622    ///
623    /// This destroys both the data journal and the offsets journal.
624    pub async fn destroy(self) -> Result<(), Error> {
625        let inner = self.inner.into_inner();
626        inner.data.destroy().await?;
627        self.offsets.destroy().await
628    }
629
630    /// Clear all data and reset the journal to a new starting position.
631    ///
632    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
633    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
634    #[commonware_macros::stability(ALPHA)]
635    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
636        let mut inner = self.inner.write().await;
637        inner.data.clear().await?;
638
639        self.offsets.clear_to_size(new_size).await?;
640        inner.size = new_size;
641        inner.pruning_boundary = new_size;
642        Ok(())
643    }
644
645    /// Align the offsets journal and data journal to be consistent in case a crash occurred
646    /// on a previous run and left the journals in an inconsistent state.
647    ///
648    /// The data journal is the source of truth. This function scans it to determine
649    /// what SHOULD be in the offsets journal, then fixes any mismatches.
650    ///
651    /// # Returns
652    ///
653    /// Returns `(pruning_boundary, size)` for the contiguous journal.
654    async fn align_journals(
655        data: &mut variable::Journal<E, V>,
656        offsets: &mut fixed::Journal<E, u64>,
657        items_per_section: u64,
658    ) -> Result<(u64, u64), Error> {
659        // === Handle empty data journal case ===
660        let items_in_last_section = match data.newest_section() {
661            Some(last_section) => {
662                let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
663                futures::pin_mut!(stream);
664                let mut count = 0u64;
665                while let Some(result) = stream.next().await {
666                    result?; // Propagate replay errors (corruption, etc.)
667                    count += 1;
668                }
669                count
670            }
671            None => 0,
672        };
673
674        // Data journal is empty if there are no sections or if there is one section and it has no items.
675        // The latter should only occur if a crash occured after opening a data journal blob but
676        // before writing to it.
677        let data_empty =
678            data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
679        if data_empty {
680            let offsets_bounds = {
681                let offsets_reader = offsets.reader().await;
682                offsets_reader.bounds()
683            };
684            let size = offsets_bounds.end;
685
686            if !data.is_empty() {
687                // A section exists but contains 0 items. This can happen in two cases:
688                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
689                // 2. First append crash: we opened the first section blob but crashed before writing to it
690                // In both cases, calculate target position from the first remaining section
691                // SAFETY: data is non-empty (checked above)
692                let data_first_section = data.oldest_section().unwrap();
693                let data_section_start = data_first_section * items_per_section;
694                let target_pos = data_section_start.max(offsets_bounds.start);
695
696                warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
697                offsets.clear_to_size(target_pos).await?;
698                return Ok((target_pos, target_pos));
699            }
700
701            // data.blobs is empty. This can happen in two cases:
702            // 1. We completely pruned the data journal but crashed before pruning
703            //    the offsets journal.
704            // 2. The data journal was never opened.
705            if !offsets_bounds.is_empty() && offsets_bounds.start < size {
706                // Offsets has unpruned entries but data is gone - clear to match empty state.
707                // We use clear_to_size (not prune) to ensure bounds.start == bounds.end,
708                // even when size is mid-section.
709                warn!("crash repair: clearing offsets to {size} (prune-all crash)");
710                offsets.clear_to_size(size).await?;
711            }
712
713            return Ok((size, size));
714        }
715
716        // === Handle non-empty data journal case ===
717        let data_first_section = data.oldest_section().unwrap();
718        let data_last_section = data.newest_section().unwrap();
719
720        // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index.
721        // This differs from offsets bounds start which can be mid-section after init_at_size.
722        let data_oldest_pos = data_first_section * items_per_section;
723
724        // Align pruning state
725        // We always prune data before offsets, so offsets should never be "ahead" by a section.
726        {
727            let offsets_bounds = {
728                let offsets_reader = offsets.reader().await;
729                offsets_reader.bounds()
730            };
731            match (
732                offsets_bounds.is_empty(),
733                offsets_bounds.start.cmp(&data_oldest_pos),
734            ) {
735                (true, _) => {
736                    // Offsets journal is empty but data journal isn't.
737                    // It should always be in the same section as the data journal, though.
738                    let offsets_first_section = offsets_bounds.start / items_per_section;
739                    if offsets_first_section != data_first_section {
740                        return Err(Error::Corruption(format!(
741                            "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
742                        )));
743                    }
744                    warn!(
745                        "crash repair: offsets journal empty at {}, will rebuild from data",
746                        offsets_bounds.start
747                    );
748                }
749                (false, std::cmp::Ordering::Less) => {
750                    // Offsets behind on pruning -- prune to catch up
751                    warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
752                    offsets.prune(data_oldest_pos).await?;
753                }
754                (false, std::cmp::Ordering::Greater) => {
755                    // Compare sections: same section = valid, different section = corruption.
756                    if offsets_bounds.start / items_per_section > data_first_section {
757                        return Err(Error::Corruption(format!(
758                            "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
759                            offsets_bounds.start
760                        )));
761                    }
762                }
763                (false, std::cmp::Ordering::Equal) => {
764                    // Both journals are pruned to the same position.
765                }
766            }
767        }
768
769        // Compute the correct logical size
770        // Uses bounds.start from offsets as the anchor because it tracks the exact starting
771        // position, which may be mid-section after init_at_size.
772        //
773        // Note: Corruption checks above ensure bounds.start is in data_first_section,
774        // so the subtraction in oldest_items cannot underflow.
775        // Re-fetch bounds since prune may have been called above.
776        let (offsets_bounds, data_size) = {
777            let offsets_reader = offsets.reader().await;
778            let offsets_bounds = offsets_reader.bounds();
779            let data_size = if data_first_section == data_last_section {
780                offsets_bounds.start + items_in_last_section
781            } else {
782                let oldest_items =
783                    (data_first_section + 1) * items_per_section - offsets_bounds.start;
784                let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
785                offsets_bounds.start + oldest_items + middle_items + items_in_last_section
786            };
787            (offsets_bounds, data_size)
788        };
789
790        // Align sizes
791        let offsets_size = offsets_bounds.end;
792        if offsets_size > data_size {
793            // Crashed after writing offsets but before writing data.
794            warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
795            offsets.rewind(data_size).await?;
796        } else if offsets_size < data_size {
797            // Crashed after writing data but before writing offsets.
798            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
799        }
800
801        // Final invariant checks
802        let pruning_boundary = {
803            let offsets_reader = offsets.reader().await;
804            let offsets_bounds = offsets_reader.bounds();
805            assert_eq!(offsets_bounds.end, data_size);
806
807            // After alignment, offsets and data must be in the same section.
808            // We return bounds.start from offsets as the true boundary.
809            assert!(
810                !offsets_bounds.is_empty(),
811                "offsets should have data after alignment"
812            );
813            assert_eq!(
814                offsets_bounds.start / items_per_section,
815                data_first_section,
816                "offsets and data should be in same oldest section"
817            );
818            offsets_bounds.start
819        };
820
821        offsets.sync().await?;
822        Ok((pruning_boundary, data_size))
823    }
824
825    /// Rebuild missing offset entries by replaying the data journal and
826    /// appending the missing entries to the offsets journal.
827    ///
828    /// The data journal is the source of truth. This function brings the offsets
829    /// journal up to date by replaying data items and indexing their positions.
830    ///
831    /// # Warning
832    ///
833    /// - Panics if data journal is empty
834    /// - Panics if `offsets_size` >= `data.size()`
835    async fn add_missing_offsets(
836        data: &variable::Journal<E, V>,
837        offsets: &mut fixed::Journal<E, u64>,
838        offsets_size: u64,
839        items_per_section: u64,
840    ) -> Result<(), Error> {
841        assert!(
842            !data.is_empty(),
843            "rebuild_offsets called with empty data journal"
844        );
845
846        // Find where to start replaying
847        let (start_section, resume_offset, skip_first) = {
848            let offsets_reader = offsets.reader().await;
849            let offsets_bounds = offsets_reader.bounds();
850            if offsets_bounds.is_empty() {
851                // Offsets empty -- start from first data section
852                // SAFETY: data is non-empty (checked above)
853                let first_section = data.oldest_section().unwrap();
854                (first_section, 0, false)
855            } else if offsets_bounds.start < offsets_size {
856                // Offsets has items -- resume from last indexed position
857                let last_offset = offsets_reader.read(offsets_size - 1).await?;
858                let last_section = position_to_section(offsets_size - 1, items_per_section);
859                (last_section, last_offset, true)
860            } else {
861                // Offsets fully pruned but data has items -- start from first data section
862                // SAFETY: data is non-empty (checked above)
863                let first_section = data.oldest_section().unwrap();
864                (first_section, 0, false)
865            }
866        };
867
868        // Replay data journal from start position through the end and index all items.
869        // The data journal is the source of truth, so we consume the entire stream.
870        // (replay streams from start_section onwards through all subsequent sections)
871        let stream = data
872            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
873            .await?;
874        futures::pin_mut!(stream);
875
876        let mut skipped_first = false;
877        while let Some(result) = stream.next().await {
878            let (_section, offset, _size, _item) = result?;
879
880            // Skip first item if resuming from last indexed offset
881            if skip_first && !skipped_first {
882                skipped_first = true;
883                continue;
884            }
885
886            offsets.append(&offset).await?;
887        }
888
889        Ok(())
890    }
891}
892
893// Implement Contiguous trait for variable-length items
894impl<E: Clock + Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
895    type Item = V;
896
897    async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
898        Self::reader(self).await
899    }
900
901    async fn size(&self) -> u64 {
902        Self::size(self).await
903    }
904}
905
906impl<E: Clock + Storage + Metrics, V: CodecShared> Mutable for Journal<E, V> {
907    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
908        Self::append(self, item).await
909    }
910
911    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
912        Self::prune(self, min_position).await
913    }
914
915    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
916        Self::rewind(self, size).await
917    }
918}
919
920impl<E: Clock + Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
921    type Error = Error;
922
923    async fn commit(&self) -> Result<(), Error> {
924        self.commit().await
925    }
926
927    async fn sync(&self) -> Result<(), Error> {
928        self.sync().await
929    }
930
931    async fn destroy(self) -> Result<(), Error> {
932        self.destroy().await
933    }
934}
935
936#[cfg(test)]
937impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
938    /// Test helper: Read the item at the given position.
939    pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
940        self.reader().await.read(position).await
941    }
942
943    /// Test helper: Return the bounds of the journal.
944    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
945        self.reader().await.bounds()
946    }
947
948    /// Test helper: Prune the internal data journal directly (simulates crash scenario).
949    pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
950        let mut inner = self.inner.write().await;
951        inner.data.prune(section).await
952    }
953
954    /// Test helper: Prune the internal offsets journal directly (simulates crash scenario).
955    pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
956        self.offsets.prune(position).await
957    }
958
959    /// Test helper: Rewind the internal offsets journal directly (simulates crash scenario).
960    pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
961        self.offsets.rewind(position).await
962    }
963
964    /// Test helper: Get the size of the internal offsets journal.
965    pub(crate) async fn test_offsets_size(&self) -> u64 {
966        self.offsets.size().await
967    }
968
969    /// Test helper: Append directly to the internal data journal (simulates crash scenario).
970    pub(crate) async fn test_append_data(
971        &self,
972        section: u64,
973        item: V,
974    ) -> Result<(u64, u32), Error> {
975        let mut inner = self.inner.write().await;
976        inner.data.append(section, &item).await
977    }
978
979    /// Test helper: Sync the internal data journal.
980    pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
981        let inner = self.inner.read().await;
982        inner
983            .data
984            .sync(inner.data.newest_section().unwrap_or(0))
985            .await
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992    use crate::journal::contiguous::tests::run_contiguous_tests;
993    use commonware_macros::test_traced;
994    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
995    use commonware_utils::{NZUsize, NZU16, NZU64};
996    use futures::FutureExt as _;
997    use std::num::NonZeroU16;
998
999    // Use some jank sizes to exercise boundary conditions.
1000    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1001    const PAGE_CACHE_SIZE: usize = 2;
1002    // Larger page sizes for tests that need more buffer space.
1003    const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1004    const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1005
1006    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
1007    ///
1008    /// When the offsets partition is completely lost and the data has been pruned, we cannot
1009    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
1010    /// This is a genuine external failure that should be detected and reported clearly.
1011    #[test_traced]
1012    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1013        let executor = deterministic::Runner::default();
1014        executor.start(|context| async move {
1015            let cfg = Config {
1016                partition: "offsets-loss-after-prune".into(),
1017                items_per_section: NZU64!(10),
1018                compression: None,
1019                codec_config: (),
1020                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1021                write_buffer: NZUsize!(1024),
1022            };
1023
1024            // === Phase 1: Create journal with data and prune ===
1025            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1026                .await
1027                .unwrap();
1028
1029            // Append 40 items across 4 sections (0-3)
1030            for i in 0..40u64 {
1031                journal.append(&(i * 100)).await.unwrap();
1032            }
1033
1034            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
1035            journal.prune(20).await.unwrap();
1036            let bounds = journal.bounds().await;
1037            assert_eq!(bounds.start, 20);
1038            assert_eq!(bounds.end, 40);
1039
1040            journal.sync().await.unwrap();
1041            drop(journal);
1042
1043            // === Phase 2: Simulate complete offsets partition loss ===
1044            // Remove both the offsets data partition and its metadata partition
1045            context
1046                .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1047                .await
1048                .expect("Failed to remove offsets blobs partition");
1049            context
1050                .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1051                .await
1052                .expect("Failed to remove offsets metadata partition");
1053
1054            // === Phase 3: Verify this is detected as unrecoverable ===
1055            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1056            assert!(matches!(result, Err(Error::Corruption(_))));
1057        });
1058    }
1059
1060    /// Test that init aligns state when data is pruned/lost but offsets survives.
1061    ///
1062    /// This handles both:
1063    /// 1. Crash during prune-all (data pruned, offsets not yet)
1064    /// 2. External data partition loss
1065    ///
1066    /// In both cases, we align by pruning offsets to match.
1067    #[test_traced]
1068    fn test_variable_align_data_offsets_mismatch() {
1069        let executor = deterministic::Runner::default();
1070        executor.start(|context| async move {
1071            let cfg = Config {
1072                partition: "data-loss-test".into(),
1073                items_per_section: NZU64!(10),
1074                compression: None,
1075                codec_config: (),
1076                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1077                write_buffer: NZUsize!(1024),
1078            };
1079
1080            // === Setup: Create journal with data ===
1081            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1082                .await
1083                .unwrap();
1084
1085            // Append 20 items across 2 sections
1086            for i in 0..20u64 {
1087                variable.append(&(i * 100)).await.unwrap();
1088            }
1089
1090            variable.sync().await.unwrap();
1091            drop(variable);
1092
1093            // === Simulate data loss: Delete data partition but keep offsets ===
1094            context
1095                .remove(&cfg.data_partition(), None)
1096                .await
1097                .expect("Failed to remove data partition");
1098
1099            // === Verify init aligns the mismatch ===
1100            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1101                .await
1102                .expect("Should align offsets to match empty data");
1103
1104            // Size should be preserved
1105            assert_eq!(journal.size().await, 20);
1106
1107            // But no items remain (both journals pruned)
1108            assert!(journal.bounds().await.is_empty());
1109
1110            // All reads should fail with ItemPruned
1111            for i in 0..20 {
1112                assert!(matches!(
1113                    journal.read(i).await,
1114                    Err(crate::journal::Error::ItemPruned(_))
1115                ));
1116            }
1117
1118            // Can append new data starting at position 20
1119            let pos = journal.append(&999).await.unwrap();
1120            assert_eq!(pos, 20);
1121            assert_eq!(journal.read(20).await.unwrap(), 999);
1122
1123            journal.destroy().await.unwrap();
1124        });
1125    }
1126
1127    /// Test replay behavior for variable-length items.
1128    #[test_traced]
1129    fn test_variable_replay() {
1130        let executor = deterministic::Runner::default();
1131        executor.start(|context| async move {
1132            let cfg = Config {
1133                partition: "replay".into(),
1134                items_per_section: NZU64!(10),
1135                compression: None,
1136                codec_config: (),
1137                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1138                write_buffer: NZUsize!(1024),
1139            };
1140
1141            // Initialize journal
1142            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1143
1144            // Append 40 items across 4 sections (0-3)
1145            for i in 0..40u64 {
1146                journal.append(&(i * 100)).await.unwrap();
1147            }
1148
1149            // Test 1: Full replay
1150            {
1151                let reader = journal.reader().await;
1152                let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1153                futures::pin_mut!(stream);
1154                for i in 0..40u64 {
1155                    let (pos, item) = stream.next().await.unwrap().unwrap();
1156                    assert_eq!(pos, i);
1157                    assert_eq!(item, i * 100);
1158                }
1159                assert!(stream.next().await.is_none());
1160            }
1161
1162            // Test 2: Partial replay from middle of section
1163            {
1164                let reader = journal.reader().await;
1165                let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1166                futures::pin_mut!(stream);
1167                for i in 15..40u64 {
1168                    let (pos, item) = stream.next().await.unwrap().unwrap();
1169                    assert_eq!(pos, i);
1170                    assert_eq!(item, i * 100);
1171                }
1172                assert!(stream.next().await.is_none());
1173            }
1174
1175            // Test 3: Partial replay from section boundary
1176            {
1177                let reader = journal.reader().await;
1178                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1179                futures::pin_mut!(stream);
1180                for i in 20..40u64 {
1181                    let (pos, item) = stream.next().await.unwrap().unwrap();
1182                    assert_eq!(pos, i);
1183                    assert_eq!(item, i * 100);
1184                }
1185                assert!(stream.next().await.is_none());
1186            }
1187
1188            // Test 4: Prune and verify replay from pruned
1189            journal.prune(20).await.unwrap();
1190            {
1191                let reader = journal.reader().await;
1192                let res = reader.replay(NZUsize!(20), 0).await;
1193                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1194            }
1195            {
1196                let reader = journal.reader().await;
1197                let res = reader.replay(NZUsize!(20), 19).await;
1198                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1199            }
1200
1201            // Test 5: Replay from exactly at pruning boundary after prune
1202            {
1203                let reader = journal.reader().await;
1204                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1205                futures::pin_mut!(stream);
1206                for i in 20..40u64 {
1207                    let (pos, item) = stream.next().await.unwrap().unwrap();
1208                    assert_eq!(pos, i);
1209                    assert_eq!(item, i * 100);
1210                }
1211                assert!(stream.next().await.is_none());
1212            }
1213
1214            // Test 6: Replay from the end
1215            {
1216                let reader = journal.reader().await;
1217                let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1218                futures::pin_mut!(stream);
1219                assert!(stream.next().await.is_none());
1220            }
1221
1222            // Test 7: Replay beyond the end (should error)
1223            {
1224                let reader = journal.reader().await;
1225                let res = reader.replay(NZUsize!(20), 41).await;
1226                assert!(matches!(
1227                    res,
1228                    Err(crate::journal::Error::ItemOutOfRange(41))
1229                ));
1230            }
1231
1232            journal.destroy().await.unwrap();
1233        });
1234    }
1235
1236    #[test_traced]
1237    fn test_variable_contiguous() {
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            run_contiguous_tests(move |test_name: String, idx: usize| {
1241                let label = test_name.replace('-', "_");
1242                let context = context.with_label(&format!("{label}_{idx}"));
1243                async move {
1244                    let cfg = Config {
1245                        partition: format!("generic-test-{test_name}"),
1246                        items_per_section: NZU64!(10),
1247                        compression: None,
1248                        codec_config: (),
1249                        page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1250                        write_buffer: NZUsize!(1024),
1251                    };
1252                    Journal::<_, u64>::init(context, cfg).await
1253                }
1254                .boxed()
1255            })
1256            .await;
1257        });
1258    }
1259
1260    /// Test multiple sequential prunes with Variable-specific guarantees.
1261    #[test_traced]
1262    fn test_variable_multiple_sequential_prunes() {
1263        let executor = deterministic::Runner::default();
1264        executor.start(|context| async move {
1265            let cfg = Config {
1266                partition: "sequential-prunes".into(),
1267                items_per_section: NZU64!(10),
1268                compression: None,
1269                codec_config: (),
1270                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1271                write_buffer: NZUsize!(1024),
1272            };
1273
1274            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1275
1276            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1277            for i in 0..40u64 {
1278                journal.append(&(i * 100)).await.unwrap();
1279            }
1280
1281            // Initial state: all items accessible
1282            let bounds = journal.bounds().await;
1283            assert_eq!(bounds.start, 0);
1284            assert_eq!(bounds.end, 40);
1285
1286            // First prune: remove section 0 (positions 0-9)
1287            let pruned = journal.prune(10).await.unwrap();
1288            assert!(pruned);
1289
1290            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1291            assert_eq!(journal.bounds().await.start, 10);
1292
1293            // Items 0-9 should be pruned, 10+ should be accessible
1294            assert!(matches!(
1295                journal.read(0).await,
1296                Err(crate::journal::Error::ItemPruned(_))
1297            ));
1298            assert_eq!(journal.read(10).await.unwrap(), 1000);
1299            assert_eq!(journal.read(19).await.unwrap(), 1900);
1300
1301            // Second prune: remove section 1 (positions 10-19)
1302            let pruned = journal.prune(20).await.unwrap();
1303            assert!(pruned);
1304
1305            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1306            assert_eq!(journal.bounds().await.start, 20);
1307
1308            // Items 0-19 should be pruned, 20+ should be accessible
1309            assert!(matches!(
1310                journal.read(10).await,
1311                Err(crate::journal::Error::ItemPruned(_))
1312            ));
1313            assert!(matches!(
1314                journal.read(19).await,
1315                Err(crate::journal::Error::ItemPruned(_))
1316            ));
1317            assert_eq!(journal.read(20).await.unwrap(), 2000);
1318            assert_eq!(journal.read(29).await.unwrap(), 2900);
1319
1320            // Third prune: remove section 2 (positions 20-29)
1321            let pruned = journal.prune(30).await.unwrap();
1322            assert!(pruned);
1323
1324            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1325            assert_eq!(journal.bounds().await.start, 30);
1326
1327            // Items 0-29 should be pruned, 30+ should be accessible
1328            assert!(matches!(
1329                journal.read(20).await,
1330                Err(crate::journal::Error::ItemPruned(_))
1331            ));
1332            assert!(matches!(
1333                journal.read(29).await,
1334                Err(crate::journal::Error::ItemPruned(_))
1335            ));
1336            assert_eq!(journal.read(30).await.unwrap(), 3000);
1337            assert_eq!(journal.read(39).await.unwrap(), 3900);
1338
1339            // Size should still be 40 (pruning doesn't affect size)
1340            assert_eq!(journal.size().await, 40);
1341
1342            journal.destroy().await.unwrap();
1343        });
1344    }
1345
1346    /// Test that pruning all data and re-initializing preserves positions.
1347    #[test_traced]
1348    fn test_variable_prune_all_then_reinit() {
1349        let executor = deterministic::Runner::default();
1350        executor.start(|context| async move {
1351            let cfg = Config {
1352                partition: "prune-all-reinit".into(),
1353                items_per_section: NZU64!(10),
1354                compression: None,
1355                codec_config: (),
1356                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1357                write_buffer: NZUsize!(1024),
1358            };
1359
1360            // === Phase 1: Create journal and append data ===
1361            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1362                .await
1363                .unwrap();
1364
1365            for i in 0..100u64 {
1366                journal.append(&(i * 100)).await.unwrap();
1367            }
1368
1369            let bounds = journal.bounds().await;
1370            assert_eq!(bounds.end, 100);
1371            assert_eq!(bounds.start, 0);
1372
1373            // === Phase 2: Prune all data ===
1374            let pruned = journal.prune(100).await.unwrap();
1375            assert!(pruned);
1376
1377            // All data is pruned - no items remain
1378            let bounds = journal.bounds().await;
1379            assert_eq!(bounds.end, 100);
1380            assert!(bounds.is_empty());
1381
1382            // All reads should fail with ItemPruned
1383            for i in 0..100 {
1384                assert!(matches!(
1385                    journal.read(i).await,
1386                    Err(crate::journal::Error::ItemPruned(_))
1387                ));
1388            }
1389
1390            journal.sync().await.unwrap();
1391            drop(journal);
1392
1393            // === Phase 3: Re-init and verify position preserved ===
1394            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1395                .await
1396                .unwrap();
1397
1398            // Size should be preserved, but no items remain
1399            let bounds = journal.bounds().await;
1400            assert_eq!(bounds.end, 100);
1401            assert!(bounds.is_empty());
1402
1403            // All reads should still fail
1404            for i in 0..100 {
1405                assert!(matches!(
1406                    journal.read(i).await,
1407                    Err(crate::journal::Error::ItemPruned(_))
1408                ));
1409            }
1410
1411            // === Phase 4: Append new data ===
1412            // Next append should get position 100
1413            journal.append(&10000).await.unwrap();
1414            let bounds = journal.bounds().await;
1415            assert_eq!(bounds.end, 101);
1416            // Now we have one item at position 100
1417            assert_eq!(bounds.start, 100);
1418
1419            // Can read the new item
1420            assert_eq!(journal.read(100).await.unwrap(), 10000);
1421
1422            // Old positions still fail
1423            assert!(matches!(
1424                journal.read(99).await,
1425                Err(crate::journal::Error::ItemPruned(_))
1426            ));
1427
1428            journal.destroy().await.unwrap();
1429        });
1430    }
1431
1432    /// Test recovery from crash after data journal pruned but before offsets journal.
1433    #[test_traced]
1434    fn test_variable_recovery_prune_crash_offsets_behind() {
1435        let executor = deterministic::Runner::default();
1436        executor.start(|context| async move {
1437            // === Setup: Create Variable wrapper with data ===
1438            let cfg = Config {
1439                partition: "recovery-prune-crash".into(),
1440                items_per_section: NZU64!(10),
1441                compression: None,
1442                codec_config: (),
1443                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1444                write_buffer: NZUsize!(1024),
1445            };
1446
1447            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1448                .await
1449                .unwrap();
1450
1451            // Append 40 items across 4 sections to both journals
1452            for i in 0..40u64 {
1453                variable.append(&(i * 100)).await.unwrap();
1454            }
1455
1456            // Prune to position 10 normally (both data and offsets journals pruned)
1457            variable.prune(10).await.unwrap();
1458            assert_eq!(variable.bounds().await.start, 10);
1459
1460            // === Simulate crash: Prune data journal but not offsets journal ===
1461            // Manually prune data journal to section 2 (position 20)
1462            variable.test_prune_data(2).await.unwrap();
1463            // Offsets journal still has data from position 10-19
1464
1465            variable.sync().await.unwrap();
1466            drop(variable);
1467
1468            // === Verify recovery ===
1469            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1470                .await
1471                .unwrap();
1472
1473            // Init should auto-repair: offsets journal pruned to match data journal
1474            let bounds = variable.bounds().await;
1475            assert_eq!(bounds.start, 20);
1476            assert_eq!(bounds.end, 40);
1477
1478            // Reads before position 20 should fail (pruned from both journals)
1479            assert!(matches!(
1480                variable.read(10).await,
1481                Err(crate::journal::Error::ItemPruned(_))
1482            ));
1483
1484            // Reads at position 20+ should succeed
1485            assert_eq!(variable.read(20).await.unwrap(), 2000);
1486            assert_eq!(variable.read(39).await.unwrap(), 3900);
1487
1488            variable.destroy().await.unwrap();
1489        });
1490    }
1491
1492    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1493    ///
1494    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1495    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1496    #[test_traced]
1497    fn test_variable_recovery_offsets_ahead_corruption() {
1498        let executor = deterministic::Runner::default();
1499        executor.start(|context| async move {
1500            // === Setup: Create Variable wrapper with data ===
1501            let cfg = Config {
1502                partition: "recovery-offsets-ahead".into(),
1503                items_per_section: NZU64!(10),
1504                compression: None,
1505                codec_config: (),
1506                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1507                write_buffer: NZUsize!(1024),
1508            };
1509
1510            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1511                .await
1512                .unwrap();
1513
1514            // Append 40 items across 4 sections to both journals
1515            for i in 0..40u64 {
1516                variable.append(&(i * 100)).await.unwrap();
1517            }
1518
1519            // Prune offsets journal ahead of data journal (impossible state)
1520            variable.test_prune_offsets(20).await.unwrap(); // Prune to position 20
1521            variable.test_prune_data(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1522
1523            variable.sync().await.unwrap();
1524            drop(variable);
1525
1526            // === Verify corruption detected ===
1527            let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1528            assert!(matches!(result, Err(Error::Corruption(_))));
1529        });
1530    }
1531
1532    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1533    #[test_traced]
1534    fn test_variable_recovery_append_crash_offsets_behind() {
1535        let executor = deterministic::Runner::default();
1536        executor.start(|context| async move {
1537            // === Setup: Create Variable wrapper with partial data ===
1538            let cfg = Config {
1539                partition: "recovery-append-crash".into(),
1540                items_per_section: NZU64!(10),
1541                compression: None,
1542                codec_config: (),
1543                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1544                write_buffer: NZUsize!(1024),
1545            };
1546
1547            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1548                .await
1549                .unwrap();
1550
1551            // Append 15 items to both journals (fills section 0, partial section 1)
1552            for i in 0..15u64 {
1553                variable.append(&(i * 100)).await.unwrap();
1554            }
1555
1556            assert_eq!(variable.size().await, 15);
1557
1558            // Manually append 5 more items directly to data journal only
1559            for i in 15..20u64 {
1560                variable.test_append_data(1, i * 100).await.unwrap();
1561            }
1562            // Offsets journal still has only 15 entries
1563
1564            variable.sync().await.unwrap();
1565            drop(variable);
1566
1567            // === Verify recovery ===
1568            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1569                .await
1570                .unwrap();
1571
1572            // Init should rebuild offsets journal from data journal replay
1573            let bounds = variable.bounds().await;
1574            assert_eq!(bounds.end, 20);
1575            assert_eq!(bounds.start, 0);
1576
1577            // All items should be readable from both journals
1578            for i in 0..20u64 {
1579                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1580            }
1581
1582            // Offsets journal should be fully rebuilt to match data journal
1583            assert_eq!(variable.test_offsets_size().await, 20);
1584
1585            variable.destroy().await.unwrap();
1586        });
1587    }
1588
1589    /// Test recovery from multiple prune operations with crash.
1590    #[test_traced]
1591    fn test_variable_recovery_multiple_prunes_crash() {
1592        let executor = deterministic::Runner::default();
1593        executor.start(|context| async move {
1594            // === Setup: Create Variable wrapper with data ===
1595            let cfg = Config {
1596                partition: "recovery-multiple-prunes".into(),
1597                items_per_section: NZU64!(10),
1598                compression: None,
1599                codec_config: (),
1600                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1601                write_buffer: NZUsize!(1024),
1602            };
1603
1604            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1605                .await
1606                .unwrap();
1607
1608            // Append 50 items across 5 sections to both journals
1609            for i in 0..50u64 {
1610                variable.append(&(i * 100)).await.unwrap();
1611            }
1612
1613            // Prune to position 10 normally (both data and offsets journals pruned)
1614            variable.prune(10).await.unwrap();
1615            assert_eq!(variable.bounds().await.start, 10);
1616
1617            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1618            // Manually prune data journal to section 3 (position 30)
1619            variable.test_prune_data(3).await.unwrap();
1620            // Offsets journal still thinks oldest is position 10
1621
1622            variable.sync().await.unwrap();
1623            drop(variable);
1624
1625            // === Verify recovery ===
1626            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1627                .await
1628                .unwrap();
1629
1630            // Init should auto-repair: offsets journal pruned to match data journal
1631            let bounds = variable.bounds().await;
1632            assert_eq!(bounds.start, 30);
1633            assert_eq!(bounds.end, 50);
1634
1635            // Reads before position 30 should fail (pruned from both journals)
1636            assert!(matches!(
1637                variable.read(10).await,
1638                Err(crate::journal::Error::ItemPruned(_))
1639            ));
1640            assert!(matches!(
1641                variable.read(20).await,
1642                Err(crate::journal::Error::ItemPruned(_))
1643            ));
1644
1645            // Reads at position 30+ should succeed
1646            assert_eq!(variable.read(30).await.unwrap(), 3000);
1647            assert_eq!(variable.read(49).await.unwrap(), 4900);
1648
1649            variable.destroy().await.unwrap();
1650        });
1651    }
1652
1653    /// Test recovery from crash during rewind operation.
1654    ///
1655    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1656    /// This creates a situation where offsets journal has been rewound but data journal still
1657    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1658    /// offsets index across all sections to match the data journal.
1659    #[test_traced]
1660    fn test_variable_recovery_rewind_crash_multi_section() {
1661        let executor = deterministic::Runner::default();
1662        executor.start(|context| async move {
1663            // === Setup: Create Variable wrapper with data across multiple sections ===
1664            let cfg = Config {
1665                partition: "recovery-rewind-crash".into(),
1666                items_per_section: NZU64!(10),
1667                compression: None,
1668                codec_config: (),
1669                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1670                write_buffer: NZUsize!(1024),
1671            };
1672
1673            let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1674                .await
1675                .unwrap();
1676
1677            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1678            for i in 0..25u64 {
1679                variable.append(&(i * 100)).await.unwrap();
1680            }
1681
1682            assert_eq!(variable.size().await, 25);
1683
1684            // === Simulate crash during rewind(5) ===
1685            // Rewind offsets journal to size 5 (keeps positions 0-4)
1686            variable.test_rewind_offsets(5).await.unwrap();
1687            // CRASH before data.rewind() completes - data still has all 3 sections
1688
1689            variable.sync().await.unwrap();
1690            drop(variable);
1691
1692            // === Verify recovery ===
1693            let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1694                .await
1695                .unwrap();
1696
1697            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1698            let bounds = variable.bounds().await;
1699            assert_eq!(bounds.end, 25);
1700            assert_eq!(bounds.start, 0);
1701
1702            // All items should be readable - offsets rebuilt correctly across all sections
1703            for i in 0..25u64 {
1704                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1705            }
1706
1707            // Verify offsets journal fully rebuilt
1708            assert_eq!(variable.test_offsets_size().await, 25);
1709
1710            // Verify next append gets position 25
1711            let pos = variable.append(&2500).await.unwrap();
1712            assert_eq!(pos, 25);
1713            assert_eq!(variable.read(25).await.unwrap(), 2500);
1714
1715            variable.destroy().await.unwrap();
1716        });
1717    }
1718
1719    /// Test recovery from crash after data sync but before offsets sync when journal was
1720    /// previously emptied by pruning.
1721    #[test_traced]
1722    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1723        let executor = deterministic::Runner::default();
1724        executor.start(|context| async move {
1725            let cfg = Config {
1726                partition: "recovery-empty-after-prune".into(),
1727                items_per_section: NZU64!(10),
1728                compression: None,
1729                codec_config: (),
1730                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1731                write_buffer: NZUsize!(1024),
1732            };
1733
1734            // === Phase 1: Create journal with one full section ===
1735            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1736                .await
1737                .unwrap();
1738
1739            // Append 10 items (positions 0-9), fills section 0
1740            for i in 0..10u64 {
1741                journal.append(&(i * 100)).await.unwrap();
1742            }
1743            let bounds = journal.bounds().await;
1744            assert_eq!(bounds.end, 10);
1745            assert_eq!(bounds.start, 0);
1746
1747            // === Phase 2: Prune to create empty journal ===
1748            journal.prune(10).await.unwrap();
1749            let bounds = journal.bounds().await;
1750            assert_eq!(bounds.end, 10);
1751            assert!(bounds.is_empty()); // Empty!
1752
1753            // === Phase 3: Append directly to data journal to simulate crash ===
1754            // Manually append to data journal only (bypassing Variable's append logic)
1755            // This simulates the case where data was synced but offsets wasn't
1756            for i in 10..20u64 {
1757                journal.test_append_data(1, i * 100).await.unwrap();
1758            }
1759            // Sync the data journal (section 1)
1760            journal.test_sync_data().await.unwrap();
1761            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1762
1763            // Close without syncing offsets
1764            drop(journal);
1765
1766            // === Phase 4: Verify recovery succeeds ===
1767            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1768                .await
1769                .expect("Should recover from crash after data sync but before offsets sync");
1770
1771            // All data should be recovered
1772            let bounds = journal.bounds().await;
1773            assert_eq!(bounds.end, 20);
1774            assert_eq!(bounds.start, 10);
1775
1776            // All items from position 10-19 should be readable
1777            for i in 10..20u64 {
1778                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1779            }
1780
1781            // Items 0-9 should be pruned
1782            for i in 0..10 {
1783                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1784            }
1785
1786            journal.destroy().await.unwrap();
1787        });
1788    }
1789
1790    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1791    #[test_traced]
1792    fn test_variable_concurrent_sync_recovery() {
1793        let executor = deterministic::Runner::default();
1794        executor.start(|context| async move {
1795            let cfg = Config {
1796                partition: "concurrent-sync-recovery".into(),
1797                items_per_section: NZU64!(10),
1798                compression: None,
1799                codec_config: (),
1800                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1801                write_buffer: NZUsize!(1024),
1802            };
1803
1804            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1805                .await
1806                .unwrap();
1807
1808            // Append items across a section boundary
1809            for i in 0..15u64 {
1810                journal.append(&(i * 100)).await.unwrap();
1811            }
1812
1813            // Manually sync only data to simulate crash during concurrent sync
1814            journal.commit().await.unwrap();
1815
1816            // Simulate a crash (offsets not synced)
1817            drop(journal);
1818
1819            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1820                .await
1821                .unwrap();
1822
1823            // Data should be intact and offsets rebuilt
1824            assert_eq!(journal.size().await, 15);
1825            for i in 0..15u64 {
1826                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1827            }
1828
1829            journal.destroy().await.unwrap();
1830        });
1831    }
1832
1833    #[test_traced]
1834    fn test_init_at_size_zero() {
1835        let executor = deterministic::Runner::default();
1836        executor.start(|context| async move {
1837            let cfg = Config {
1838                partition: "init-at-size-zero".into(),
1839                items_per_section: NZU64!(5),
1840                compression: None,
1841                codec_config: (),
1842                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1843                write_buffer: NZUsize!(1024),
1844            };
1845
1846            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1847                .await
1848                .unwrap();
1849
1850            // Size should be 0
1851            assert_eq!(journal.size().await, 0);
1852
1853            // No oldest retained position (empty journal)
1854            assert!(journal.bounds().await.is_empty());
1855
1856            // Next append should get position 0
1857            let pos = journal.append(&100).await.unwrap();
1858            assert_eq!(pos, 0);
1859            assert_eq!(journal.size().await, 1);
1860            assert_eq!(journal.read(0).await.unwrap(), 100);
1861
1862            journal.destroy().await.unwrap();
1863        });
1864    }
1865
1866    #[test_traced]
1867    fn test_init_at_size_section_boundary() {
1868        let executor = deterministic::Runner::default();
1869        executor.start(|context| async move {
1870            let cfg = Config {
1871                partition: "init-at-size-boundary".into(),
1872                items_per_section: NZU64!(5),
1873                compression: None,
1874                codec_config: (),
1875                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1876                write_buffer: NZUsize!(1024),
1877            };
1878
1879            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1880            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1881                .await
1882                .unwrap();
1883
1884            // Size should be 10
1885            let bounds = journal.bounds().await;
1886            assert_eq!(bounds.end, 10);
1887
1888            // No data yet, so no oldest retained position
1889            assert!(bounds.is_empty());
1890
1891            // Next append should get position 10
1892            let pos = journal.append(&1000).await.unwrap();
1893            assert_eq!(pos, 10);
1894            assert_eq!(journal.size().await, 11);
1895            assert_eq!(journal.read(10).await.unwrap(), 1000);
1896
1897            // Can continue appending
1898            let pos = journal.append(&1001).await.unwrap();
1899            assert_eq!(pos, 11);
1900            assert_eq!(journal.read(11).await.unwrap(), 1001);
1901
1902            journal.destroy().await.unwrap();
1903        });
1904    }
1905
1906    #[test_traced]
1907    fn test_init_at_size_mid_section() {
1908        let executor = deterministic::Runner::default();
1909        executor.start(|context| async move {
1910            let cfg = Config {
1911                partition: "init-at-size-mid".into(),
1912                items_per_section: NZU64!(5),
1913                compression: None,
1914                codec_config: (),
1915                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1916                write_buffer: NZUsize!(1024),
1917            };
1918
1919            // Initialize at position 7 (middle of section 1 with items_per_section=5)
1920            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1921                .await
1922                .unwrap();
1923
1924            // Size should be 7
1925            let bounds = journal.bounds().await;
1926            assert_eq!(bounds.end, 7);
1927
1928            // No data yet, so no oldest retained position
1929            assert!(bounds.is_empty());
1930
1931            // Next append should get position 7
1932            let pos = journal.append(&700).await.unwrap();
1933            assert_eq!(pos, 7);
1934            assert_eq!(journal.size().await, 8);
1935            assert_eq!(journal.read(7).await.unwrap(), 700);
1936
1937            journal.destroy().await.unwrap();
1938        });
1939    }
1940
1941    #[test_traced]
1942    fn test_init_at_size_persistence() {
1943        let executor = deterministic::Runner::default();
1944        executor.start(|context| async move {
1945            let cfg = Config {
1946                partition: "init-at-size-persist".into(),
1947                items_per_section: NZU64!(5),
1948                compression: None,
1949                codec_config: (),
1950                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1951                write_buffer: NZUsize!(1024),
1952            };
1953
1954            // Initialize at position 15
1955            let journal =
1956                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1957                    .await
1958                    .unwrap();
1959
1960            // Append some items
1961            for i in 0..5u64 {
1962                let pos = journal.append(&(1500 + i)).await.unwrap();
1963                assert_eq!(pos, 15 + i);
1964            }
1965
1966            assert_eq!(journal.size().await, 20);
1967
1968            // Sync and reopen
1969            journal.sync().await.unwrap();
1970            drop(journal);
1971
1972            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1973                .await
1974                .unwrap();
1975
1976            // Size and data should be preserved
1977            let bounds = journal.bounds().await;
1978            assert_eq!(bounds.end, 20);
1979            assert_eq!(bounds.start, 15);
1980
1981            // Verify data
1982            for i in 0..5u64 {
1983                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1984            }
1985
1986            // Can continue appending
1987            let pos = journal.append(&9999).await.unwrap();
1988            assert_eq!(pos, 20);
1989            assert_eq!(journal.read(20).await.unwrap(), 9999);
1990
1991            journal.destroy().await.unwrap();
1992        });
1993    }
1994
1995    #[test_traced]
1996    fn test_init_at_size_persistence_without_data() {
1997        let executor = deterministic::Runner::default();
1998        executor.start(|context| async move {
1999            let cfg = Config {
2000                partition: "init-at-size-persist-empty".into(),
2001                items_per_section: NZU64!(5),
2002                compression: None,
2003                codec_config: (),
2004                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2005                write_buffer: NZUsize!(1024),
2006            };
2007
2008            // Initialize at position 15
2009            let journal =
2010                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2011                    .await
2012                    .unwrap();
2013
2014            let bounds = journal.bounds().await;
2015            assert_eq!(bounds.end, 15);
2016            assert!(bounds.is_empty());
2017
2018            // Drop without writing any data
2019            drop(journal);
2020
2021            // Reopen and verify size persisted
2022            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2023                .await
2024                .unwrap();
2025
2026            let bounds = journal.bounds().await;
2027            assert_eq!(bounds.end, 15);
2028            assert!(bounds.is_empty());
2029
2030            // Can append starting at position 15
2031            let pos = journal.append(&1500).await.unwrap();
2032            assert_eq!(pos, 15);
2033            assert_eq!(journal.read(15).await.unwrap(), 1500);
2034
2035            journal.destroy().await.unwrap();
2036        });
2037    }
2038
2039    /// Test init_at_size with mid-section value persists correctly across restart.
2040    #[test_traced]
2041    fn test_init_at_size_mid_section_persistence() {
2042        let executor = deterministic::Runner::default();
2043        executor.start(|context| async move {
2044            let cfg = Config {
2045                partition: "init-at-size-mid-section".into(),
2046                items_per_section: NZU64!(5),
2047                compression: None,
2048                codec_config: (),
2049                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2050                write_buffer: NZUsize!(1024),
2051            };
2052
2053            // Initialize at position 7 (mid-section, 7 % 5 = 2)
2054            let journal =
2055                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2056                    .await
2057                    .unwrap();
2058
2059            // Append 3 items at positions 7, 8, 9 (fills rest of section 1)
2060            for i in 0..3u64 {
2061                let pos = journal.append(&(700 + i)).await.unwrap();
2062                assert_eq!(pos, 7 + i);
2063            }
2064
2065            let bounds = journal.bounds().await;
2066            assert_eq!(bounds.end, 10);
2067            assert_eq!(bounds.start, 7);
2068
2069            // Sync and reopen
2070            journal.sync().await.unwrap();
2071            drop(journal);
2072
2073            // Reopen
2074            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2075                .await
2076                .unwrap();
2077
2078            // Size and bounds.start should be preserved correctly
2079            let bounds = journal.bounds().await;
2080            assert_eq!(bounds.end, 10);
2081            assert_eq!(bounds.start, 7);
2082
2083            // Verify data
2084            for i in 0..3u64 {
2085                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2086            }
2087
2088            // Positions before 7 should be pruned
2089            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2090
2091            journal.destroy().await.unwrap();
2092        });
2093    }
2094
2095    /// Test init_at_size mid-section with data spanning multiple sections.
2096    #[test_traced]
2097    fn test_init_at_size_mid_section_multi_section_persistence() {
2098        let executor = deterministic::Runner::default();
2099        executor.start(|context| async move {
2100            let cfg = Config {
2101                partition: "init-at-size-multi-section".into(),
2102                items_per_section: NZU64!(5),
2103                compression: None,
2104                codec_config: (),
2105                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2106                write_buffer: NZUsize!(1024),
2107            };
2108
2109            // Initialize at position 7 (mid-section)
2110            let journal =
2111                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2112                    .await
2113                    .unwrap();
2114
2115            // Append 8 items: positions 7-14 (section 1: 3 items, section 2: 5 items)
2116            for i in 0..8u64 {
2117                let pos = journal.append(&(700 + i)).await.unwrap();
2118                assert_eq!(pos, 7 + i);
2119            }
2120
2121            let bounds = journal.bounds().await;
2122            assert_eq!(bounds.end, 15);
2123            assert_eq!(bounds.start, 7);
2124
2125            // Sync and reopen
2126            journal.sync().await.unwrap();
2127            drop(journal);
2128
2129            // Reopen
2130            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2131                .await
2132                .unwrap();
2133
2134            // Verify state preserved
2135            let bounds = journal.bounds().await;
2136            assert_eq!(bounds.end, 15);
2137            assert_eq!(bounds.start, 7);
2138
2139            // Verify all data
2140            for i in 0..8u64 {
2141                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2142            }
2143
2144            journal.destroy().await.unwrap();
2145        });
2146    }
2147
2148    /// Regression test: data-empty crash repair must preserve mid-section pruning boundary.
2149    #[test_traced]
2150    fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2151        let executor = deterministic::Runner::default();
2152        executor.start(|context| async move {
2153            let cfg = Config {
2154                partition: "align-journals-mid-section-pruning-boundary".into(),
2155                items_per_section: NZU64!(5),
2156                compression: None,
2157                codec_config: (),
2158                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2159                write_buffer: NZUsize!(1024),
2160            };
2161
2162            // Phase 1: Create data and offsets, then simulate data-only pruning crash.
2163            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2164                .await
2165                .unwrap();
2166            for i in 0..7u64 {
2167                journal.append(&(100 + i)).await.unwrap();
2168            }
2169            journal.sync().await.unwrap();
2170
2171            // Simulate crash after data was cleared but before offsets were pruned.
2172            journal.inner.write().await.data.clear().await.unwrap();
2173            drop(journal);
2174
2175            // Phase 2: Init triggers data-empty repair and should treat journal as fully pruned at size 7.
2176            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2177                .await
2178                .unwrap();
2179            let bounds = journal.bounds().await;
2180            assert_eq!(bounds.end, 7);
2181            assert!(bounds.is_empty());
2182
2183            // Append one item at position 7.
2184            let pos = journal.append(&777).await.unwrap();
2185            assert_eq!(pos, 7);
2186            assert_eq!(journal.size().await, 8);
2187            assert_eq!(journal.read(7).await.unwrap(), 777);
2188
2189            // Sync only the data journal to simulate a crash before offsets are synced.
2190            let section = 7 / cfg.items_per_section.get();
2191            journal
2192                .inner
2193                .write()
2194                .await
2195                .data
2196                .sync(section)
2197                .await
2198                .unwrap();
2199            drop(journal);
2200
2201            // Phase 3: Reopen and verify we did not lose the appended item.
2202            let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2203                .await
2204                .unwrap();
2205            let bounds = journal.bounds().await;
2206            assert_eq!(bounds.end, 8);
2207            assert_eq!(bounds.start, 7);
2208            assert_eq!(journal.read(7).await.unwrap(), 777);
2209
2210            journal.destroy().await.unwrap();
2211        });
2212    }
2213
2214    /// Test crash recovery: init_at_size + append + crash with data synced but offsets not.
2215    #[test_traced]
2216    fn test_init_at_size_crash_data_synced_offsets_not() {
2217        let executor = deterministic::Runner::default();
2218        executor.start(|context| async move {
2219            let cfg = Config {
2220                partition: "init-at-size-crash-recovery".into(),
2221                items_per_section: NZU64!(5),
2222                compression: None,
2223                codec_config: (),
2224                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2225                write_buffer: NZUsize!(1024),
2226            };
2227
2228            // Initialize at position 7 (mid-section)
2229            let journal =
2230                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2231                    .await
2232                    .unwrap();
2233
2234            // Append 3 items
2235            for i in 0..3u64 {
2236                journal.append(&(700 + i)).await.unwrap();
2237            }
2238
2239            // Sync only the data journal, not offsets (simulate crash)
2240            journal.inner.write().await.data.sync(1).await.unwrap();
2241            // Don't sync offsets - simulates crash after data write but before offsets write
2242            drop(journal);
2243
2244            // Reopen - should recover by rebuilding offsets from data
2245            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2246                .await
2247                .unwrap();
2248
2249            // Verify recovery
2250            let bounds = journal.bounds().await;
2251            assert_eq!(bounds.end, 10);
2252            assert_eq!(bounds.start, 7);
2253
2254            // Verify data is accessible
2255            for i in 0..3u64 {
2256                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2257            }
2258
2259            journal.destroy().await.unwrap();
2260        });
2261    }
2262
2263    #[test_traced]
2264    fn test_prune_does_not_move_oldest_retained_backwards() {
2265        let executor = deterministic::Runner::default();
2266        executor.start(|context| async move {
2267            let cfg = Config {
2268                partition: "prune-no-backwards".into(),
2269                items_per_section: NZU64!(5),
2270                compression: None,
2271                codec_config: (),
2272                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2273                write_buffer: NZUsize!(1024),
2274            };
2275
2276            let journal =
2277                Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2278                    .await
2279                    .unwrap();
2280
2281            // Append a few items at positions 7..9
2282            for i in 0..3u64 {
2283                let pos = journal.append(&(700 + i)).await.unwrap();
2284                assert_eq!(pos, 7 + i);
2285            }
2286            assert_eq!(journal.bounds().await.start, 7);
2287
2288            // Prune to a position within the same section should not move bounds.start backwards.
2289            journal.prune(8).await.unwrap();
2290            assert_eq!(journal.bounds().await.start, 7);
2291            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2292            assert_eq!(journal.read(7).await.unwrap(), 700);
2293
2294            journal.destroy().await.unwrap();
2295        });
2296    }
2297
2298    #[test_traced]
2299    fn test_init_at_size_large_offset() {
2300        let executor = deterministic::Runner::default();
2301        executor.start(|context| async move {
2302            let cfg = Config {
2303                partition: "init-at-size-large".into(),
2304                items_per_section: NZU64!(5),
2305                compression: None,
2306                codec_config: (),
2307                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2308                write_buffer: NZUsize!(1024),
2309            };
2310
2311            // Initialize at a large position (position 1000)
2312            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2313                .await
2314                .unwrap();
2315
2316            let bounds = journal.bounds().await;
2317            assert_eq!(bounds.end, 1000);
2318            // No data yet, so no oldest retained position
2319            assert!(bounds.is_empty());
2320
2321            // Next append should get position 1000
2322            let pos = journal.append(&100000).await.unwrap();
2323            assert_eq!(pos, 1000);
2324            assert_eq!(journal.read(1000).await.unwrap(), 100000);
2325
2326            journal.destroy().await.unwrap();
2327        });
2328    }
2329
2330    #[test_traced]
2331    fn test_init_at_size_prune_and_append() {
2332        let executor = deterministic::Runner::default();
2333        executor.start(|context| async move {
2334            let cfg = Config {
2335                partition: "init-at-size-prune".into(),
2336                items_per_section: NZU64!(5),
2337                compression: None,
2338                codec_config: (),
2339                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2340                write_buffer: NZUsize!(1024),
2341            };
2342
2343            // Initialize at position 20
2344            let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2345                .await
2346                .unwrap();
2347
2348            // Append items 20-29
2349            for i in 0..10u64 {
2350                journal.append(&(2000 + i)).await.unwrap();
2351            }
2352
2353            assert_eq!(journal.size().await, 30);
2354
2355            // Prune to position 25
2356            journal.prune(25).await.unwrap();
2357
2358            let bounds = journal.bounds().await;
2359            assert_eq!(bounds.end, 30);
2360            assert_eq!(bounds.start, 25);
2361
2362            // Verify remaining items are readable
2363            for i in 25..30u64 {
2364                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2365            }
2366
2367            // Continue appending
2368            let pos = journal.append(&3000).await.unwrap();
2369            assert_eq!(pos, 30);
2370
2371            journal.destroy().await.unwrap();
2372        });
2373    }
2374
2375    /// Test `init_sync` when there is no existing data on disk.
2376    #[test_traced]
2377    fn test_init_sync_no_existing_data() {
2378        let executor = deterministic::Runner::default();
2379        executor.start(|context| async move {
2380            let cfg = Config {
2381                partition: "test-fresh-start".into(),
2382                items_per_section: NZU64!(5),
2383                compression: None,
2384                codec_config: (),
2385                write_buffer: NZUsize!(1024),
2386                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2387            };
2388
2389            // Initialize journal with sync boundaries when no existing data exists
2390            let lower_bound = 10;
2391            let upper_bound = 26;
2392            let journal =
2393                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2394                    .await
2395                    .expect("Failed to initialize journal with sync boundaries");
2396
2397            let bounds = journal.bounds().await;
2398            assert_eq!(bounds.end, lower_bound);
2399            assert!(bounds.is_empty());
2400
2401            // Append items using the contiguous API
2402            let pos1 = journal.append(&42u64).await.unwrap();
2403            assert_eq!(pos1, lower_bound);
2404            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2405
2406            let pos2 = journal.append(&43u64).await.unwrap();
2407            assert_eq!(pos2, lower_bound + 1);
2408            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2409
2410            journal.destroy().await.unwrap();
2411        });
2412    }
2413
2414    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
2415    #[test_traced]
2416    fn test_init_sync_existing_data_overlap() {
2417        let executor = deterministic::Runner::default();
2418        executor.start(|context| async move {
2419            let cfg = Config {
2420                partition: "test-overlap".into(),
2421                items_per_section: NZU64!(5),
2422                compression: None,
2423                codec_config: (),
2424                write_buffer: NZUsize!(1024),
2425                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2426            };
2427
2428            // Create initial journal with data in multiple sections
2429            let journal =
2430                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2431                    .await
2432                    .expect("Failed to create initial journal");
2433
2434            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2435            for i in 0..20u64 {
2436                journal.append(&(i * 100)).await.unwrap();
2437            }
2438            journal.sync().await.unwrap();
2439            drop(journal);
2440
2441            // Initialize with sync boundaries that overlap with existing data
2442            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
2443            let lower_bound = 8;
2444            let upper_bound = 31;
2445            let journal = Journal::<deterministic::Context, u64>::init_sync(
2446                context.clone(),
2447                cfg.clone(),
2448                lower_bound..upper_bound,
2449            )
2450            .await
2451            .expect("Failed to initialize journal with overlap");
2452
2453            assert_eq!(journal.size().await, 20);
2454
2455            // Verify oldest retained is pruned to lower_bound's section boundary (5)
2456            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2457
2458            // Verify data integrity: positions before 5 are pruned
2459            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2460            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2461
2462            // Positions 5-19 should be accessible
2463            assert_eq!(journal.read(5).await.unwrap(), 500);
2464            assert_eq!(journal.read(8).await.unwrap(), 800);
2465            assert_eq!(journal.read(19).await.unwrap(), 1900);
2466
2467            // Position 20+ should not exist yet
2468            assert!(matches!(
2469                journal.read(20).await,
2470                Err(Error::ItemOutOfRange(_))
2471            ));
2472
2473            // Assert journal can accept new items
2474            let pos = journal.append(&999).await.unwrap();
2475            assert_eq!(pos, 20);
2476            assert_eq!(journal.read(20).await.unwrap(), 999);
2477
2478            journal.destroy().await.unwrap();
2479        });
2480    }
2481
2482    /// Test `init_sync` with invalid parameters.
2483    #[should_panic]
2484    #[test_traced]
2485    fn test_init_sync_invalid_parameters() {
2486        let executor = deterministic::Runner::default();
2487        executor.start(|context| async move {
2488            let cfg = Config {
2489                partition: "test-invalid".into(),
2490                items_per_section: NZU64!(5),
2491                compression: None,
2492                codec_config: (),
2493                write_buffer: NZUsize!(1024),
2494                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2495            };
2496
2497            #[allow(clippy::reversed_empty_ranges)]
2498            let _result = Journal::<deterministic::Context, u64>::init_sync(
2499                context.clone(),
2500                cfg,
2501                10..5, // invalid range: lower > upper
2502            )
2503            .await;
2504        });
2505    }
2506
2507    /// Test `init_sync` when existing data exactly matches the sync range.
2508    #[test_traced]
2509    fn test_init_sync_existing_data_exact_match() {
2510        let executor = deterministic::Runner::default();
2511        executor.start(|context| async move {
2512            let items_per_section = NZU64!(5);
2513            let cfg = Config {
2514                partition: "test-exact-match".into(),
2515                items_per_section,
2516                compression: None,
2517                codec_config: (),
2518                write_buffer: NZUsize!(1024),
2519                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2520            };
2521
2522            // Create initial journal with data exactly matching sync range
2523            let journal =
2524                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2525                    .await
2526                    .expect("Failed to create initial journal");
2527
2528            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2529            for i in 0..20u64 {
2530                journal.append(&(i * 100)).await.unwrap();
2531            }
2532            journal.sync().await.unwrap();
2533            drop(journal);
2534
2535            // Initialize with sync boundaries that exactly match existing data
2536            let lower_bound = 5; // section 1
2537            let upper_bound = 20; // section 3
2538            let journal = Journal::<deterministic::Context, u64>::init_sync(
2539                context.clone(),
2540                cfg.clone(),
2541                lower_bound..upper_bound,
2542            )
2543            .await
2544            .expect("Failed to initialize journal with exact match");
2545
2546            assert_eq!(journal.size().await, 20);
2547
2548            // Verify pruning to lower bound (section 1 boundary = position 5)
2549            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2550
2551            // Verify positions before 5 are pruned
2552            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2553            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2554
2555            // Positions 5-19 should be accessible
2556            assert_eq!(journal.read(5).await.unwrap(), 500);
2557            assert_eq!(journal.read(10).await.unwrap(), 1000);
2558            assert_eq!(journal.read(19).await.unwrap(), 1900);
2559
2560            // Position 20+ should not exist yet
2561            assert!(matches!(
2562                journal.read(20).await,
2563                Err(Error::ItemOutOfRange(_))
2564            ));
2565
2566            // Assert journal can accept new operations
2567            let pos = journal.append(&999).await.unwrap();
2568            assert_eq!(pos, 20);
2569            assert_eq!(journal.read(20).await.unwrap(), 999);
2570
2571            journal.destroy().await.unwrap();
2572        });
2573    }
2574
2575    /// Test `init_sync` when existing data exceeds the sync target range.
2576    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2577    #[test_traced]
2578    fn test_init_sync_existing_data_exceeds_upper_bound() {
2579        let executor = deterministic::Runner::default();
2580        executor.start(|context| async move {
2581            let items_per_section = NZU64!(5);
2582            let cfg = Config {
2583                partition: "test-unexpected-data".into(),
2584                items_per_section,
2585                compression: None,
2586                codec_config: (),
2587                write_buffer: NZUsize!(1024),
2588                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2589            };
2590
2591            // Create initial journal with data beyond sync range
2592            let journal = Journal::<deterministic::Context, u64>::init(
2593                context.with_label("initial"),
2594                cfg.clone(),
2595            )
2596            .await
2597            .expect("Failed to create initial journal");
2598
2599            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2600            for i in 0..30u64 {
2601                journal.append(&(i * 1000)).await.unwrap();
2602            }
2603            journal.sync().await.unwrap();
2604            drop(journal);
2605
2606            // Initialize with sync boundaries that are exceeded by existing data
2607            let lower_bound = 8; // section 1
2608            for (i, upper_bound) in (9..29).enumerate() {
2609                let result = Journal::<deterministic::Context, u64>::init_sync(
2610                    context.with_label(&format!("sync_{i}")),
2611                    cfg.clone(),
2612                    lower_bound..upper_bound,
2613                )
2614                .await;
2615
2616                // Should return ItemOutOfRange error since data exists beyond upper_bound
2617                assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2618            }
2619        });
2620    }
2621
2622    /// Test `init_sync` when all existing data is stale (before lower bound).
2623    #[test_traced]
2624    fn test_init_sync_existing_data_stale() {
2625        let executor = deterministic::Runner::default();
2626        executor.start(|context| async move {
2627            let items_per_section = NZU64!(5);
2628            let cfg = Config {
2629                partition: "test-stale".into(),
2630                items_per_section,
2631                compression: None,
2632                codec_config: (),
2633                write_buffer: NZUsize!(1024),
2634                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2635            };
2636
2637            // Create initial journal with stale data
2638            let journal = Journal::<deterministic::Context, u64>::init(
2639                context.with_label("first"),
2640                cfg.clone(),
2641            )
2642            .await
2643            .expect("Failed to create initial journal");
2644
2645            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2646            for i in 0..10u64 {
2647                journal.append(&(i * 100)).await.unwrap();
2648            }
2649            journal.sync().await.unwrap();
2650            drop(journal);
2651
2652            // Initialize with sync boundaries beyond all existing data
2653            let lower_bound = 15; // section 3
2654            let upper_bound = 26; // last element in section 5
2655            let journal = Journal::<deterministic::Context, u64>::init_sync(
2656                context.with_label("second"),
2657                cfg.clone(),
2658                lower_bound..upper_bound,
2659            )
2660            .await
2661            .expect("Failed to initialize journal with stale data");
2662
2663            assert_eq!(journal.size().await, 15);
2664
2665            // Verify fresh journal (all old data destroyed, starts at position 15)
2666            assert!(journal.bounds().await.is_empty());
2667
2668            // Verify old positions don't exist
2669            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2670            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2671            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2672
2673            journal.destroy().await.unwrap();
2674        });
2675    }
2676
2677    /// Test `init_sync` with section boundary edge cases.
2678    #[test_traced]
2679    fn test_init_sync_section_boundaries() {
2680        let executor = deterministic::Runner::default();
2681        executor.start(|context| async move {
2682            let items_per_section = NZU64!(5);
2683            let cfg = Config {
2684                partition: "test-boundaries".into(),
2685                items_per_section,
2686                compression: None,
2687                codec_config: (),
2688                write_buffer: NZUsize!(1024),
2689                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2690            };
2691
2692            // Create journal with data at section boundaries
2693            let journal =
2694                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2695                    .await
2696                    .expect("Failed to create initial journal");
2697
2698            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2699            for i in 0..25u64 {
2700                journal.append(&(i * 100)).await.unwrap();
2701            }
2702            journal.sync().await.unwrap();
2703            drop(journal);
2704
2705            // Test sync boundaries exactly at section boundaries
2706            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2707            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2708            let journal = Journal::<deterministic::Context, u64>::init_sync(
2709                context.clone(),
2710                cfg.clone(),
2711                lower_bound..upper_bound,
2712            )
2713            .await
2714            .expect("Failed to initialize journal at boundaries");
2715
2716            assert_eq!(journal.size().await, 25);
2717
2718            // Verify oldest retained is at section 3 boundary (position 15)
2719            assert_eq!(journal.bounds().await.start, 15);
2720
2721            // Verify positions before 15 are pruned
2722            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2723            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2724
2725            // Verify positions 15-24 are accessible
2726            assert_eq!(journal.read(15).await.unwrap(), 1500);
2727            assert_eq!(journal.read(20).await.unwrap(), 2000);
2728            assert_eq!(journal.read(24).await.unwrap(), 2400);
2729
2730            // Position 25+ should not exist yet
2731            assert!(matches!(
2732                journal.read(25).await,
2733                Err(Error::ItemOutOfRange(_))
2734            ));
2735
2736            // Assert journal can accept new operations
2737            let pos = journal.append(&999).await.unwrap();
2738            assert_eq!(pos, 25);
2739            assert_eq!(journal.read(25).await.unwrap(), 999);
2740
2741            journal.destroy().await.unwrap();
2742        });
2743    }
2744
2745    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2746    #[test_traced]
2747    fn test_init_sync_same_section_bounds() {
2748        let executor = deterministic::Runner::default();
2749        executor.start(|context| async move {
2750            let items_per_section = NZU64!(5);
2751            let cfg = Config {
2752                partition: "test-same-section".into(),
2753                items_per_section,
2754                compression: None,
2755                codec_config: (),
2756                write_buffer: NZUsize!(1024),
2757                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2758            };
2759
2760            // Create journal with data in multiple sections
2761            let journal =
2762                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2763                    .await
2764                    .expect("Failed to create initial journal");
2765
2766            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2767            for i in 0..15u64 {
2768                journal.append(&(i * 100)).await.unwrap();
2769            }
2770            journal.sync().await.unwrap();
2771            drop(journal);
2772
2773            // Test sync boundaries within the same section
2774            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2775            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2776            let journal = Journal::<deterministic::Context, u64>::init_sync(
2777                context.clone(),
2778                cfg.clone(),
2779                lower_bound..upper_bound,
2780            )
2781            .await
2782            .expect("Failed to initialize journal with same-section bounds");
2783
2784            assert_eq!(journal.size().await, 15);
2785
2786            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2787            // Oldest retained position should be at section 2 boundary (position 10)
2788            assert_eq!(journal.bounds().await.start, 10);
2789
2790            // Verify positions before 10 are pruned
2791            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2792            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2793
2794            // Verify positions 10-14 are accessible
2795            assert_eq!(journal.read(10).await.unwrap(), 1000);
2796            assert_eq!(journal.read(11).await.unwrap(), 1100);
2797            assert_eq!(journal.read(14).await.unwrap(), 1400);
2798
2799            // Position 15+ should not exist yet
2800            assert!(matches!(
2801                journal.read(15).await,
2802                Err(Error::ItemOutOfRange(_))
2803            ));
2804
2805            // Assert journal can accept new operations
2806            let pos = journal.append(&999).await.unwrap();
2807            assert_eq!(pos, 15);
2808            assert_eq!(journal.read(15).await.unwrap(), 999);
2809
2810            journal.destroy().await.unwrap();
2811        });
2812    }
2813
2814    /// Test contiguous variable journal with items_per_section=1.
2815    ///
2816    /// This is a regression test for a bug where reading from size()-1 fails
2817    /// when using items_per_section=1, particularly after pruning and restart.
2818    #[test_traced]
2819    fn test_single_item_per_section() {
2820        let executor = deterministic::Runner::default();
2821        executor.start(|context| async move {
2822            let cfg = Config {
2823                partition: "single-item-per-section".into(),
2824                items_per_section: NZU64!(1),
2825                compression: None,
2826                codec_config: (),
2827                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2828                write_buffer: NZUsize!(1024),
2829            };
2830
2831            // === Test 1: Basic single item operation ===
2832            let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2833                .await
2834                .unwrap();
2835
2836            // Verify empty state
2837            let bounds = journal.bounds().await;
2838            assert_eq!(bounds.end, 0);
2839            assert!(bounds.is_empty());
2840
2841            // Append 1 item (value = position * 100, so position 0 has value 0)
2842            let pos = journal.append(&0).await.unwrap();
2843            assert_eq!(pos, 0);
2844            assert_eq!(journal.size().await, 1);
2845
2846            // Sync
2847            journal.sync().await.unwrap();
2848
2849            // Read from size() - 1
2850            let value = journal.read(journal.size().await - 1).await.unwrap();
2851            assert_eq!(value, 0);
2852
2853            // === Test 2: Multiple items with single item per section ===
2854            for i in 1..10u64 {
2855                let pos = journal.append(&(i * 100)).await.unwrap();
2856                assert_eq!(pos, i);
2857                assert_eq!(journal.size().await, i + 1);
2858
2859                // Verify we can read the just-appended item at size() - 1
2860                let value = journal.read(journal.size().await - 1).await.unwrap();
2861                assert_eq!(value, i * 100);
2862            }
2863
2864            // Verify all items can be read
2865            for i in 0..10u64 {
2866                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2867            }
2868
2869            journal.sync().await.unwrap();
2870
2871            // === Test 3: Pruning with single item per section ===
2872            // Prune to position 5 (removes positions 0-4)
2873            let pruned = journal.prune(5).await.unwrap();
2874            assert!(pruned);
2875
2876            // Size should still be 10
2877            assert_eq!(journal.size().await, 10);
2878
2879            // bounds.start should be 5
2880            assert_eq!(journal.bounds().await.start, 5);
2881
2882            // Reading from bounds.end - 1 (position 9) should still work
2883            let value = journal.read(journal.size().await - 1).await.unwrap();
2884            assert_eq!(value, 900);
2885
2886            // Reading from pruned positions should return ItemPruned
2887            for i in 0..5 {
2888                assert!(matches!(
2889                    journal.read(i).await,
2890                    Err(crate::journal::Error::ItemPruned(_))
2891                ));
2892            }
2893
2894            // Reading from retained positions should work
2895            for i in 5..10u64 {
2896                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2897            }
2898
2899            // Append more items after pruning
2900            for i in 10..15u64 {
2901                let pos = journal.append(&(i * 100)).await.unwrap();
2902                assert_eq!(pos, i);
2903
2904                // Verify we can read from size() - 1
2905                let value = journal.read(journal.size().await - 1).await.unwrap();
2906                assert_eq!(value, i * 100);
2907            }
2908
2909            journal.sync().await.unwrap();
2910            drop(journal);
2911
2912            // === Test 4: Restart persistence with single item per section ===
2913            let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2914                .await
2915                .unwrap();
2916
2917            // Verify size is preserved
2918            assert_eq!(journal.size().await, 15);
2919
2920            // Verify bounds.start is preserved
2921            assert_eq!(journal.bounds().await.start, 5);
2922
2923            // Reading from bounds.end - 1 should work after restart
2924            let value = journal.read(journal.size().await - 1).await.unwrap();
2925            assert_eq!(value, 1400);
2926
2927            // Reading all retained positions should work
2928            for i in 5..15u64 {
2929                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2930            }
2931
2932            journal.destroy().await.unwrap();
2933
2934            // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) ===
2935            // Fresh journal for this test
2936            let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2937                .await
2938                .unwrap();
2939
2940            // Append 10 items (positions 0-9)
2941            for i in 0..10u64 {
2942                journal.append(&(i * 1000)).await.unwrap();
2943            }
2944
2945            // Prune to position 5 (removes positions 0-4)
2946            journal.prune(5).await.unwrap();
2947            let bounds = journal.bounds().await;
2948            assert_eq!(bounds.end, 10);
2949            assert_eq!(bounds.start, 5);
2950
2951            // Sync and restart
2952            journal.sync().await.unwrap();
2953            drop(journal);
2954
2955            // Re-open journal
2956            let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
2957                .await
2958                .unwrap();
2959
2960            // Verify state after restart
2961            let bounds = journal.bounds().await;
2962            assert_eq!(bounds.end, 10);
2963            assert_eq!(bounds.start, 5);
2964
2965            // KEY TEST: Reading from bounds.end - 1 (position 9) should work
2966            let value = journal.read(journal.size().await - 1).await.unwrap();
2967            assert_eq!(value, 9000);
2968
2969            // Verify all retained positions (5-9) work
2970            for i in 5..10u64 {
2971                assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2972            }
2973
2974            journal.destroy().await.unwrap();
2975
2976            // === Test 6: Prune all items (edge case) ===
2977            // This tests the scenario where prune removes everything.
2978            // Callers must check bounds().is_empty() before reading.
2979            let journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
2980                .await
2981                .unwrap();
2982
2983            for i in 0..5u64 {
2984                journal.append(&(i * 100)).await.unwrap();
2985            }
2986            journal.sync().await.unwrap();
2987
2988            // Prune all items
2989            journal.prune(5).await.unwrap();
2990            let bounds = journal.bounds().await;
2991            assert_eq!(bounds.end, 5); // Size unchanged
2992            assert!(bounds.is_empty()); // All pruned
2993
2994            // bounds.end - 1 = 4, but position 4 is pruned
2995            let result = journal.read(journal.size().await - 1).await;
2996            assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2997
2998            // After appending, reading works again
2999            journal.append(&500).await.unwrap();
3000            let bounds = journal.bounds().await;
3001            assert_eq!(bounds.start, 5);
3002            assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3003
3004            journal.destroy().await.unwrap();
3005        });
3006    }
3007
3008    #[test_traced]
3009    fn test_variable_journal_clear_to_size() {
3010        let executor = deterministic::Runner::default();
3011        executor.start(|context| async move {
3012            let cfg = Config {
3013                partition: "clear-test".into(),
3014                items_per_section: NZU64!(10),
3015                compression: None,
3016                codec_config: (),
3017                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3018                write_buffer: NZUsize!(1024),
3019            };
3020
3021            let journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
3022                .await
3023                .unwrap();
3024
3025            // Append 25 items (spanning multiple sections)
3026            for i in 0..25u64 {
3027                journal.append(&(i * 100)).await.unwrap();
3028            }
3029            let bounds = journal.bounds().await;
3030            assert_eq!(bounds.end, 25);
3031            assert_eq!(bounds.start, 0);
3032            journal.sync().await.unwrap();
3033
3034            // Clear to position 100, effectively resetting the journal
3035            journal.clear_to_size(100).await.unwrap();
3036            let bounds = journal.bounds().await;
3037            assert_eq!(bounds.end, 100);
3038            assert!(bounds.is_empty());
3039
3040            // Old positions should fail
3041            for i in 0..25 {
3042                assert!(matches!(
3043                    journal.read(i).await,
3044                    Err(crate::journal::Error::ItemPruned(_))
3045                ));
3046            }
3047
3048            // Verify size persists after restart without writing any data
3049            drop(journal);
3050            let journal =
3051                Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
3052                    .await
3053                    .unwrap();
3054            let bounds = journal.bounds().await;
3055            assert_eq!(bounds.end, 100);
3056            assert!(bounds.is_empty());
3057
3058            // Append new data starting at position 100
3059            for i in 100..105u64 {
3060                let pos = journal.append(&(i * 100)).await.unwrap();
3061                assert_eq!(pos, i);
3062            }
3063            let bounds = journal.bounds().await;
3064            assert_eq!(bounds.end, 105);
3065            assert_eq!(bounds.start, 100);
3066
3067            // New positions should be readable
3068            for i in 100..105u64 {
3069                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3070            }
3071
3072            // Sync and re-init to verify persistence
3073            journal.sync().await.unwrap();
3074            drop(journal);
3075
3076            let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
3077                .await
3078                .unwrap();
3079
3080            let bounds = journal.bounds().await;
3081            assert_eq!(bounds.end, 105);
3082            assert_eq!(bounds.start, 100);
3083            for i in 100..105u64 {
3084                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3085            }
3086
3087            journal.destroy().await.unwrap();
3088        });
3089    }
3090}