commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use crate::{
7    journal::{
8        contiguous::{fixed, Contiguous, MutableContiguous},
9        segmented::variable,
10        Error,
11    },
12    mmr::Location,
13    Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
17use commonware_utils::NZUsize;
18use core::ops::Range;
19use futures::{future::Either, stream, Stream, StreamExt as _};
20use std::num::{NonZeroU64, NonZeroUsize};
21use tracing::{debug, info};
22
23const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
24
25/// Suffix appended to the base partition name for the data journal.
26const DATA_SUFFIX: &str = "_data";
27
28/// Suffix appended to the base partition name for the offsets journal.
29const OFFSETS_SUFFIX: &str = "_offsets";
30
31/// Calculate the section number for a given position.
32///
33/// # Arguments
34///
35/// * `position` - The absolute position in the journal
36/// * `items_per_section` - The number of items stored in each section
37///
38/// # Returns
39///
40/// The section number where the item at `position` should be stored.
41///
42/// # Examples
43///
44/// ```ignore
45/// // With 10 items per section:
46/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
47/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
48/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
49/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
50/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
51/// ```
52const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
53    position / items_per_section
54}
55
56/// Configuration for a [Journal].
57#[derive(Clone)]
58pub struct Config<C> {
59    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
60    pub partition: String,
61
62    /// The number of items to store in each section.
63    ///
64    /// Once set, this value cannot be changed across restarts.
65    /// All non-final sections will be full and persisted.
66    pub items_per_section: NonZeroU64,
67
68    /// Optional compression level for stored items.
69    pub compression: Option<u8>,
70
71    /// [Codec] configuration for encoding/decoding items.
72    pub codec_config: C,
73
74    /// Buffer pool for caching data.
75    pub buffer_pool: PoolRef,
76
77    /// Write buffer size for each section.
78    pub write_buffer: NonZeroUsize,
79}
80
81impl<C> Config<C> {
82    /// Returns the partition name for the data journal.
83    fn data_partition(&self) -> String {
84        format!("{}{}", self.partition, DATA_SUFFIX)
85    }
86
87    /// Returns the partition name for the offsets journal.
88    fn offsets_partition(&self) -> String {
89        format!("{}{}", self.partition, OFFSETS_SUFFIX)
90    }
91}
92
93/// A contiguous journal with variable-size entries.
94///
95/// This journal manages section assignment automatically, allowing callers to append items
96/// sequentially without manually tracking section numbers.
97///
98/// # Repair
99///
100/// Like
101/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
102/// and
103/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
104/// the first invalid data read will be considered the new end of the journal (and the
105/// underlying [Blob](commonware_runtime::Blob) will be truncated to the last valid item). Repair occurs during
106/// init via the underlying segmented journals.
107///
108/// # Invariants
109///
110/// ## 1. Section Fullness
111///
112/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
113/// that on `init()`, we only need to replay the last section to determine the exact size.
114///
115/// ## 2. Data Journal is Source of Truth
116///
117/// The data journal is always the source of truth. The offsets journal is an index
118/// that may temporarily diverge during crashes. Divergences are automatically
119/// aligned during init():
120/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
121///   (This can happen if we crash after writing data journal but before writing offsets journal)
122/// * If offsets.size() > data.size(): Rewind offsets to match data size.
123///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
124/// * If offsets.oldest_retained_pos() < data.oldest_retained_pos(): Prune offsets to match
125///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
126///
127/// Note that we don't recover from the case where offsets.oldest_retained_pos() >
128/// data.oldest_retained_pos(). This should never occur because we always prune the data journal
129/// before the offsets journal.
130pub struct Journal<E: Storage + Metrics, V: Codec> {
131    /// The underlying variable-length data journal.
132    data: variable::Journal<E, V>,
133
134    /// Index mapping positions to byte offsets within the data journal.
135    /// The section can be calculated from the position using items_per_section.
136    offsets: fixed::Journal<E, u64>,
137
138    /// The number of items per section.
139    ///
140    /// # Invariant
141    ///
142    /// This value is immutable after initialization and must remain consistent
143    /// across restarts. Changing this value will result in data loss or corruption.
144    items_per_section: u64,
145
146    /// The next position to be assigned on append (total items appended).
147    ///
148    /// # Invariant
149    ///
150    /// Always >= `oldest_retained_pos`. Equal when data journal is empty or fully pruned.
151    size: u64,
152
153    /// The position of the first item that remains after pruning.
154    ///
155    /// # Invariant
156    ///
157    /// Always section-aligned: `oldest_retained_pos % items_per_section == 0`.
158    /// Never decreases (pruning only moves forward).
159    oldest_retained_pos: u64,
160}
161
162impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
163    /// Initialize a contiguous variable journal.
164    ///
165    /// # Crash Recovery
166    ///
167    /// The data journal is the source of truth. If the offsets journal is inconsistent
168    /// it will be updated to match the data journal.
169    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
170        let items_per_section = cfg.items_per_section.get();
171        let data_partition = cfg.data_partition();
172        let offsets_partition = cfg.offsets_partition();
173
174        // Initialize underlying variable data journal
175        let mut data = variable::Journal::init(
176            context.clone(),
177            variable::Config {
178                partition: data_partition,
179                compression: cfg.compression,
180                codec_config: cfg.codec_config,
181                buffer_pool: cfg.buffer_pool.clone(),
182                write_buffer: cfg.write_buffer,
183            },
184        )
185        .await?;
186
187        // Initialize offsets journal
188        let mut offsets = fixed::Journal::init(
189            context,
190            fixed::Config {
191                partition: offsets_partition,
192                items_per_blob: cfg.items_per_section,
193                buffer_pool: cfg.buffer_pool,
194                write_buffer: cfg.write_buffer,
195            },
196        )
197        .await?;
198
199        // Validate and align offsets journal to match data journal
200        let (oldest_retained_pos, size) =
201            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
202        assert!(
203            oldest_retained_pos.is_multiple_of(items_per_section),
204            "oldest_retained_pos is not section-aligned"
205        );
206
207        Ok(Self {
208            data,
209            offsets,
210            items_per_section,
211            size,
212            oldest_retained_pos,
213        })
214    }
215
216    /// Initialize a [Journal] in a fully pruned state at a specific logical size.
217    ///
218    /// This creates a journal that reports `size()` as `size` but contains no data.
219    /// The `oldest_retained_pos()` will return `None`, indicating all positions before
220    /// `size` have been pruned. This is useful for state sync when starting from
221    /// a non-zero position without historical data.
222    ///
223    /// # Arguments
224    ///
225    /// * `size` - The logical size to initialize at.
226    ///
227    /// # Post-conditions
228    ///
229    /// * `size()` returns `size`
230    /// * `oldest_retained_pos()` returns `None` (fully pruned)
231    /// * Next append receives position `size`
232    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
233        // Initialize empty data journal
234        let data = variable::Journal::init(
235            context.clone(),
236            variable::Config {
237                partition: cfg.data_partition(),
238                compression: cfg.compression,
239                codec_config: cfg.codec_config.clone(),
240                buffer_pool: cfg.buffer_pool.clone(),
241                write_buffer: cfg.write_buffer,
242            },
243        )
244        .await?;
245
246        // Initialize offsets journal at the target size
247        let offsets = fixed::Journal::init_at_size(
248            context,
249            fixed::Config {
250                partition: cfg.offsets_partition(),
251                items_per_blob: cfg.items_per_section,
252                buffer_pool: cfg.buffer_pool,
253                write_buffer: cfg.write_buffer,
254            },
255            size,
256        )
257        .await?;
258
259        Ok(Self {
260            data,
261            offsets,
262            items_per_section: cfg.items_per_section.get(),
263            size,
264            oldest_retained_pos: size,
265        })
266    }
267
268    /// Initialize a [Journal] for use in state sync.
269    ///
270    /// The bounds are item locations (not section numbers). This function prepares the
271    /// on-disk journal so that subsequent appends go to the correct physical location for the
272    /// requested range.
273    ///
274    /// Behavior by existing on-disk state:
275    /// - Fresh (no data): returns an empty journal.
276    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
277    ///   empty journal.
278    /// - Overlap within [`range.start`, `range.end`]:
279    ///   - Prunes to `range.start`
280    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
281    ///
282    /// # Arguments
283    /// - `context`: storage context
284    /// - `cfg`: journal configuration
285    /// - `range`: range of item locations to retain
286    ///
287    /// # Returns
288    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
289    ///
290    /// # Errors
291    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
292    pub(crate) async fn init_sync(
293        context: E,
294        cfg: Config<V::Cfg>,
295        range: Range<u64>,
296    ) -> Result<Self, crate::qmdb::Error> {
297        assert!(!range.is_empty(), "range must not be empty");
298
299        debug!(
300            range.start,
301            range.end,
302            items_per_section = cfg.items_per_section.get(),
303            "initializing contiguous variable journal for sync"
304        );
305
306        // Initialize contiguous journal
307        let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
308
309        let size = journal.size();
310
311        // No existing data - initialize at the start of the sync range if needed
312        if size == 0 {
313            if range.start == 0 {
314                debug!("no existing journal data, returning empty journal");
315                return Ok(journal);
316            } else {
317                debug!(
318                    range.start,
319                    "no existing journal data, initializing at sync range start"
320                );
321                journal.destroy().await?;
322                return Ok(Self::init_at_size(context, cfg, range.start).await?);
323            }
324        }
325
326        // Check if data exceeds the sync range
327        if size > range.end {
328            return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
329                size,
330            )));
331        }
332
333        // If all existing data is before our sync range, destroy and recreate fresh
334        if size <= range.start {
335            // All data is stale (ends at or before range.start)
336            debug!(
337                size,
338                range.start, "existing journal data is stale, re-initializing at start position"
339            );
340            journal.destroy().await?;
341            return Ok(Self::init_at_size(context, cfg, range.start).await?);
342        }
343
344        // Prune to lower bound if needed
345        let oldest = journal.oldest_retained_pos();
346        if let Some(oldest_pos) = oldest {
347            if oldest_pos < range.start {
348                debug!(
349                    oldest_pos,
350                    range.start, "pruning journal to sync range start"
351                );
352                journal.prune(range.start).await?;
353            }
354        }
355
356        Ok(journal)
357    }
358
359    /// Rewind the journal to the given size, discarding items from the end.
360    ///
361    /// After rewinding to size N, the journal will contain exactly N items, and the next append
362    /// will receive position N.
363    ///
364    /// # Errors
365    ///
366    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
367    ///
368    /// # Warning
369    ///
370    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
371    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
372        // Validate rewind target
373        match size.cmp(&self.size) {
374            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
375            std::cmp::Ordering::Equal => return Ok(()), // No-op
376            std::cmp::Ordering::Less => {}
377        }
378
379        // Rewind never updates oldest_retained_pos.
380        if size < self.oldest_retained_pos {
381            return Err(Error::ItemPruned(size));
382        }
383
384        // Read the offset of the first item to discard (at position 'size').
385        let discard_offset = self.offsets.read(size).await?;
386        let discard_section = position_to_section(size, self.items_per_section);
387
388        self.data
389            .rewind_to_offset(discard_section, discard_offset)
390            .await?;
391        self.offsets.rewind(size).await?;
392
393        // Update our size
394        self.size = size;
395
396        Ok(())
397    }
398
399    /// Append a new item to the journal, returning its position.
400    ///
401    /// The position returned is a stable, consecutively increasing value starting from 0.
402    /// This position remains constant after pruning.
403    ///
404    /// When a section becomes full, both the data journal and offsets journal are persisted
405    /// to maintain the invariant that all non-final sections are full and consistent.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the underlying storage operation fails or if the item cannot
410    /// be encoded.
411    ///
412    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
413    /// reopened to trigger alignment in [Journal::init].
414    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
415        // Calculate which section this position belongs to
416        let section = self.current_section();
417
418        // Append to data journal, get offset
419        let (offset, _size) = self.data.append(section, item).await?;
420
421        // Append offset to offsets journal
422        let offsets_pos = self.offsets.append(offset).await?;
423        assert_eq!(offsets_pos, self.size);
424
425        // Return the current position
426        let position = self.size;
427        self.size += 1;
428
429        // Maintain invariant that all full sections are persisted.
430        if self.size.is_multiple_of(self.items_per_section) {
431            futures::try_join!(self.data.sync(section), self.offsets.sync())?;
432        }
433
434        Ok(position)
435    }
436
437    /// Return the total number of items that have been appended to the journal.
438    ///
439    /// This count is NOT affected by pruning. The next appended item will receive this
440    /// position as its value.
441    pub const fn size(&self) -> u64 {
442        self.size
443    }
444
445    /// Return the position of the oldest item still retained in the journal.
446    ///
447    /// Returns `None` if the journal is empty or if all items have been pruned.
448    pub const fn oldest_retained_pos(&self) -> Option<u64> {
449        if self.size == self.oldest_retained_pos {
450            // No items retained: either never had data or fully pruned
451            None
452        } else {
453            Some(self.oldest_retained_pos)
454        }
455    }
456
457    /// Returns the location before which all items have been pruned.
458    pub fn pruning_boundary(&self) -> u64 {
459        self.oldest_retained_pos().unwrap_or(self.size)
460    }
461
462    /// Prune items at positions strictly less than `min_position`.
463    ///
464    /// Returns `true` if any data was pruned, `false` otherwise.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if the underlying storage operation fails.
469    ///
470    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
471    /// reopened to trigger alignment in [Journal::init].
472    pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
473        if min_position <= self.oldest_retained_pos {
474            return Ok(false);
475        }
476
477        // Cap min_position to size to maintain the invariant oldest_retained_pos <= size
478        let min_position = min_position.min(self.size);
479
480        // Calculate section number
481        let min_section = position_to_section(min_position, self.items_per_section);
482
483        let pruned = self.data.prune(min_section).await?;
484        if pruned {
485            self.oldest_retained_pos = min_section * self.items_per_section;
486            self.offsets.prune(self.oldest_retained_pos).await?;
487        }
488        Ok(pruned)
489    }
490
491    /// Return a stream of all items in the journal starting from `start_pos`.
492    ///
493    /// Each item is yielded as a tuple `(position, item)` where position is the item's
494    /// position in the journal.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
499    /// errors occur during replay.
500    pub async fn replay(
501        &self,
502        start_pos: u64,
503        buffer_size: NonZeroUsize,
504    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
505        // Validate start position is within bounds.
506        if start_pos < self.oldest_retained_pos {
507            return Err(Error::ItemPruned(start_pos));
508        }
509        if start_pos > self.size {
510            return Err(Error::ItemOutOfRange(start_pos));
511        }
512
513        // If replaying at exactly size, return empty stream
514        if start_pos == self.size {
515            return Ok(Either::Left(stream::empty()));
516        }
517
518        // Use offsets index to find offset to start from, calculate section from position
519        let start_offset = self.offsets.read(start_pos).await?;
520        let start_section = position_to_section(start_pos, self.items_per_section);
521        let data_stream = self
522            .data
523            .replay(start_section, start_offset, buffer_size)
524            .await?;
525
526        // Transform the stream to include position information
527        let transformed = data_stream.enumerate().map(move |(idx, result)| {
528            result.map(|(_section, _offset, _size, item)| {
529                // Calculate position: start_pos + items read
530                let pos = start_pos + idx as u64;
531                (pos, item)
532            })
533        });
534
535        Ok(Either::Right(transformed))
536    }
537
538    /// Read the item at the given position.
539    ///
540    /// # Errors
541    ///
542    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
543    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
544    /// - Returns other errors if storage or decoding fails.
545    pub async fn read(&self, position: u64) -> Result<V, Error> {
546        // Check bounds
547        if position >= self.size {
548            return Err(Error::ItemOutOfRange(position));
549        }
550
551        if position < self.oldest_retained_pos {
552            return Err(Error::ItemPruned(position));
553        }
554
555        // Read offset from journal and calculate section from position
556        let offset = self.offsets.read(position).await?;
557        let section = position_to_section(position, self.items_per_section);
558
559        // Read item from data journal
560        self.data.get(section, offset).await
561    }
562
563    /// Durably persist the journal.
564    ///
565    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
566    /// before the next call to `sync()`.
567    pub async fn commit(&mut self) -> Result<(), Error> {
568        let section = self.current_section();
569        self.data.sync(section).await
570    }
571
572    /// Durably persist the journal and ensure recovery is not required on startup.
573    ///
574    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
575    pub async fn sync(&mut self) -> Result<(), Error> {
576        // Persist only the current (final) section of the data journal.
577        // All non-final sections are already persisted per Invariant #1.
578        let section = self.current_section();
579
580        // Persist both journals concurrently
581        futures::try_join!(self.data.sync(section), self.offsets.sync())?;
582
583        Ok(())
584    }
585
586    /// Remove any underlying blobs created by the journal.
587    ///
588    /// This destroys both the data journal and the offsets journal.
589    pub async fn destroy(self) -> Result<(), Error> {
590        self.data.destroy().await?;
591        self.offsets.destroy().await
592    }
593
594    /// Return the section number where the next append will write.
595    const fn current_section(&self) -> u64 {
596        position_to_section(self.size, self.items_per_section)
597    }
598
599    /// Align the offsets journal and data journal to be consistent in case a crash occured
600    /// on a previous run and left the journals in an inconsistent state.
601    ///
602    /// The data journal is the source of truth. This function scans it to determine
603    /// what SHOULD be in the offsets journal, then fixes any mismatches.
604    ///
605    /// # Returns
606    ///
607    /// Returns `(oldest_retained_pos, size)` for the contiguous journal.
608    async fn align_journals(
609        data: &mut variable::Journal<E, V>,
610        offsets: &mut fixed::Journal<E, u64>,
611        items_per_section: u64,
612    ) -> Result<(u64, u64), Error> {
613        // === Handle empty data journal case ===
614        let items_in_last_section = match data.newest_section() {
615            Some(last_section) => {
616                let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
617                futures::pin_mut!(stream);
618                let mut count = 0u64;
619                while let Some(result) = stream.next().await {
620                    result?; // Propagate replay errors (corruption, etc.)
621                    count += 1;
622                }
623                count
624            }
625            None => 0,
626        };
627
628        // Data journal is empty if there are no sections or if there is one section and it has no items.
629        // The latter should only occur if a crash occured after opening a data journal blob but
630        // before writing to it.
631        let data_empty =
632            data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
633        if data_empty {
634            let size = offsets.size();
635
636            if !data.is_empty() {
637                // A section exists but contains 0 items. This can happen in two cases:
638                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
639                // 2. First append crash: we opened the first section blob but crashed before writing to it
640                // In both cases, calculate target position from the first remaining section
641                // SAFETY: data is non-empty (checked above)
642                let first_section = data.oldest_section().unwrap();
643                let target_pos = first_section * items_per_section;
644
645                info!("crash repair: rewinding offsets from {size} to {target_pos}");
646                offsets.rewind(target_pos).await?;
647                offsets.sync().await?;
648                return Ok((target_pos, target_pos));
649            }
650
651            // data.blobs is empty. This can happen in two cases:
652            // 1. We completely pruned the data journal but crashed before pruning
653            //    the offsets journal.
654            // 2. The data journal was never opened.
655            if let Some(oldest) = offsets.oldest_retained_pos() {
656                if oldest < size {
657                    // Offsets has unpruned entries but data is gone - align by pruning
658                    info!("crash repair: pruning offsets to {size} (prune-all crash)");
659                    offsets.prune(size).await?;
660                    offsets.sync().await?;
661                }
662            }
663
664            return Ok((size, size));
665        }
666
667        // === Handle non-empty data journal case ===
668        let (data_oldest_pos, data_size) = {
669            // SAFETY: data is non-empty (empty case returns early above)
670            let first_section = data.oldest_section().unwrap();
671            let last_section = data.newest_section().unwrap();
672
673            let oldest_pos = first_section * items_per_section;
674
675            // Invariant 1 on `Variable` guarantees that all sections except possibly the last
676            // are full. Therefore, the size of the journal is the number of items in the last
677            // section plus the number of items in the other sections.
678            let size = (last_section * items_per_section) + items_in_last_section;
679            (oldest_pos, size)
680        };
681        assert_ne!(
682            data_oldest_pos, data_size,
683            "data journal expected to be non-empty"
684        );
685
686        // Align pruning state. We always prune the data journal before the offsets journal,
687        // so we validate that invariant and repair crash faults or detect corruption.
688        match offsets.oldest_retained_pos() {
689            Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
690                // Offsets behind on pruning -- prune to catch up
691                info!("crash repair: pruning offsets journal to {data_oldest_pos}");
692                offsets.prune(data_oldest_pos).await?;
693            }
694            Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
695                return Err(Error::Corruption(format!(
696                    "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
697                )));
698            }
699            Some(_) => {
700                // Both journals are pruned to the same position.
701            }
702            None if data_oldest_pos > 0 => {
703                // Offsets journal is empty (size == oldest_retained_pos).
704                // This can happen if we pruned all data, then appended new data, persisted the
705                // data journal, but crashed before persisting the offsets journal.
706                // We can recover if offsets.size() matches data_oldest_pos (proper pruning).
707                let offsets_size = offsets.size();
708                if offsets_size != data_oldest_pos {
709                    return Err(Error::Corruption(format!(
710                        "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
711                    )));
712                }
713                info!("crash repair: offsets journal empty at {data_oldest_pos}");
714            }
715            None => {
716                // Both journals are empty/fully pruned.
717            }
718        }
719
720        let offsets_size = offsets.size();
721        if offsets_size > data_size {
722            // We must have crashed after writing offsets but before writing data.
723            info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
724            offsets.rewind(data_size).await?;
725        } else if offsets_size < data_size {
726            // We must have crashed after writing the data journal but before writing the offsets
727            // journal.
728            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
729        }
730
731        assert_eq!(offsets.size(), data_size);
732        // Oldest retained position is always Some because the data journal is non-empty.
733        assert_eq!(offsets.oldest_retained_pos(), Some(data_oldest_pos));
734
735        offsets.sync().await?;
736        Ok((data_oldest_pos, data_size))
737    }
738
739    /// Rebuild missing offset entries by replaying the data journal and
740    /// appending the missing entries to the offsets journal.
741    ///
742    /// The data journal is the source of truth. This function brings the offsets
743    /// journal up to date by replaying data items and indexing their positions.
744    ///
745    /// # Warning
746    ///
747    /// - Panics if data journal is empty
748    /// - Panics if `offsets_size` >= `data.size()`
749    async fn add_missing_offsets(
750        data: &variable::Journal<E, V>,
751        offsets: &mut fixed::Journal<E, u64>,
752        offsets_size: u64,
753        items_per_section: u64,
754    ) -> Result<(), Error> {
755        assert!(
756            !data.is_empty(),
757            "rebuild_offsets called with empty data journal"
758        );
759
760        // Find where to start replaying
761        let (start_section, resume_offset, skip_first) =
762            if let Some(oldest) = offsets.oldest_retained_pos() {
763                if oldest < offsets_size {
764                    // Offsets has items -- resume from last indexed position
765                    let last_offset = offsets.read(offsets_size - 1).await?;
766                    let last_section = position_to_section(offsets_size - 1, items_per_section);
767                    (last_section, last_offset, true)
768                } else {
769                    // Offsets fully pruned but data has items -- start from first data section
770                    // SAFETY: data is non-empty (checked above)
771                    let first_section = data.oldest_section().unwrap();
772                    (first_section, 0, false)
773                }
774            } else {
775                // Offsets empty -- start from first data section
776                // SAFETY: data is non-empty (checked above)
777                let first_section = data.oldest_section().unwrap();
778                (first_section, 0, false)
779            };
780
781        // Replay data journal from start position through the end and index all items.
782        // The data journal is the source of truth, so we consume the entire stream.
783        // (replay streams from start_section onwards through all subsequent sections)
784        let stream = data
785            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
786            .await?;
787        futures::pin_mut!(stream);
788
789        let mut skipped_first = false;
790        while let Some(result) = stream.next().await {
791            let (_section, offset, _size, _item) = result?;
792
793            // Skip first item if resuming from last indexed offset
794            if skip_first && !skipped_first {
795                skipped_first = true;
796                continue;
797            }
798
799            offsets.append(offset).await?;
800        }
801
802        Ok(())
803    }
804}
805
806// Implement Contiguous trait for variable-length items
807impl<E: Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
808    type Item = V;
809
810    fn size(&self) -> u64 {
811        Self::size(self)
812    }
813
814    fn oldest_retained_pos(&self) -> Option<u64> {
815        Self::oldest_retained_pos(self)
816    }
817
818    fn pruning_boundary(&self) -> u64 {
819        Self::pruning_boundary(self)
820    }
821
822    async fn replay(
823        &self,
824        start_pos: u64,
825        buffer: NonZeroUsize,
826    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
827        Self::replay(self, start_pos, buffer).await
828    }
829
830    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
831        Self::read(self, position).await
832    }
833}
834
835impl<E: Storage + Metrics, V: CodecShared> MutableContiguous for Journal<E, V> {
836    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
837        Self::append(self, item).await
838    }
839
840    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
841        Self::prune(self, min_position).await
842    }
843
844    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
845        Self::rewind(self, size).await
846    }
847}
848
849impl<E: Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
850    type Error = Error;
851
852    async fn commit(&mut self) -> Result<(), Error> {
853        Self::commit(self).await
854    }
855
856    async fn sync(&mut self) -> Result<(), Error> {
857        Self::sync(self).await
858    }
859
860    async fn destroy(self) -> Result<(), Error> {
861        Self::destroy(self).await
862    }
863}
864#[cfg(test)]
865mod tests {
866    use super::*;
867    use crate::journal::contiguous::tests::run_contiguous_tests;
868    use commonware_macros::test_traced;
869    use commonware_runtime::{buffer::pool::PoolRef, deterministic, Runner};
870    use commonware_utils::{NZUsize, NZU16, NZU64};
871    use futures::FutureExt as _;
872    use std::num::NonZeroU16;
873
874    // Use some jank sizes to exercise boundary conditions.
875    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
876    const PAGE_CACHE_SIZE: usize = 2;
877    // Larger page sizes for tests that need more buffer space.
878    const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
879    const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
880
881    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
882    ///
883    /// When the offsets partition is completely lost and the data has been pruned, we cannot
884    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
885    /// This is a genuine external failure that should be detected and reported clearly.
886    #[test_traced]
887    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
888        let executor = deterministic::Runner::default();
889        executor.start(|context| async move {
890            let cfg = Config {
891                partition: "offsets_loss_after_prune".to_string(),
892                items_per_section: NZU64!(10),
893                compression: None,
894                codec_config: (),
895                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
896                write_buffer: NZUsize!(1024),
897            };
898
899            // === Phase 1: Create journal with data and prune ===
900            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
901                .await
902                .unwrap();
903
904            // Append 40 items across 4 sections (0-3)
905            for i in 0..40u64 {
906                journal.append(i * 100).await.unwrap();
907            }
908
909            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
910            journal.prune(20).await.unwrap();
911            assert_eq!(journal.oldest_retained_pos(), Some(20));
912            assert_eq!(journal.size(), 40);
913
914            journal.sync().await.unwrap();
915            drop(journal);
916
917            // === Phase 2: Simulate complete offsets partition loss ===
918            context
919                .remove(&cfg.offsets_partition(), None)
920                .await
921                .expect("Failed to remove offsets partition");
922
923            // === Phase 3: Verify this is detected as unrecoverable ===
924            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
925            assert!(matches!(result, Err(Error::Corruption(_))));
926        });
927    }
928
929    /// Test that init aligns state when data is pruned/lost but offsets survives.
930    ///
931    /// This handles both:
932    /// 1. Crash during prune-all (data pruned, offsets not yet)
933    /// 2. External data partition loss
934    ///
935    /// In both cases, we align by pruning offsets to match.
936    #[test_traced]
937    fn test_variable_align_data_offsets_mismatch() {
938        let executor = deterministic::Runner::default();
939        executor.start(|context| async move {
940            let cfg = Config {
941                partition: "data_loss_test".to_string(),
942                items_per_section: NZU64!(10),
943                compression: None,
944                codec_config: (),
945                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
946                write_buffer: NZUsize!(1024),
947            };
948
949            // === Setup: Create journal with data ===
950            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
951                .await
952                .unwrap();
953
954            // Append 20 items across 2 sections
955            for i in 0..20u64 {
956                variable.append(i * 100).await.unwrap();
957            }
958
959            variable.sync().await.unwrap();
960            drop(variable);
961
962            // === Simulate data loss: Delete data partition but keep offsets ===
963            context
964                .remove(&cfg.data_partition(), None)
965                .await
966                .expect("Failed to remove data partition");
967
968            // === Verify init aligns the mismatch ===
969            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
970                .await
971                .expect("Should align offsets to match empty data");
972
973            // Size should be preserved
974            assert_eq!(journal.size(), 20);
975
976            // But no items remain (both journals pruned)
977            assert_eq!(journal.oldest_retained_pos(), None);
978
979            // All reads should fail with ItemPruned
980            for i in 0..20 {
981                assert!(matches!(
982                    journal.read(i).await,
983                    Err(crate::journal::Error::ItemPruned(_))
984                ));
985            }
986
987            // Can append new data starting at position 20
988            let pos = journal.append(999).await.unwrap();
989            assert_eq!(pos, 20);
990            assert_eq!(journal.read(20).await.unwrap(), 999);
991
992            journal.destroy().await.unwrap();
993        });
994    }
995
996    #[test_traced]
997    fn test_variable_contiguous() {
998        let executor = deterministic::Runner::default();
999        executor.start(|context| async move {
1000            run_contiguous_tests(move |test_name: String| {
1001                let context = context.clone();
1002                async move {
1003                    Journal::<_, u64>::init(
1004                        context,
1005                        Config {
1006                            partition: format!("generic_test_{test_name}"),
1007                            items_per_section: NZU64!(10),
1008                            compression: None,
1009                            codec_config: (),
1010                            buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1011                            write_buffer: NZUsize!(1024),
1012                        },
1013                    )
1014                    .await
1015                }
1016                .boxed()
1017            })
1018            .await;
1019        });
1020    }
1021
1022    /// Test multiple sequential prunes with Variable-specific guarantees.
1023    #[test_traced]
1024    fn test_variable_multiple_sequential_prunes() {
1025        let executor = deterministic::Runner::default();
1026        executor.start(|context| async move {
1027            let cfg = Config {
1028                partition: "sequential_prunes".to_string(),
1029                items_per_section: NZU64!(10),
1030                compression: None,
1031                codec_config: (),
1032                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1033                write_buffer: NZUsize!(1024),
1034            };
1035
1036            let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1037
1038            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1039            for i in 0..40u64 {
1040                journal.append(i * 100).await.unwrap();
1041            }
1042
1043            // Initial state: all items accessible
1044            assert_eq!(journal.oldest_retained_pos(), Some(0));
1045            assert_eq!(journal.size(), 40);
1046
1047            // First prune: remove section 0 (positions 0-9)
1048            let pruned = journal.prune(10).await.unwrap();
1049            assert!(pruned);
1050
1051            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1052            let oldest = journal.oldest_retained_pos().unwrap();
1053            assert_eq!(oldest, 10);
1054
1055            // Items 0-9 should be pruned, 10+ should be accessible
1056            assert!(matches!(
1057                journal.read(0).await,
1058                Err(crate::journal::Error::ItemPruned(_))
1059            ));
1060            assert_eq!(journal.read(10).await.unwrap(), 1000);
1061            assert_eq!(journal.read(19).await.unwrap(), 1900);
1062
1063            // Second prune: remove section 1 (positions 10-19)
1064            let pruned = journal.prune(20).await.unwrap();
1065            assert!(pruned);
1066
1067            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1068            let oldest = journal.oldest_retained_pos().unwrap();
1069            assert_eq!(oldest, 20);
1070
1071            // Items 0-19 should be pruned, 20+ should be accessible
1072            assert!(matches!(
1073                journal.read(10).await,
1074                Err(crate::journal::Error::ItemPruned(_))
1075            ));
1076            assert!(matches!(
1077                journal.read(19).await,
1078                Err(crate::journal::Error::ItemPruned(_))
1079            ));
1080            assert_eq!(journal.read(20).await.unwrap(), 2000);
1081            assert_eq!(journal.read(29).await.unwrap(), 2900);
1082
1083            // Third prune: remove section 2 (positions 20-29)
1084            let pruned = journal.prune(30).await.unwrap();
1085            assert!(pruned);
1086
1087            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1088            let oldest = journal.oldest_retained_pos().unwrap();
1089            assert_eq!(oldest, 30);
1090
1091            // Items 0-29 should be pruned, 30+ should be accessible
1092            assert!(matches!(
1093                journal.read(20).await,
1094                Err(crate::journal::Error::ItemPruned(_))
1095            ));
1096            assert!(matches!(
1097                journal.read(29).await,
1098                Err(crate::journal::Error::ItemPruned(_))
1099            ));
1100            assert_eq!(journal.read(30).await.unwrap(), 3000);
1101            assert_eq!(journal.read(39).await.unwrap(), 3900);
1102
1103            // Size should still be 40 (pruning doesn't affect size)
1104            assert_eq!(journal.size(), 40);
1105
1106            journal.destroy().await.unwrap();
1107        });
1108    }
1109
1110    /// Test that pruning all data and re-initializing preserves positions.
1111    #[test_traced]
1112    fn test_variable_prune_all_then_reinit() {
1113        let executor = deterministic::Runner::default();
1114        executor.start(|context| async move {
1115            let cfg = Config {
1116                partition: "prune_all_reinit".to_string(),
1117                items_per_section: NZU64!(10),
1118                compression: None,
1119                codec_config: (),
1120                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1121                write_buffer: NZUsize!(1024),
1122            };
1123
1124            // === Phase 1: Create journal and append data ===
1125            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1126                .await
1127                .unwrap();
1128
1129            for i in 0..100u64 {
1130                journal.append(i * 100).await.unwrap();
1131            }
1132
1133            assert_eq!(journal.size(), 100);
1134            assert_eq!(journal.oldest_retained_pos(), Some(0));
1135
1136            // === Phase 2: Prune all data ===
1137            let pruned = journal.prune(100).await.unwrap();
1138            assert!(pruned);
1139
1140            // All data is pruned - no items remain
1141            assert_eq!(journal.size(), 100);
1142            assert_eq!(journal.oldest_retained_pos(), None);
1143
1144            // All reads should fail with ItemPruned
1145            for i in 0..100 {
1146                assert!(matches!(
1147                    journal.read(i).await,
1148                    Err(crate::journal::Error::ItemPruned(_))
1149                ));
1150            }
1151
1152            journal.sync().await.unwrap();
1153            drop(journal);
1154
1155            // === Phase 3: Re-init and verify position preserved ===
1156            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1157                .await
1158                .unwrap();
1159
1160            // Size should be preserved, but no items remain
1161            assert_eq!(journal.size(), 100);
1162            assert_eq!(journal.oldest_retained_pos(), None);
1163
1164            // All reads should still fail
1165            for i in 0..100 {
1166                assert!(matches!(
1167                    journal.read(i).await,
1168                    Err(crate::journal::Error::ItemPruned(_))
1169                ));
1170            }
1171
1172            // === Phase 4: Append new data ===
1173            // Next append should get position 100
1174            journal.append(10000).await.unwrap();
1175            assert_eq!(journal.size(), 101);
1176            // Now we have one item at position 100
1177            assert_eq!(journal.oldest_retained_pos(), Some(100));
1178
1179            // Can read the new item
1180            assert_eq!(journal.read(100).await.unwrap(), 10000);
1181
1182            // Old positions still fail
1183            assert!(matches!(
1184                journal.read(99).await,
1185                Err(crate::journal::Error::ItemPruned(_))
1186            ));
1187
1188            journal.destroy().await.unwrap();
1189        });
1190    }
1191
1192    /// Test recovery from crash after data journal pruned but before offsets journal.
1193    #[test_traced]
1194    fn test_variable_recovery_prune_crash_offsets_behind() {
1195        let executor = deterministic::Runner::default();
1196        executor.start(|context| async move {
1197            // === Setup: Create Variable wrapper with data ===
1198            let cfg = Config {
1199                partition: "recovery_prune_crash".to_string(),
1200                items_per_section: NZU64!(10),
1201                compression: None,
1202                codec_config: (),
1203                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1204                write_buffer: NZUsize!(1024),
1205            };
1206
1207            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1208                .await
1209                .unwrap();
1210
1211            // Append 40 items across 4 sections to both journals
1212            for i in 0..40u64 {
1213                variable.append(i * 100).await.unwrap();
1214            }
1215
1216            // Prune to position 10 normally (both data and offsets journals pruned)
1217            variable.prune(10).await.unwrap();
1218            assert_eq!(variable.oldest_retained_pos(), Some(10));
1219
1220            // === Simulate crash: Prune data journal but not offsets journal ===
1221            // Manually prune data journal to section 2 (position 20)
1222            variable.data.prune(2).await.unwrap();
1223            // Offsets journal still has data from position 10-19
1224
1225            variable.sync().await.unwrap();
1226            drop(variable);
1227
1228            // === Verify recovery ===
1229            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1230                .await
1231                .unwrap();
1232
1233            // Init should auto-repair: offsets journal pruned to match data journal
1234            assert_eq!(variable.oldest_retained_pos(), Some(20));
1235            assert_eq!(variable.size(), 40);
1236
1237            // Reads before position 20 should fail (pruned from both journals)
1238            assert!(matches!(
1239                variable.read(10).await,
1240                Err(crate::journal::Error::ItemPruned(_))
1241            ));
1242
1243            // Reads at position 20+ should succeed
1244            assert_eq!(variable.read(20).await.unwrap(), 2000);
1245            assert_eq!(variable.read(39).await.unwrap(), 3900);
1246
1247            variable.destroy().await.unwrap();
1248        });
1249    }
1250
1251    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1252    ///
1253    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1254    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1255    #[test_traced]
1256    fn test_variable_recovery_offsets_ahead_corruption() {
1257        let executor = deterministic::Runner::default();
1258        executor.start(|context| async move {
1259            // === Setup: Create Variable wrapper with data ===
1260            let cfg = Config {
1261                partition: "recovery_offsets_ahead".to_string(),
1262                items_per_section: NZU64!(10),
1263                compression: None,
1264                codec_config: (),
1265                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1266                write_buffer: NZUsize!(1024),
1267            };
1268
1269            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1270                .await
1271                .unwrap();
1272
1273            // Append 40 items across 4 sections to both journals
1274            for i in 0..40u64 {
1275                variable.append(i * 100).await.unwrap();
1276            }
1277
1278            // Prune offsets journal ahead of data journal (impossible state)
1279            variable.offsets.prune(20).await.unwrap(); // Prune to position 20
1280            variable.data.prune(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1281
1282            variable.sync().await.unwrap();
1283            drop(variable);
1284
1285            // === Verify corruption detected ===
1286            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1287            assert!(matches!(result, Err(Error::Corruption(_))));
1288        });
1289    }
1290
1291    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1292    #[test_traced]
1293    fn test_variable_recovery_append_crash_offsets_behind() {
1294        let executor = deterministic::Runner::default();
1295        executor.start(|context| async move {
1296            // === Setup: Create Variable wrapper with partial data ===
1297            let cfg = Config {
1298                partition: "recovery_append_crash".to_string(),
1299                items_per_section: NZU64!(10),
1300                compression: None,
1301                codec_config: (),
1302                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1303                write_buffer: NZUsize!(1024),
1304            };
1305
1306            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1307                .await
1308                .unwrap();
1309
1310            // Append 15 items to both journals (fills section 0, partial section 1)
1311            for i in 0..15u64 {
1312                variable.append(i * 100).await.unwrap();
1313            }
1314
1315            assert_eq!(variable.size(), 15);
1316
1317            // Manually append 5 more items directly to data journal only
1318            for i in 15..20u64 {
1319                variable.data.append(1, i * 100).await.unwrap();
1320            }
1321            // Offsets journal still has only 15 entries
1322
1323            variable.sync().await.unwrap();
1324            drop(variable);
1325
1326            // === Verify recovery ===
1327            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1328                .await
1329                .unwrap();
1330
1331            // Init should rebuild offsets journal from data journal replay
1332            assert_eq!(variable.size(), 20);
1333            assert_eq!(variable.oldest_retained_pos(), Some(0));
1334
1335            // All items should be readable from both journals
1336            for i in 0..20u64 {
1337                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1338            }
1339
1340            // Offsets journal should be fully rebuilt to match data journal
1341            assert_eq!(variable.offsets.size(), 20);
1342
1343            variable.destroy().await.unwrap();
1344        });
1345    }
1346
1347    /// Test recovery from multiple prune operations with crash.
1348    #[test_traced]
1349    fn test_variable_recovery_multiple_prunes_crash() {
1350        let executor = deterministic::Runner::default();
1351        executor.start(|context| async move {
1352            // === Setup: Create Variable wrapper with data ===
1353            let cfg = Config {
1354                partition: "recovery_multiple_prunes".to_string(),
1355                items_per_section: NZU64!(10),
1356                compression: None,
1357                codec_config: (),
1358                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1359                write_buffer: NZUsize!(1024),
1360            };
1361
1362            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1363                .await
1364                .unwrap();
1365
1366            // Append 50 items across 5 sections to both journals
1367            for i in 0..50u64 {
1368                variable.append(i * 100).await.unwrap();
1369            }
1370
1371            // Prune to position 10 normally (both data and offsets journals pruned)
1372            variable.prune(10).await.unwrap();
1373            assert_eq!(variable.oldest_retained_pos(), Some(10));
1374
1375            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1376            // Manually prune data journal to section 3 (position 30)
1377            variable.data.prune(3).await.unwrap();
1378            // Offsets journal still thinks oldest is position 10
1379
1380            variable.sync().await.unwrap();
1381            drop(variable);
1382
1383            // === Verify recovery ===
1384            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1385                .await
1386                .unwrap();
1387
1388            // Init should auto-repair: offsets journal pruned to match data journal
1389            assert_eq!(variable.oldest_retained_pos(), Some(30));
1390            assert_eq!(variable.size(), 50);
1391
1392            // Reads before position 30 should fail (pruned from both journals)
1393            assert!(matches!(
1394                variable.read(10).await,
1395                Err(crate::journal::Error::ItemPruned(_))
1396            ));
1397            assert!(matches!(
1398                variable.read(20).await,
1399                Err(crate::journal::Error::ItemPruned(_))
1400            ));
1401
1402            // Reads at position 30+ should succeed
1403            assert_eq!(variable.read(30).await.unwrap(), 3000);
1404            assert_eq!(variable.read(49).await.unwrap(), 4900);
1405
1406            variable.destroy().await.unwrap();
1407        });
1408    }
1409
1410    /// Test recovery from crash during rewind operation.
1411    ///
1412    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1413    /// This creates a situation where offsets journal has been rewound but data journal still
1414    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1415    /// offsets index across all sections to match the data journal.
1416    #[test_traced]
1417    fn test_variable_recovery_rewind_crash_multi_section() {
1418        let executor = deterministic::Runner::default();
1419        executor.start(|context| async move {
1420            // === Setup: Create Variable wrapper with data across multiple sections ===
1421            let cfg = Config {
1422                partition: "recovery_rewind_crash".to_string(),
1423                items_per_section: NZU64!(10),
1424                compression: None,
1425                codec_config: (),
1426                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1427                write_buffer: NZUsize!(1024),
1428            };
1429
1430            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1431                .await
1432                .unwrap();
1433
1434            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1435            for i in 0..25u64 {
1436                variable.append(i * 100).await.unwrap();
1437            }
1438
1439            assert_eq!(variable.size(), 25);
1440
1441            // === Simulate crash during rewind(5) ===
1442            // Rewind offsets journal to size 5 (keeps positions 0-4)
1443            variable.offsets.rewind(5).await.unwrap();
1444            // CRASH before data.rewind() completes - data still has all 3 sections
1445
1446            variable.sync().await.unwrap();
1447            drop(variable);
1448
1449            // === Verify recovery ===
1450            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1451                .await
1452                .unwrap();
1453
1454            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1455            assert_eq!(variable.size(), 25);
1456            assert_eq!(variable.oldest_retained_pos(), Some(0));
1457
1458            // All items should be readable - offsets rebuilt correctly across all sections
1459            for i in 0..25u64 {
1460                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1461            }
1462
1463            // Verify offsets journal fully rebuilt
1464            assert_eq!(variable.offsets.size(), 25);
1465
1466            // Verify next append gets position 25
1467            let pos = variable.append(2500).await.unwrap();
1468            assert_eq!(pos, 25);
1469            assert_eq!(variable.read(25).await.unwrap(), 2500);
1470
1471            variable.destroy().await.unwrap();
1472        });
1473    }
1474
1475    /// Test recovery from crash after data sync but before offsets sync when journal was
1476    /// previously emptied by pruning.
1477    #[test_traced]
1478    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1479        let executor = deterministic::Runner::default();
1480        executor.start(|context| async move {
1481            let cfg = Config {
1482                partition: "recovery_empty_after_prune".to_string(),
1483                items_per_section: NZU64!(10),
1484                compression: None,
1485                codec_config: (),
1486                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1487                write_buffer: NZUsize!(1024),
1488            };
1489
1490            // === Phase 1: Create journal with one full section ===
1491            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1492                .await
1493                .unwrap();
1494
1495            // Append 10 items (positions 0-9), fills section 0
1496            for i in 0..10u64 {
1497                journal.append(i * 100).await.unwrap();
1498            }
1499            assert_eq!(journal.size(), 10);
1500            assert_eq!(journal.oldest_retained_pos(), Some(0));
1501
1502            // === Phase 2: Prune to create empty journal ===
1503            journal.prune(10).await.unwrap();
1504            assert_eq!(journal.size(), 10);
1505            assert_eq!(journal.oldest_retained_pos(), None); // Empty!
1506
1507            // === Phase 3: Append directly to data journal to simulate crash ===
1508            // Manually append to data journal only (bypassing Variable's append logic)
1509            // This simulates the case where data was synced but offsets wasn't
1510            for i in 10..20u64 {
1511                journal.data.append(1, i * 100).await.unwrap();
1512            }
1513            // Sync the data journal (section 1)
1514            journal.data.sync(1).await.unwrap();
1515            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1516
1517            // Close without syncing offsets
1518            drop(journal);
1519
1520            // === Phase 4: Verify recovery succeeds ===
1521            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1522                .await
1523                .expect("Should recover from crash after data sync but before offsets sync");
1524
1525            // All data should be recovered
1526            assert_eq!(journal.size(), 20);
1527            assert_eq!(journal.oldest_retained_pos(), Some(10));
1528
1529            // All items from position 10-19 should be readable
1530            for i in 10..20u64 {
1531                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1532            }
1533
1534            // Items 0-9 should be pruned
1535            for i in 0..10 {
1536                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1537            }
1538
1539            journal.destroy().await.unwrap();
1540        });
1541    }
1542
1543    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1544    #[test_traced]
1545    fn test_variable_concurrent_sync_recovery() {
1546        let executor = deterministic::Runner::default();
1547        executor.start(|context| async move {
1548            let cfg = Config {
1549                partition: "concurrent_sync_recovery".to_string(),
1550                items_per_section: NZU64!(10),
1551                compression: None,
1552                codec_config: (),
1553                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1554                write_buffer: NZUsize!(1024),
1555            };
1556
1557            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1558                .await
1559                .unwrap();
1560
1561            // Append items across a section boundary
1562            for i in 0..15u64 {
1563                journal.append(i * 100).await.unwrap();
1564            }
1565
1566            // Manually sync only data to simulate crash during concurrent sync
1567            journal.commit().await.unwrap();
1568
1569            // Simulate a crash (offsets not synced)
1570            drop(journal);
1571
1572            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1573                .await
1574                .unwrap();
1575
1576            // Data should be intact and offsets rebuilt
1577            assert_eq!(journal.size(), 15);
1578            for i in 0..15u64 {
1579                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1580            }
1581
1582            journal.destroy().await.unwrap();
1583        });
1584    }
1585
1586    #[test_traced]
1587    fn test_init_at_size_zero() {
1588        let executor = deterministic::Runner::default();
1589        executor.start(|context| async move {
1590            let cfg = Config {
1591                partition: "init_at_size_zero".to_string(),
1592                items_per_section: NZU64!(5),
1593                compression: None,
1594                codec_config: (),
1595                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1596                write_buffer: NZUsize!(1024),
1597            };
1598
1599            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1600                .await
1601                .unwrap();
1602
1603            // Size should be 0
1604            assert_eq!(journal.size(), 0);
1605
1606            // No oldest retained position (empty journal)
1607            assert_eq!(journal.oldest_retained_pos(), None);
1608
1609            // Next append should get position 0
1610            let pos = journal.append(100).await.unwrap();
1611            assert_eq!(pos, 0);
1612            assert_eq!(journal.size(), 1);
1613            assert_eq!(journal.read(0).await.unwrap(), 100);
1614
1615            journal.destroy().await.unwrap();
1616        });
1617    }
1618
1619    #[test_traced]
1620    fn test_init_at_size_section_boundary() {
1621        let executor = deterministic::Runner::default();
1622        executor.start(|context| async move {
1623            let cfg = Config {
1624                partition: "init_at_size_boundary".to_string(),
1625                items_per_section: NZU64!(5),
1626                compression: None,
1627                codec_config: (),
1628                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1629                write_buffer: NZUsize!(1024),
1630            };
1631
1632            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1633            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1634                .await
1635                .unwrap();
1636
1637            // Size should be 10
1638            assert_eq!(journal.size(), 10);
1639
1640            // No data yet, so no oldest retained position
1641            assert_eq!(journal.oldest_retained_pos(), None);
1642
1643            // Next append should get position 10
1644            let pos = journal.append(1000).await.unwrap();
1645            assert_eq!(pos, 10);
1646            assert_eq!(journal.size(), 11);
1647            assert_eq!(journal.read(10).await.unwrap(), 1000);
1648
1649            // Can continue appending
1650            let pos = journal.append(1001).await.unwrap();
1651            assert_eq!(pos, 11);
1652            assert_eq!(journal.read(11).await.unwrap(), 1001);
1653
1654            journal.destroy().await.unwrap();
1655        });
1656    }
1657
1658    #[test_traced]
1659    fn test_init_at_size_mid_section() {
1660        let executor = deterministic::Runner::default();
1661        executor.start(|context| async move {
1662            let cfg = Config {
1663                partition: "init_at_size_mid".to_string(),
1664                items_per_section: NZU64!(5),
1665                compression: None,
1666                codec_config: (),
1667                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1668                write_buffer: NZUsize!(1024),
1669            };
1670
1671            // Initialize at position 7 (middle of section 1 with items_per_section=5)
1672            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1673                .await
1674                .unwrap();
1675
1676            // Size should be 7
1677            assert_eq!(journal.size(), 7);
1678
1679            // No data yet, so no oldest retained position
1680            assert_eq!(journal.oldest_retained_pos(), None);
1681
1682            // Next append should get position 7
1683            let pos = journal.append(700).await.unwrap();
1684            assert_eq!(pos, 7);
1685            assert_eq!(journal.size(), 8);
1686            assert_eq!(journal.read(7).await.unwrap(), 700);
1687
1688            journal.destroy().await.unwrap();
1689        });
1690    }
1691
1692    #[test_traced]
1693    fn test_init_at_size_persistence() {
1694        let executor = deterministic::Runner::default();
1695        executor.start(|context| async move {
1696            let cfg = Config {
1697                partition: "init_at_size_persist".to_string(),
1698                items_per_section: NZU64!(5),
1699                compression: None,
1700                codec_config: (),
1701                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1702                write_buffer: NZUsize!(1024),
1703            };
1704
1705            // Initialize at position 15
1706            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1707                .await
1708                .unwrap();
1709
1710            // Append some items
1711            for i in 0..5u64 {
1712                let pos = journal.append(1500 + i).await.unwrap();
1713                assert_eq!(pos, 15 + i);
1714            }
1715
1716            assert_eq!(journal.size(), 20);
1717
1718            // Sync and reopen
1719            journal.sync().await.unwrap();
1720            drop(journal);
1721
1722            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1723                .await
1724                .unwrap();
1725
1726            // Size and data should be preserved
1727            assert_eq!(journal.size(), 20);
1728            assert_eq!(journal.oldest_retained_pos(), Some(15));
1729
1730            // Verify data
1731            for i in 0..5u64 {
1732                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1733            }
1734
1735            // Can continue appending
1736            let pos = journal.append(9999).await.unwrap();
1737            assert_eq!(pos, 20);
1738            assert_eq!(journal.read(20).await.unwrap(), 9999);
1739
1740            journal.destroy().await.unwrap();
1741        });
1742    }
1743
1744    #[test_traced]
1745    fn test_init_at_size_large_offset() {
1746        let executor = deterministic::Runner::default();
1747        executor.start(|context| async move {
1748            let cfg = Config {
1749                partition: "init_at_size_large".to_string(),
1750                items_per_section: NZU64!(5),
1751                compression: None,
1752                codec_config: (),
1753                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1754                write_buffer: NZUsize!(1024),
1755            };
1756
1757            // Initialize at a large position (position 1000)
1758            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1759                .await
1760                .unwrap();
1761
1762            assert_eq!(journal.size(), 1000);
1763            // No data yet, so no oldest retained position
1764            assert_eq!(journal.oldest_retained_pos(), None);
1765
1766            // Next append should get position 1000
1767            let pos = journal.append(100000).await.unwrap();
1768            assert_eq!(pos, 1000);
1769            assert_eq!(journal.read(1000).await.unwrap(), 100000);
1770
1771            journal.destroy().await.unwrap();
1772        });
1773    }
1774
1775    #[test_traced]
1776    fn test_init_at_size_prune_and_append() {
1777        let executor = deterministic::Runner::default();
1778        executor.start(|context| async move {
1779            let cfg = Config {
1780                partition: "init_at_size_prune".to_string(),
1781                items_per_section: NZU64!(5),
1782                compression: None,
1783                codec_config: (),
1784                buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1785                write_buffer: NZUsize!(1024),
1786            };
1787
1788            // Initialize at position 20
1789            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1790                .await
1791                .unwrap();
1792
1793            // Append items 20-29
1794            for i in 0..10u64 {
1795                journal.append(2000 + i).await.unwrap();
1796            }
1797
1798            assert_eq!(journal.size(), 30);
1799
1800            // Prune to position 25
1801            journal.prune(25).await.unwrap();
1802
1803            assert_eq!(journal.size(), 30);
1804            assert_eq!(journal.oldest_retained_pos(), Some(25));
1805
1806            // Verify remaining items are readable
1807            for i in 25..30u64 {
1808                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1809            }
1810
1811            // Continue appending
1812            let pos = journal.append(3000).await.unwrap();
1813            assert_eq!(pos, 30);
1814
1815            journal.destroy().await.unwrap();
1816        });
1817    }
1818
1819    /// Test `init_sync` when there is no existing data on disk.
1820    #[test_traced]
1821    fn test_init_sync_no_existing_data() {
1822        let executor = deterministic::Runner::default();
1823        executor.start(|context| async move {
1824            let cfg = Config {
1825                partition: "test_fresh_start".into(),
1826                items_per_section: NZU64!(5),
1827                compression: None,
1828                codec_config: (),
1829                write_buffer: NZUsize!(1024),
1830                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1831            };
1832
1833            // Initialize journal with sync boundaries when no existing data exists
1834            let lower_bound = 10;
1835            let upper_bound = 26;
1836            let mut journal =
1837                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1838                    .await
1839                    .expect("Failed to initialize journal with sync boundaries");
1840
1841            assert_eq!(journal.size(), lower_bound);
1842            assert_eq!(journal.oldest_retained_pos(), None);
1843
1844            // Append items using the contiguous API
1845            let pos1 = journal.append(42u64).await.unwrap();
1846            assert_eq!(pos1, lower_bound);
1847            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1848
1849            let pos2 = journal.append(43u64).await.unwrap();
1850            assert_eq!(pos2, lower_bound + 1);
1851            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1852
1853            journal.destroy().await.unwrap();
1854        });
1855    }
1856
1857    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
1858    #[test_traced]
1859    fn test_init_sync_existing_data_overlap() {
1860        let executor = deterministic::Runner::default();
1861        executor.start(|context| async move {
1862            let cfg = Config {
1863                partition: "test_overlap".into(),
1864                items_per_section: NZU64!(5),
1865                compression: None,
1866                codec_config: (),
1867                write_buffer: NZUsize!(1024),
1868                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1869            };
1870
1871            // Create initial journal with data in multiple sections
1872            let mut journal =
1873                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1874                    .await
1875                    .expect("Failed to create initial journal");
1876
1877            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1878            for i in 0..20u64 {
1879                journal.append(i * 100).await.unwrap();
1880            }
1881            journal.sync().await.unwrap();
1882            drop(journal);
1883
1884            // Initialize with sync boundaries that overlap with existing data
1885            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
1886            let lower_bound = 8;
1887            let upper_bound = 31;
1888            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1889                context.clone(),
1890                cfg.clone(),
1891                lower_bound..upper_bound,
1892            )
1893            .await
1894            .expect("Failed to initialize journal with overlap");
1895
1896            assert_eq!(journal.size(), 20);
1897
1898            // Verify oldest retained is pruned to lower_bound's section boundary (5)
1899            let oldest = journal.oldest_retained_pos();
1900            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1901
1902            // Verify data integrity: positions before 5 are pruned
1903            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1904            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1905
1906            // Positions 5-19 should be accessible
1907            assert_eq!(journal.read(5).await.unwrap(), 500);
1908            assert_eq!(journal.read(8).await.unwrap(), 800);
1909            assert_eq!(journal.read(19).await.unwrap(), 1900);
1910
1911            // Position 20+ should not exist yet
1912            assert!(matches!(
1913                journal.read(20).await,
1914                Err(Error::ItemOutOfRange(_))
1915            ));
1916
1917            // Assert journal can accept new items
1918            let pos = journal.append(999).await.unwrap();
1919            assert_eq!(pos, 20);
1920            assert_eq!(journal.read(20).await.unwrap(), 999);
1921
1922            journal.destroy().await.unwrap();
1923        });
1924    }
1925
1926    /// Test `init_sync` with invalid parameters.
1927    #[should_panic]
1928    #[test_traced]
1929    fn test_init_sync_invalid_parameters() {
1930        let executor = deterministic::Runner::default();
1931        executor.start(|context| async move {
1932            let cfg = Config {
1933                partition: "test_invalid".into(),
1934                items_per_section: NZU64!(5),
1935                compression: None,
1936                codec_config: (),
1937                write_buffer: NZUsize!(1024),
1938                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1939            };
1940
1941            #[allow(clippy::reversed_empty_ranges)]
1942            let _result = Journal::<deterministic::Context, u64>::init_sync(
1943                context.clone(),
1944                cfg,
1945                10..5, // invalid range: lower > upper
1946            )
1947            .await;
1948        });
1949    }
1950
1951    /// Test `init_sync` when existing data exactly matches the sync range.
1952    #[test_traced]
1953    fn test_init_sync_existing_data_exact_match() {
1954        let executor = deterministic::Runner::default();
1955        executor.start(|context| async move {
1956            let items_per_section = NZU64!(5);
1957            let cfg = Config {
1958                partition: "test_exact_match".to_string(),
1959                items_per_section,
1960                compression: None,
1961                codec_config: (),
1962                write_buffer: NZUsize!(1024),
1963                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1964            };
1965
1966            // Create initial journal with data exactly matching sync range
1967            let mut journal =
1968                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1969                    .await
1970                    .expect("Failed to create initial journal");
1971
1972            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1973            for i in 0..20u64 {
1974                journal.append(i * 100).await.unwrap();
1975            }
1976            journal.sync().await.unwrap();
1977            drop(journal);
1978
1979            // Initialize with sync boundaries that exactly match existing data
1980            let lower_bound = 5; // section 1
1981            let upper_bound = 20; // section 3
1982            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1983                context.clone(),
1984                cfg.clone(),
1985                lower_bound..upper_bound,
1986            )
1987            .await
1988            .expect("Failed to initialize journal with exact match");
1989
1990            assert_eq!(journal.size(), 20);
1991
1992            // Verify pruning to lower bound (section 1 boundary = position 5)
1993            let oldest = journal.oldest_retained_pos();
1994            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1995
1996            // Verify positions before 5 are pruned
1997            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1998            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1999
2000            // Positions 5-19 should be accessible
2001            assert_eq!(journal.read(5).await.unwrap(), 500);
2002            assert_eq!(journal.read(10).await.unwrap(), 1000);
2003            assert_eq!(journal.read(19).await.unwrap(), 1900);
2004
2005            // Position 20+ should not exist yet
2006            assert!(matches!(
2007                journal.read(20).await,
2008                Err(Error::ItemOutOfRange(_))
2009            ));
2010
2011            // Assert journal can accept new operations
2012            let pos = journal.append(999).await.unwrap();
2013            assert_eq!(pos, 20);
2014            assert_eq!(journal.read(20).await.unwrap(), 999);
2015
2016            journal.destroy().await.unwrap();
2017        });
2018    }
2019
2020    /// Test `init_sync` when existing data exceeds the sync target range.
2021    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2022    #[test_traced]
2023    fn test_init_sync_existing_data_exceeds_upper_bound() {
2024        let executor = deterministic::Runner::default();
2025        executor.start(|context| async move {
2026            let items_per_section = NZU64!(5);
2027            let cfg = Config {
2028                partition: "test_unexpected_data".into(),
2029                items_per_section,
2030                compression: None,
2031                codec_config: (),
2032                write_buffer: NZUsize!(1024),
2033                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2034            };
2035
2036            // Create initial journal with data beyond sync range
2037            let mut journal =
2038                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2039                    .await
2040                    .expect("Failed to create initial journal");
2041
2042            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2043            for i in 0..30u64 {
2044                journal.append(i * 1000).await.unwrap();
2045            }
2046            journal.sync().await.unwrap();
2047            drop(journal);
2048
2049            // Initialize with sync boundaries that are exceeded by existing data
2050            let lower_bound = 8; // section 1
2051            for upper_bound in 9..29 {
2052                let result = Journal::<deterministic::Context, u64>::init_sync(
2053                    context.clone(),
2054                    cfg.clone(),
2055                    lower_bound..upper_bound,
2056                )
2057                .await;
2058
2059                // Should return UnexpectedData error since data exists beyond upper_bound
2060                assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_))));
2061            }
2062        });
2063    }
2064
2065    /// Test `init_sync` when all existing data is stale (before lower bound).
2066    #[test_traced]
2067    fn test_init_sync_existing_data_stale() {
2068        let executor = deterministic::Runner::default();
2069        executor.start(|context| async move {
2070            let items_per_section = NZU64!(5);
2071            let cfg = Config {
2072                partition: "test_stale".into(),
2073                items_per_section,
2074                compression: None,
2075                codec_config: (),
2076                write_buffer: NZUsize!(1024),
2077                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2078            };
2079
2080            // Create initial journal with stale data
2081            let mut journal =
2082                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2083                    .await
2084                    .expect("Failed to create initial journal");
2085
2086            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2087            for i in 0..10u64 {
2088                journal.append(i * 100).await.unwrap();
2089            }
2090            journal.sync().await.unwrap();
2091            drop(journal);
2092
2093            // Initialize with sync boundaries beyond all existing data
2094            let lower_bound = 15; // section 3
2095            let upper_bound = 26; // last element in section 5
2096            let journal = Journal::<deterministic::Context, u64>::init_sync(
2097                context.clone(),
2098                cfg.clone(),
2099                lower_bound..upper_bound,
2100            )
2101            .await
2102            .expect("Failed to initialize journal with stale data");
2103
2104            assert_eq!(journal.size(), 15);
2105
2106            // Verify fresh journal (all old data destroyed, starts at position 15)
2107            assert_eq!(journal.oldest_retained_pos(), None);
2108
2109            // Verify old positions don't exist
2110            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2111            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2112            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2113
2114            journal.destroy().await.unwrap();
2115        });
2116    }
2117
2118    /// Test `init_sync` with section boundary edge cases.
2119    #[test_traced]
2120    fn test_init_sync_section_boundaries() {
2121        let executor = deterministic::Runner::default();
2122        executor.start(|context| async move {
2123            let items_per_section = NZU64!(5);
2124            let cfg = Config {
2125                partition: "test_boundaries".into(),
2126                items_per_section,
2127                compression: None,
2128                codec_config: (),
2129                write_buffer: NZUsize!(1024),
2130                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2131            };
2132
2133            // Create journal with data at section boundaries
2134            let mut journal =
2135                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2136                    .await
2137                    .expect("Failed to create initial journal");
2138
2139            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2140            for i in 0..25u64 {
2141                journal.append(i * 100).await.unwrap();
2142            }
2143            journal.sync().await.unwrap();
2144            drop(journal);
2145
2146            // Test sync boundaries exactly at section boundaries
2147            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2148            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2149            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2150                context.clone(),
2151                cfg.clone(),
2152                lower_bound..upper_bound,
2153            )
2154            .await
2155            .expect("Failed to initialize journal at boundaries");
2156
2157            assert_eq!(journal.size(), 25);
2158
2159            // Verify oldest retained is at section 3 boundary (position 15)
2160            let oldest = journal.oldest_retained_pos();
2161            assert_eq!(oldest, Some(15));
2162
2163            // Verify positions before 15 are pruned
2164            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2165            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2166
2167            // Verify positions 15-24 are accessible
2168            assert_eq!(journal.read(15).await.unwrap(), 1500);
2169            assert_eq!(journal.read(20).await.unwrap(), 2000);
2170            assert_eq!(journal.read(24).await.unwrap(), 2400);
2171
2172            // Position 25+ should not exist yet
2173            assert!(matches!(
2174                journal.read(25).await,
2175                Err(Error::ItemOutOfRange(_))
2176            ));
2177
2178            // Assert journal can accept new operations
2179            let pos = journal.append(999).await.unwrap();
2180            assert_eq!(pos, 25);
2181            assert_eq!(journal.read(25).await.unwrap(), 999);
2182
2183            journal.destroy().await.unwrap();
2184        });
2185    }
2186
2187    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2188    #[test_traced]
2189    fn test_init_sync_same_section_bounds() {
2190        let executor = deterministic::Runner::default();
2191        executor.start(|context| async move {
2192            let items_per_section = NZU64!(5);
2193            let cfg = Config {
2194                partition: "test_same_section".into(),
2195                items_per_section,
2196                compression: None,
2197                codec_config: (),
2198                write_buffer: NZUsize!(1024),
2199                buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2200            };
2201
2202            // Create journal with data in multiple sections
2203            let mut journal =
2204                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2205                    .await
2206                    .expect("Failed to create initial journal");
2207
2208            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2209            for i in 0..15u64 {
2210                journal.append(i * 100).await.unwrap();
2211            }
2212            journal.sync().await.unwrap();
2213            drop(journal);
2214
2215            // Test sync boundaries within the same section
2216            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2217            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2218            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2219                context.clone(),
2220                cfg.clone(),
2221                lower_bound..upper_bound,
2222            )
2223            .await
2224            .expect("Failed to initialize journal with same-section bounds");
2225
2226            assert_eq!(journal.size(), 15);
2227
2228            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2229            // Oldest retained position should be at section 2 boundary (position 10)
2230            let oldest = journal.oldest_retained_pos();
2231            assert_eq!(oldest, Some(10));
2232
2233            // Verify positions before 10 are pruned
2234            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2235            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2236
2237            // Verify positions 10-14 are accessible
2238            assert_eq!(journal.read(10).await.unwrap(), 1000);
2239            assert_eq!(journal.read(11).await.unwrap(), 1100);
2240            assert_eq!(journal.read(14).await.unwrap(), 1400);
2241
2242            // Position 15+ should not exist yet
2243            assert!(matches!(
2244                journal.read(15).await,
2245                Err(Error::ItemOutOfRange(_))
2246            ));
2247
2248            // Assert journal can accept new operations
2249            let pos = journal.append(999).await.unwrap();
2250            assert_eq!(pos, 15);
2251            assert_eq!(journal.read(15).await.unwrap(), 999);
2252
2253            journal.destroy().await.unwrap();
2254        });
2255    }
2256
2257    /// Test contiguous variable journal with items_per_section=1.
2258    ///
2259    /// This is a regression test for a bug where reading from size()-1 fails
2260    /// when using items_per_section=1, particularly after pruning and restart.
2261    #[test_traced]
2262    fn test_single_item_per_section() {
2263        let executor = deterministic::Runner::default();
2264        executor.start(|context| async move {
2265            let cfg = Config {
2266                partition: "single_item_per_section".to_string(),
2267                items_per_section: NZU64!(1),
2268                compression: None,
2269                codec_config: (),
2270                buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2271                write_buffer: NZUsize!(1024),
2272            };
2273
2274            // === Test 1: Basic single item operation ===
2275            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2276                .await
2277                .unwrap();
2278
2279            // Verify empty state
2280            assert_eq!(journal.size(), 0);
2281            assert_eq!(journal.oldest_retained_pos(), None);
2282
2283            // Append 1 item (value = position * 100, so position 0 has value 0)
2284            let pos = journal.append(0).await.unwrap();
2285            assert_eq!(pos, 0);
2286            assert_eq!(journal.size(), 1);
2287
2288            // Sync
2289            journal.sync().await.unwrap();
2290
2291            // Read from size() - 1
2292            let value = journal.read(journal.size() - 1).await.unwrap();
2293            assert_eq!(value, 0);
2294
2295            // === Test 2: Multiple items with single item per section ===
2296            for i in 1..10u64 {
2297                let pos = journal.append(i * 100).await.unwrap();
2298                assert_eq!(pos, i);
2299                assert_eq!(journal.size(), i + 1);
2300
2301                // Verify we can read the just-appended item at size() - 1
2302                let value = journal.read(journal.size() - 1).await.unwrap();
2303                assert_eq!(value, i * 100);
2304            }
2305
2306            // Verify all items can be read
2307            for i in 0..10u64 {
2308                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2309            }
2310
2311            journal.sync().await.unwrap();
2312
2313            // === Test 3: Pruning with single item per section ===
2314            // Prune to position 5 (removes positions 0-4)
2315            let pruned = journal.prune(5).await.unwrap();
2316            assert!(pruned);
2317
2318            // Size should still be 10
2319            assert_eq!(journal.size(), 10);
2320
2321            // oldest_retained_pos should be 5
2322            assert_eq!(journal.oldest_retained_pos(), Some(5));
2323
2324            // Reading from size() - 1 (position 9) should still work
2325            let value = journal.read(journal.size() - 1).await.unwrap();
2326            assert_eq!(value, 900);
2327
2328            // Reading from pruned positions should return ItemPruned
2329            for i in 0..5 {
2330                assert!(matches!(
2331                    journal.read(i).await,
2332                    Err(crate::journal::Error::ItemPruned(_))
2333                ));
2334            }
2335
2336            // Reading from retained positions should work
2337            for i in 5..10u64 {
2338                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2339            }
2340
2341            // Append more items after pruning
2342            for i in 10..15u64 {
2343                let pos = journal.append(i * 100).await.unwrap();
2344                assert_eq!(pos, i);
2345
2346                // Verify we can read from size() - 1
2347                let value = journal.read(journal.size() - 1).await.unwrap();
2348                assert_eq!(value, i * 100);
2349            }
2350
2351            journal.sync().await.unwrap();
2352            drop(journal);
2353
2354            // === Test 4: Restart persistence with single item per section ===
2355            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2356                .await
2357                .unwrap();
2358
2359            // Verify size is preserved
2360            assert_eq!(journal.size(), 15);
2361
2362            // Verify oldest_retained_pos is preserved
2363            assert_eq!(journal.oldest_retained_pos(), Some(5));
2364
2365            // Reading from size() - 1 should work after restart
2366            let value = journal.read(journal.size() - 1).await.unwrap();
2367            assert_eq!(value, 1400);
2368
2369            // Reading all retained positions should work
2370            for i in 5..15u64 {
2371                assert_eq!(journal.read(i).await.unwrap(), i * 100);
2372            }
2373
2374            journal.destroy().await.unwrap();
2375
2376            // === Test 5: Restart after pruning with non-zero index (KEY SCENARIO) ===
2377            // Fresh journal for this test
2378            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2379                .await
2380                .unwrap();
2381
2382            // Append 10 items (positions 0-9)
2383            for i in 0..10u64 {
2384                journal.append(i * 1000).await.unwrap();
2385            }
2386
2387            // Prune to position 5 (removes positions 0-4)
2388            journal.prune(5).await.unwrap();
2389            assert_eq!(journal.size(), 10);
2390            assert_eq!(journal.oldest_retained_pos(), Some(5));
2391
2392            // Sync and restart
2393            journal.sync().await.unwrap();
2394            drop(journal);
2395
2396            // Re-open journal
2397            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2398                .await
2399                .unwrap();
2400
2401            // Verify state after restart
2402            assert_eq!(journal.size(), 10);
2403            assert_eq!(journal.oldest_retained_pos(), Some(5));
2404
2405            // KEY TEST: Reading from size() - 1 (position 9) should work
2406            let value = journal.read(journal.size() - 1).await.unwrap();
2407            assert_eq!(value, 9000);
2408
2409            // Verify all retained positions (5-9) work
2410            for i in 5..10u64 {
2411                assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2412            }
2413
2414            journal.destroy().await.unwrap();
2415
2416            // === Test 6: Prune all items (edge case) ===
2417            // This tests the scenario where prune removes everything.
2418            // Callers must check oldest_retained_pos() before reading.
2419            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2420                .await
2421                .unwrap();
2422
2423            for i in 0..5u64 {
2424                journal.append(i * 100).await.unwrap();
2425            }
2426            journal.sync().await.unwrap();
2427
2428            // Prune all items
2429            journal.prune(5).await.unwrap();
2430            assert_eq!(journal.size(), 5); // Size unchanged
2431            assert_eq!(journal.oldest_retained_pos(), None); // All pruned
2432
2433            // size() - 1 = 4, but position 4 is pruned
2434            let result = journal.read(journal.size() - 1).await;
2435            assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2436
2437            // After appending, reading works again
2438            journal.append(500).await.unwrap();
2439            assert_eq!(journal.oldest_retained_pos(), Some(5));
2440            assert_eq!(journal.read(journal.size() - 1).await.unwrap(), 500);
2441
2442            journal.destroy().await.unwrap();
2443        });
2444    }
2445}