commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use crate::{
7    journal::{
8        contiguous::{fixed, Contiguous, MutableContiguous, PersistableContiguous},
9        segmented::variable,
10        Error,
11    },
12    mmr::Location,
13};
14use commonware_codec::Codec;
15use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
16use commonware_utils::NZUsize;
17use core::ops::Range;
18use futures::{future::Either, stream, Stream, StreamExt as _};
19use std::num::{NonZeroU64, NonZeroUsize};
20use tracing::{debug, info};
21
22const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
23
24/// Suffix appended to the base partition name for the data journal.
25const DATA_SUFFIX: &str = "_data";
26
27/// Suffix appended to the base partition name for the offsets journal.
28const OFFSETS_SUFFIX: &str = "_offsets";
29
30/// Calculate the section number for a given position.
31///
32/// # Arguments
33///
34/// * `position` - The absolute position in the journal
35/// * `items_per_section` - The number of items stored in each section
36///
37/// # Returns
38///
39/// The section number where the item at `position` should be stored.
40///
41/// # Examples
42///
43/// ```ignore
44/// // With 10 items per section:
45/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
46/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
47/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
48/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
49/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
50/// ```
51const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
52    position / items_per_section
53}
54
55/// Configuration for a [Journal].
56#[derive(Clone)]
57pub struct Config<C> {
58    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
59    pub partition: String,
60
61    /// The number of items to store in each section.
62    ///
63    /// Once set, this value cannot be changed across restarts.
64    /// All non-final sections will be full and persisted.
65    pub items_per_section: NonZeroU64,
66
67    /// Optional compression level for stored items.
68    pub compression: Option<u8>,
69
70    /// [Codec] configuration for encoding/decoding items.
71    pub codec_config: C,
72
73    /// Buffer pool for caching data.
74    pub buffer_pool: PoolRef,
75
76    /// Write buffer size for each section.
77    pub write_buffer: NonZeroUsize,
78}
79
80impl<C> Config<C> {
81    /// Returns the partition name for the data journal.
82    fn data_partition(&self) -> String {
83        format!("{}{}", self.partition, DATA_SUFFIX)
84    }
85
86    /// Returns the partition name for the offsets journal.
87    fn offsets_partition(&self) -> String {
88        format!("{}{}", self.partition, OFFSETS_SUFFIX)
89    }
90}
91
92/// A position-based journal for variable-length items.
93///
94/// This journal manages section assignment automatically, allowing callers to append items
95/// sequentially without manually tracking section numbers.
96///
97/// # Invariants
98///
99/// ## 1. Section Fullness
100///
101/// All non-final sections are full (`items_per_section` items) and persisted. This ensures
102/// that on `init()`, we only need to replay the last section to determine the exact size.
103///
104/// ## 2. Data Journal is Source of Truth
105///
106/// The data journal is always the source of truth. The offsets journal is an index
107/// that may temporarily diverge during crashes. Divergences are automatically
108/// aligned during init():
109/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
110///   (This can happen if we crash after writing data journal but before writing offsets journal)
111/// * If offsets.size() > data.size(): Rewind offsets to match data size.
112///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
113/// * If offsets.oldest_retained_pos() < data.oldest_retained_pos(): Prune offsets to match
114///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
115///
116/// Note that we don't recover from the case where offsets.oldest_retained_pos() >
117/// data.oldest_retained_pos(). This should never occur because we always prune the data journal
118/// before the offsets journal.
119pub struct Journal<E: Storage + Metrics, V: Codec> {
120    /// The underlying variable-length data journal.
121    pub(crate) data: variable::Journal<E, V>,
122
123    /// Index mapping positions to byte offsets within the data journal.
124    /// The section can be calculated from the position using items_per_section.
125    pub(crate) offsets: fixed::Journal<E, u32>,
126
127    /// The number of items per section.
128    ///
129    /// # Invariant
130    ///
131    /// This value is immutable after initialization and must remain consistent
132    /// across restarts. Changing this value will result in data loss or corruption.
133    items_per_section: u64,
134
135    /// The next position to be assigned on append (total items appended).
136    ///
137    /// # Invariant
138    ///
139    /// Always >= `oldest_retained_pos`. Equal when data journal is empty or fully pruned.
140    size: u64,
141
142    /// The position of the first item that remains after pruning.
143    ///
144    /// # Invariant
145    ///
146    /// Always section-aligned: `oldest_retained_pos % items_per_section == 0`.
147    /// Never decreases (pruning only moves forward).
148    oldest_retained_pos: u64,
149}
150
151impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
152    /// Initialize a contiguous variable journal.
153    ///
154    /// # Crash Recovery
155    ///
156    /// The data journal is the source of truth. If the offsets journal is inconsistent
157    /// it will be updated to match the data journal.
158    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
159        let items_per_section = cfg.items_per_section.get();
160        let data_partition = cfg.data_partition();
161        let offsets_partition = cfg.offsets_partition();
162
163        // Initialize underlying variable data journal
164        let mut data = variable::Journal::init(
165            context.clone(),
166            variable::Config {
167                partition: data_partition,
168                compression: cfg.compression,
169                codec_config: cfg.codec_config,
170                buffer_pool: cfg.buffer_pool.clone(),
171                write_buffer: cfg.write_buffer,
172            },
173        )
174        .await?;
175
176        // Initialize offsets journal
177        let mut offsets = fixed::Journal::init(
178            context,
179            fixed::Config {
180                partition: offsets_partition,
181                items_per_blob: cfg.items_per_section,
182                buffer_pool: cfg.buffer_pool,
183                write_buffer: cfg.write_buffer,
184            },
185        )
186        .await?;
187
188        // Validate and align offsets journal to match data journal
189        let (oldest_retained_pos, size) =
190            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
191        assert!(
192            oldest_retained_pos.is_multiple_of(items_per_section),
193            "oldest_retained_pos is not section-aligned"
194        );
195
196        Ok(Self {
197            data,
198            offsets,
199            items_per_section,
200            size,
201            oldest_retained_pos,
202        })
203    }
204
205    /// Initialize a [Journal] in a fully pruned state at a specific logical size.
206    ///
207    /// This creates a journal that reports `size()` as `size` but contains no data.
208    /// The `oldest_retained_pos()` will return `None`, indicating all positions before
209    /// `size` have been pruned. This is useful for state sync when starting from
210    /// a non-zero position without historical data.
211    ///
212    /// # Arguments
213    ///
214    /// * `size` - The logical size to initialize at.
215    ///
216    /// # Post-conditions
217    ///
218    /// * `size()` returns `size`
219    /// * `oldest_retained_pos()` returns `None` (fully pruned)
220    /// * Next append receives position `size`
221    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
222        // Initialize empty data journal
223        let data = variable::Journal::init(
224            context.clone(),
225            variable::Config {
226                partition: cfg.data_partition(),
227                compression: cfg.compression,
228                codec_config: cfg.codec_config.clone(),
229                buffer_pool: cfg.buffer_pool.clone(),
230                write_buffer: cfg.write_buffer,
231            },
232        )
233        .await?;
234
235        // Initialize offsets journal at the target size
236        let offsets = crate::qmdb::any::unordered::sync::init_journal_at_size(
237            context,
238            fixed::Config {
239                partition: cfg.offsets_partition(),
240                items_per_blob: cfg.items_per_section,
241                buffer_pool: cfg.buffer_pool,
242                write_buffer: cfg.write_buffer,
243            },
244            size,
245        )
246        .await?;
247
248        Ok(Self {
249            data,
250            offsets,
251            items_per_section: cfg.items_per_section.get(),
252            size,
253            oldest_retained_pos: size,
254        })
255    }
256
257    /// Initialize a [Journal] for use in state sync.
258    ///
259    /// The bounds are item locations (not section numbers). This function prepares the
260    /// on-disk journal so that subsequent appends go to the correct physical location for the
261    /// requested range.
262    ///
263    /// Behavior by existing on-disk state:
264    /// - Fresh (no data): returns an empty journal.
265    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
266    ///   empty journal.
267    /// - Overlap within [`range.start`, `range.end`]:
268    ///   - Prunes to `range.start`
269    /// - Unexpected data beyond `range.end`: returns [crate::qmdb::Error::UnexpectedData].
270    ///
271    /// # Arguments
272    /// - `context`: storage context
273    /// - `cfg`: journal configuration
274    /// - `range`: range of item locations to retain
275    ///
276    /// # Returns
277    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
278    ///
279    /// # Errors
280    /// Returns [crate::qmdb::Error::UnexpectedData] if existing data extends beyond `range.end`.
281    pub(crate) async fn init_sync(
282        context: E,
283        cfg: Config<V::Cfg>,
284        range: Range<u64>,
285    ) -> Result<Self, crate::qmdb::Error> {
286        assert!(!range.is_empty(), "range must not be empty");
287
288        debug!(
289            range.start,
290            range.end,
291            items_per_section = cfg.items_per_section.get(),
292            "initializing contiguous variable journal for sync"
293        );
294
295        // Initialize contiguous journal
296        let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
297
298        let size = journal.size();
299
300        // No existing data - initialize at the start of the sync range if needed
301        if size == 0 {
302            if range.start == 0 {
303                debug!("no existing journal data, returning empty journal");
304                return Ok(journal);
305            } else {
306                debug!(
307                    range.start,
308                    "no existing journal data, initializing at sync range start"
309                );
310                journal.destroy().await?;
311                return Ok(Self::init_at_size(context, cfg, range.start).await?);
312            }
313        }
314
315        // Check if data exceeds the sync range
316        if size > range.end {
317            return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
318                size,
319            )));
320        }
321
322        // If all existing data is before our sync range, destroy and recreate fresh
323        if size <= range.start {
324            // All data is stale (ends at or before range.start)
325            debug!(
326                size,
327                range.start, "existing journal data is stale, re-initializing at start position"
328            );
329            journal.destroy().await?;
330            return Ok(Self::init_at_size(context, cfg, range.start).await?);
331        }
332
333        // Prune to lower bound if needed
334        let oldest = journal.oldest_retained_pos();
335        if let Some(oldest_pos) = oldest {
336            if oldest_pos < range.start {
337                debug!(
338                    oldest_pos,
339                    range.start, "pruning journal to sync range start"
340                );
341                journal.prune(range.start).await?;
342            }
343        }
344
345        Ok(journal)
346    }
347
348    /// Rewind the journal to the given size, discarding items from the end.
349    ///
350    /// After rewinding to size N, the journal will contain exactly N items, and the next append
351    /// will receive position N.
352    ///
353    /// # Errors
354    ///
355    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
356    ///
357    /// # Warning
358    ///
359    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
360    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
361        // Validate rewind target
362        match size.cmp(&self.size) {
363            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
364            std::cmp::Ordering::Equal => return Ok(()), // No-op
365            std::cmp::Ordering::Less => {}
366        }
367
368        // Rewind never updates oldest_retained_pos.
369        if size < self.oldest_retained_pos {
370            return Err(Error::ItemPruned(size));
371        }
372
373        // Read the offset of the first item to discard (at position 'size').
374        let discard_offset = self.offsets.read(size).await?;
375        let discard_section = position_to_section(size, self.items_per_section);
376
377        self.data
378            .rewind_to_offset(discard_section, discard_offset)
379            .await?;
380        self.offsets.rewind(size).await?;
381
382        // Update our size
383        self.size = size;
384
385        Ok(())
386    }
387
388    /// Append a new item to the journal, returning its position.
389    ///
390    /// The position returned is a stable, consecutively increasing value starting from 0.
391    /// This position remains constant after pruning.
392    ///
393    /// When a section becomes full, both the data journal and offsets journal are persisted
394    /// to maintain the invariant that all non-final sections are full and consistent.
395    ///
396    /// # Errors
397    ///
398    /// Returns an error if the underlying storage operation fails or if the item cannot
399    /// be encoded.
400    ///
401    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
402    /// reopened to trigger alignment in [Journal::init].
403    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
404        // Calculate which section this position belongs to
405        let section = self.current_section();
406
407        // Append to data journal, get offset
408        let (offset, _size) = self.data.append(section, item).await?;
409
410        // Append offset to offsets journal
411        let offsets_pos = self.offsets.append(offset).await?;
412        assert_eq!(offsets_pos, self.size);
413
414        // Return the current position
415        let position = self.size;
416        self.size += 1;
417
418        // Maintain invariant that all full sections are persisted.
419        if self.size.is_multiple_of(self.items_per_section) {
420            futures::try_join!(self.data.sync(section), self.offsets.sync())?;
421        }
422
423        Ok(position)
424    }
425
426    /// Return the total number of items that have been appended to the journal.
427    ///
428    /// This count is NOT affected by pruning. The next appended item will receive this
429    /// position as its value.
430    pub const fn size(&self) -> u64 {
431        self.size
432    }
433
434    /// Return the position of the oldest item still retained in the journal.
435    ///
436    /// Returns `None` if the journal is empty or if all items have been pruned.
437    pub const fn oldest_retained_pos(&self) -> Option<u64> {
438        if self.size == self.oldest_retained_pos {
439            // No items retained: either never had data or fully pruned
440            None
441        } else {
442            Some(self.oldest_retained_pos)
443        }
444    }
445
446    /// Returns the location before which all items have been pruned.
447    pub fn pruning_boundary(&self) -> u64 {
448        self.oldest_retained_pos().unwrap_or(self.size)
449    }
450
451    /// Prune items at positions strictly less than `min_position`.
452    ///
453    /// Returns `true` if any data was pruned, `false` otherwise.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the underlying storage operation fails.
458    ///
459    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
460    /// reopened to trigger alignment in [Journal::init].
461    pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
462        if min_position <= self.oldest_retained_pos {
463            return Ok(false);
464        }
465
466        // Cap min_position to size to maintain the invariant oldest_retained_pos <= size
467        let min_position = min_position.min(self.size);
468
469        // Calculate section number
470        let min_section = position_to_section(min_position, self.items_per_section);
471
472        let pruned = self.data.prune(min_section).await?;
473        if pruned {
474            self.oldest_retained_pos = min_section * self.items_per_section;
475            self.offsets.prune(self.oldest_retained_pos).await?;
476        }
477        Ok(pruned)
478    }
479
480    /// Return a stream of all items in the journal starting from `start_pos`.
481    ///
482    /// Each item is yielded as a tuple `(position, item)` where position is the item's
483    /// position in the journal.
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
488    /// errors occur during replay.
489    pub async fn replay(
490        &self,
491        start_pos: u64,
492        buffer_size: NonZeroUsize,
493    ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
494        // Validate start position is within bounds.
495        if start_pos < self.oldest_retained_pos {
496            return Err(Error::ItemPruned(start_pos));
497        }
498        if start_pos > self.size {
499            return Err(Error::ItemOutOfRange(start_pos));
500        }
501
502        // If replaying at exactly size, return empty stream
503        if start_pos == self.size {
504            return Ok(Either::Left(stream::empty()));
505        }
506
507        // Use offsets index to find offset to start from, calculate section from position
508        let start_offset = self.offsets.read(start_pos).await?;
509        let start_section = position_to_section(start_pos, self.items_per_section);
510        let data_stream = self
511            .data
512            .replay(start_section, start_offset, buffer_size)
513            .await?;
514
515        // Transform the stream to include position information
516        let transformed = data_stream.enumerate().map(move |(idx, result)| {
517            result.map(|(_section, _offset, _size, item)| {
518                // Calculate position: start_pos + items read
519                let pos = start_pos + idx as u64;
520                (pos, item)
521            })
522        });
523
524        Ok(Either::Right(transformed))
525    }
526
527    /// Read the item at the given position.
528    ///
529    /// # Errors
530    ///
531    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
532    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
533    /// - Returns other errors if storage or decoding fails.
534    pub async fn read(&self, position: u64) -> Result<V, Error> {
535        // Check bounds
536        if position >= self.size {
537            return Err(Error::ItemOutOfRange(position));
538        }
539
540        if position < self.oldest_retained_pos {
541            return Err(Error::ItemPruned(position));
542        }
543
544        // Read offset from journal and calculate section from position
545        let offset = self.offsets.read(position).await?;
546        let section = position_to_section(position, self.items_per_section);
547
548        // Read item from data journal
549        self.data.get(section, offset).await
550    }
551
552    /// Durably persist the journal.
553    ///
554    /// This is faster than `sync()` but recovery will be required on startup if a crash occurs
555    /// before the next call to `sync()`.
556    pub async fn commit(&mut self) -> Result<(), Error> {
557        let section = self.current_section();
558        self.data.sync(section).await
559    }
560
561    /// Durably persist the journal and ensure recovery is not required on startup.
562    ///
563    /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup.
564    pub async fn sync(&mut self) -> Result<(), Error> {
565        // Persist only the current (final) section of the data journal.
566        // All non-final sections are already persisted per Invariant #1.
567        let section = self.current_section();
568
569        // Persist both journals concurrently
570        futures::try_join!(self.data.sync(section), self.offsets.sync())?;
571
572        Ok(())
573    }
574
575    /// Close the journal, persisting all pending writes.
576    ///
577    /// This closes both the data journal and the offsets journal.
578    pub async fn close(mut self) -> Result<(), Error> {
579        self.sync().await?;
580        self.data.close().await?;
581        self.offsets.close().await
582    }
583
584    /// Remove any underlying blobs created by the journal.
585    ///
586    /// This destroys both the data journal and the offsets journal.
587    pub async fn destroy(self) -> Result<(), Error> {
588        self.data.destroy().await?;
589        self.offsets.destroy().await
590    }
591
592    /// Return the section number where the next append will write.
593    const fn current_section(&self) -> u64 {
594        position_to_section(self.size, self.items_per_section)
595    }
596
597    /// Align the offsets journal and data journal to be consistent in case a crash occured
598    /// on a previous run and left the journals in an inconsistent state.
599    ///
600    /// The data journal is the source of truth. This function scans it to determine
601    /// what SHOULD be in the offsets journal, then fixes any mismatches.
602    ///
603    /// # Returns
604    ///
605    /// Returns `(oldest_retained_pos, size)` for the contiguous journal.
606    async fn align_journals(
607        data: &mut variable::Journal<E, V>,
608        offsets: &mut fixed::Journal<E, u32>,
609        items_per_section: u64,
610    ) -> Result<(u64, u64), Error> {
611        // === Handle empty data journal case ===
612        let items_in_last_section = match data.blobs.last_key_value() {
613            Some((last_section, _)) => {
614                let stream = data.replay(*last_section, 0, REPLAY_BUFFER_SIZE).await?;
615                futures::pin_mut!(stream);
616                let mut count = 0u64;
617                while let Some(result) = stream.next().await {
618                    result?; // Propagate replay errors (corruption, etc.)
619                    count += 1;
620                }
621                count
622            }
623            None => 0,
624        };
625
626        // Data journal is empty if there are no sections or if there is one section and it has no items.
627        // The latter should only occur if a crash occured after opening a data journal blob but
628        // before writing to it.
629        let data_empty =
630            data.blobs.is_empty() || (data.blobs.len() == 1 && items_in_last_section == 0);
631        if data_empty {
632            let size = offsets.size();
633
634            if !data.blobs.is_empty() {
635                // A section exists but contains 0 items. This can happen in two cases:
636                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
637                // 2. First append crash: we opened the first section blob but crashed before writing to it
638                // In both cases, calculate target position from the first remaining section
639                let first_section = *data.blobs.first_key_value().unwrap().0;
640                let target_pos = first_section * items_per_section;
641
642                info!("crash repair: rewinding offsets from {size} to {target_pos}");
643                offsets.rewind(target_pos).await?;
644                offsets.sync().await?;
645                return Ok((target_pos, target_pos));
646            }
647
648            // data.blobs is empty. This can happen in two cases:
649            // 1. We completely pruned the data journal but crashed before pruning
650            //    the offsets journal.
651            // 2. The data journal was never opened.
652            if let Some(oldest) = offsets.oldest_retained_pos() {
653                if oldest < size {
654                    // Offsets has unpruned entries but data is gone - align by pruning
655                    info!("crash repair: pruning offsets to {size} (prune-all crash)");
656                    offsets.prune(size).await?;
657                    offsets.sync().await?;
658                }
659            }
660
661            return Ok((size, size));
662        }
663
664        // === Handle non-empty data journal case ===
665        let (data_oldest_pos, data_size) = {
666            // Data exists -- count items
667            let first_section = *data.blobs.first_key_value().unwrap().0;
668            let last_section = *data.blobs.last_key_value().unwrap().0;
669
670            let oldest_pos = first_section * items_per_section;
671
672            // Invariant 1 on `Variable` guarantees that all sections except possibly the last
673            // are full. Therefore, the size of the journal is the number of items in the last
674            // section plus the number of items in the other sections.
675            let size = (last_section * items_per_section) + items_in_last_section;
676            (oldest_pos, size)
677        };
678        assert_ne!(
679            data_oldest_pos, data_size,
680            "data journal expected to be non-empty"
681        );
682
683        // Align pruning state. We always prune the data journal before the offsets journal,
684        // so we validate that invariant and repair crash faults or detect corruption.
685        match offsets.oldest_retained_pos() {
686            Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
687                // Offsets behind on pruning -- prune to catch up
688                info!("crash repair: pruning offsets journal to {data_oldest_pos}");
689                offsets.prune(data_oldest_pos).await?;
690            }
691            Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
692                return Err(Error::Corruption(format!(
693                    "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
694                )));
695            }
696            Some(_) => {
697                // Both journals are pruned to the same position.
698            }
699            None if data_oldest_pos > 0 => {
700                // Offsets journal is empty (size == oldest_retained_pos).
701                // This can happen if we pruned all data, then appended new data, persisted the
702                // data journal, but crashed before persisting the offsets journal.
703                // We can recover if offsets.size() matches data_oldest_pos (proper pruning).
704                let offsets_size = offsets.size();
705                if offsets_size != data_oldest_pos {
706                    return Err(Error::Corruption(format!(
707                        "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
708                    )));
709                }
710                info!("crash repair: offsets journal empty at {data_oldest_pos}");
711            }
712            None => {
713                // Both journals are empty/fully pruned.
714            }
715        }
716
717        let offsets_size = offsets.size();
718        if offsets_size > data_size {
719            // We must have crashed after writing offsets but before writing data.
720            info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
721            offsets.rewind(data_size).await?;
722        } else if offsets_size < data_size {
723            // We must have crashed after writing the data journal but before writing the offsets
724            // journal.
725            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
726        }
727
728        assert_eq!(offsets.size(), data_size);
729        // Oldest retained position is always Some because the data journal is non-empty.
730        assert_eq!(offsets.oldest_retained_pos(), Some(data_oldest_pos));
731
732        offsets.sync().await?;
733        Ok((data_oldest_pos, data_size))
734    }
735
736    /// Rebuild missing offset entries by replaying the data journal and
737    /// appending the missing entries to the offsets journal.
738    ///
739    /// The data journal is the source of truth. This function brings the offsets
740    /// journal up to date by replaying data items and indexing their positions.
741    ///
742    /// # Warning
743    ///
744    /// - Panics if `data.blobs` is empty
745    /// - Panics if `offsets_size` >= `data.size()`
746    async fn add_missing_offsets(
747        data: &variable::Journal<E, V>,
748        offsets: &mut fixed::Journal<E, u32>,
749        offsets_size: u64,
750        items_per_section: u64,
751    ) -> Result<(), Error> {
752        assert!(
753            !data.blobs.is_empty(),
754            "rebuild_offsets called with empty data journal"
755        );
756
757        // Find where to start replaying
758        let (start_section, resume_offset, skip_first) =
759            if let Some(oldest) = offsets.oldest_retained_pos() {
760                if oldest < offsets_size {
761                    // Offsets has items -- resume from last indexed position
762                    let last_offset = offsets.read(offsets_size - 1).await?;
763                    let last_section = position_to_section(offsets_size - 1, items_per_section);
764                    (last_section, last_offset, true)
765                } else {
766                    // Offsets fully pruned but data has items -- start from first data section
767                    // SAFETY: data.blobs is non-empty (checked above)
768                    let first_section = *data.blobs.first_key_value().unwrap().0;
769                    (first_section, 0, false)
770                }
771            } else {
772                // Offsets empty -- start from first data section
773                // SAFETY: data.blobs is non-empty (checked above)
774                let first_section = *data.blobs.first_key_value().unwrap().0;
775                (first_section, 0, false)
776            };
777
778        // Replay data journal from start position through the end and index all items.
779        // The data journal is the source of truth, so we consume the entire stream.
780        // (replay streams from start_section onwards through all subsequent sections)
781        let stream = data
782            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
783            .await?;
784        futures::pin_mut!(stream);
785
786        let mut skipped_first = false;
787        while let Some(result) = stream.next().await {
788            let (_section, offset, _size, _item) = result?;
789
790            // Skip first item if resuming from last indexed offset
791            if skip_first && !skipped_first {
792                skipped_first = true;
793                continue;
794            }
795
796            offsets.append(offset).await?;
797        }
798
799        Ok(())
800    }
801}
802
803// Implement Contiguous trait for variable-length items
804impl<E: Storage + Metrics, V: Codec> Contiguous for Journal<E, V> {
805    type Item = V;
806
807    fn size(&self) -> u64 {
808        Self::size(self)
809    }
810
811    fn oldest_retained_pos(&self) -> Option<u64> {
812        Self::oldest_retained_pos(self)
813    }
814
815    fn pruning_boundary(&self) -> u64 {
816        Self::pruning_boundary(self)
817    }
818
819    async fn replay(
820        &self,
821        start_pos: u64,
822        buffer: NonZeroUsize,
823    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
824        Self::replay(self, start_pos, buffer).await
825    }
826
827    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
828        Self::read(self, position).await
829    }
830}
831
832impl<E: Storage + Metrics, V: Codec> MutableContiguous for Journal<E, V> {
833    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
834        Self::append(self, item).await
835    }
836
837    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
838        Self::prune(self, min_position).await
839    }
840
841    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
842        Self::rewind(self, size).await
843    }
844}
845
846impl<E: Storage + Metrics, V: Codec> PersistableContiguous for Journal<E, V> {
847    async fn commit(&mut self) -> Result<(), Error> {
848        Self::commit(self).await
849    }
850
851    async fn sync(&mut self) -> Result<(), Error> {
852        Self::sync(self).await
853    }
854
855    async fn close(self) -> Result<(), Error> {
856        Self::close(self).await
857    }
858
859    async fn destroy(self) -> Result<(), Error> {
860        Self::destroy(self).await
861    }
862}
863#[cfg(test)]
864mod tests {
865    use super::*;
866    use crate::journal::contiguous::tests::run_contiguous_tests;
867    use commonware_macros::test_traced;
868    use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
869    use commonware_utils::{NZUsize, NZU64};
870    use futures::FutureExt as _;
871
872    // Use some jank sizes to exercise boundary conditions.
873    const PAGE_SIZE: usize = 101;
874    const PAGE_CACHE_SIZE: usize = 2;
875
876    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
877    ///
878    /// When the offsets partition is completely lost and the data has been pruned, we cannot
879    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
880    /// This is a genuine external failure that should be detected and reported clearly.
881    #[test_traced]
882    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
883        let executor = deterministic::Runner::default();
884        executor.start(|context| async move {
885            let cfg = Config {
886                partition: "offsets_loss_after_prune".to_string(),
887                items_per_section: NZU64!(10),
888                compression: None,
889                codec_config: (),
890                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
891                write_buffer: NZUsize!(1024),
892            };
893
894            // === Phase 1: Create journal with data and prune ===
895            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
896                .await
897                .unwrap();
898
899            // Append 40 items across 4 sections (0-3)
900            for i in 0..40u64 {
901                journal.append(i * 100).await.unwrap();
902            }
903
904            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
905            journal.prune(20).await.unwrap();
906            assert_eq!(journal.oldest_retained_pos(), Some(20));
907            assert_eq!(journal.size(), 40);
908
909            journal.close().await.unwrap();
910
911            // === Phase 2: Simulate complete offsets partition loss ===
912            context
913                .remove(&cfg.offsets_partition(), None)
914                .await
915                .expect("Failed to remove offsets partition");
916
917            // === Phase 3: Verify this is detected as unrecoverable ===
918            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
919            assert!(matches!(result, Err(Error::Corruption(_))));
920        });
921    }
922
923    /// Test that init aligns state when data is pruned/lost but offsets survives.
924    ///
925    /// This handles both:
926    /// 1. Crash during prune-all (data pruned, offsets not yet)
927    /// 2. External data partition loss
928    ///
929    /// In both cases, we align by pruning offsets to match.
930    #[test_traced]
931    fn test_variable_align_data_offsets_mismatch() {
932        let executor = deterministic::Runner::default();
933        executor.start(|context| async move {
934            let cfg = Config {
935                partition: "data_loss_test".to_string(),
936                items_per_section: NZU64!(10),
937                compression: None,
938                codec_config: (),
939                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
940                write_buffer: NZUsize!(1024),
941            };
942
943            // === Setup: Create journal with data ===
944            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
945                .await
946                .unwrap();
947
948            // Append 20 items across 2 sections
949            for i in 0..20u64 {
950                variable.append(i * 100).await.unwrap();
951            }
952
953            variable.close().await.unwrap();
954
955            // === Simulate data loss: Delete data partition but keep offsets ===
956            context
957                .remove(&cfg.data_partition(), None)
958                .await
959                .expect("Failed to remove data partition");
960
961            // === Verify init aligns the mismatch ===
962            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
963                .await
964                .expect("Should align offsets to match empty data");
965
966            // Size should be preserved
967            assert_eq!(journal.size(), 20);
968
969            // But no items remain (both journals pruned)
970            assert_eq!(journal.oldest_retained_pos(), None);
971
972            // All reads should fail with ItemPruned
973            for i in 0..20 {
974                assert!(matches!(
975                    journal.read(i).await,
976                    Err(crate::journal::Error::ItemPruned(_))
977                ));
978            }
979
980            // Can append new data starting at position 20
981            let pos = journal.append(999).await.unwrap();
982            assert_eq!(pos, 20);
983            assert_eq!(journal.read(20).await.unwrap(), 999);
984
985            journal.destroy().await.unwrap();
986        });
987    }
988
989    #[test_traced]
990    fn test_variable_contiguous() {
991        let executor = deterministic::Runner::default();
992        executor.start(|context| async move {
993            run_contiguous_tests(move |test_name: String| {
994                let context = context.clone();
995                async move {
996                    Journal::<_, u64>::init(
997                        context,
998                        Config {
999                            partition: format!("generic_test_{test_name}"),
1000                            items_per_section: NZU64!(10),
1001                            compression: None,
1002                            codec_config: (),
1003                            buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1004                            write_buffer: NZUsize!(1024),
1005                        },
1006                    )
1007                    .await
1008                }
1009                .boxed()
1010            })
1011            .await;
1012        });
1013    }
1014
1015    /// Test multiple sequential prunes with Variable-specific guarantees.
1016    #[test_traced]
1017    fn test_variable_multiple_sequential_prunes() {
1018        let executor = deterministic::Runner::default();
1019        executor.start(|context| async move {
1020            let cfg = Config {
1021                partition: "sequential_prunes".to_string(),
1022                items_per_section: NZU64!(10),
1023                compression: None,
1024                codec_config: (),
1025                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1026                write_buffer: NZUsize!(1024),
1027            };
1028
1029            let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1030
1031            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1032            for i in 0..40u64 {
1033                journal.append(i * 100).await.unwrap();
1034            }
1035
1036            // Initial state: all items accessible
1037            assert_eq!(journal.oldest_retained_pos(), Some(0));
1038            assert_eq!(journal.size(), 40);
1039
1040            // First prune: remove section 0 (positions 0-9)
1041            let pruned = journal.prune(10).await.unwrap();
1042            assert!(pruned);
1043
1044            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1045            let oldest = journal.oldest_retained_pos().unwrap();
1046            assert_eq!(oldest, 10);
1047
1048            // Items 0-9 should be pruned, 10+ should be accessible
1049            assert!(matches!(
1050                journal.read(0).await,
1051                Err(crate::journal::Error::ItemPruned(_))
1052            ));
1053            assert_eq!(journal.read(10).await.unwrap(), 1000);
1054            assert_eq!(journal.read(19).await.unwrap(), 1900);
1055
1056            // Second prune: remove section 1 (positions 10-19)
1057            let pruned = journal.prune(20).await.unwrap();
1058            assert!(pruned);
1059
1060            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1061            let oldest = journal.oldest_retained_pos().unwrap();
1062            assert_eq!(oldest, 20);
1063
1064            // Items 0-19 should be pruned, 20+ should be accessible
1065            assert!(matches!(
1066                journal.read(10).await,
1067                Err(crate::journal::Error::ItemPruned(_))
1068            ));
1069            assert!(matches!(
1070                journal.read(19).await,
1071                Err(crate::journal::Error::ItemPruned(_))
1072            ));
1073            assert_eq!(journal.read(20).await.unwrap(), 2000);
1074            assert_eq!(journal.read(29).await.unwrap(), 2900);
1075
1076            // Third prune: remove section 2 (positions 20-29)
1077            let pruned = journal.prune(30).await.unwrap();
1078            assert!(pruned);
1079
1080            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1081            let oldest = journal.oldest_retained_pos().unwrap();
1082            assert_eq!(oldest, 30);
1083
1084            // Items 0-29 should be pruned, 30+ should be accessible
1085            assert!(matches!(
1086                journal.read(20).await,
1087                Err(crate::journal::Error::ItemPruned(_))
1088            ));
1089            assert!(matches!(
1090                journal.read(29).await,
1091                Err(crate::journal::Error::ItemPruned(_))
1092            ));
1093            assert_eq!(journal.read(30).await.unwrap(), 3000);
1094            assert_eq!(journal.read(39).await.unwrap(), 3900);
1095
1096            // Size should still be 40 (pruning doesn't affect size)
1097            assert_eq!(journal.size(), 40);
1098
1099            journal.destroy().await.unwrap();
1100        });
1101    }
1102
1103    /// Test that pruning all data and re-initializing preserves positions.
1104    #[test_traced]
1105    fn test_variable_prune_all_then_reinit() {
1106        let executor = deterministic::Runner::default();
1107        executor.start(|context| async move {
1108            let cfg = Config {
1109                partition: "prune_all_reinit".to_string(),
1110                items_per_section: NZU64!(10),
1111                compression: None,
1112                codec_config: (),
1113                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1114                write_buffer: NZUsize!(1024),
1115            };
1116
1117            // === Phase 1: Create journal and append data ===
1118            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1119                .await
1120                .unwrap();
1121
1122            for i in 0..100u64 {
1123                journal.append(i * 100).await.unwrap();
1124            }
1125
1126            assert_eq!(journal.size(), 100);
1127            assert_eq!(journal.oldest_retained_pos(), Some(0));
1128
1129            // === Phase 2: Prune all data ===
1130            let pruned = journal.prune(100).await.unwrap();
1131            assert!(pruned);
1132
1133            // All data is pruned - no items remain
1134            assert_eq!(journal.size(), 100);
1135            assert_eq!(journal.oldest_retained_pos(), None);
1136
1137            // All reads should fail with ItemPruned
1138            for i in 0..100 {
1139                assert!(matches!(
1140                    journal.read(i).await,
1141                    Err(crate::journal::Error::ItemPruned(_))
1142                ));
1143            }
1144
1145            journal.close().await.unwrap();
1146
1147            // === Phase 3: Re-init and verify position preserved ===
1148            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1149                .await
1150                .unwrap();
1151
1152            // Size should be preserved, but no items remain
1153            assert_eq!(journal.size(), 100);
1154            assert_eq!(journal.oldest_retained_pos(), None);
1155
1156            // All reads should still fail
1157            for i in 0..100 {
1158                assert!(matches!(
1159                    journal.read(i).await,
1160                    Err(crate::journal::Error::ItemPruned(_))
1161                ));
1162            }
1163
1164            // === Phase 4: Append new data ===
1165            // Next append should get position 100
1166            journal.append(10000).await.unwrap();
1167            assert_eq!(journal.size(), 101);
1168            // Now we have one item at position 100
1169            assert_eq!(journal.oldest_retained_pos(), Some(100));
1170
1171            // Can read the new item
1172            assert_eq!(journal.read(100).await.unwrap(), 10000);
1173
1174            // Old positions still fail
1175            assert!(matches!(
1176                journal.read(99).await,
1177                Err(crate::journal::Error::ItemPruned(_))
1178            ));
1179
1180            journal.destroy().await.unwrap();
1181        });
1182    }
1183
1184    /// Test recovery from crash after data journal pruned but before offsets journal.
1185    #[test_traced]
1186    fn test_variable_recovery_prune_crash_offsets_behind() {
1187        let executor = deterministic::Runner::default();
1188        executor.start(|context| async move {
1189            // === Setup: Create Variable wrapper with data ===
1190            let cfg = Config {
1191                partition: "recovery_prune_crash".to_string(),
1192                items_per_section: NZU64!(10),
1193                compression: None,
1194                codec_config: (),
1195                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1196                write_buffer: NZUsize!(1024),
1197            };
1198
1199            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1200                .await
1201                .unwrap();
1202
1203            // Append 40 items across 4 sections to both journals
1204            for i in 0..40u64 {
1205                variable.append(i * 100).await.unwrap();
1206            }
1207
1208            // Prune to position 10 normally (both data and offsets journals pruned)
1209            variable.prune(10).await.unwrap();
1210            assert_eq!(variable.oldest_retained_pos(), Some(10));
1211
1212            // === Simulate crash: Prune data journal but not offsets journal ===
1213            // Manually prune data journal to section 2 (position 20)
1214            variable.data.prune(2).await.unwrap();
1215            // Offsets journal still has data from position 10-19
1216
1217            variable.close().await.unwrap();
1218
1219            // === Verify recovery ===
1220            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1221                .await
1222                .unwrap();
1223
1224            // Init should auto-repair: offsets journal pruned to match data journal
1225            assert_eq!(variable.oldest_retained_pos(), Some(20));
1226            assert_eq!(variable.size(), 40);
1227
1228            // Reads before position 20 should fail (pruned from both journals)
1229            assert!(matches!(
1230                variable.read(10).await,
1231                Err(crate::journal::Error::ItemPruned(_))
1232            ));
1233
1234            // Reads at position 20+ should succeed
1235            assert_eq!(variable.read(20).await.unwrap(), 2000);
1236            assert_eq!(variable.read(39).await.unwrap(), 3900);
1237
1238            variable.destroy().await.unwrap();
1239        });
1240    }
1241
1242    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1243    ///
1244    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1245    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1246    #[test_traced]
1247    fn test_variable_recovery_offsets_ahead_corruption() {
1248        let executor = deterministic::Runner::default();
1249        executor.start(|context| async move {
1250            // === Setup: Create Variable wrapper with data ===
1251            let cfg = Config {
1252                partition: "recovery_offsets_ahead".to_string(),
1253                items_per_section: NZU64!(10),
1254                compression: None,
1255                codec_config: (),
1256                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1257                write_buffer: NZUsize!(1024),
1258            };
1259
1260            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1261                .await
1262                .unwrap();
1263
1264            // Append 40 items across 4 sections to both journals
1265            for i in 0..40u64 {
1266                variable.append(i * 100).await.unwrap();
1267            }
1268
1269            // Prune offsets journal ahead of data journal (impossible state)
1270            variable.offsets.prune(20).await.unwrap(); // Prune to position 20
1271            variable.data.prune(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1272
1273            variable.close().await.unwrap();
1274
1275            // === Verify corruption detected ===
1276            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1277            assert!(matches!(result, Err(Error::Corruption(_))));
1278        });
1279    }
1280
1281    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1282    #[test_traced]
1283    fn test_variable_recovery_append_crash_offsets_behind() {
1284        let executor = deterministic::Runner::default();
1285        executor.start(|context| async move {
1286            // === Setup: Create Variable wrapper with partial data ===
1287            let cfg = Config {
1288                partition: "recovery_append_crash".to_string(),
1289                items_per_section: NZU64!(10),
1290                compression: None,
1291                codec_config: (),
1292                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1293                write_buffer: NZUsize!(1024),
1294            };
1295
1296            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1297                .await
1298                .unwrap();
1299
1300            // Append 15 items to both journals (fills section 0, partial section 1)
1301            for i in 0..15u64 {
1302                variable.append(i * 100).await.unwrap();
1303            }
1304
1305            assert_eq!(variable.size(), 15);
1306
1307            // Manually append 5 more items directly to data journal only
1308            for i in 15..20u64 {
1309                variable.data.append(1, i * 100).await.unwrap();
1310            }
1311            // Offsets journal still has only 15 entries
1312
1313            variable.close().await.unwrap();
1314
1315            // === Verify recovery ===
1316            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1317                .await
1318                .unwrap();
1319
1320            // Init should rebuild offsets journal from data journal replay
1321            assert_eq!(variable.size(), 20);
1322            assert_eq!(variable.oldest_retained_pos(), Some(0));
1323
1324            // All items should be readable from both journals
1325            for i in 0..20u64 {
1326                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1327            }
1328
1329            // Offsets journal should be fully rebuilt to match data journal
1330            assert_eq!(variable.offsets.size(), 20);
1331
1332            variable.destroy().await.unwrap();
1333        });
1334    }
1335
1336    /// Test recovery from multiple prune operations with crash.
1337    #[test_traced]
1338    fn test_variable_recovery_multiple_prunes_crash() {
1339        let executor = deterministic::Runner::default();
1340        executor.start(|context| async move {
1341            // === Setup: Create Variable wrapper with data ===
1342            let cfg = Config {
1343                partition: "recovery_multiple_prunes".to_string(),
1344                items_per_section: NZU64!(10),
1345                compression: None,
1346                codec_config: (),
1347                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1348                write_buffer: NZUsize!(1024),
1349            };
1350
1351            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1352                .await
1353                .unwrap();
1354
1355            // Append 50 items across 5 sections to both journals
1356            for i in 0..50u64 {
1357                variable.append(i * 100).await.unwrap();
1358            }
1359
1360            // Prune to position 10 normally (both data and offsets journals pruned)
1361            variable.prune(10).await.unwrap();
1362            assert_eq!(variable.oldest_retained_pos(), Some(10));
1363
1364            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1365            // Manually prune data journal to section 3 (position 30)
1366            variable.data.prune(3).await.unwrap();
1367            // Offsets journal still thinks oldest is position 10
1368
1369            variable.close().await.unwrap();
1370
1371            // === Verify recovery ===
1372            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1373                .await
1374                .unwrap();
1375
1376            // Init should auto-repair: offsets journal pruned to match data journal
1377            assert_eq!(variable.oldest_retained_pos(), Some(30));
1378            assert_eq!(variable.size(), 50);
1379
1380            // Reads before position 30 should fail (pruned from both journals)
1381            assert!(matches!(
1382                variable.read(10).await,
1383                Err(crate::journal::Error::ItemPruned(_))
1384            ));
1385            assert!(matches!(
1386                variable.read(20).await,
1387                Err(crate::journal::Error::ItemPruned(_))
1388            ));
1389
1390            // Reads at position 30+ should succeed
1391            assert_eq!(variable.read(30).await.unwrap(), 3000);
1392            assert_eq!(variable.read(49).await.unwrap(), 4900);
1393
1394            variable.destroy().await.unwrap();
1395        });
1396    }
1397
1398    /// Test recovery from crash during rewind operation.
1399    ///
1400    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1401    /// This creates a situation where offsets journal has been rewound but data journal still
1402    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1403    /// offsets index across all sections to match the data journal.
1404    #[test_traced]
1405    fn test_variable_recovery_rewind_crash_multi_section() {
1406        let executor = deterministic::Runner::default();
1407        executor.start(|context| async move {
1408            // === Setup: Create Variable wrapper with data across multiple sections ===
1409            let cfg = Config {
1410                partition: "recovery_rewind_crash".to_string(),
1411                items_per_section: NZU64!(10),
1412                compression: None,
1413                codec_config: (),
1414                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1415                write_buffer: NZUsize!(1024),
1416            };
1417
1418            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1419                .await
1420                .unwrap();
1421
1422            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1423            for i in 0..25u64 {
1424                variable.append(i * 100).await.unwrap();
1425            }
1426
1427            assert_eq!(variable.size(), 25);
1428
1429            // === Simulate crash during rewind(5) ===
1430            // Rewind offsets journal to size 5 (keeps positions 0-4)
1431            variable.offsets.rewind(5).await.unwrap();
1432            // CRASH before data.rewind() completes - data still has all 3 sections
1433
1434            variable.close().await.unwrap();
1435
1436            // === Verify recovery ===
1437            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1438                .await
1439                .unwrap();
1440
1441            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1442            assert_eq!(variable.size(), 25);
1443            assert_eq!(variable.oldest_retained_pos(), Some(0));
1444
1445            // All items should be readable - offsets rebuilt correctly across all sections
1446            for i in 0..25u64 {
1447                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1448            }
1449
1450            // Verify offsets journal fully rebuilt
1451            assert_eq!(variable.offsets.size(), 25);
1452
1453            // Verify next append gets position 25
1454            let pos = variable.append(2500).await.unwrap();
1455            assert_eq!(pos, 25);
1456            assert_eq!(variable.read(25).await.unwrap(), 2500);
1457
1458            variable.destroy().await.unwrap();
1459        });
1460    }
1461
1462    /// Test recovery from crash after data sync but before offsets sync when journal was
1463    /// previously emptied by pruning.
1464    #[test_traced]
1465    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1466        let executor = deterministic::Runner::default();
1467        executor.start(|context| async move {
1468            let cfg = Config {
1469                partition: "recovery_empty_after_prune".to_string(),
1470                items_per_section: NZU64!(10),
1471                compression: None,
1472                codec_config: (),
1473                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1474                write_buffer: NZUsize!(1024),
1475            };
1476
1477            // === Phase 1: Create journal with one full section ===
1478            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1479                .await
1480                .unwrap();
1481
1482            // Append 10 items (positions 0-9), fills section 0
1483            for i in 0..10u64 {
1484                journal.append(i * 100).await.unwrap();
1485            }
1486            assert_eq!(journal.size(), 10);
1487            assert_eq!(journal.oldest_retained_pos(), Some(0));
1488
1489            // === Phase 2: Prune to create empty journal ===
1490            journal.prune(10).await.unwrap();
1491            assert_eq!(journal.size(), 10);
1492            assert_eq!(journal.oldest_retained_pos(), None); // Empty!
1493
1494            // === Phase 3: Append directly to data journal to simulate crash ===
1495            // Manually append to data journal only (bypassing Variable's append logic)
1496            // This simulates the case where data was synced but offsets wasn't
1497            for i in 10..20u64 {
1498                journal.data.append(1, i * 100).await.unwrap();
1499            }
1500            // Sync the data journal (section 1)
1501            journal.data.sync(1).await.unwrap();
1502            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1503
1504            // Close without syncing offsets
1505            drop(journal);
1506
1507            // === Phase 4: Verify recovery succeeds ===
1508            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1509                .await
1510                .expect("Should recover from crash after data sync but before offsets sync");
1511
1512            // All data should be recovered
1513            assert_eq!(journal.size(), 20);
1514            assert_eq!(journal.oldest_retained_pos(), Some(10));
1515
1516            // All items from position 10-19 should be readable
1517            for i in 10..20u64 {
1518                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1519            }
1520
1521            // Items 0-9 should be pruned
1522            for i in 0..10 {
1523                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1524            }
1525
1526            journal.destroy().await.unwrap();
1527        });
1528    }
1529
1530    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1531    #[test_traced]
1532    fn test_variable_concurrent_sync_recovery() {
1533        let executor = deterministic::Runner::default();
1534        executor.start(|context| async move {
1535            let cfg = Config {
1536                partition: "concurrent_sync_recovery".to_string(),
1537                items_per_section: NZU64!(10),
1538                compression: None,
1539                codec_config: (),
1540                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1541                write_buffer: NZUsize!(1024),
1542            };
1543
1544            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1545                .await
1546                .unwrap();
1547
1548            // Append items across a section boundary
1549            for i in 0..15u64 {
1550                journal.append(i * 100).await.unwrap();
1551            }
1552
1553            // Manually sync only data to simulate crash during concurrent sync
1554            journal.commit().await.unwrap();
1555
1556            // Simulate a crash (offsets not synced)
1557            drop(journal);
1558
1559            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1560                .await
1561                .unwrap();
1562
1563            // Data should be intact and offsets rebuilt
1564            assert_eq!(journal.size(), 15);
1565            for i in 0..15u64 {
1566                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1567            }
1568
1569            journal.destroy().await.unwrap();
1570        });
1571    }
1572
1573    #[test_traced]
1574    fn test_init_at_size_zero() {
1575        let executor = deterministic::Runner::default();
1576        executor.start(|context| async move {
1577            let cfg = Config {
1578                partition: "init_at_size_zero".to_string(),
1579                items_per_section: NZU64!(5),
1580                compression: None,
1581                codec_config: (),
1582                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1583                write_buffer: NZUsize!(1024),
1584            };
1585
1586            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1587                .await
1588                .unwrap();
1589
1590            // Size should be 0
1591            assert_eq!(journal.size(), 0);
1592
1593            // No oldest retained position (empty journal)
1594            assert_eq!(journal.oldest_retained_pos(), None);
1595
1596            // Next append should get position 0
1597            let pos = journal.append(100).await.unwrap();
1598            assert_eq!(pos, 0);
1599            assert_eq!(journal.size(), 1);
1600            assert_eq!(journal.read(0).await.unwrap(), 100);
1601
1602            journal.destroy().await.unwrap();
1603        });
1604    }
1605
1606    #[test_traced]
1607    fn test_init_at_size_section_boundary() {
1608        let executor = deterministic::Runner::default();
1609        executor.start(|context| async move {
1610            let cfg = Config {
1611                partition: "init_at_size_boundary".to_string(),
1612                items_per_section: NZU64!(5),
1613                compression: None,
1614                codec_config: (),
1615                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1616                write_buffer: NZUsize!(1024),
1617            };
1618
1619            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1620            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1621                .await
1622                .unwrap();
1623
1624            // Size should be 10
1625            assert_eq!(journal.size(), 10);
1626
1627            // No data yet, so no oldest retained position
1628            assert_eq!(journal.oldest_retained_pos(), None);
1629
1630            // Next append should get position 10
1631            let pos = journal.append(1000).await.unwrap();
1632            assert_eq!(pos, 10);
1633            assert_eq!(journal.size(), 11);
1634            assert_eq!(journal.read(10).await.unwrap(), 1000);
1635
1636            // Can continue appending
1637            let pos = journal.append(1001).await.unwrap();
1638            assert_eq!(pos, 11);
1639            assert_eq!(journal.read(11).await.unwrap(), 1001);
1640
1641            journal.destroy().await.unwrap();
1642        });
1643    }
1644
1645    #[test_traced]
1646    fn test_init_at_size_mid_section() {
1647        let executor = deterministic::Runner::default();
1648        executor.start(|context| async move {
1649            let cfg = Config {
1650                partition: "init_at_size_mid".to_string(),
1651                items_per_section: NZU64!(5),
1652                compression: None,
1653                codec_config: (),
1654                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1655                write_buffer: NZUsize!(1024),
1656            };
1657
1658            // Initialize at position 7 (middle of section 1 with items_per_section=5)
1659            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1660                .await
1661                .unwrap();
1662
1663            // Size should be 7
1664            assert_eq!(journal.size(), 7);
1665
1666            // No data yet, so no oldest retained position
1667            assert_eq!(journal.oldest_retained_pos(), None);
1668
1669            // Next append should get position 7
1670            let pos = journal.append(700).await.unwrap();
1671            assert_eq!(pos, 7);
1672            assert_eq!(journal.size(), 8);
1673            assert_eq!(journal.read(7).await.unwrap(), 700);
1674
1675            journal.destroy().await.unwrap();
1676        });
1677    }
1678
1679    #[test_traced]
1680    fn test_init_at_size_persistence() {
1681        let executor = deterministic::Runner::default();
1682        executor.start(|context| async move {
1683            let cfg = Config {
1684                partition: "init_at_size_persist".to_string(),
1685                items_per_section: NZU64!(5),
1686                compression: None,
1687                codec_config: (),
1688                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1689                write_buffer: NZUsize!(1024),
1690            };
1691
1692            // Initialize at position 15
1693            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1694                .await
1695                .unwrap();
1696
1697            // Append some items
1698            for i in 0..5u64 {
1699                let pos = journal.append(1500 + i).await.unwrap();
1700                assert_eq!(pos, 15 + i);
1701            }
1702
1703            assert_eq!(journal.size(), 20);
1704
1705            // Close and reopen
1706            journal.close().await.unwrap();
1707
1708            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1709                .await
1710                .unwrap();
1711
1712            // Size and data should be preserved
1713            assert_eq!(journal.size(), 20);
1714            assert_eq!(journal.oldest_retained_pos(), Some(15));
1715
1716            // Verify data
1717            for i in 0..5u64 {
1718                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1719            }
1720
1721            // Can continue appending
1722            let pos = journal.append(9999).await.unwrap();
1723            assert_eq!(pos, 20);
1724            assert_eq!(journal.read(20).await.unwrap(), 9999);
1725
1726            journal.destroy().await.unwrap();
1727        });
1728    }
1729
1730    #[test_traced]
1731    fn test_init_at_size_large_offset() {
1732        let executor = deterministic::Runner::default();
1733        executor.start(|context| async move {
1734            let cfg = Config {
1735                partition: "init_at_size_large".to_string(),
1736                items_per_section: NZU64!(5),
1737                compression: None,
1738                codec_config: (),
1739                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1740                write_buffer: NZUsize!(1024),
1741            };
1742
1743            // Initialize at a large position (position 1000)
1744            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1745                .await
1746                .unwrap();
1747
1748            assert_eq!(journal.size(), 1000);
1749            // No data yet, so no oldest retained position
1750            assert_eq!(journal.oldest_retained_pos(), None);
1751
1752            // Next append should get position 1000
1753            let pos = journal.append(100000).await.unwrap();
1754            assert_eq!(pos, 1000);
1755            assert_eq!(journal.read(1000).await.unwrap(), 100000);
1756
1757            journal.destroy().await.unwrap();
1758        });
1759    }
1760
1761    #[test_traced]
1762    fn test_init_at_size_prune_and_append() {
1763        let executor = deterministic::Runner::default();
1764        executor.start(|context| async move {
1765            let cfg = Config {
1766                partition: "init_at_size_prune".to_string(),
1767                items_per_section: NZU64!(5),
1768                compression: None,
1769                codec_config: (),
1770                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1771                write_buffer: NZUsize!(1024),
1772            };
1773
1774            // Initialize at position 20
1775            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1776                .await
1777                .unwrap();
1778
1779            // Append items 20-29
1780            for i in 0..10u64 {
1781                journal.append(2000 + i).await.unwrap();
1782            }
1783
1784            assert_eq!(journal.size(), 30);
1785
1786            // Prune to position 25
1787            journal.prune(25).await.unwrap();
1788
1789            assert_eq!(journal.size(), 30);
1790            assert_eq!(journal.oldest_retained_pos(), Some(25));
1791
1792            // Verify remaining items are readable
1793            for i in 25..30u64 {
1794                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1795            }
1796
1797            // Continue appending
1798            let pos = journal.append(3000).await.unwrap();
1799            assert_eq!(pos, 30);
1800
1801            journal.destroy().await.unwrap();
1802        });
1803    }
1804
1805    /// Test `init_sync` when there is no existing data on disk.
1806    #[test_traced]
1807    fn test_init_sync_no_existing_data() {
1808        let executor = deterministic::Runner::default();
1809        executor.start(|context| async move {
1810            let cfg = Config {
1811                partition: "test_fresh_start".into(),
1812                items_per_section: NZU64!(5),
1813                compression: None,
1814                codec_config: (),
1815                write_buffer: NZUsize!(1024),
1816                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1817            };
1818
1819            // Initialize journal with sync boundaries when no existing data exists
1820            let lower_bound = 10;
1821            let upper_bound = 26;
1822            let mut journal =
1823                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1824                    .await
1825                    .expect("Failed to initialize journal with sync boundaries");
1826
1827            assert_eq!(journal.size(), lower_bound);
1828            assert_eq!(journal.oldest_retained_pos(), None);
1829
1830            // Append items using the contiguous API
1831            let pos1 = journal.append(42u64).await.unwrap();
1832            assert_eq!(pos1, lower_bound);
1833            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1834
1835            let pos2 = journal.append(43u64).await.unwrap();
1836            assert_eq!(pos2, lower_bound + 1);
1837            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1838
1839            journal.destroy().await.unwrap();
1840        });
1841    }
1842
1843    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
1844    #[test_traced]
1845    fn test_init_sync_existing_data_overlap() {
1846        let executor = deterministic::Runner::default();
1847        executor.start(|context| async move {
1848            let cfg = Config {
1849                partition: "test_overlap".into(),
1850                items_per_section: NZU64!(5),
1851                compression: None,
1852                codec_config: (),
1853                write_buffer: NZUsize!(1024),
1854                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1855            };
1856
1857            // Create initial journal with data in multiple sections
1858            let mut journal =
1859                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1860                    .await
1861                    .expect("Failed to create initial journal");
1862
1863            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1864            for i in 0..20u64 {
1865                journal.append(i * 100).await.unwrap();
1866            }
1867            journal.close().await.unwrap();
1868
1869            // Initialize with sync boundaries that overlap with existing data
1870            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
1871            let lower_bound = 8;
1872            let upper_bound = 31;
1873            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1874                context.clone(),
1875                cfg.clone(),
1876                lower_bound..upper_bound,
1877            )
1878            .await
1879            .expect("Failed to initialize journal with overlap");
1880
1881            assert_eq!(journal.size(), 20);
1882
1883            // Verify oldest retained is pruned to lower_bound's section boundary (5)
1884            let oldest = journal.oldest_retained_pos();
1885            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1886
1887            // Verify data integrity: positions before 5 are pruned
1888            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1889            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1890
1891            // Positions 5-19 should be accessible
1892            assert_eq!(journal.read(5).await.unwrap(), 500);
1893            assert_eq!(journal.read(8).await.unwrap(), 800);
1894            assert_eq!(journal.read(19).await.unwrap(), 1900);
1895
1896            // Position 20+ should not exist yet
1897            assert!(matches!(
1898                journal.read(20).await,
1899                Err(Error::ItemOutOfRange(_))
1900            ));
1901
1902            // Assert journal can accept new items
1903            let pos = journal.append(999).await.unwrap();
1904            assert_eq!(pos, 20);
1905            assert_eq!(journal.read(20).await.unwrap(), 999);
1906
1907            journal.destroy().await.unwrap();
1908        });
1909    }
1910
1911    /// Test `init_sync` with invalid parameters.
1912    #[should_panic]
1913    #[test_traced]
1914    fn test_init_sync_invalid_parameters() {
1915        let executor = deterministic::Runner::default();
1916        executor.start(|context| async move {
1917            let cfg = Config {
1918                partition: "test_invalid".into(),
1919                items_per_section: NZU64!(5),
1920                compression: None,
1921                codec_config: (),
1922                write_buffer: NZUsize!(1024),
1923                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1924            };
1925
1926            #[allow(clippy::reversed_empty_ranges)]
1927            let _result = Journal::<deterministic::Context, u64>::init_sync(
1928                context.clone(),
1929                cfg,
1930                10..5, // invalid range: lower > upper
1931            )
1932            .await;
1933        });
1934    }
1935
1936    /// Test `init_sync` when existing data exactly matches the sync range.
1937    #[test_traced]
1938    fn test_init_sync_existing_data_exact_match() {
1939        let executor = deterministic::Runner::default();
1940        executor.start(|context| async move {
1941            let items_per_section = NZU64!(5);
1942            let cfg = Config {
1943                partition: "test_exact_match".to_string(),
1944                items_per_section,
1945                compression: None,
1946                codec_config: (),
1947                write_buffer: NZUsize!(1024),
1948                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1949            };
1950
1951            // Create initial journal with data exactly matching sync range
1952            let mut journal =
1953                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1954                    .await
1955                    .expect("Failed to create initial journal");
1956
1957            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1958            for i in 0..20u64 {
1959                journal.append(i * 100).await.unwrap();
1960            }
1961            journal.close().await.unwrap();
1962
1963            // Initialize with sync boundaries that exactly match existing data
1964            let lower_bound = 5; // section 1
1965            let upper_bound = 20; // section 3
1966            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1967                context.clone(),
1968                cfg.clone(),
1969                lower_bound..upper_bound,
1970            )
1971            .await
1972            .expect("Failed to initialize journal with exact match");
1973
1974            assert_eq!(journal.size(), 20);
1975
1976            // Verify pruning to lower bound (section 1 boundary = position 5)
1977            let oldest = journal.oldest_retained_pos();
1978            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1979
1980            // Verify positions before 5 are pruned
1981            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1982            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1983
1984            // Positions 5-19 should be accessible
1985            assert_eq!(journal.read(5).await.unwrap(), 500);
1986            assert_eq!(journal.read(10).await.unwrap(), 1000);
1987            assert_eq!(journal.read(19).await.unwrap(), 1900);
1988
1989            // Position 20+ should not exist yet
1990            assert!(matches!(
1991                journal.read(20).await,
1992                Err(Error::ItemOutOfRange(_))
1993            ));
1994
1995            // Assert journal can accept new operations
1996            let pos = journal.append(999).await.unwrap();
1997            assert_eq!(pos, 20);
1998            assert_eq!(journal.read(20).await.unwrap(), 999);
1999
2000            journal.destroy().await.unwrap();
2001        });
2002    }
2003
2004    /// Test `init_sync` when existing data exceeds the sync target range.
2005    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2006    #[test_traced]
2007    fn test_init_sync_existing_data_exceeds_upper_bound() {
2008        let executor = deterministic::Runner::default();
2009        executor.start(|context| async move {
2010            let items_per_section = NZU64!(5);
2011            let cfg = Config {
2012                partition: "test_unexpected_data".into(),
2013                items_per_section,
2014                compression: None,
2015                codec_config: (),
2016                write_buffer: NZUsize!(1024),
2017                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2018            };
2019
2020            // Create initial journal with data beyond sync range
2021            let mut journal =
2022                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2023                    .await
2024                    .expect("Failed to create initial journal");
2025
2026            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2027            for i in 0..30u64 {
2028                journal.append(i * 1000).await.unwrap();
2029            }
2030            journal.close().await.unwrap();
2031
2032            // Initialize with sync boundaries that are exceeded by existing data
2033            let lower_bound = 8; // section 1
2034            for upper_bound in 9..29 {
2035                let result = Journal::<deterministic::Context, u64>::init_sync(
2036                    context.clone(),
2037                    cfg.clone(),
2038                    lower_bound..upper_bound,
2039                )
2040                .await;
2041
2042                // Should return UnexpectedData error since data exists beyond upper_bound
2043                assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_))));
2044            }
2045        });
2046    }
2047
2048    /// Test `init_sync` when all existing data is stale (before lower bound).
2049    #[test_traced]
2050    fn test_init_sync_existing_data_stale() {
2051        let executor = deterministic::Runner::default();
2052        executor.start(|context| async move {
2053            let items_per_section = NZU64!(5);
2054            let cfg = Config {
2055                partition: "test_stale".into(),
2056                items_per_section,
2057                compression: None,
2058                codec_config: (),
2059                write_buffer: NZUsize!(1024),
2060                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2061            };
2062
2063            // Create initial journal with stale data
2064            let mut journal =
2065                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2066                    .await
2067                    .expect("Failed to create initial journal");
2068
2069            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2070            for i in 0..10u64 {
2071                journal.append(i * 100).await.unwrap();
2072            }
2073            journal.close().await.unwrap();
2074
2075            // Initialize with sync boundaries beyond all existing data
2076            let lower_bound = 15; // section 3
2077            let upper_bound = 26; // last element in section 5
2078            let journal = Journal::<deterministic::Context, u64>::init_sync(
2079                context.clone(),
2080                cfg.clone(),
2081                lower_bound..upper_bound,
2082            )
2083            .await
2084            .expect("Failed to initialize journal with stale data");
2085
2086            assert_eq!(journal.size(), 15);
2087
2088            // Verify fresh journal (all old data destroyed, starts at position 15)
2089            assert_eq!(journal.oldest_retained_pos(), None);
2090
2091            // Verify old positions don't exist
2092            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2093            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2094            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2095
2096            journal.destroy().await.unwrap();
2097        });
2098    }
2099
2100    /// Test `init_sync` with section boundary edge cases.
2101    #[test_traced]
2102    fn test_init_sync_section_boundaries() {
2103        let executor = deterministic::Runner::default();
2104        executor.start(|context| async move {
2105            let items_per_section = NZU64!(5);
2106            let cfg = Config {
2107                partition: "test_boundaries".into(),
2108                items_per_section,
2109                compression: None,
2110                codec_config: (),
2111                write_buffer: NZUsize!(1024),
2112                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2113            };
2114
2115            // Create journal with data at section boundaries
2116            let mut journal =
2117                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2118                    .await
2119                    .expect("Failed to create initial journal");
2120
2121            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2122            for i in 0..25u64 {
2123                journal.append(i * 100).await.unwrap();
2124            }
2125            journal.close().await.unwrap();
2126
2127            // Test sync boundaries exactly at section boundaries
2128            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2129            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2130            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2131                context.clone(),
2132                cfg.clone(),
2133                lower_bound..upper_bound,
2134            )
2135            .await
2136            .expect("Failed to initialize journal at boundaries");
2137
2138            assert_eq!(journal.size(), 25);
2139
2140            // Verify oldest retained is at section 3 boundary (position 15)
2141            let oldest = journal.oldest_retained_pos();
2142            assert_eq!(oldest, Some(15));
2143
2144            // Verify positions before 15 are pruned
2145            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2146            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2147
2148            // Verify positions 15-24 are accessible
2149            assert_eq!(journal.read(15).await.unwrap(), 1500);
2150            assert_eq!(journal.read(20).await.unwrap(), 2000);
2151            assert_eq!(journal.read(24).await.unwrap(), 2400);
2152
2153            // Position 25+ should not exist yet
2154            assert!(matches!(
2155                journal.read(25).await,
2156                Err(Error::ItemOutOfRange(_))
2157            ));
2158
2159            // Assert journal can accept new operations
2160            let pos = journal.append(999).await.unwrap();
2161            assert_eq!(pos, 25);
2162            assert_eq!(journal.read(25).await.unwrap(), 999);
2163
2164            journal.destroy().await.unwrap();
2165        });
2166    }
2167
2168    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2169    #[test_traced]
2170    fn test_init_sync_same_section_bounds() {
2171        let executor = deterministic::Runner::default();
2172        executor.start(|context| async move {
2173            let items_per_section = NZU64!(5);
2174            let cfg = Config {
2175                partition: "test_same_section".into(),
2176                items_per_section,
2177                compression: None,
2178                codec_config: (),
2179                write_buffer: NZUsize!(1024),
2180                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2181            };
2182
2183            // Create journal with data in multiple sections
2184            let mut journal =
2185                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2186                    .await
2187                    .expect("Failed to create initial journal");
2188
2189            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2190            for i in 0..15u64 {
2191                journal.append(i * 100).await.unwrap();
2192            }
2193            journal.close().await.unwrap();
2194
2195            // Test sync boundaries within the same section
2196            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2197            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2198            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2199                context.clone(),
2200                cfg.clone(),
2201                lower_bound..upper_bound,
2202            )
2203            .await
2204            .expect("Failed to initialize journal with same-section bounds");
2205
2206            assert_eq!(journal.size(), 15);
2207
2208            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2209            // Oldest retained position should be at section 2 boundary (position 10)
2210            let oldest = journal.oldest_retained_pos();
2211            assert_eq!(oldest, Some(10));
2212
2213            // Verify positions before 10 are pruned
2214            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2215            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2216
2217            // Verify positions 10-14 are accessible
2218            assert_eq!(journal.read(10).await.unwrap(), 1000);
2219            assert_eq!(journal.read(11).await.unwrap(), 1100);
2220            assert_eq!(journal.read(14).await.unwrap(), 1400);
2221
2222            // Position 15+ should not exist yet
2223            assert!(matches!(
2224                journal.read(15).await,
2225                Err(Error::ItemOutOfRange(_))
2226            ));
2227
2228            // Assert journal can accept new operations
2229            let pos = journal.append(999).await.unwrap();
2230            assert_eq!(pos, 15);
2231            assert_eq!(journal.read(15).await.unwrap(), 999);
2232
2233            journal.destroy().await.unwrap();
2234        });
2235    }
2236}