Skip to main content

commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use super::Reader as _;
7use crate::{
8    journal::{
9        contiguous::{fixed, metrics::VariableMetrics as Metrics, Contiguous, Many, Mutable},
10        segmented::variable,
11        Error,
12    },
13    Context, Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::buffer::paged::CacheRef;
17use commonware_utils::{
18    sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19    NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31/// Suffix appended to the base partition name for the data journal.
32const DATA_SUFFIX: &str = "_data";
33
34/// Suffix appended to the base partition name for the offsets journal.
35const OFFSETS_SUFFIX: &str = "_offsets";
36
37/// Calculate the section number for a given position.
38///
39/// # Arguments
40///
41/// * `position` - The absolute position in the journal
42/// * `items_per_section` - The number of items stored in each section
43///
44/// # Returns
45///
46/// The section number where the item at `position` should be stored.
47///
48/// # Examples
49///
50/// ```ignore
51/// // With 10 items per section:
52/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
53/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
54/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
55/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
56/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
57/// ```
58const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59    position / items_per_section
60}
61
62/// Configuration for a [Journal].
63#[derive(Clone)]
64pub struct Config<C> {
65    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
66    pub partition: String,
67
68    /// The number of items to store in each section.
69    ///
70    /// Once set, this value cannot be changed across restarts.
71    /// All non-final sections will be full and persisted.
72    pub items_per_section: NonZeroU64,
73
74    /// Optional compression level for stored items.
75    pub compression: Option<u8>,
76
77    /// [Codec] configuration for encoding/decoding items.
78    pub codec_config: C,
79
80    /// Page cache for buffering reads from the underlying storage.
81    pub page_cache: CacheRef,
82
83    /// Write buffer size for each section.
84    pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88    /// Returns the partition name for the data journal.
89    fn data_partition(&self) -> String {
90        format!("{}{}", self.partition, DATA_SUFFIX)
91    }
92
93    /// Returns the partition name for the offsets journal.
94    fn offsets_partition(&self) -> String {
95        format!("{}{}", self.partition, OFFSETS_SUFFIX)
96    }
97}
98
99/// Inner journal state protected by a lock for interior mutability.
100struct Inner<E: Context, V: Codec> {
101    /// The underlying variable-length data journal.
102    data: variable::Journal<E, V>,
103
104    /// The next position to be assigned on append (total items appended).
105    ///
106    /// # Invariant
107    ///
108    /// Always >= `pruning_boundary`. Equal when data journal is empty or fully pruned.
109    size: u64,
110
111    /// The position before which all items have been pruned.
112    ///
113    /// After normal operation and pruning, the value is section-aligned.
114    /// After `init_at_size(N)`, the value may be mid-section.
115    ///
116    /// # Invariant
117    ///
118    /// Never decreases (pruning only moves forward).
119    pruning_boundary: u64,
120}
121
122impl<E: Context, V: CodecShared> Inner<E, V> {
123    /// Read the item at the given position using the provided offsets reader.
124    ///
125    /// # Errors
126    ///
127    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
128    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
129    /// - Returns other errors if storage or decoding fails.
130    async fn read(
131        &self,
132        position: u64,
133        items_per_section: u64,
134        offsets: &impl super::Reader<Item = u64>,
135    ) -> Result<V, Error> {
136        if position >= self.size {
137            return Err(Error::ItemOutOfRange(position));
138        }
139        if position < self.pruning_boundary {
140            return Err(Error::ItemPruned(position));
141        }
142
143        let offset = offsets.read(position).await?;
144        let section = position_to_section(position, items_per_section);
145
146        self.data.get(section, offset).await
147    }
148
149    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
150    fn try_read_sync(
151        &self,
152        position: u64,
153        items_per_section: u64,
154        offsets: &impl super::Reader<Item = u64>,
155    ) -> Option<V> {
156        let mut buf = Vec::new();
157        self.try_read_sync_into(position, items_per_section, offsets, &mut buf)
158    }
159
160    /// Read an item synchronously using caller-provided buffer.
161    fn try_read_sync_into(
162        &self,
163        position: u64,
164        items_per_section: u64,
165        offsets: &impl super::Reader<Item = u64>,
166        buf: &mut Vec<u8>,
167    ) -> Option<V> {
168        if position >= self.size || position < self.pruning_boundary {
169            return None;
170        }
171        let offset = offsets.try_read_sync(position)?;
172        let section = position_to_section(position, items_per_section);
173        self.data.try_get_sync_into(section, offset, buf)
174    }
175}
176
177/// A contiguous journal with variable-size entries.
178///
179/// This journal manages section assignment automatically, allowing callers to append items
180/// sequentially without manually tracking section numbers.
181///
182/// # Repair
183///
184/// Like
185/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
186/// and
187/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
188/// the first invalid data read will be considered the new end of the journal (and the
189/// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during
190/// init via the underlying segmented journals.
191///
192/// # Invariants
193///
194/// ## 1. Section Fullness
195///
196/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
197/// that on `init()`, we only need to replay the last section to determine the exact size.
198///
199/// ## 2. Data Journal is Source of Truth
200///
201/// The data journal is always the source of truth. The offsets journal is an index
202/// that may temporarily diverge during crashes. Divergences are automatically
203/// aligned during init():
204/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
205///   (This can happen if we crash after writing data journal but before writing offsets journal)
206/// * If offsets.size() > data.size(): Rewind offsets to match data size.
207///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
208/// * If offsets.bounds().start < data.bounds().start: Prune offsets to match
209///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
210///
211/// Note that we don't recover from the case where offsets.bounds().start >
212/// data.bounds().start. This should never occur because we always prune the data journal
213/// before the offsets journal.
214pub struct Journal<E: Context, V: Codec> {
215    /// Inner state for data journal metadata.
216    ///
217    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
218    /// race conditions while allowing concurrent reads during sync.
219    inner: UpgradableAsyncRwLock<Inner<E, V>>,
220
221    /// Index mapping positions to byte offsets within the data journal.
222    /// The section can be calculated from the position using items_per_section.
223    offsets: fixed::Journal<E, u64>,
224
225    /// The number of items per section.
226    ///
227    /// # Invariant
228    ///
229    /// This value is immutable after initialization and must remain consistent
230    /// across restarts. Changing this value will result in data loss or corruption.
231    items_per_section: u64,
232
233    /// Optional compression level when encoding items.
234    compression: Option<u8>,
235
236    /// Metrics for monitoring journal state and activity.
237    metrics: Metrics<E>,
238}
239
240/// A reader guard that holds a consistent snapshot of the variable journal's bounds.
241pub struct Reader<'a, E: Context, V: Codec> {
242    guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
243    offsets: fixed::Reader<'a, E, u64>,
244    items_per_section: u64,
245    metrics: &'a Metrics<E>,
246}
247
248impl<E: Context, V: CodecShared> super::Reader for Reader<'_, E, V> {
249    type Item = V;
250
251    fn bounds(&self) -> std::ops::Range<u64> {
252        self.guard.pruning_boundary..self.guard.size
253    }
254
255    async fn read(&self, position: u64) -> Result<V, Error> {
256        let _timer = self.metrics.read_timer();
257        self.metrics.read_calls.inc();
258        let result = match self
259            .guard
260            .read(position, self.items_per_section, &self.offsets)
261            .await
262        {
263            Ok(item) => {
264                self.metrics.items_read.inc();
265                Ok(item)
266            }
267            Err(error) => Err(error),
268        };
269        result
270    }
271
272    async fn read_many(&self, positions: &[u64]) -> Result<Vec<V>, Error> {
273        if positions.is_empty() {
274            return Ok(Vec::new());
275        }
276        let _timer = self.metrics.read_many_timer();
277        self.metrics.read_many_calls.inc();
278        assert!(
279            positions.windows(2).all(|w| w[0] < w[1]),
280            "positions must be strictly increasing"
281        );
282        if positions[0] < self.guard.pruning_boundary {
283            return Err(Error::ItemPruned(positions[0]));
284        }
285        let last_position = *positions.last().expect("positions is not empty");
286        if last_position >= self.guard.size {
287            return Err(Error::ItemOutOfRange(last_position));
288        }
289
290        // Read the items from cache if possible.
291        let mut result: Vec<Option<V>> = Vec::with_capacity(positions.len());
292        let mut miss_indices = Vec::with_capacity(positions.len());
293        let mut miss_positions = Vec::with_capacity(positions.len());
294        let mut buf = Vec::new();
295        for (i, &position) in positions.iter().enumerate() {
296            if let Some(item) = self.guard.try_read_sync_into(
297                position,
298                self.items_per_section,
299                &self.offsets,
300                &mut buf,
301            ) {
302                result.push(Some(item));
303            } else {
304                result.push(None);
305                miss_indices.push(i);
306                miss_positions.push(position);
307            }
308        }
309
310        if miss_positions.is_empty() {
311            self.metrics.items_read.inc_by(positions.len() as u64);
312            return Ok(result.into_iter().map(|r| r.unwrap()).collect());
313        }
314
315        // Read the offsets of all items that were not found in the cache.
316        let miss_offsets = self
317            .offsets
318            .read_many(&miss_positions)
319            .await
320            .map_err(|e| match e {
321                Error::ItemOutOfRange(e) | Error::ItemPruned(e) => {
322                    Error::Corruption(format!("section/item should be found, but got: {e}"))
323                }
324                other => other,
325            })?;
326
327        // Group runs of consecutive positions that fall into the same section and perform a
328        // consecutive read for each run.
329        let mut group_start = 0;
330        while group_start < miss_positions.len() {
331            let section = position_to_section(miss_positions[group_start], self.items_per_section);
332            let mut group_end = group_start + 1;
333            while group_end < miss_positions.len()
334                && position_to_section(miss_positions[group_end], self.items_per_section) == section
335            {
336                group_end += 1;
337            }
338
339            let mut run_start = group_start;
340            while run_start < group_end {
341                let mut run_end = run_start + 1;
342                while run_end < group_end
343                    && miss_positions[run_end - 1].checked_add(1) == Some(miss_positions[run_end])
344                {
345                    run_end += 1;
346                }
347
348                let items = self
349                    .guard
350                    .data
351                    .get_many_consecutive(section, &miss_offsets[run_start..run_end])
352                    .await?;
353
354                for (item, &miss_idx) in items.into_iter().zip(&miss_indices[run_start..run_end]) {
355                    result[miss_idx] = Some(item);
356                }
357                run_start = run_end;
358            }
359            group_start = group_end;
360        }
361
362        self.metrics.items_read.inc_by(positions.len() as u64);
363        Ok(result.into_iter().map(|r| r.unwrap()).collect())
364    }
365
366    fn try_read_sync(&self, position: u64) -> Option<V> {
367        let item = self
368            .guard
369            .try_read_sync(position, self.items_per_section, &self.offsets)?;
370        self.metrics.try_read_sync_hits.inc();
371        self.metrics.items_read.inc();
372        Some(item)
373    }
374
375    async fn replay(
376        &self,
377        buffer_size: NonZeroUsize,
378        start_pos: u64,
379    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
380        // Validate bounds.
381        if start_pos < self.guard.pruning_boundary {
382            return Err(Error::ItemPruned(start_pos));
383        }
384        if start_pos > self.guard.size {
385            return Err(Error::ItemOutOfRange(start_pos));
386        }
387
388        // Get the starting offset and section. For empty range (start_pos == size),
389        // use a section beyond existing data so data.replay returns empty naturally.
390        let (start_section, start_offset) = if start_pos < self.guard.size {
391            let offset = self.offsets.read(start_pos).await?;
392            let section = position_to_section(start_pos, self.items_per_section);
393            (section, offset)
394        } else {
395            (u64::MAX, 0)
396        };
397
398        let inner_stream = self
399            .guard
400            .data
401            .replay(start_section, start_offset, buffer_size)
402            .await?;
403
404        // Map the stream to add positions.
405        let stream = inner_stream
406            .zip(stream::iter(start_pos..))
407            .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
408
409        Ok(stream)
410    }
411}
412
413impl<E: Context, V: CodecShared> Journal<E, V> {
414    /// Initialize a contiguous variable journal.
415    ///
416    /// # Crash Recovery
417    ///
418    /// The data journal is the source of truth. If the offsets journal is inconsistent
419    /// it will be updated to match the data journal.
420    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
421        let items_per_section = cfg.items_per_section.get();
422        let data_partition = cfg.data_partition();
423        let offsets_partition = cfg.offsets_partition();
424
425        // Initialize underlying variable data journal
426        let mut data = variable::Journal::init(
427            context.child("data"),
428            variable::Config {
429                partition: data_partition,
430                compression: cfg.compression,
431                codec_config: cfg.codec_config,
432                page_cache: cfg.page_cache.clone(),
433                write_buffer: cfg.write_buffer,
434            },
435        )
436        .await?;
437
438        // Initialize offsets journal
439        let mut offsets = fixed::Journal::init(
440            context.child("offsets"),
441            fixed::Config {
442                partition: offsets_partition,
443                items_per_blob: cfg.items_per_section,
444                page_cache: cfg.page_cache,
445                write_buffer: cfg.write_buffer,
446            },
447        )
448        .await?;
449
450        // Validate and align offsets journal to match data journal
451        let (pruning_boundary, size) =
452            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
453
454        let metrics = Metrics::new(context);
455        metrics.update(size, pruning_boundary, items_per_section);
456
457        Ok(Self {
458            inner: UpgradableAsyncRwLock::new(Inner {
459                data,
460                size,
461                pruning_boundary,
462            }),
463            offsets,
464            items_per_section,
465            compression: cfg.compression,
466            metrics,
467        })
468    }
469
470    /// Initialize an empty [Journal] at the given logical `size`.
471    ///
472    /// Returns a journal with journal.bounds() == Range{start: size, end: size}
473    /// and next append at position `size`.
474    #[commonware_macros::stability(ALPHA)]
475    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
476        // Initialize empty data journal
477        let data = variable::Journal::init(
478            context.child("data"),
479            variable::Config {
480                partition: cfg.data_partition(),
481                compression: cfg.compression,
482                codec_config: cfg.codec_config.clone(),
483                page_cache: cfg.page_cache.clone(),
484                write_buffer: cfg.write_buffer,
485            },
486        )
487        .await?;
488
489        // Initialize offsets journal at the target size
490        let offsets = fixed::Journal::init_at_size(
491            context.child("offsets"),
492            fixed::Config {
493                partition: cfg.offsets_partition(),
494                items_per_blob: cfg.items_per_section,
495                page_cache: cfg.page_cache,
496                write_buffer: cfg.write_buffer,
497            },
498            size,
499        )
500        .await?;
501
502        let items_per_section = cfg.items_per_section.get();
503        let metrics = Metrics::new(context);
504        metrics.update(size, size, items_per_section);
505
506        Ok(Self {
507            inner: UpgradableAsyncRwLock::new(Inner {
508                data,
509                size,
510                pruning_boundary: size,
511            }),
512            offsets,
513            items_per_section,
514            compression: cfg.compression,
515            metrics,
516        })
517    }
518
519    /// Initialize a [Journal] for use in state sync.
520    ///
521    /// The bounds are item locations (not section numbers). This function prepares the
522    /// on-disk journal so that subsequent appends go to the correct physical location for the
523    /// requested range.
524    ///
525    /// Behavior by existing on-disk state:
526    /// - Fresh (no data): returns an empty journal.
527    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
528    ///   empty journal.
529    /// - Overlap within [`range.start`, `range.end`]:
530    ///   - Prunes toward `range.start` (section-aligned, so some items before
531    ///     `range.start` may be retained)
532    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
533    ///
534    /// # Arguments
535    /// - `context`: storage context
536    /// - `cfg`: journal configuration
537    /// - `range`: range of item locations to retain
538    ///
539    /// # Returns
540    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
541    ///
542    /// # Errors
543    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
544    #[commonware_macros::stability(ALPHA)]
545    pub(crate) async fn init_sync(
546        context: E,
547        cfg: Config<V::Cfg>,
548        range: Range<u64>,
549    ) -> Result<Self, Error> {
550        assert!(!range.is_empty(), "range must not be empty");
551
552        debug!(
553            range.start,
554            range.end,
555            items_per_section = cfg.items_per_section.get(),
556            "initializing contiguous variable journal for sync"
557        );
558
559        // Initialize contiguous journal
560        let journal = Self::init(context.child("journal"), cfg.clone()).await?;
561
562        let size = journal.size().await;
563
564        // No existing data - initialize at the start of the sync range if needed
565        if size == 0 {
566            if range.start == 0 {
567                debug!("no existing journal data, returning empty journal");
568                return Ok(journal);
569            } else {
570                debug!(
571                    range.start,
572                    "no existing journal data, initializing at sync range start"
573                );
574                journal.destroy().await?;
575                return Self::init_at_size(context, cfg, range.start).await;
576            }
577        }
578
579        // After a same-section crash during a previous clear_to_size, the journal may recover to a
580        // stale position ahead of the requested start.
581        let bounds = journal.reader().await.bounds();
582        if bounds.is_empty() && bounds.start > range.start {
583            journal.clear_to_size(range.start).await?;
584            return Ok(journal);
585        }
586
587        // Check if data exceeds the sync range
588        if size > range.end {
589            return Err(Error::ItemOutOfRange(size));
590        }
591
592        // If all existing data is before our sync range, destroy and recreate fresh
593        if size <= range.start {
594            // All data is stale (ends at or before range.start)
595            debug!(
596                size,
597                range.start, "existing journal data is stale, re-initializing at start position"
598            );
599            journal.destroy().await?;
600            return Self::init_at_size(context, cfg, range.start).await;
601        }
602
603        // Prune to lower bound if needed
604        if !bounds.is_empty() && bounds.start < range.start {
605            debug!(
606                oldest_pos = bounds.start,
607                range.start, "pruning journal to sync range start"
608            );
609            journal.prune(range.start).await?;
610        }
611
612        Ok(journal)
613    }
614
615    /// Rewind the journal to the given size, discarding items from the end.
616    ///
617    /// After rewinding to size N, the journal will contain exactly N items, and the next append
618    /// will receive position N.
619    ///
620    /// # Errors
621    ///
622    /// Returns [Error::InvalidRewind] if `size` is larger than current size.
623    /// Returns [Error::ItemPruned] if `size` is smaller than the pruning boundary.
624    ///
625    /// # Warning
626    ///
627    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
628    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
629        let mut inner = self.inner.write().await;
630
631        // Validate rewind target
632        match size.cmp(&inner.size) {
633            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
634            std::cmp::Ordering::Equal => return Ok(()), // No-op
635            std::cmp::Ordering::Less => {}
636        }
637
638        // Rewind never updates the pruning boundary.
639        if size < inner.pruning_boundary {
640            return Err(Error::ItemPruned(size));
641        }
642
643        // Read the offset of the first item to discard (at position 'size').
644        let discard_offset = {
645            let offsets_reader = self.offsets.reader().await;
646            offsets_reader.read(size).await?
647        };
648        let discard_section = position_to_section(size, self.items_per_section);
649
650        inner
651            .data
652            .rewind_to_offset(discard_section, discard_offset)
653            .await?;
654        self.offsets.rewind(size).await?;
655
656        // Update our size
657        inner.size = size;
658        self.metrics
659            .update(inner.size, inner.pruning_boundary, self.items_per_section);
660
661        Ok(())
662    }
663
664    /// Append a new item to the journal, returning its position.
665    ///
666    /// The position returned is a stable, consecutively increasing value starting from 0.
667    /// This position remains constant after pruning.
668    ///
669    /// When a section becomes full, both the data journal and offsets journal are persisted
670    /// to maintain the invariant that all non-final sections are full and consistent.
671    ///
672    /// # Errors
673    ///
674    /// Returns an error if the underlying storage operation fails or if the item cannot
675    /// be encoded.
676    ///
677    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
678    /// reopened to trigger alignment in [Journal::init].
679    pub async fn append(&self, item: &V) -> Result<u64, Error> {
680        let _timer = self.metrics.append_timer();
681        self.metrics.append_calls.inc();
682        self.append_many_inner(Many::Flat(std::slice::from_ref(item)))
683            .await
684    }
685
686    /// Append items to the journal, returning the position of the last item appended.
687    ///
688    /// Acquires the write lock once for all items instead of per-item.
689    /// Returns [Error::EmptyAppend] if items is empty.
690    pub async fn append_many<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
691        let _timer = self.metrics.append_many_timer();
692        self.metrics.append_many_calls.inc();
693        self.append_many_inner(items).await
694    }
695
696    async fn append_many_inner<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
697        if items.is_empty() {
698            return Err(Error::EmptyAppend);
699        }
700        let items_count = items.len();
701
702        // Encode every item into a single buffer for bulk-writing before grabbing write guard.
703        let mut encoded = Vec::new();
704        let mut item_starts = Vec::with_capacity(items_count);
705        let mut encode = |item: &V| {
706            item_starts.push(encoded.len());
707            variable::Journal::<E, V>::encode_item_into(self.compression, item, &mut encoded)
708        };
709        match &items {
710            Many::Flat(items) => {
711                for item in *items {
712                    encode(item)?;
713                }
714            }
715            Many::Nested(nested_items) => {
716                for items in *nested_items {
717                    for item in *items {
718                        encode(item)?;
719                    }
720                }
721            }
722        }
723
724        // Mutating operations are serialized by taking the write guard.
725        let mut inner = self.inner.write().await;
726
727        let mut written = 0;
728        while written < items_count {
729            let section = position_to_section(inner.size, self.items_per_section);
730            let pos_in_section = inner.size % self.items_per_section;
731            let remaining_space = (self.items_per_section - pos_in_section) as usize;
732            let batch_count = remaining_space.min(items_count - written);
733            let batch_start = item_starts[written];
734            let batch_end = item_starts
735                .get(written + batch_count)
736                .copied()
737                .unwrap_or(encoded.len());
738
739            // Append pre-encoded data to the data journal, then convert relative item starts
740            // into absolute offsets.
741            let base_offset = inner
742                .data
743                .append_raw(section, &encoded[batch_start..batch_end])
744                .await?;
745
746            let absolute_offsets = item_starts[written..written + batch_count]
747                .iter()
748                .map(|&start| {
749                    base_offset
750                        .checked_add((start - batch_start) as u64)
751                        .ok_or(Error::OffsetOverflow)
752                })
753                .collect::<Result<Vec<u64>, _>>()?;
754
755            // Persist the offsets for this section batch in the offsets journal.
756            let last_offsets_pos = self
757                .offsets
758                .append_many(Many::Flat(&absolute_offsets))
759                .await?;
760            assert_eq!(last_offsets_pos, inner.size + batch_count as u64 - 1);
761
762            inner.size += batch_count as u64;
763            written += batch_count;
764
765            // The section was filled and must be synced. Downgrade so readers can continue
766            // during the sync while mutators remain blocked.
767            if inner.size.is_multiple_of(self.items_per_section) {
768                let inner_ref = inner.downgrade_to_upgradable();
769                futures::try_join!(inner_ref.data.sync(section), self.offsets.sync())?;
770                if written == items_count {
771                    self.metrics.update(
772                        inner_ref.size,
773                        inner_ref.pruning_boundary,
774                        self.items_per_section,
775                    );
776                    return Ok(inner_ref.size - 1);
777                }
778                inner = inner_ref.upgrade().await;
779            }
780        }
781
782        self.metrics
783            .update(inner.size, inner.pruning_boundary, self.items_per_section);
784        Ok(inner.size - 1)
785    }
786
787    /// Acquire a reader guard that holds a consistent view of the journal.
788    pub async fn reader(&self) -> Reader<'_, E, V> {
789        Reader {
790            guard: self.inner.read().await,
791            offsets: self.offsets.reader().await,
792            items_per_section: self.items_per_section,
793            metrics: &self.metrics,
794        }
795    }
796
797    /// Return the total number of items in the journal, irrespective of pruning. The next value
798    /// appended to the journal will be at this position.
799    pub async fn size(&self) -> u64 {
800        self.inner.read().await.size
801    }
802
803    /// Prune items at positions strictly less than `min_position`.
804    ///
805    /// Returns `true` if any data was pruned, `false` otherwise.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the underlying storage operation fails.
810    ///
811    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
812    /// reopened to trigger alignment in [Journal::init].
813    pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
814        let mut inner = self.inner.write().await;
815
816        if min_position <= inner.pruning_boundary {
817            return Ok(false);
818        }
819
820        // Cap min_position to size to maintain the invariant pruning_boundary <= size
821        let min_position = min_position.min(inner.size);
822
823        // Calculate section number
824        let min_section = position_to_section(min_position, self.items_per_section);
825
826        let pruned = inner.data.prune(min_section).await?;
827        if pruned {
828            let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
829            inner.pruning_boundary = new_oldest;
830            self.offsets.prune(new_oldest).await?;
831            self.metrics
832                .update(inner.size, inner.pruning_boundary, self.items_per_section);
833        }
834        Ok(pruned)
835    }
836
837    /// Durably persist the journal.
838    ///
839    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
840    /// before the next call to `sync()`.
841    pub async fn commit(&self) -> Result<(), Error> {
842        let _timer = self.metrics.commit_timer();
843        self.metrics.record_commit();
844        // Serialize with append/prune/rewind so section selection is stable, while still allowing
845        // concurrent readers.
846        let inner = self.inner.upgradable_read().await;
847
848        let section = position_to_section(inner.size, self.items_per_section);
849        inner.data.sync(section).await?;
850        Ok(())
851    }
852
853    /// Durably persist the journal and ensure recovery is not required on startup.
854    ///
855    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
856    pub async fn sync(&self) -> Result<(), Error> {
857        let _timer = self.metrics.sync_timer();
858        self.metrics.sync_calls.inc();
859        // Serialize with append/prune/rewind so section selection is stable, while still allowing
860        // concurrent readers.
861        let inner = self.inner.upgradable_read().await;
862
863        // Persist only the current (final) section of the data journal.
864        // All non-final sections are already persisted per Invariant #1.
865        let section = position_to_section(inner.size, self.items_per_section);
866
867        // Persist both journals concurrently. These journals may not exist yet if the
868        // previous section was just filled. This is checked internally.
869        futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
870
871        Ok(())
872    }
873
874    /// Remove any underlying blobs created by the journal.
875    ///
876    /// This destroys both the data journal and the offsets journal.
877    pub async fn destroy(self) -> Result<(), Error> {
878        let inner = self.inner.into_inner();
879        inner.data.destroy().await?;
880        self.offsets.destroy().await
881    }
882
883    /// Clear all data and reset the journal to a new starting position.
884    ///
885    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
886    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
887    #[commonware_macros::stability(ALPHA)]
888    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
889        let mut inner = self.inner.write().await;
890        inner.data.clear().await?;
891
892        self.offsets.clear_to_size(new_size).await?;
893        inner.size = new_size;
894        inner.pruning_boundary = new_size;
895        self.metrics
896            .update(inner.size, inner.pruning_boundary, self.items_per_section);
897        Ok(())
898    }
899
900    /// Align the offsets journal and data journal to be consistent in case a crash occurred
901    /// on a previous run and left the journals in an inconsistent state.
902    ///
903    /// The data journal is the source of truth. This function scans it to determine
904    /// what SHOULD be in the offsets journal, then fixes any mismatches.
905    ///
906    /// # Returns
907    ///
908    /// Returns `(pruning_boundary, size)` for the contiguous journal.
909    async fn align_journals(
910        data: &mut variable::Journal<E, V>,
911        offsets: &mut fixed::Journal<E, u64>,
912        items_per_section: u64,
913    ) -> Result<(u64, u64), Error> {
914        // === Handle empty data journal case ===
915        let items_in_last_section = match data.newest_section() {
916            Some(last_section) => {
917                let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
918                futures::pin_mut!(stream);
919                let mut count = 0u64;
920                while let Some(result) = stream.next().await {
921                    result?; // Propagate replay errors (corruption, etc.)
922                    count += 1;
923                }
924                count
925            }
926            None => 0,
927        };
928
929        // Data journal is empty if there are no sections or if there is one section and it has no items.
930        // The latter should only occur if a crash occured after opening a data journal blob but
931        // before writing to it.
932        let data_empty =
933            data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
934        if data_empty {
935            let offsets_bounds = {
936                let offsets_reader = offsets.reader().await;
937                offsets_reader.bounds()
938            };
939            let size = offsets_bounds.end;
940
941            if !data.is_empty() {
942                // A section exists but contains 0 items. This can happen in two cases:
943                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
944                // 2. First append crash: we opened the first section blob but crashed before writing to it
945                // In both cases, calculate target position from the first remaining section
946                // SAFETY: data is non-empty (checked above)
947                let data_first_section = data.oldest_section().unwrap();
948                let data_section_start = data_first_section * items_per_section;
949                let target_pos = data_section_start.max(offsets_bounds.start);
950
951                warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
952                offsets.clear_to_size(target_pos).await?;
953                return Ok((target_pos, target_pos));
954            }
955
956            // data.blobs is empty. This can happen in two cases:
957            // 1. We completely pruned the data journal but crashed before pruning
958            //    the offsets journal.
959            // 2. The data journal was never opened.
960            if !offsets_bounds.is_empty() && offsets_bounds.start < size {
961                // Offsets has unpruned entries but data is gone - clear to match empty state.
962                // We use clear_to_size (not prune) to ensure bounds.start == bounds.end,
963                // even when size is mid-section.
964                warn!("crash repair: clearing offsets to {size} (prune-all crash)");
965                offsets.clear_to_size(size).await?;
966            }
967
968            return Ok((size, size));
969        }
970
971        // === Handle non-empty data journal case ===
972        let data_first_section = data.oldest_section().unwrap();
973        let data_last_section = data.newest_section().unwrap();
974
975        // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index.
976        // This differs from offsets bounds start which can be mid-section after init_at_size.
977        let data_oldest_pos = data_first_section * items_per_section;
978
979        // Align pruning state
980        // We always prune data before offsets, so offsets should never be "ahead" by a section.
981        {
982            let offsets_bounds = {
983                let offsets_reader = offsets.reader().await;
984                offsets_reader.bounds()
985            };
986            match (
987                offsets_bounds.is_empty(),
988                offsets_bounds.start.cmp(&data_oldest_pos),
989            ) {
990                (true, _) => {
991                    // Offsets journal is empty but data journal isn't.
992                    // It should always be in the same section as the data journal, though.
993                    let offsets_first_section = offsets_bounds.start / items_per_section;
994                    if offsets_first_section != data_first_section {
995                        return Err(Error::Corruption(format!(
996                            "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
997                        )));
998                    }
999                    warn!(
1000                        "crash repair: offsets journal empty at {}, will rebuild from data",
1001                        offsets_bounds.start
1002                    );
1003                }
1004                (false, std::cmp::Ordering::Less) => {
1005                    // Offsets behind on pruning -- prune to catch up
1006                    warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
1007                    offsets.prune(data_oldest_pos).await?;
1008                }
1009                (false, std::cmp::Ordering::Greater) => {
1010                    // Compare sections: same section = valid, different section = corruption.
1011                    if offsets_bounds.start / items_per_section > data_first_section {
1012                        return Err(Error::Corruption(format!(
1013                            "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
1014                            offsets_bounds.start
1015                        )));
1016                    }
1017                }
1018                (false, std::cmp::Ordering::Equal) => {
1019                    // Both journals are pruned to the same position.
1020                }
1021            }
1022        }
1023
1024        // Compute the correct logical size
1025        // Uses bounds.start from offsets as the anchor because it tracks the exact starting
1026        // position, which may be mid-section after init_at_size.
1027        //
1028        // Note: Corruption checks above ensure bounds.start is in data_first_section,
1029        // so the subtraction in oldest_items cannot underflow.
1030        // Re-fetch bounds since prune may have been called above.
1031        let (offsets_bounds, data_size) = {
1032            let offsets_reader = offsets.reader().await;
1033            let offsets_bounds = offsets_reader.bounds();
1034            let data_size = if data_first_section == data_last_section {
1035                offsets_bounds.start + items_in_last_section
1036            } else {
1037                let oldest_items =
1038                    (data_first_section + 1) * items_per_section - offsets_bounds.start;
1039                let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
1040                offsets_bounds.start + oldest_items + middle_items + items_in_last_section
1041            };
1042            (offsets_bounds, data_size)
1043        };
1044
1045        // Align sizes
1046        let offsets_size = offsets_bounds.end;
1047        if offsets_size > data_size {
1048            // Crashed after writing offsets but before writing data.
1049            warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
1050            offsets.rewind(data_size).await?;
1051        } else if offsets_size < data_size {
1052            // Crashed after writing data but before writing offsets.
1053            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
1054        }
1055
1056        // Final invariant checks
1057        let pruning_boundary = {
1058            let offsets_reader = offsets.reader().await;
1059            let offsets_bounds = offsets_reader.bounds();
1060            assert_eq!(offsets_bounds.end, data_size);
1061
1062            // After alignment, offsets and data must be in the same section.
1063            // We return bounds.start from offsets as the true boundary.
1064            assert!(
1065                !offsets_bounds.is_empty(),
1066                "offsets should have data after alignment"
1067            );
1068            assert_eq!(
1069                offsets_bounds.start / items_per_section,
1070                data_first_section,
1071                "offsets and data should be in same oldest section"
1072            );
1073            offsets_bounds.start
1074        };
1075
1076        offsets.sync().await?;
1077        Ok((pruning_boundary, data_size))
1078    }
1079
1080    /// Rebuild missing offset entries by replaying the data journal and
1081    /// appending the missing entries to the offsets journal.
1082    ///
1083    /// The data journal is the source of truth. This function brings the offsets
1084    /// journal up to date by replaying data items and indexing their positions.
1085    ///
1086    /// # Warning
1087    ///
1088    /// - Panics if data journal is empty
1089    /// - Panics if `offsets_size` >= `data.size()`
1090    async fn add_missing_offsets(
1091        data: &variable::Journal<E, V>,
1092        offsets: &mut fixed::Journal<E, u64>,
1093        offsets_size: u64,
1094        items_per_section: u64,
1095    ) -> Result<(), Error> {
1096        assert!(
1097            !data.is_empty(),
1098            "rebuild_offsets called with empty data journal"
1099        );
1100
1101        // Find where to start replaying
1102        let (start_section, resume_offset, skip_first) = {
1103            let offsets_reader = offsets.reader().await;
1104            let offsets_bounds = offsets_reader.bounds();
1105            if offsets_bounds.is_empty() {
1106                // Offsets empty -- start from first data section
1107                // SAFETY: data is non-empty (checked above)
1108                let first_section = data.oldest_section().unwrap();
1109                (first_section, 0, false)
1110            } else if offsets_bounds.start < offsets_size {
1111                // Offsets has items -- resume from last indexed position
1112                let last_offset = offsets_reader.read(offsets_size - 1).await?;
1113                let last_section = position_to_section(offsets_size - 1, items_per_section);
1114                (last_section, last_offset, true)
1115            } else {
1116                // Offsets fully pruned but data has items -- start from first data section
1117                // SAFETY: data is non-empty (checked above)
1118                let first_section = data.oldest_section().unwrap();
1119                (first_section, 0, false)
1120            }
1121        };
1122
1123        // Replay data journal from start position through the end and index all items.
1124        // The data journal is the source of truth, so we consume the entire stream.
1125        // (replay streams from start_section onwards through all subsequent sections)
1126        let stream = data
1127            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
1128            .await?;
1129        futures::pin_mut!(stream);
1130
1131        let mut skipped_first = false;
1132        while let Some(result) = stream.next().await {
1133            let (_section, offset, _size, _item) = result?;
1134
1135            // Skip first item if resuming from last indexed offset
1136            if skip_first && !skipped_first {
1137                skipped_first = true;
1138                continue;
1139            }
1140
1141            offsets.append(&offset).await?;
1142        }
1143
1144        Ok(())
1145    }
1146}
1147
1148// Implement Contiguous trait for variable-length items
1149impl<E: Context, V: CodecShared> Contiguous for Journal<E, V> {
1150    type Item = V;
1151
1152    async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
1153        Self::reader(self).await
1154    }
1155
1156    async fn size(&self) -> u64 {
1157        Self::size(self).await
1158    }
1159}
1160
1161impl<E: Context, V: CodecShared> Mutable for Journal<E, V> {
1162    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
1163        Self::append(self, item).await
1164    }
1165
1166    async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
1167        Self::append_many(self, items).await
1168    }
1169
1170    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
1171        Self::prune(self, min_position).await
1172    }
1173
1174    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
1175        Self::rewind(self, size).await
1176    }
1177}
1178
1179impl<E: Context, V: CodecShared> Persistable for Journal<E, V> {
1180    type Error = Error;
1181
1182    async fn commit(&self) -> Result<(), Error> {
1183        self.commit().await
1184    }
1185
1186    async fn sync(&self) -> Result<(), Error> {
1187        self.sync().await
1188    }
1189
1190    async fn destroy(self) -> Result<(), Error> {
1191        self.destroy().await
1192    }
1193}
1194
1195#[commonware_macros::stability(ALPHA)]
1196impl<E: Context, V: CodecShared> crate::journal::authenticated::Inner<E> for Journal<E, V> {
1197    type Config = Config<V::Cfg>;
1198
1199    async fn init<
1200        F: crate::merkle::Family,
1201        H: commonware_cryptography::Hasher,
1202        S: commonware_parallel::Strategy,
1203    >(
1204        context: E,
1205        merkle_cfg: crate::merkle::full::Config<S>,
1206        journal_cfg: Self::Config,
1207        rewind_predicate: fn(&V) -> bool,
1208        bagging: crate::merkle::Bagging,
1209    ) -> Result<
1210        crate::journal::authenticated::Journal<F, E, Self, H, S>,
1211        crate::journal::authenticated::Error<F>,
1212    > {
1213        crate::journal::authenticated::Journal::<F, E, Self, H, S>::new(
1214            context,
1215            merkle_cfg,
1216            journal_cfg,
1217            rewind_predicate,
1218            bagging,
1219        )
1220        .await
1221    }
1222}
1223
1224#[cfg(test)]
1225impl<E: Context, V: CodecShared> Journal<E, V> {
1226    /// Test helper: Read the item at the given position.
1227    pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
1228        self.reader().await.read(position).await
1229    }
1230
1231    /// Test helper: Return the bounds of the journal.
1232    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1233        self.reader().await.bounds()
1234    }
1235
1236    /// Test helper: Prune the internal data journal directly (simulates crash scenario).
1237    pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
1238        let mut inner = self.inner.write().await;
1239        inner.data.prune(section).await
1240    }
1241
1242    /// Test helper: Prune the internal offsets journal directly (simulates crash scenario).
1243    pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
1244        self.offsets.prune(position).await
1245    }
1246
1247    /// Test helper: Rewind the internal offsets journal directly (simulates crash scenario).
1248    pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
1249        self.offsets.rewind(position).await
1250    }
1251
1252    /// Test helper: Get the size of the internal offsets journal.
1253    pub(crate) async fn test_offsets_size(&self) -> u64 {
1254        self.offsets.size().await
1255    }
1256
1257    /// Test helper: Append directly to the internal data journal (simulates crash scenario).
1258    pub(crate) async fn test_append_data(
1259        &self,
1260        section: u64,
1261        item: V,
1262    ) -> Result<(u64, u32), Error> {
1263        let mut inner = self.inner.write().await;
1264        inner.data.append(section, &item).await
1265    }
1266
1267    /// Test helper: Sync the internal data journal.
1268    pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
1269        let inner = self.inner.read().await;
1270        inner
1271            .data
1272            .sync(inner.data.newest_section().unwrap_or(0))
1273            .await
1274    }
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279    use super::*;
1280    use crate::journal::contiguous::tests::run_contiguous_tests;
1281    use commonware_macros::test_traced;
1282    use commonware_runtime::{
1283        buffer::paged::CacheRef, deterministic, Metrics as _, Runner, Storage, Supervisor as _,
1284    };
1285    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
1286    use futures::FutureExt as _;
1287    use std::num::NonZeroU16;
1288
1289    // Use some jank sizes to exercise boundary conditions.
1290    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1291    const PAGE_CACHE_SIZE: usize = 2;
1292    // Larger page sizes for tests that need more buffer space.
1293    const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1294    const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1295
1296    #[test_traced]
1297    fn test_variable_append_many_compressed() {
1298        let executor = deterministic::Runner::default();
1299        executor.start(|context| async move {
1300            let cfg = Config {
1301                partition: "append-many-compressed".into(),
1302                items_per_section: NZU64!(3),
1303                compression: Some(1),
1304                codec_config: (),
1305                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1306                write_buffer: NZUsize!(1024),
1307            };
1308            let journal = Journal::<_, FixedBytes<32>>::init(context.child("journal"), cfg)
1309                .await
1310                .unwrap();
1311            let items = [
1312                FixedBytes::new([0; 32]),
1313                FixedBytes::new([1; 32]),
1314                FixedBytes::new([2; 32]),
1315                FixedBytes::new([3; 32]),
1316                FixedBytes::new([4; 32]),
1317            ];
1318
1319            let last = journal.append_many(Many::Flat(&items)).await.unwrap();
1320            assert_eq!(last, 4);
1321            for (pos, item) in items.iter().enumerate() {
1322                assert_eq!(journal.read(pos as u64).await.unwrap(), *item);
1323            }
1324
1325            journal.destroy().await.unwrap();
1326        });
1327    }
1328
1329    #[test_traced]
1330    fn test_variable_read_many_after_reopen() {
1331        let executor = deterministic::Runner::default();
1332        executor.start(|context| async move {
1333            let cfg = Config {
1334                partition: "read-many-after-reopen".into(),
1335                items_per_section: NZU64!(5),
1336                compression: None,
1337                codec_config: (),
1338                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1339                write_buffer: NZUsize!(1024),
1340            };
1341
1342            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1343                .await
1344                .unwrap();
1345            for i in 0..20u64 {
1346                journal.append(&(i * 100)).await.unwrap();
1347            }
1348            journal.sync().await.unwrap();
1349            drop(journal);
1350
1351            let cfg = Config {
1352                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1353                ..cfg
1354            };
1355            let journal = Journal::<_, u64>::init(context.child("second"), cfg)
1356                .await
1357                .unwrap();
1358            let reader = journal.reader().await;
1359            let items = reader.read_many(&[1, 2, 3, 7, 8, 12, 18]).await.unwrap();
1360            assert_eq!(items, vec![100, 200, 300, 700, 800, 1200, 1800]);
1361            drop(reader);
1362
1363            journal.destroy().await.unwrap();
1364        });
1365    }
1366
1367    #[test_traced]
1368    fn test_variable_read_many_consecutive_after_reopen() {
1369        let executor = deterministic::Runner::default();
1370        executor.start(|context| async move {
1371            let cfg = Config {
1372                partition: "read-many-consecutive-after-reopen".into(),
1373                items_per_section: NZU64!(20),
1374                compression: None,
1375                codec_config: (),
1376                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1377                write_buffer: NZUsize!(1024),
1378            };
1379
1380            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1381                .await
1382                .unwrap();
1383            for i in 0..20u64 {
1384                journal.append(&(i * 100)).await.unwrap();
1385            }
1386            journal.sync().await.unwrap();
1387            drop(journal);
1388
1389            let cfg = Config {
1390                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1391                ..cfg
1392            };
1393            let journal = Journal::<_, u64>::init(context.child("second"), cfg)
1394                .await
1395                .unwrap();
1396            let reader = journal.reader().await;
1397            let positions: Vec<u64> = (3..10).collect();
1398            let items = reader.read_many(&positions).await.unwrap();
1399            assert_eq!(items, vec![300, 400, 500, 600, 700, 800, 900]);
1400            drop(reader);
1401
1402            journal.destroy().await.unwrap();
1403        });
1404    }
1405
1406    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
1407    ///
1408    /// When the offsets partition is completely lost and the data has been pruned, we cannot
1409    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
1410    /// This is a genuine external failure that should be detected and reported clearly.
1411    #[test_traced]
1412    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1413        let executor = deterministic::Runner::default();
1414        executor.start(|context| async move {
1415            let cfg = Config {
1416                partition: "offsets-loss-after-prune".into(),
1417                items_per_section: NZU64!(10),
1418                compression: None,
1419                codec_config: (),
1420                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1421                write_buffer: NZUsize!(1024),
1422            };
1423
1424            // === Phase 1: Create journal with data and prune ===
1425            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1426                .await
1427                .unwrap();
1428
1429            // Append 40 items across 4 sections (0-3)
1430            for i in 0..40u64 {
1431                journal.append(&(i * 100)).await.unwrap();
1432            }
1433
1434            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
1435            journal.prune(20).await.unwrap();
1436            let bounds = journal.bounds().await;
1437            assert_eq!(bounds.start, 20);
1438            assert_eq!(bounds.end, 40);
1439
1440            journal.sync().await.unwrap();
1441            drop(journal);
1442
1443            // === Phase 2: Simulate complete offsets partition loss ===
1444            // Remove both the offsets data partition and its metadata partition
1445            context
1446                .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1447                .await
1448                .expect("Failed to remove offsets blobs partition");
1449            context
1450                .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1451                .await
1452                .expect("Failed to remove offsets metadata partition");
1453
1454            // === Phase 3: Verify this is detected as unrecoverable ===
1455            let result = Journal::<_, u64>::init(context.child("second"), cfg.clone()).await;
1456            assert!(matches!(result, Err(Error::Corruption(_))));
1457        });
1458    }
1459
1460    /// Test that init aligns state when data is pruned/lost but offsets survives.
1461    ///
1462    /// This handles both:
1463    /// 1. Crash during prune-all (data pruned, offsets not yet)
1464    /// 2. External data partition loss
1465    ///
1466    /// In both cases, we align by pruning offsets to match.
1467    #[test_traced]
1468    fn test_variable_align_data_offsets_mismatch() {
1469        let executor = deterministic::Runner::default();
1470        executor.start(|context| async move {
1471            let cfg = Config {
1472                partition: "data-loss-test".into(),
1473                items_per_section: NZU64!(10),
1474                compression: None,
1475                codec_config: (),
1476                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1477                write_buffer: NZUsize!(1024),
1478            };
1479
1480            // === Setup: Create journal with data ===
1481            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1482                .await
1483                .unwrap();
1484
1485            // Append 20 items across 2 sections
1486            for i in 0..20u64 {
1487                variable.append(&(i * 100)).await.unwrap();
1488            }
1489
1490            variable.sync().await.unwrap();
1491            drop(variable);
1492
1493            // === Simulate data loss: Delete data partition but keep offsets ===
1494            context
1495                .remove(&cfg.data_partition(), None)
1496                .await
1497                .expect("Failed to remove data partition");
1498
1499            // === Verify init aligns the mismatch ===
1500            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1501                .await
1502                .expect("Should align offsets to match empty data");
1503
1504            // Size should be preserved
1505            assert_eq!(journal.size().await, 20);
1506
1507            // But no items remain (both journals pruned)
1508            assert!(journal.bounds().await.is_empty());
1509
1510            // All reads should fail with ItemPruned
1511            for i in 0..20 {
1512                assert!(matches!(
1513                    journal.read(i).await,
1514                    Err(crate::journal::Error::ItemPruned(_))
1515                ));
1516            }
1517
1518            // Can append new data starting at position 20
1519            let pos = journal.append(&999).await.unwrap();
1520            assert_eq!(pos, 20);
1521            assert_eq!(journal.read(20).await.unwrap(), 999);
1522
1523            journal.destroy().await.unwrap();
1524        });
1525    }
1526
1527    /// Test replay behavior for variable-length items.
1528    #[test_traced]
1529    fn test_variable_replay() {
1530        let executor = deterministic::Runner::default();
1531        executor.start(|context| async move {
1532            let cfg = Config {
1533                partition: "replay".into(),
1534                items_per_section: NZU64!(10),
1535                compression: None,
1536                codec_config: (),
1537                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1538                write_buffer: NZUsize!(1024),
1539            };
1540
1541            // Initialize journal
1542            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1543
1544            // Append 40 items across 4 sections (0-3)
1545            for i in 0..40u64 {
1546                journal.append(&(i * 100)).await.unwrap();
1547            }
1548
1549            // Test 1: Full replay
1550            {
1551                let reader = journal.reader().await;
1552                let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1553                futures::pin_mut!(stream);
1554                for i in 0..40u64 {
1555                    let (pos, item) = stream.next().await.unwrap().unwrap();
1556                    assert_eq!(pos, i);
1557                    assert_eq!(item, i * 100);
1558                }
1559                assert!(stream.next().await.is_none());
1560            }
1561
1562            // Test 2: Partial replay from middle of section
1563            {
1564                let reader = journal.reader().await;
1565                let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1566                futures::pin_mut!(stream);
1567                for i in 15..40u64 {
1568                    let (pos, item) = stream.next().await.unwrap().unwrap();
1569                    assert_eq!(pos, i);
1570                    assert_eq!(item, i * 100);
1571                }
1572                assert!(stream.next().await.is_none());
1573            }
1574
1575            // Test 3: Partial replay from section boundary
1576            {
1577                let reader = journal.reader().await;
1578                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1579                futures::pin_mut!(stream);
1580                for i in 20..40u64 {
1581                    let (pos, item) = stream.next().await.unwrap().unwrap();
1582                    assert_eq!(pos, i);
1583                    assert_eq!(item, i * 100);
1584                }
1585                assert!(stream.next().await.is_none());
1586            }
1587
1588            // Test 4: Prune and verify replay from pruned
1589            journal.prune(20).await.unwrap();
1590            {
1591                let reader = journal.reader().await;
1592                let res = reader.replay(NZUsize!(20), 0).await;
1593                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1594            }
1595            {
1596                let reader = journal.reader().await;
1597                let res = reader.replay(NZUsize!(20), 19).await;
1598                assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1599            }
1600
1601            // Test 5: Replay from exactly at pruning boundary after prune
1602            {
1603                let reader = journal.reader().await;
1604                let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1605                futures::pin_mut!(stream);
1606                for i in 20..40u64 {
1607                    let (pos, item) = stream.next().await.unwrap().unwrap();
1608                    assert_eq!(pos, i);
1609                    assert_eq!(item, i * 100);
1610                }
1611                assert!(stream.next().await.is_none());
1612            }
1613
1614            // Test 6: Replay from the end
1615            {
1616                let reader = journal.reader().await;
1617                let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1618                futures::pin_mut!(stream);
1619                assert!(stream.next().await.is_none());
1620            }
1621
1622            // Test 7: Replay beyond the end (should error)
1623            {
1624                let reader = journal.reader().await;
1625                let res = reader.replay(NZUsize!(20), 41).await;
1626                assert!(matches!(
1627                    res,
1628                    Err(crate::journal::Error::ItemOutOfRange(41))
1629                ));
1630            }
1631
1632            journal.destroy().await.unwrap();
1633        });
1634    }
1635
1636    #[test_traced]
1637    fn test_variable_contiguous() {
1638        let executor = deterministic::Runner::default();
1639        executor.start(|context| async move {
1640            run_contiguous_tests(move |test_name: String, idx: usize| {
1641                let label = test_name.replace('-', "_");
1642                let context = context
1643                    .child("test")
1644                    .with_attribute("name", &label)
1645                    .with_attribute("index", idx);
1646                async move {
1647                    let cfg = Config {
1648                        partition: format!("generic-test-{test_name}"),
1649                        items_per_section: NZU64!(10),
1650                        compression: None,
1651                        codec_config: (),
1652                        page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1653                        write_buffer: NZUsize!(1024),
1654                    };
1655                    Journal::<_, u64>::init(context, cfg).await
1656                }
1657                .boxed()
1658            })
1659            .await;
1660        });
1661    }
1662
1663    /// Test multiple sequential prunes with Variable-specific guarantees.
1664    #[test_traced]
1665    fn test_variable_multiple_sequential_prunes() {
1666        let executor = deterministic::Runner::default();
1667        executor.start(|context| async move {
1668            let cfg = Config {
1669                partition: "sequential-prunes".into(),
1670                items_per_section: NZU64!(10),
1671                compression: None,
1672                codec_config: (),
1673                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1674                write_buffer: NZUsize!(1024),
1675            };
1676
1677            let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1678
1679            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1680            for i in 0..40u64 {
1681                journal.append(&(i * 100)).await.unwrap();
1682            }
1683
1684            // Initial state: all items accessible
1685            let bounds = journal.bounds().await;
1686            assert_eq!(bounds.start, 0);
1687            assert_eq!(bounds.end, 40);
1688
1689            // First prune: remove section 0 (positions 0-9)
1690            let pruned = journal.prune(10).await.unwrap();
1691            assert!(pruned);
1692
1693            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1694            assert_eq!(journal.bounds().await.start, 10);
1695
1696            // Items 0-9 should be pruned, 10+ should be accessible
1697            assert!(matches!(
1698                journal.read(0).await,
1699                Err(crate::journal::Error::ItemPruned(_))
1700            ));
1701            assert_eq!(journal.read(10).await.unwrap(), 1000);
1702            assert_eq!(journal.read(19).await.unwrap(), 1900);
1703
1704            // Second prune: remove section 1 (positions 10-19)
1705            let pruned = journal.prune(20).await.unwrap();
1706            assert!(pruned);
1707
1708            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1709            assert_eq!(journal.bounds().await.start, 20);
1710
1711            // Items 0-19 should be pruned, 20+ should be accessible
1712            assert!(matches!(
1713                journal.read(10).await,
1714                Err(crate::journal::Error::ItemPruned(_))
1715            ));
1716            assert!(matches!(
1717                journal.read(19).await,
1718                Err(crate::journal::Error::ItemPruned(_))
1719            ));
1720            assert_eq!(journal.read(20).await.unwrap(), 2000);
1721            assert_eq!(journal.read(29).await.unwrap(), 2900);
1722
1723            // Third prune: remove section 2 (positions 20-29)
1724            let pruned = journal.prune(30).await.unwrap();
1725            assert!(pruned);
1726
1727            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1728            assert_eq!(journal.bounds().await.start, 30);
1729
1730            // Items 0-29 should be pruned, 30+ should be accessible
1731            assert!(matches!(
1732                journal.read(20).await,
1733                Err(crate::journal::Error::ItemPruned(_))
1734            ));
1735            assert!(matches!(
1736                journal.read(29).await,
1737                Err(crate::journal::Error::ItemPruned(_))
1738            ));
1739            assert_eq!(journal.read(30).await.unwrap(), 3000);
1740            assert_eq!(journal.read(39).await.unwrap(), 3900);
1741
1742            // Size should still be 40 (pruning doesn't affect size)
1743            assert_eq!(journal.size().await, 40);
1744
1745            journal.destroy().await.unwrap();
1746        });
1747    }
1748
1749    /// Test that pruning all data and re-initializing preserves positions.
1750    #[test_traced]
1751    fn test_variable_prune_all_then_reinit() {
1752        let executor = deterministic::Runner::default();
1753        executor.start(|context| async move {
1754            let cfg = Config {
1755                partition: "prune-all-reinit".into(),
1756                items_per_section: NZU64!(10),
1757                compression: None,
1758                codec_config: (),
1759                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1760                write_buffer: NZUsize!(1024),
1761            };
1762
1763            // === Phase 1: Create journal and append data ===
1764            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1765                .await
1766                .unwrap();
1767
1768            for i in 0..100u64 {
1769                journal.append(&(i * 100)).await.unwrap();
1770            }
1771
1772            let bounds = journal.bounds().await;
1773            assert_eq!(bounds.end, 100);
1774            assert_eq!(bounds.start, 0);
1775
1776            // === Phase 2: Prune all data ===
1777            let pruned = journal.prune(100).await.unwrap();
1778            assert!(pruned);
1779
1780            // All data is pruned - no items remain
1781            let bounds = journal.bounds().await;
1782            assert_eq!(bounds.end, 100);
1783            assert!(bounds.is_empty());
1784
1785            // All reads should fail with ItemPruned
1786            for i in 0..100 {
1787                assert!(matches!(
1788                    journal.read(i).await,
1789                    Err(crate::journal::Error::ItemPruned(_))
1790                ));
1791            }
1792
1793            journal.sync().await.unwrap();
1794            drop(journal);
1795
1796            // === Phase 3: Re-init and verify position preserved ===
1797            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1798                .await
1799                .unwrap();
1800
1801            // Size should be preserved, but no items remain
1802            let bounds = journal.bounds().await;
1803            assert_eq!(bounds.end, 100);
1804            assert!(bounds.is_empty());
1805
1806            // All reads should still fail
1807            for i in 0..100 {
1808                assert!(matches!(
1809                    journal.read(i).await,
1810                    Err(crate::journal::Error::ItemPruned(_))
1811                ));
1812            }
1813
1814            // === Phase 4: Append new data ===
1815            // Next append should get position 100
1816            journal.append(&10000).await.unwrap();
1817            let bounds = journal.bounds().await;
1818            assert_eq!(bounds.end, 101);
1819            // Now we have one item at position 100
1820            assert_eq!(bounds.start, 100);
1821
1822            // Can read the new item
1823            assert_eq!(journal.read(100).await.unwrap(), 10000);
1824
1825            // Old positions still fail
1826            assert!(matches!(
1827                journal.read(99).await,
1828                Err(crate::journal::Error::ItemPruned(_))
1829            ));
1830
1831            journal.destroy().await.unwrap();
1832        });
1833    }
1834
1835    /// Test recovery from crash after data journal pruned but before offsets journal.
1836    #[test_traced]
1837    fn test_variable_recovery_prune_crash_offsets_behind() {
1838        let executor = deterministic::Runner::default();
1839        executor.start(|context| async move {
1840            // === Setup: Create Variable wrapper with data ===
1841            let cfg = Config {
1842                partition: "recovery-prune-crash".into(),
1843                items_per_section: NZU64!(10),
1844                compression: None,
1845                codec_config: (),
1846                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1847                write_buffer: NZUsize!(1024),
1848            };
1849
1850            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1851                .await
1852                .unwrap();
1853
1854            // Append 40 items across 4 sections to both journals
1855            for i in 0..40u64 {
1856                variable.append(&(i * 100)).await.unwrap();
1857            }
1858
1859            // Prune to position 10 normally (both data and offsets journals pruned)
1860            variable.prune(10).await.unwrap();
1861            assert_eq!(variable.bounds().await.start, 10);
1862
1863            // === Simulate crash: Prune data journal but not offsets journal ===
1864            // Manually prune data journal to section 2 (position 20)
1865            variable.test_prune_data(2).await.unwrap();
1866            // Offsets journal still has data from position 10-19
1867
1868            variable.sync().await.unwrap();
1869            drop(variable);
1870
1871            // === Verify recovery ===
1872            let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1873                .await
1874                .unwrap();
1875
1876            // Init should auto-repair: offsets journal pruned to match data journal
1877            let bounds = variable.bounds().await;
1878            assert_eq!(bounds.start, 20);
1879            assert_eq!(bounds.end, 40);
1880
1881            // Reads before position 20 should fail (pruned from both journals)
1882            assert!(matches!(
1883                variable.read(10).await,
1884                Err(crate::journal::Error::ItemPruned(_))
1885            ));
1886
1887            // Reads at position 20+ should succeed
1888            assert_eq!(variable.read(20).await.unwrap(), 2000);
1889            assert_eq!(variable.read(39).await.unwrap(), 3900);
1890
1891            variable.destroy().await.unwrap();
1892        });
1893    }
1894
1895    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1896    ///
1897    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1898    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1899    #[test_traced]
1900    fn test_variable_recovery_offsets_ahead_corruption() {
1901        let executor = deterministic::Runner::default();
1902        executor.start(|context| async move {
1903            // === Setup: Create Variable wrapper with data ===
1904            let cfg = Config {
1905                partition: "recovery-offsets-ahead".into(),
1906                items_per_section: NZU64!(10),
1907                compression: None,
1908                codec_config: (),
1909                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1910                write_buffer: NZUsize!(1024),
1911            };
1912
1913            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1914                .await
1915                .unwrap();
1916
1917            // Append 40 items across 4 sections to both journals
1918            for i in 0..40u64 {
1919                variable.append(&(i * 100)).await.unwrap();
1920            }
1921
1922            // Prune offsets journal ahead of data journal (impossible state)
1923            variable.test_prune_offsets(20).await.unwrap(); // Prune to position 20
1924            variable.test_prune_data(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1925
1926            variable.sync().await.unwrap();
1927            drop(variable);
1928
1929            // === Verify corruption detected ===
1930            let result = Journal::<_, u64>::init(context.child("second"), cfg.clone()).await;
1931            assert!(matches!(result, Err(Error::Corruption(_))));
1932        });
1933    }
1934
1935    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1936    #[test_traced]
1937    fn test_variable_recovery_append_crash_offsets_behind() {
1938        let executor = deterministic::Runner::default();
1939        executor.start(|context| async move {
1940            // === Setup: Create Variable wrapper with partial data ===
1941            let cfg = Config {
1942                partition: "recovery-append-crash".into(),
1943                items_per_section: NZU64!(10),
1944                compression: None,
1945                codec_config: (),
1946                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1947                write_buffer: NZUsize!(1024),
1948            };
1949
1950            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1951                .await
1952                .unwrap();
1953
1954            // Append 15 items to both journals (fills section 0, partial section 1)
1955            for i in 0..15u64 {
1956                variable.append(&(i * 100)).await.unwrap();
1957            }
1958
1959            assert_eq!(variable.size().await, 15);
1960
1961            // Manually append 5 more items directly to data journal only
1962            for i in 15..20u64 {
1963                variable.test_append_data(1, i * 100).await.unwrap();
1964            }
1965            // Offsets journal still has only 15 entries
1966
1967            variable.sync().await.unwrap();
1968            drop(variable);
1969
1970            // === Verify recovery ===
1971            let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1972                .await
1973                .unwrap();
1974
1975            // Init should rebuild offsets journal from data journal replay
1976            let bounds = variable.bounds().await;
1977            assert_eq!(bounds.end, 20);
1978            assert_eq!(bounds.start, 0);
1979
1980            // All items should be readable from both journals
1981            for i in 0..20u64 {
1982                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1983            }
1984
1985            // Offsets journal should be fully rebuilt to match data journal
1986            assert_eq!(variable.test_offsets_size().await, 20);
1987
1988            variable.destroy().await.unwrap();
1989        });
1990    }
1991
1992    /// Test recovery from multiple prune operations with crash.
1993    #[test_traced]
1994    fn test_variable_recovery_multiple_prunes_crash() {
1995        let executor = deterministic::Runner::default();
1996        executor.start(|context| async move {
1997            // === Setup: Create Variable wrapper with data ===
1998            let cfg = Config {
1999                partition: "recovery-multiple-prunes".into(),
2000                items_per_section: NZU64!(10),
2001                compression: None,
2002                codec_config: (),
2003                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2004                write_buffer: NZUsize!(1024),
2005            };
2006
2007            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2008                .await
2009                .unwrap();
2010
2011            // Append 50 items across 5 sections to both journals
2012            for i in 0..50u64 {
2013                variable.append(&(i * 100)).await.unwrap();
2014            }
2015
2016            // Prune to position 10 normally (both data and offsets journals pruned)
2017            variable.prune(10).await.unwrap();
2018            assert_eq!(variable.bounds().await.start, 10);
2019
2020            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
2021            // Manually prune data journal to section 3 (position 30)
2022            variable.test_prune_data(3).await.unwrap();
2023            // Offsets journal still thinks oldest is position 10
2024
2025            variable.sync().await.unwrap();
2026            drop(variable);
2027
2028            // === Verify recovery ===
2029            let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2030                .await
2031                .unwrap();
2032
2033            // Init should auto-repair: offsets journal pruned to match data journal
2034            let bounds = variable.bounds().await;
2035            assert_eq!(bounds.start, 30);
2036            assert_eq!(bounds.end, 50);
2037
2038            // Reads before position 30 should fail (pruned from both journals)
2039            assert!(matches!(
2040                variable.read(10).await,
2041                Err(crate::journal::Error::ItemPruned(_))
2042            ));
2043            assert!(matches!(
2044                variable.read(20).await,
2045                Err(crate::journal::Error::ItemPruned(_))
2046            ));
2047
2048            // Reads at position 30+ should succeed
2049            assert_eq!(variable.read(30).await.unwrap(), 3000);
2050            assert_eq!(variable.read(49).await.unwrap(), 4900);
2051
2052            variable.destroy().await.unwrap();
2053        });
2054    }
2055
2056    /// Test recovery from crash during rewind operation.
2057    ///
2058    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
2059    /// This creates a situation where offsets journal has been rewound but data journal still
2060    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
2061    /// offsets index across all sections to match the data journal.
2062    #[test_traced]
2063    fn test_variable_recovery_rewind_crash_multi_section() {
2064        let executor = deterministic::Runner::default();
2065        executor.start(|context| async move {
2066            // === Setup: Create Variable wrapper with data across multiple sections ===
2067            let cfg = Config {
2068                partition: "recovery-rewind-crash".into(),
2069                items_per_section: NZU64!(10),
2070                compression: None,
2071                codec_config: (),
2072                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2073                write_buffer: NZUsize!(1024),
2074            };
2075
2076            let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2077                .await
2078                .unwrap();
2079
2080            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
2081            for i in 0..25u64 {
2082                variable.append(&(i * 100)).await.unwrap();
2083            }
2084
2085            assert_eq!(variable.size().await, 25);
2086
2087            // === Simulate crash during rewind(5) ===
2088            // Rewind offsets journal to size 5 (keeps positions 0-4)
2089            variable.test_rewind_offsets(5).await.unwrap();
2090            // CRASH before data.rewind() completes - data still has all 3 sections
2091
2092            variable.sync().await.unwrap();
2093            drop(variable);
2094
2095            // === Verify recovery ===
2096            let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2097                .await
2098                .unwrap();
2099
2100            // Init should rebuild offsets[5-24] from data journal across all 3 sections
2101            let bounds = variable.bounds().await;
2102            assert_eq!(bounds.end, 25);
2103            assert_eq!(bounds.start, 0);
2104
2105            // All items should be readable - offsets rebuilt correctly across all sections
2106            for i in 0..25u64 {
2107                assert_eq!(variable.read(i).await.unwrap(), i * 100);
2108            }
2109
2110            // Verify offsets journal fully rebuilt
2111            assert_eq!(variable.test_offsets_size().await, 25);
2112
2113            // Verify next append gets position 25
2114            let pos = variable.append(&2500).await.unwrap();
2115            assert_eq!(pos, 25);
2116            assert_eq!(variable.read(25).await.unwrap(), 2500);
2117
2118            variable.destroy().await.unwrap();
2119        });
2120    }
2121
2122    /// Test recovery from crash after data sync but before offsets sync when journal was
2123    /// previously emptied by pruning.
2124    #[test_traced]
2125    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
2126        let executor = deterministic::Runner::default();
2127        executor.start(|context| async move {
2128            let cfg = Config {
2129                partition: "recovery-empty-after-prune".into(),
2130                items_per_section: NZU64!(10),
2131                compression: None,
2132                codec_config: (),
2133                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2134                write_buffer: NZUsize!(1024),
2135            };
2136
2137            // === Phase 1: Create journal with one full section ===
2138            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2139                .await
2140                .unwrap();
2141
2142            // Append 10 items (positions 0-9), fills section 0
2143            for i in 0..10u64 {
2144                journal.append(&(i * 100)).await.unwrap();
2145            }
2146            let bounds = journal.bounds().await;
2147            assert_eq!(bounds.end, 10);
2148            assert_eq!(bounds.start, 0);
2149
2150            // === Phase 2: Prune to create empty journal ===
2151            journal.prune(10).await.unwrap();
2152            let bounds = journal.bounds().await;
2153            assert_eq!(bounds.end, 10);
2154            assert!(bounds.is_empty()); // Empty!
2155
2156            // === Phase 3: Append directly to data journal to simulate crash ===
2157            // Manually append to data journal only (bypassing Variable's append logic)
2158            // This simulates the case where data was synced but offsets wasn't
2159            for i in 10..20u64 {
2160                journal.test_append_data(1, i * 100).await.unwrap();
2161            }
2162            // Sync the data journal (section 1)
2163            journal.test_sync_data().await.unwrap();
2164            // Do NOT sync offsets journal - simulates crash before offsets.sync()
2165
2166            // Close without syncing offsets
2167            drop(journal);
2168
2169            // === Phase 4: Verify recovery succeeds ===
2170            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2171                .await
2172                .expect("Should recover from crash after data sync but before offsets sync");
2173
2174            // All data should be recovered
2175            let bounds = journal.bounds().await;
2176            assert_eq!(bounds.end, 20);
2177            assert_eq!(bounds.start, 10);
2178
2179            // All items from position 10-19 should be readable
2180            for i in 10..20u64 {
2181                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2182            }
2183
2184            // Items 0-9 should be pruned
2185            for i in 0..10 {
2186                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2187            }
2188
2189            journal.destroy().await.unwrap();
2190        });
2191    }
2192
2193    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
2194    #[test_traced]
2195    fn test_variable_concurrent_sync_recovery() {
2196        let executor = deterministic::Runner::default();
2197        executor.start(|context| async move {
2198            let cfg = Config {
2199                partition: "concurrent-sync-recovery".into(),
2200                items_per_section: NZU64!(10),
2201                compression: None,
2202                codec_config: (),
2203                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2204                write_buffer: NZUsize!(1024),
2205            };
2206
2207            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2208                .await
2209                .unwrap();
2210
2211            // Append items across a section boundary
2212            for i in 0..15u64 {
2213                journal.append(&(i * 100)).await.unwrap();
2214            }
2215
2216            // Manually sync only data to simulate crash during concurrent sync
2217            journal.commit().await.unwrap();
2218
2219            // Simulate a crash (offsets not synced)
2220            drop(journal);
2221
2222            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2223                .await
2224                .unwrap();
2225
2226            // Data should be intact and offsets rebuilt
2227            assert_eq!(journal.size().await, 15);
2228            for i in 0..15u64 {
2229                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2230            }
2231
2232            journal.destroy().await.unwrap();
2233        });
2234    }
2235
2236    #[test_traced]
2237    fn test_init_at_size_zero() {
2238        let executor = deterministic::Runner::default();
2239        executor.start(|context| async move {
2240            let cfg = Config {
2241                partition: "init-at-size-zero".into(),
2242                items_per_section: NZU64!(5),
2243                compression: None,
2244                codec_config: (),
2245                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2246                write_buffer: NZUsize!(1024),
2247            };
2248
2249            let journal = Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 0)
2250                .await
2251                .unwrap();
2252
2253            // Size should be 0
2254            assert_eq!(journal.size().await, 0);
2255
2256            // No oldest retained position (empty journal)
2257            assert!(journal.bounds().await.is_empty());
2258
2259            // Next append should get position 0
2260            let pos = journal.append(&100).await.unwrap();
2261            assert_eq!(pos, 0);
2262            assert_eq!(journal.size().await, 1);
2263            assert_eq!(journal.read(0).await.unwrap(), 100);
2264
2265            journal.destroy().await.unwrap();
2266        });
2267    }
2268
2269    #[test_traced]
2270    fn test_init_at_size_section_boundary() {
2271        let executor = deterministic::Runner::default();
2272        executor.start(|context| async move {
2273            let cfg = Config {
2274                partition: "init-at-size-boundary".into(),
2275                items_per_section: NZU64!(5),
2276                compression: None,
2277                codec_config: (),
2278                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2279                write_buffer: NZUsize!(1024),
2280            };
2281
2282            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
2283            let journal =
2284                Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 10)
2285                    .await
2286                    .unwrap();
2287
2288            // Size should be 10
2289            let bounds = journal.bounds().await;
2290            assert_eq!(bounds.end, 10);
2291
2292            // No data yet, so no oldest retained position
2293            assert!(bounds.is_empty());
2294
2295            // Next append should get position 10
2296            let pos = journal.append(&1000).await.unwrap();
2297            assert_eq!(pos, 10);
2298            assert_eq!(journal.size().await, 11);
2299            assert_eq!(journal.read(10).await.unwrap(), 1000);
2300
2301            // Can continue appending
2302            let pos = journal.append(&1001).await.unwrap();
2303            assert_eq!(pos, 11);
2304            assert_eq!(journal.read(11).await.unwrap(), 1001);
2305
2306            journal.destroy().await.unwrap();
2307        });
2308    }
2309
2310    #[test_traced]
2311    fn test_init_at_size_mid_section() {
2312        let executor = deterministic::Runner::default();
2313        executor.start(|context| async move {
2314            let cfg = Config {
2315                partition: "init-at-size-mid".into(),
2316                items_per_section: NZU64!(5),
2317                compression: None,
2318                codec_config: (),
2319                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2320                write_buffer: NZUsize!(1024),
2321            };
2322
2323            // Initialize at position 7 (middle of section 1 with items_per_section=5)
2324            let journal = Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 7)
2325                .await
2326                .unwrap();
2327
2328            // Size should be 7
2329            let bounds = journal.bounds().await;
2330            assert_eq!(bounds.end, 7);
2331
2332            // No data yet, so no oldest retained position
2333            assert!(bounds.is_empty());
2334
2335            // Next append should get position 7
2336            let pos = journal.append(&700).await.unwrap();
2337            assert_eq!(pos, 7);
2338            assert_eq!(journal.size().await, 8);
2339            assert_eq!(journal.read(7).await.unwrap(), 700);
2340
2341            journal.destroy().await.unwrap();
2342        });
2343    }
2344
2345    #[test_traced]
2346    fn test_init_at_size_persistence() {
2347        let executor = deterministic::Runner::default();
2348        executor.start(|context| async move {
2349            let cfg = Config {
2350                partition: "init-at-size-persist".into(),
2351                items_per_section: NZU64!(5),
2352                compression: None,
2353                codec_config: (),
2354                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2355                write_buffer: NZUsize!(1024),
2356            };
2357
2358            // Initialize at position 15
2359            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 15)
2360                .await
2361                .unwrap();
2362
2363            // Append some items
2364            for i in 0..5u64 {
2365                let pos = journal.append(&(1500 + i)).await.unwrap();
2366                assert_eq!(pos, 15 + i);
2367            }
2368
2369            assert_eq!(journal.size().await, 20);
2370
2371            // Sync and reopen
2372            journal.sync().await.unwrap();
2373            drop(journal);
2374
2375            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2376                .await
2377                .unwrap();
2378
2379            // Size and data should be preserved
2380            let bounds = journal.bounds().await;
2381            assert_eq!(bounds.end, 20);
2382            assert_eq!(bounds.start, 15);
2383
2384            // Verify data
2385            for i in 0..5u64 {
2386                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
2387            }
2388
2389            // Can continue appending
2390            let pos = journal.append(&9999).await.unwrap();
2391            assert_eq!(pos, 20);
2392            assert_eq!(journal.read(20).await.unwrap(), 9999);
2393
2394            journal.destroy().await.unwrap();
2395        });
2396    }
2397
2398    #[test_traced]
2399    fn test_init_at_size_persistence_without_data() {
2400        let executor = deterministic::Runner::default();
2401        executor.start(|context| async move {
2402            let cfg = Config {
2403                partition: "init-at-size-persist-empty".into(),
2404                items_per_section: NZU64!(5),
2405                compression: None,
2406                codec_config: (),
2407                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2408                write_buffer: NZUsize!(1024),
2409            };
2410
2411            // Initialize at position 15
2412            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 15)
2413                .await
2414                .unwrap();
2415
2416            let bounds = journal.bounds().await;
2417            assert_eq!(bounds.end, 15);
2418            assert!(bounds.is_empty());
2419
2420            // Drop without writing any data
2421            drop(journal);
2422
2423            // Reopen and verify size persisted
2424            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2425                .await
2426                .unwrap();
2427
2428            let bounds = journal.bounds().await;
2429            assert_eq!(bounds.end, 15);
2430            assert!(bounds.is_empty());
2431
2432            // Can append starting at position 15
2433            let pos = journal.append(&1500).await.unwrap();
2434            assert_eq!(pos, 15);
2435            assert_eq!(journal.read(15).await.unwrap(), 1500);
2436
2437            journal.destroy().await.unwrap();
2438        });
2439    }
2440
2441    /// Test init_at_size with mid-section value persists correctly across restart.
2442    #[test_traced]
2443    fn test_init_at_size_mid_section_persistence() {
2444        let executor = deterministic::Runner::default();
2445        executor.start(|context| async move {
2446            let cfg = Config {
2447                partition: "init-at-size-mid-section".into(),
2448                items_per_section: NZU64!(5),
2449                compression: None,
2450                codec_config: (),
2451                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2452                write_buffer: NZUsize!(1024),
2453            };
2454
2455            // Initialize at position 7 (mid-section, 7 % 5 = 2)
2456            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2457                .await
2458                .unwrap();
2459
2460            // Append 3 items at positions 7, 8, 9 (fills rest of section 1)
2461            for i in 0..3u64 {
2462                let pos = journal.append(&(700 + i)).await.unwrap();
2463                assert_eq!(pos, 7 + i);
2464            }
2465
2466            let bounds = journal.bounds().await;
2467            assert_eq!(bounds.end, 10);
2468            assert_eq!(bounds.start, 7);
2469
2470            // Sync and reopen
2471            journal.sync().await.unwrap();
2472            drop(journal);
2473
2474            // Reopen
2475            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2476                .await
2477                .unwrap();
2478
2479            // Size and bounds.start should be preserved correctly
2480            let bounds = journal.bounds().await;
2481            assert_eq!(bounds.end, 10);
2482            assert_eq!(bounds.start, 7);
2483
2484            // Verify data
2485            for i in 0..3u64 {
2486                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2487            }
2488
2489            // Positions before 7 should be pruned
2490            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2491
2492            journal.destroy().await.unwrap();
2493        });
2494    }
2495
2496    /// Test init_at_size mid-section with data spanning multiple sections.
2497    #[test_traced]
2498    fn test_init_at_size_mid_section_multi_section_persistence() {
2499        let executor = deterministic::Runner::default();
2500        executor.start(|context| async move {
2501            let cfg = Config {
2502                partition: "init-at-size-multi-section".into(),
2503                items_per_section: NZU64!(5),
2504                compression: None,
2505                codec_config: (),
2506                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2507                write_buffer: NZUsize!(1024),
2508            };
2509
2510            // Initialize at position 7 (mid-section)
2511            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2512                .await
2513                .unwrap();
2514
2515            // Append 8 items: positions 7-14 (section 1: 3 items, section 2: 5 items)
2516            for i in 0..8u64 {
2517                let pos = journal.append(&(700 + i)).await.unwrap();
2518                assert_eq!(pos, 7 + i);
2519            }
2520
2521            let bounds = journal.bounds().await;
2522            assert_eq!(bounds.end, 15);
2523            assert_eq!(bounds.start, 7);
2524
2525            // Sync and reopen
2526            journal.sync().await.unwrap();
2527            drop(journal);
2528
2529            // Reopen
2530            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2531                .await
2532                .unwrap();
2533
2534            // Verify state preserved
2535            let bounds = journal.bounds().await;
2536            assert_eq!(bounds.end, 15);
2537            assert_eq!(bounds.start, 7);
2538
2539            // Verify all data
2540            for i in 0..8u64 {
2541                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2542            }
2543
2544            journal.destroy().await.unwrap();
2545        });
2546    }
2547
2548    /// Regression test: data-empty crash repair must preserve mid-section pruning boundary.
2549    #[test_traced]
2550    fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2551        let executor = deterministic::Runner::default();
2552        executor.start(|context| async move {
2553            let cfg = Config {
2554                partition: "align-journals-mid-section-pruning-boundary".into(),
2555                items_per_section: NZU64!(5),
2556                compression: None,
2557                codec_config: (),
2558                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2559                write_buffer: NZUsize!(1024),
2560            };
2561
2562            // Phase 1: Create data and offsets, then simulate data-only pruning crash.
2563            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2564                .await
2565                .unwrap();
2566            for i in 0..7u64 {
2567                journal.append(&(100 + i)).await.unwrap();
2568            }
2569            journal.sync().await.unwrap();
2570
2571            // Simulate crash after data was cleared but before offsets were pruned.
2572            journal.inner.write().await.data.clear().await.unwrap();
2573            drop(journal);
2574
2575            // Phase 2: Init triggers data-empty repair and should treat journal as fully pruned at size 7.
2576            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2577                .await
2578                .unwrap();
2579            let bounds = journal.bounds().await;
2580            assert_eq!(bounds.end, 7);
2581            assert!(bounds.is_empty());
2582
2583            // Append one item at position 7.
2584            let pos = journal.append(&777).await.unwrap();
2585            assert_eq!(pos, 7);
2586            assert_eq!(journal.size().await, 8);
2587            assert_eq!(journal.read(7).await.unwrap(), 777);
2588
2589            // Sync only the data journal to simulate a crash before offsets are synced.
2590            let section = 7 / cfg.items_per_section.get();
2591            journal
2592                .inner
2593                .write()
2594                .await
2595                .data
2596                .sync(section)
2597                .await
2598                .unwrap();
2599            drop(journal);
2600
2601            // Phase 3: Reopen and verify we did not lose the appended item.
2602            let journal = Journal::<_, u64>::init(context.child("third"), cfg.clone())
2603                .await
2604                .unwrap();
2605            let bounds = journal.bounds().await;
2606            assert_eq!(bounds.end, 8);
2607            assert_eq!(bounds.start, 7);
2608            assert_eq!(journal.read(7).await.unwrap(), 777);
2609
2610            journal.destroy().await.unwrap();
2611        });
2612    }
2613
2614    /// Test crash recovery: init_at_size + append + crash with data synced but offsets not.
2615    #[test_traced]
2616    fn test_init_at_size_crash_data_synced_offsets_not() {
2617        let executor = deterministic::Runner::default();
2618        executor.start(|context| async move {
2619            let cfg = Config {
2620                partition: "init-at-size-crash-recovery".into(),
2621                items_per_section: NZU64!(5),
2622                compression: None,
2623                codec_config: (),
2624                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2625                write_buffer: NZUsize!(1024),
2626            };
2627
2628            // Initialize at position 7 (mid-section)
2629            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2630                .await
2631                .unwrap();
2632
2633            // Append 3 items
2634            for i in 0..3u64 {
2635                journal.append(&(700 + i)).await.unwrap();
2636            }
2637
2638            // Sync only the data journal, not offsets (simulate crash)
2639            journal.inner.write().await.data.sync(1).await.unwrap();
2640            // Don't sync offsets - simulates crash after data write but before offsets write
2641            drop(journal);
2642
2643            // Reopen - should recover by rebuilding offsets from data
2644            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2645                .await
2646                .unwrap();
2647
2648            // Verify recovery
2649            let bounds = journal.bounds().await;
2650            assert_eq!(bounds.end, 10);
2651            assert_eq!(bounds.start, 7);
2652
2653            // Verify data is accessible
2654            for i in 0..3u64 {
2655                assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2656            }
2657
2658            journal.destroy().await.unwrap();
2659        });
2660    }
2661
2662    #[test_traced]
2663    fn test_prune_does_not_move_oldest_retained_backwards() {
2664        let executor = deterministic::Runner::default();
2665        executor.start(|context| async move {
2666            let cfg = Config {
2667                partition: "prune-no-backwards".into(),
2668                items_per_section: NZU64!(5),
2669                compression: None,
2670                codec_config: (),
2671                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2672                write_buffer: NZUsize!(1024),
2673            };
2674
2675            let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2676                .await
2677                .unwrap();
2678
2679            // Append a few items at positions 7..9
2680            for i in 0..3u64 {
2681                let pos = journal.append(&(700 + i)).await.unwrap();
2682                assert_eq!(pos, 7 + i);
2683            }
2684            assert_eq!(journal.bounds().await.start, 7);
2685
2686            // Prune to a position within the same section should not move bounds.start backwards.
2687            journal.prune(8).await.unwrap();
2688            assert_eq!(journal.bounds().await.start, 7);
2689            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2690            assert_eq!(journal.read(7).await.unwrap(), 700);
2691
2692            journal.destroy().await.unwrap();
2693        });
2694    }
2695
2696    #[test_traced]
2697    fn test_init_at_size_large_offset() {
2698        let executor = deterministic::Runner::default();
2699        executor.start(|context| async move {
2700            let cfg = Config {
2701                partition: "init-at-size-large".into(),
2702                items_per_section: NZU64!(5),
2703                compression: None,
2704                codec_config: (),
2705                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2706                write_buffer: NZUsize!(1024),
2707            };
2708
2709            // Initialize at a large position (position 1000)
2710            let journal =
2711                Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 1000)
2712                    .await
2713                    .unwrap();
2714
2715            let bounds = journal.bounds().await;
2716            assert_eq!(bounds.end, 1000);
2717            // No data yet, so no oldest retained position
2718            assert!(bounds.is_empty());
2719
2720            // Next append should get position 1000
2721            let pos = journal.append(&100000).await.unwrap();
2722            assert_eq!(pos, 1000);
2723            assert_eq!(journal.read(1000).await.unwrap(), 100000);
2724
2725            journal.destroy().await.unwrap();
2726        });
2727    }
2728
2729    #[test_traced]
2730    fn test_init_at_size_prune_and_append() {
2731        let executor = deterministic::Runner::default();
2732        executor.start(|context| async move {
2733            let cfg = Config {
2734                partition: "init-at-size-prune".into(),
2735                items_per_section: NZU64!(5),
2736                compression: None,
2737                codec_config: (),
2738                page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2739                write_buffer: NZUsize!(1024),
2740            };
2741
2742            // Initialize at position 20
2743            let journal =
2744                Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 20)
2745                    .await
2746                    .unwrap();
2747
2748            // Append items 20-29
2749            for i in 0..10u64 {
2750                journal.append(&(2000 + i)).await.unwrap();
2751            }
2752
2753            assert_eq!(journal.size().await, 30);
2754
2755            // Prune to position 25
2756            journal.prune(25).await.unwrap();
2757
2758            let bounds = journal.bounds().await;
2759            assert_eq!(bounds.end, 30);
2760            assert_eq!(bounds.start, 25);
2761
2762            // Verify remaining items are readable
2763            for i in 25..30u64 {
2764                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2765            }
2766
2767            // Continue appending
2768            let pos = journal.append(&3000).await.unwrap();
2769            assert_eq!(pos, 30);
2770
2771            journal.destroy().await.unwrap();
2772        });
2773    }
2774
2775    /// Test `init_sync` when there is no existing data on disk.
2776    #[test_traced]
2777    fn test_init_sync_no_existing_data() {
2778        let executor = deterministic::Runner::default();
2779        executor.start(|context| async move {
2780            let cfg = Config {
2781                partition: "test-fresh-start".into(),
2782                items_per_section: NZU64!(5),
2783                compression: None,
2784                codec_config: (),
2785                write_buffer: NZUsize!(1024),
2786                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2787            };
2788
2789            // Initialize journal with sync boundaries when no existing data exists
2790            let lower_bound = 10;
2791            let upper_bound = 26;
2792            let journal = Journal::init_sync(
2793                context.child("storage"),
2794                cfg.clone(),
2795                lower_bound..upper_bound,
2796            )
2797            .await
2798            .expect("Failed to initialize journal with sync boundaries");
2799
2800            let bounds = journal.bounds().await;
2801            assert_eq!(bounds.end, lower_bound);
2802            assert!(bounds.is_empty());
2803
2804            // Append items using the contiguous API
2805            let pos1 = journal.append(&42u64).await.unwrap();
2806            assert_eq!(pos1, lower_bound);
2807            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2808
2809            let pos2 = journal.append(&43u64).await.unwrap();
2810            assert_eq!(pos2, lower_bound + 1);
2811            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2812
2813            journal.destroy().await.unwrap();
2814        });
2815    }
2816
2817    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
2818    #[test_traced]
2819    fn test_init_sync_existing_data_overlap() {
2820        let executor = deterministic::Runner::default();
2821        executor.start(|context| async move {
2822            let cfg = Config {
2823                partition: "test-overlap".into(),
2824                items_per_section: NZU64!(5),
2825                compression: None,
2826                codec_config: (),
2827                write_buffer: NZUsize!(1024),
2828                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2829            };
2830
2831            // Create initial journal with data in multiple sections
2832            let journal =
2833                Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
2834                    .await
2835                    .expect("Failed to create initial journal");
2836
2837            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2838            for i in 0..20u64 {
2839                journal.append(&(i * 100)).await.unwrap();
2840            }
2841            journal.sync().await.unwrap();
2842            drop(journal);
2843
2844            // Initialize with sync boundaries that overlap with existing data
2845            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
2846            let lower_bound = 8;
2847            let upper_bound = 31;
2848            let journal = Journal::<deterministic::Context, u64>::init_sync(
2849                context.child("storage"),
2850                cfg.clone(),
2851                lower_bound..upper_bound,
2852            )
2853            .await
2854            .expect("Failed to initialize journal with overlap");
2855
2856            assert_eq!(journal.size().await, 20);
2857
2858            // Verify oldest retained is pruned to lower_bound's section boundary (5)
2859            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2860
2861            // Verify data integrity: positions before 5 are pruned
2862            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2863            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2864
2865            // Positions 5-19 should be accessible
2866            assert_eq!(journal.read(5).await.unwrap(), 500);
2867            assert_eq!(journal.read(8).await.unwrap(), 800);
2868            assert_eq!(journal.read(19).await.unwrap(), 1900);
2869
2870            // Position 20+ should not exist yet
2871            assert!(matches!(
2872                journal.read(20).await,
2873                Err(Error::ItemOutOfRange(_))
2874            ));
2875
2876            // Assert journal can accept new items
2877            let pos = journal.append(&999).await.unwrap();
2878            assert_eq!(pos, 20);
2879            assert_eq!(journal.read(20).await.unwrap(), 999);
2880
2881            journal.destroy().await.unwrap();
2882        });
2883    }
2884
2885    /// Test `init_sync` with invalid parameters.
2886    #[should_panic]
2887    #[test_traced]
2888    fn test_init_sync_invalid_parameters() {
2889        let executor = deterministic::Runner::default();
2890        executor.start(|context| async move {
2891            let cfg = Config {
2892                partition: "test-invalid".into(),
2893                items_per_section: NZU64!(5),
2894                compression: None,
2895                codec_config: (),
2896                write_buffer: NZUsize!(1024),
2897                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2898            };
2899
2900            #[allow(clippy::reversed_empty_ranges)]
2901            let _result = Journal::<deterministic::Context, u64>::init_sync(
2902                context.child("storage"),
2903                cfg,
2904                10..5, // invalid range: lower > upper
2905            )
2906            .await;
2907        });
2908    }
2909
2910    /// Test `init_sync` when existing data exactly matches the sync range.
2911    #[test_traced]
2912    fn test_init_sync_existing_data_exact_match() {
2913        let executor = deterministic::Runner::default();
2914        executor.start(|context| async move {
2915            let items_per_section = NZU64!(5);
2916            let cfg = Config {
2917                partition: "test-exact-match".into(),
2918                items_per_section,
2919                compression: None,
2920                codec_config: (),
2921                write_buffer: NZUsize!(1024),
2922                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2923            };
2924
2925            // Create initial journal with data exactly matching sync range
2926            let journal =
2927                Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
2928                    .await
2929                    .expect("Failed to create initial journal");
2930
2931            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
2932            for i in 0..20u64 {
2933                journal.append(&(i * 100)).await.unwrap();
2934            }
2935            journal.sync().await.unwrap();
2936            drop(journal);
2937
2938            // Initialize with sync boundaries that exactly match existing data
2939            let lower_bound = 5; // section 1
2940            let upper_bound = 20; // section 3
2941            let journal = Journal::<deterministic::Context, u64>::init_sync(
2942                context.child("storage"),
2943                cfg.clone(),
2944                lower_bound..upper_bound,
2945            )
2946            .await
2947            .expect("Failed to initialize journal with exact match");
2948
2949            assert_eq!(journal.size().await, 20);
2950
2951            // Verify pruning to lower bound (section 1 boundary = position 5)
2952            assert_eq!(journal.bounds().await.start, 5); // Section 1 starts at position 5
2953
2954            // Verify positions before 5 are pruned
2955            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2956            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2957
2958            // Positions 5-19 should be accessible
2959            assert_eq!(journal.read(5).await.unwrap(), 500);
2960            assert_eq!(journal.read(10).await.unwrap(), 1000);
2961            assert_eq!(journal.read(19).await.unwrap(), 1900);
2962
2963            // Position 20+ should not exist yet
2964            assert!(matches!(
2965                journal.read(20).await,
2966                Err(Error::ItemOutOfRange(_))
2967            ));
2968
2969            // Assert journal can accept new operations
2970            let pos = journal.append(&999).await.unwrap();
2971            assert_eq!(pos, 20);
2972            assert_eq!(journal.read(20).await.unwrap(), 999);
2973
2974            journal.destroy().await.unwrap();
2975        });
2976    }
2977
2978    /// Test `init_sync` when existing data exceeds the sync target range.
2979    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2980    #[test_traced]
2981    fn test_init_sync_existing_data_exceeds_upper_bound() {
2982        let executor = deterministic::Runner::default();
2983        executor.start(|context| async move {
2984            let items_per_section = NZU64!(5);
2985            let cfg = Config {
2986                partition: "test-unexpected-data".into(),
2987                items_per_section,
2988                compression: None,
2989                codec_config: (),
2990                write_buffer: NZUsize!(1024),
2991                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2992            };
2993
2994            // Create initial journal with data beyond sync range
2995            let journal =
2996                Journal::<deterministic::Context, u64>::init(context.child("initial"), cfg.clone())
2997                    .await
2998                    .expect("Failed to create initial journal");
2999
3000            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
3001            for i in 0..30u64 {
3002                journal.append(&(i * 1000)).await.unwrap();
3003            }
3004            journal.sync().await.unwrap();
3005            drop(journal);
3006
3007            // Initialize with sync boundaries that are exceeded by existing data
3008            let lower_bound = 8; // section 1
3009            for (i, upper_bound) in (9..29).enumerate() {
3010                let result = Journal::<deterministic::Context, u64>::init_sync(
3011                    context.child("sync").with_attribute("index", i),
3012                    cfg.clone(),
3013                    lower_bound..upper_bound,
3014                )
3015                .await;
3016
3017                // Should return ItemOutOfRange error since data exists beyond upper_bound
3018                assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
3019            }
3020        });
3021    }
3022
3023    /// Test `init_sync` repairs an empty journal recovered at a stale position beyond the range.
3024    #[test_traced]
3025    fn test_init_sync_empty_stale_position_beyond_upper_bound() {
3026        let executor = deterministic::Runner::default();
3027        executor.start(|context| async move {
3028            let cfg = Config {
3029                partition: "test-empty-stale-position".into(),
3030                items_per_section: NZU64!(5),
3031                compression: None,
3032                codec_config: (),
3033                write_buffer: NZUsize!(1024),
3034                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3035            };
3036
3037            let stale_size = 30;
3038            let journal = Journal::<deterministic::Context, u64>::init_at_size(
3039                context.child("first"),
3040                cfg.clone(),
3041                stale_size,
3042            )
3043            .await
3044            .expect("Failed to create stale empty journal");
3045            assert_eq!(journal.size().await, stale_size);
3046            assert!(journal.bounds().await.is_empty());
3047            drop(journal);
3048
3049            let lower_bound = 10;
3050            let upper_bound = 26;
3051            let journal = Journal::<deterministic::Context, u64>::init_sync(
3052                context.child("second"),
3053                cfg.clone(),
3054                lower_bound..upper_bound,
3055            )
3056            .await
3057            .expect("Failed to repair stale empty journal");
3058
3059            assert_eq!(journal.size().await, lower_bound);
3060            assert!(journal.bounds().await.is_empty());
3061
3062            let pos = journal.append(&999).await.unwrap();
3063            assert_eq!(pos, lower_bound);
3064            assert_eq!(journal.read(pos).await.unwrap(), 999);
3065
3066            journal.destroy().await.unwrap();
3067        });
3068    }
3069
3070    /// Test `init_sync` repairs an empty journal recovered after a `clear_to_size` crash.
3071    #[test_traced]
3072    fn test_init_sync_recovers_from_stale_clear_to_size() {
3073        let executor = deterministic::Runner::default();
3074        executor.start(|context| async move {
3075            let cfg = Config {
3076                partition: "test-stale-clear-to-size".into(),
3077                items_per_section: NZU64!(5),
3078                compression: None,
3079                codec_config: (),
3080                write_buffer: NZUsize!(1024),
3081                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3082            };
3083
3084            let journal = Journal::<deterministic::Context, u64>::init_at_size(
3085                context.child("first"),
3086                cfg.clone(),
3087                9,
3088            )
3089            .await
3090            .expect("Failed to create stale empty journal");
3091            journal.sync().await.unwrap();
3092            drop(journal);
3093
3094            // Simulate clear_to_size(7) crashing after clearing data, but before offsets were
3095            // re-cleared. Recovery will initially see the old empty offsets boundary at 9.
3096            match context.remove(&cfg.data_partition(), None).await {
3097                Ok(()) | Err(commonware_runtime::Error::PartitionMissing(_)) => {}
3098                Err(error) => panic!("failed to clear data partition: {error}"),
3099            }
3100
3101            let lower_bound = 7;
3102            let upper_bound = 20;
3103            let journal = Journal::<deterministic::Context, u64>::init_sync(
3104                context.child("second"),
3105                cfg.clone(),
3106                lower_bound..upper_bound,
3107            )
3108            .await
3109            .expect("Failed to repair stale empty journal");
3110
3111            assert_eq!(journal.size().await, lower_bound);
3112            let bounds = journal.bounds().await;
3113            assert!(bounds.is_empty());
3114            assert_eq!(bounds.start, lower_bound);
3115
3116            journal.destroy().await.unwrap();
3117        });
3118    }
3119
3120    /// Test `init_sync` when all existing data is stale (before lower bound).
3121    #[test_traced]
3122    fn test_init_sync_existing_data_stale() {
3123        let executor = deterministic::Runner::default();
3124        executor.start(|context| async move {
3125            let items_per_section = NZU64!(5);
3126            let cfg = Config {
3127                partition: "test-stale".into(),
3128                items_per_section,
3129                compression: None,
3130                codec_config: (),
3131                write_buffer: NZUsize!(1024),
3132                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3133            };
3134
3135            // Create initial journal with stale data
3136            let journal =
3137                Journal::<deterministic::Context, u64>::init(context.child("first"), cfg.clone())
3138                    .await
3139                    .expect("Failed to create initial journal");
3140
3141            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
3142            for i in 0..10u64 {
3143                journal.append(&(i * 100)).await.unwrap();
3144            }
3145            journal.sync().await.unwrap();
3146            drop(journal);
3147
3148            // Initialize with sync boundaries beyond all existing data
3149            let lower_bound = 15; // section 3
3150            let upper_bound = 26; // last element in section 5
3151            let journal = Journal::<deterministic::Context, u64>::init_sync(
3152                context.child("second"),
3153                cfg.clone(),
3154                lower_bound..upper_bound,
3155            )
3156            .await
3157            .expect("Failed to initialize journal with stale data");
3158
3159            assert_eq!(journal.size().await, 15);
3160
3161            // Verify fresh journal (all old data destroyed, starts at position 15)
3162            assert!(journal.bounds().await.is_empty());
3163
3164            // Verify old positions don't exist
3165            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3166            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
3167            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
3168
3169            journal.destroy().await.unwrap();
3170        });
3171    }
3172
3173    /// Test `init_sync` with section boundary edge cases.
3174    #[test_traced]
3175    fn test_init_sync_section_boundaries() {
3176        let executor = deterministic::Runner::default();
3177        executor.start(|context| async move {
3178            let items_per_section = NZU64!(5);
3179            let cfg = Config {
3180                partition: "test-boundaries".into(),
3181                items_per_section,
3182                compression: None,
3183                codec_config: (),
3184                write_buffer: NZUsize!(1024),
3185                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3186            };
3187
3188            // Create journal with data at section boundaries
3189            let journal =
3190                Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
3191                    .await
3192                    .expect("Failed to create initial journal");
3193
3194            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
3195            for i in 0..25u64 {
3196                journal.append(&(i * 100)).await.unwrap();
3197            }
3198            journal.sync().await.unwrap();
3199            drop(journal);
3200
3201            // Test sync boundaries exactly at section boundaries
3202            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
3203            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
3204            let journal = Journal::<deterministic::Context, u64>::init_sync(
3205                context.child("storage"),
3206                cfg.clone(),
3207                lower_bound..upper_bound,
3208            )
3209            .await
3210            .expect("Failed to initialize journal at boundaries");
3211
3212            assert_eq!(journal.size().await, 25);
3213
3214            // Verify oldest retained is at section 3 boundary (position 15)
3215            assert_eq!(journal.bounds().await.start, 15);
3216
3217            // Verify positions before 15 are pruned
3218            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3219            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
3220
3221            // Verify positions 15-24 are accessible
3222            assert_eq!(journal.read(15).await.unwrap(), 1500);
3223            assert_eq!(journal.read(20).await.unwrap(), 2000);
3224            assert_eq!(journal.read(24).await.unwrap(), 2400);
3225
3226            // Position 25+ should not exist yet
3227            assert!(matches!(
3228                journal.read(25).await,
3229                Err(Error::ItemOutOfRange(_))
3230            ));
3231
3232            // Assert journal can accept new operations
3233            let pos = journal.append(&999).await.unwrap();
3234            assert_eq!(pos, 25);
3235            assert_eq!(journal.read(25).await.unwrap(), 999);
3236
3237            journal.destroy().await.unwrap();
3238        });
3239    }
3240
3241    /// Test `init_sync` when range.start and range.end-1 are in the same section.
3242    #[test_traced]
3243    fn test_init_sync_same_section_bounds() {
3244        let executor = deterministic::Runner::default();
3245        executor.start(|context| async move {
3246            let items_per_section = NZU64!(5);
3247            let cfg = Config {
3248                partition: "test-same-section".into(),
3249                items_per_section,
3250                compression: None,
3251                codec_config: (),
3252                write_buffer: NZUsize!(1024),
3253                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3254            };
3255
3256            // Create journal with data in multiple sections
3257            let journal =
3258                Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
3259                    .await
3260                    .expect("Failed to create initial journal");
3261
3262            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
3263            for i in 0..15u64 {
3264                journal.append(&(i * 100)).await.unwrap();
3265            }
3266            journal.sync().await.unwrap();
3267            drop(journal);
3268
3269            // Test sync boundaries within the same section
3270            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
3271            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
3272            let journal = Journal::<deterministic::Context, u64>::init_sync(
3273                context.child("storage"),
3274                cfg.clone(),
3275                lower_bound..upper_bound,
3276            )
3277            .await
3278            .expect("Failed to initialize journal with same-section bounds");
3279
3280            assert_eq!(journal.size().await, 15);
3281
3282            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
3283            // Oldest retained position should be at section 2 boundary (position 10)
3284            assert_eq!(journal.bounds().await.start, 10);
3285
3286            // Verify positions before 10 are pruned
3287            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3288            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
3289
3290            // Verify positions 10-14 are accessible
3291            assert_eq!(journal.read(10).await.unwrap(), 1000);
3292            assert_eq!(journal.read(11).await.unwrap(), 1100);
3293            assert_eq!(journal.read(14).await.unwrap(), 1400);
3294
3295            // Position 15+ should not exist yet
3296            assert!(matches!(
3297                journal.read(15).await,
3298                Err(Error::ItemOutOfRange(_))
3299            ));
3300
3301            // Assert journal can accept new operations
3302            let pos = journal.append(&999).await.unwrap();
3303            assert_eq!(pos, 15);
3304            assert_eq!(journal.read(15).await.unwrap(), 999);
3305
3306            journal.destroy().await.unwrap();
3307        });
3308    }
3309
3310    /// Test contiguous variable journal with items_per_section=1.
3311    ///
3312    /// This is a regression test for a bug where reading from size()-1 fails
3313    /// when using items_per_section=1, particularly after pruning and restart.
3314    #[test_traced]
3315    fn test_single_item_per_section() {
3316        let executor = deterministic::Runner::default();
3317        executor.start(|context| async move {
3318            let cfg = Config {
3319                partition: "single-item-per-section".into(),
3320                items_per_section: NZU64!(1),
3321                compression: None,
3322                codec_config: (),
3323                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3324                write_buffer: NZUsize!(1024),
3325            };
3326
3327            // === Test 1: Basic single item operation ===
3328            let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
3329                .await
3330                .unwrap();
3331
3332            // Verify empty state
3333            let bounds = journal.bounds().await;
3334            assert_eq!(bounds.end, 0);
3335            assert!(bounds.is_empty());
3336
3337            // Append 1 item (value = position * 100, so position 0 has value 0)
3338            let pos = journal.append(&0).await.unwrap();
3339            assert_eq!(pos, 0);
3340            assert_eq!(journal.size().await, 1);
3341
3342            // Sync
3343            journal.sync().await.unwrap();
3344
3345            // Read from size() - 1
3346            let value = journal.read(journal.size().await - 1).await.unwrap();
3347            assert_eq!(value, 0);
3348
3349            // === Test 2: Multiple items with single item per section ===
3350            for i in 1..10u64 {
3351                let pos = journal.append(&(i * 100)).await.unwrap();
3352                assert_eq!(pos, i);
3353                assert_eq!(journal.size().await, i + 1);
3354
3355                // Verify we can read the just-appended item at size() - 1
3356                let value = journal.read(journal.size().await - 1).await.unwrap();
3357                assert_eq!(value, i * 100);
3358            }
3359
3360            // Verify all items can be read
3361            for i in 0..10u64 {
3362                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3363            }
3364
3365            journal.sync().await.unwrap();
3366
3367            // === Test 3: Pruning with single item per section ===
3368            // Prune to position 5 (removes positions 0-4)
3369            let pruned = journal.prune(5).await.unwrap();
3370            assert!(pruned);
3371
3372            // Size should still be 10
3373            assert_eq!(journal.size().await, 10);
3374
3375            // bounds.start should be 5
3376            assert_eq!(journal.bounds().await.start, 5);
3377
3378            // Reading from bounds.end - 1 (position 9) should still work
3379            let value = journal.read(journal.size().await - 1).await.unwrap();
3380            assert_eq!(value, 900);
3381
3382            // Reading from pruned positions should return ItemPruned
3383            for i in 0..5 {
3384                assert!(matches!(
3385                    journal.read(i).await,
3386                    Err(crate::journal::Error::ItemPruned(_))
3387                ));
3388            }
3389
3390            // Reading from retained positions should work
3391            for i in 5..10u64 {
3392                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3393            }
3394
3395            // Append more items after pruning
3396            for i in 10..15u64 {
3397                let pos = journal.append(&(i * 100)).await.unwrap();
3398                assert_eq!(pos, i);
3399
3400                // Verify we can read from size() - 1
3401                let value = journal.read(journal.size().await - 1).await.unwrap();
3402                assert_eq!(value, i * 100);
3403            }
3404
3405            journal.sync().await.unwrap();
3406            drop(journal);
3407
3408            // === Test 4: Restart persistence with single item per section ===
3409            let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
3410                .await
3411                .unwrap();
3412
3413            // Verify size is preserved
3414            assert_eq!(journal.size().await, 15);
3415
3416            // Verify bounds.start is preserved
3417            assert_eq!(journal.bounds().await.start, 5);
3418
3419            // Reading from bounds.end - 1 should work after restart
3420            let value = journal.read(journal.size().await - 1).await.unwrap();
3421            assert_eq!(value, 1400);
3422
3423            // Reading all retained positions should work
3424            for i in 5..15u64 {
3425                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3426            }
3427
3428            journal.destroy().await.unwrap();
3429
3430            // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) ===
3431            // Fresh journal for this test
3432            let journal = Journal::<_, u64>::init(context.child("third"), cfg.clone())
3433                .await
3434                .unwrap();
3435
3436            // Append 10 items (positions 0-9)
3437            for i in 0..10u64 {
3438                journal.append(&(i * 1000)).await.unwrap();
3439            }
3440
3441            // Prune to position 5 (removes positions 0-4)
3442            journal.prune(5).await.unwrap();
3443            let bounds = journal.bounds().await;
3444            assert_eq!(bounds.end, 10);
3445            assert_eq!(bounds.start, 5);
3446
3447            // Sync and restart
3448            journal.sync().await.unwrap();
3449            drop(journal);
3450
3451            // Re-open journal
3452            let journal = Journal::<_, u64>::init(context.child("fourth"), cfg.clone())
3453                .await
3454                .unwrap();
3455
3456            // Verify state after restart
3457            let bounds = journal.bounds().await;
3458            assert_eq!(bounds.end, 10);
3459            assert_eq!(bounds.start, 5);
3460
3461            // KEY TEST: Reading from bounds.end - 1 (position 9) should work
3462            let value = journal.read(journal.size().await - 1).await.unwrap();
3463            assert_eq!(value, 9000);
3464
3465            // Verify all retained positions (5-9) work
3466            for i in 5..10u64 {
3467                assert_eq!(journal.read(i).await.unwrap(), i * 1000);
3468            }
3469
3470            journal.destroy().await.unwrap();
3471
3472            // === Test 6: Prune all items (edge case) ===
3473            // This tests the scenario where prune removes everything.
3474            // Callers must check bounds().is_empty() before reading.
3475            let journal = Journal::<_, u64>::init(context.child("fifth"), cfg.clone())
3476                .await
3477                .unwrap();
3478
3479            for i in 0..5u64 {
3480                journal.append(&(i * 100)).await.unwrap();
3481            }
3482            journal.sync().await.unwrap();
3483
3484            // Prune all items
3485            journal.prune(5).await.unwrap();
3486            let bounds = journal.bounds().await;
3487            assert_eq!(bounds.end, 5); // Size unchanged
3488            assert!(bounds.is_empty()); // All pruned
3489
3490            // bounds.end - 1 = 4, but position 4 is pruned
3491            let result = journal.read(journal.size().await - 1).await;
3492            assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
3493
3494            // After appending, reading works again
3495            journal.append(&500).await.unwrap();
3496            let bounds = journal.bounds().await;
3497            assert_eq!(bounds.start, 5);
3498            assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3499
3500            journal.destroy().await.unwrap();
3501        });
3502    }
3503
3504    #[test_traced]
3505    fn test_variable_journal_clear_to_size() {
3506        let executor = deterministic::Runner::default();
3507        executor.start(|context| async move {
3508            let cfg = Config {
3509                partition: "clear-test".into(),
3510                items_per_section: NZU64!(10),
3511                compression: None,
3512                codec_config: (),
3513                page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3514                write_buffer: NZUsize!(1024),
3515            };
3516
3517            let journal = Journal::<_, u64>::init(context.child("journal"), cfg.clone())
3518                .await
3519                .unwrap();
3520
3521            // Append 25 items (spanning multiple sections)
3522            for i in 0..25u64 {
3523                journal.append(&(i * 100)).await.unwrap();
3524            }
3525            let bounds = journal.bounds().await;
3526            assert_eq!(bounds.end, 25);
3527            assert_eq!(bounds.start, 0);
3528            journal.sync().await.unwrap();
3529
3530            // Clear to position 100, effectively resetting the journal
3531            journal.clear_to_size(100).await.unwrap();
3532            let bounds = journal.bounds().await;
3533            assert_eq!(bounds.end, 100);
3534            assert!(bounds.is_empty());
3535
3536            // Old positions should fail
3537            for i in 0..25 {
3538                assert!(matches!(
3539                    journal.read(i).await,
3540                    Err(crate::journal::Error::ItemPruned(_))
3541                ));
3542            }
3543
3544            // Verify size persists after restart without writing any data
3545            drop(journal);
3546            let journal =
3547                Journal::<_, u64>::init(context.child("journal_after_clear"), cfg.clone())
3548                    .await
3549                    .unwrap();
3550            let bounds = journal.bounds().await;
3551            assert_eq!(bounds.end, 100);
3552            assert!(bounds.is_empty());
3553
3554            // Append new data starting at position 100
3555            for i in 100..105u64 {
3556                let pos = journal.append(&(i * 100)).await.unwrap();
3557                assert_eq!(pos, i);
3558            }
3559            let bounds = journal.bounds().await;
3560            assert_eq!(bounds.end, 105);
3561            assert_eq!(bounds.start, 100);
3562
3563            // New positions should be readable
3564            for i in 100..105u64 {
3565                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3566            }
3567
3568            // Sync and re-init to verify persistence
3569            journal.sync().await.unwrap();
3570            drop(journal);
3571
3572            let journal = Journal::<_, u64>::init(context.child("journal_reopened"), cfg)
3573                .await
3574                .unwrap();
3575
3576            let bounds = journal.bounds().await;
3577            assert_eq!(bounds.end, 105);
3578            assert_eq!(bounds.start, 100);
3579            for i in 100..105u64 {
3580                assert_eq!(journal.read(i).await.unwrap(), i * 100);
3581            }
3582
3583            journal.destroy().await.unwrap();
3584        });
3585    }
3586
3587    #[test_traced]
3588    fn test_variable_journal_metrics() {
3589        let executor = deterministic::Runner::default();
3590        executor.start(|context| async move {
3591            let cfg = Config {
3592                partition: "metrics".into(),
3593                items_per_section: NZU64!(2),
3594                compression: None,
3595                codec_config: (),
3596                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10)),
3597                write_buffer: NZUsize!(1024),
3598            };
3599            let journal = Journal::<_, u64>::init(context.child("variable_metrics"), cfg)
3600                .await
3601                .unwrap();
3602
3603            let items = [0, 1, 2, 3, 4];
3604            journal.append_many(Many::Flat(&items)).await.unwrap();
3605            journal.append(&5).await.unwrap();
3606            let reader = journal.reader().await;
3607            reader.read(0).await.unwrap();
3608            reader.read_many(&[1, 2]).await.unwrap();
3609            reader.try_read_sync(3).unwrap();
3610            drop(reader);
3611            journal.commit().await.unwrap();
3612            journal.sync().await.unwrap();
3613            journal.prune(2).await.unwrap();
3614            journal.rewind(4).await.unwrap();
3615
3616            let buffer = context.encode();
3617            for expected in [
3618                "variable_metrics_size 4",
3619                "variable_metrics_pruning_boundary 2",
3620                "variable_metrics_retained 2",
3621                "variable_metrics_tail_items 2",
3622                "variable_metrics_append_calls_total 1",
3623                "variable_metrics_append_many_calls_total 1",
3624                "variable_metrics_read_calls_total 1",
3625                "variable_metrics_read_many_calls_total 1",
3626                "variable_metrics_try_read_sync_hits_total 1",
3627                "variable_metrics_items_read_total 4",
3628                "variable_metrics_commit_calls_total 1",
3629                "variable_metrics_sync_calls_total 1",
3630                "variable_metrics_append_duration_count 1",
3631                "variable_metrics_append_many_duration_count 1",
3632                "variable_metrics_read_duration_count 1",
3633                "variable_metrics_read_many_duration_count 1",
3634                "variable_metrics_commit_duration_count 1",
3635                "variable_metrics_sync_duration_count 1",
3636                "variable_metrics_data_tracked",
3637                "variable_metrics_offsets_size 4",
3638                "variable_metrics_offsets_blobs_tracked",
3639            ] {
3640                assert!(buffer.contains(expected), "{expected}\n{buffer}");
3641            }
3642            for unexpected in [
3643                "variable_metrics_cache_hits_total",
3644                "variable_metrics_cache_misses_total",
3645            ] {
3646                assert!(!buffer.contains(unexpected), "{unexpected}\n{buffer}");
3647            }
3648
3649            journal.destroy().await.unwrap();
3650        });
3651    }
3652}