commonware_storage/journal/contiguous/
variable.rs

1//! Position-based journal for variable-length items.
2//!
3//! This journal enforces section fullness: all non-final sections are full and synced.
4//! On init, only the last section needs to be replayed to determine the exact size.
5
6use crate::{
7    journal::{
8        contiguous::{fixed, Contiguous},
9        segmented::variable,
10        Error,
11    },
12    mmr::Location,
13};
14use commonware_codec::Codec;
15use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
16use commonware_utils::NZUsize;
17use core::ops::Range;
18use futures::{stream, Stream, StreamExt as _};
19use std::{
20    num::{NonZeroU64, NonZeroUsize},
21    pin::Pin,
22};
23use tracing::{debug, info};
24
25const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
26
27/// Suffix appended to the base partition name for the data journal.
28const DATA_SUFFIX: &str = "_data";
29
30/// Suffix appended to the base partition name for the offsets journal.
31const OFFSETS_SUFFIX: &str = "_offsets";
32
33/// Calculate the section number for a given position.
34///
35/// # Arguments
36///
37/// * `position` - The absolute position in the journal
38/// * `items_per_section` - The number of items stored in each section
39///
40/// # Returns
41///
42/// The section number where the item at `position` should be stored.
43///
44/// # Examples
45///
46/// ```ignore
47/// // With 10 items per section:
48/// assert_eq!(position_to_section(0, 10), 0);   // position 0 -> section 0
49/// assert_eq!(position_to_section(9, 10), 0);   // position 9 -> section 0
50/// assert_eq!(position_to_section(10, 10), 1);  // position 10 -> section 1
51/// assert_eq!(position_to_section(25, 10), 2);  // position 25 -> section 2
52/// assert_eq!(position_to_section(30, 10), 3);  // position 30 -> section 3
53/// ```
54const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
55    position / items_per_section
56}
57
58/// Configuration for a [Journal].
59#[derive(Clone)]
60pub struct Config<C> {
61    /// Base partition name. Sub-partitions will be created by appending DATA_SUFFIX and OFFSETS_SUFFIX.
62    pub partition: String,
63
64    /// The number of items to store in each section.
65    ///
66    /// Once set, this value cannot be changed across restarts.
67    /// All non-final sections will be full and synced.
68    pub items_per_section: NonZeroU64,
69
70    /// Optional compression level for stored items.
71    pub compression: Option<u8>,
72
73    /// [Codec] configuration for encoding/decoding items.
74    pub codec_config: C,
75
76    /// Buffer pool for caching data.
77    pub buffer_pool: PoolRef,
78
79    /// Write buffer size for each section.
80    pub write_buffer: NonZeroUsize,
81}
82
83impl<C> Config<C> {
84    /// Returns the partition name for the data journal.
85    fn data_partition(&self) -> String {
86        format!("{}{}", self.partition, DATA_SUFFIX)
87    }
88
89    /// Returns the partition name for the offsets journal.
90    fn offsets_partition(&self) -> String {
91        format!("{}{}", self.partition, OFFSETS_SUFFIX)
92    }
93}
94
95/// A position-based journal for variable-length items.
96///
97/// This journal manages section assignment automatically, allowing callers to append items
98/// sequentially without manually tracking section numbers.
99///
100/// # Invariants
101///
102/// ## 1. Section Fullness
103///
104/// All non-final sections are full (`items_per_section` items) and synced. This ensures
105/// that on `init()`, we only need to replay the last section to determine the exact size.
106///
107/// ## 2. Data Journal is Source of Truth
108///
109/// The data journal is always the source of truth. The offsets journal is an index
110/// that may temporarily diverge during crashes. Divergences are automatically
111/// aligned during init():
112/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
113///   (This can happen if we crash after writing data journal but before writing offsets journal)
114/// * If offsets.size() > data.size(): Rewind offsets to match data size.
115///   (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
116/// * If offsets.oldest_retained_pos() < data.oldest_retained_pos(): Prune offsets to match
117///   (This can happen if we crash after pruning data journal but before pruning offsets journal)
118///
119/// Note that we don't recover from the case where offsets.oldest_retained_pos() >
120/// data.oldest_retained_pos(). This should never occur because we always prune the data journal
121/// before the offsets journal.
122pub struct Journal<E: Storage + Metrics, V: Codec> {
123    /// The underlying variable-length data journal.
124    data: variable::Journal<E, V>,
125
126    /// Index mapping positions to byte offsets within the data journal.
127    /// The section can be calculated from the position using items_per_section.
128    offsets: fixed::Journal<E, u32>,
129
130    /// The number of items per section.
131    ///
132    /// # Invariant
133    ///
134    /// This value is immutable after initialization and must remain consistent
135    /// across restarts. Changing this value will result in data loss or corruption.
136    items_per_section: u64,
137
138    /// The next position to be assigned on append (total items appended).
139    ///
140    /// # Invariant
141    ///
142    /// Always >= `oldest_retained_pos`. Equal when data journal is empty or fully pruned.
143    size: u64,
144
145    /// The position of the first item that remains after pruning.
146    ///
147    /// # Invariant
148    ///
149    /// Always section-aligned: `oldest_retained_pos % items_per_section == 0`.
150    /// Never decreases (pruning only moves forward).
151    oldest_retained_pos: u64,
152}
153
154impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
155    /// Initialize a contiguous variable journal.
156    ///
157    /// # Crash Recovery
158    ///
159    /// The data journal is the source of truth. If the offsets journal is inconsistent
160    /// it will be updated to match the data journal.
161    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
162        let items_per_section = cfg.items_per_section.get();
163        let data_partition = cfg.data_partition();
164        let offsets_partition = cfg.offsets_partition();
165
166        // Initialize underlying variable data journal
167        let mut data = variable::Journal::init(
168            context.clone(),
169            variable::Config {
170                partition: data_partition,
171                compression: cfg.compression,
172                codec_config: cfg.codec_config,
173                buffer_pool: cfg.buffer_pool.clone(),
174                write_buffer: cfg.write_buffer,
175            },
176        )
177        .await?;
178
179        // Initialize offsets journal
180        let mut offsets = fixed::Journal::init(
181            context,
182            fixed::Config {
183                partition: offsets_partition,
184                items_per_blob: cfg.items_per_section,
185                buffer_pool: cfg.buffer_pool,
186                write_buffer: cfg.write_buffer,
187            },
188        )
189        .await?;
190
191        // Validate and align offsets journal to match data journal
192        let (oldest_retained_pos, size) =
193            Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
194        assert!(
195            oldest_retained_pos.is_multiple_of(items_per_section),
196            "oldest_retained_pos is not section-aligned"
197        );
198
199        Ok(Self {
200            data,
201            offsets,
202            items_per_section,
203            size,
204            oldest_retained_pos,
205        })
206    }
207
208    /// Initialize a [Journal] in a fully pruned state at a specific logical size.
209    ///
210    /// This creates a journal that reports `size()` as `size` but contains no data.
211    /// The `oldest_retained_pos()` will return `None`, indicating all positions before
212    /// `size` have been pruned. This is useful for state sync when starting from
213    /// a non-zero position without historical data.
214    ///
215    /// # Arguments
216    ///
217    /// * `size` - The logical size to initialize at.
218    ///
219    /// # Post-conditions
220    ///
221    /// * `size()` returns `size`
222    /// * `oldest_retained_pos()` returns `None` (fully pruned)
223    /// * Next append receives position `size`
224    pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
225        // Initialize empty data journal
226        let data = variable::Journal::init(
227            context.clone(),
228            variable::Config {
229                partition: cfg.data_partition(),
230                compression: cfg.compression,
231                codec_config: cfg.codec_config.clone(),
232                buffer_pool: cfg.buffer_pool.clone(),
233                write_buffer: cfg.write_buffer,
234            },
235        )
236        .await?;
237
238        // Initialize offsets journal at the target size
239        let offsets = crate::adb::any::fixed::sync::init_journal_at_size(
240            context,
241            fixed::Config {
242                partition: cfg.offsets_partition(),
243                items_per_blob: cfg.items_per_section,
244                buffer_pool: cfg.buffer_pool,
245                write_buffer: cfg.write_buffer,
246            },
247            size,
248        )
249        .await?;
250
251        Ok(Self {
252            data,
253            offsets,
254            items_per_section: cfg.items_per_section.get(),
255            size,
256            oldest_retained_pos: size,
257        })
258    }
259
260    /// Initialize a [Journal] for use in state sync.
261    ///
262    /// The bounds are item locations (not section numbers). This function prepares the
263    /// on-disk journal so that subsequent appends go to the correct physical location for the
264    /// requested range.
265    ///
266    /// Behavior by existing on-disk state:
267    /// - Fresh (no data): returns an empty journal.
268    /// - Stale (all data strictly before `range.start`): destroys existing data and returns an
269    ///   empty journal.
270    /// - Overlap within [`range.start`, `range.end`]:
271    ///   - Prunes to `range.start`
272    /// - Unexpected data beyond `range.end`: returns [crate::adb::Error::UnexpectedData].
273    ///
274    /// # Arguments
275    /// - `context`: storage context
276    /// - `cfg`: journal configuration
277    /// - `range`: range of item locations to retain
278    ///
279    /// # Returns
280    /// A contiguous journal ready for sync operations. The journal's size will be within the range.
281    ///
282    /// # Errors
283    /// Returns [crate::adb::Error::UnexpectedData] if existing data extends beyond `range.end`.
284    pub(crate) async fn init_sync(
285        context: E,
286        cfg: Config<V::Cfg>,
287        range: Range<u64>,
288    ) -> Result<Journal<E, V>, crate::adb::Error> {
289        assert!(!range.is_empty(), "range must not be empty");
290
291        debug!(
292            range.start,
293            range.end,
294            items_per_section = cfg.items_per_section.get(),
295            "initializing contiguous variable journal for sync"
296        );
297
298        // Initialize contiguous journal
299        let mut journal = Journal::init(context.with_label("journal"), cfg.clone()).await?;
300
301        let size = journal.size();
302
303        // No existing data - initialize at the start of the sync range if needed
304        if size == 0 {
305            if range.start == 0 {
306                debug!("no existing journal data, returning empty journal");
307                return Ok(journal);
308            } else {
309                debug!(
310                    range.start,
311                    "no existing journal data, initializing at sync range start"
312                );
313                journal.destroy().await?;
314                return Ok(Journal::init_at_size(context, cfg, range.start).await?);
315            }
316        }
317
318        // Check if data exceeds the sync range
319        if size > range.end {
320            return Err(crate::adb::Error::UnexpectedData(Location::new_unchecked(
321                size,
322            )));
323        }
324
325        // If all existing data is before our sync range, destroy and recreate fresh
326        if size <= range.start {
327            // All data is stale (ends at or before range.start)
328            debug!(
329                size,
330                range.start, "existing journal data is stale, re-initializing at start position"
331            );
332            journal.destroy().await?;
333            return Ok(Journal::init_at_size(context, cfg, range.start).await?);
334        }
335
336        // Prune to lower bound if needed
337        let oldest = journal.oldest_retained_pos();
338        if let Some(oldest_pos) = oldest {
339            if oldest_pos < range.start {
340                debug!(
341                    oldest_pos,
342                    range.start, "pruning journal to sync range start"
343                );
344                journal.prune(range.start).await?;
345            }
346        }
347
348        Ok(journal)
349    }
350
351    /// Rewind the journal to the given size, discarding items from the end.
352    ///
353    /// After rewinding to size N, the journal will contain exactly N items,
354    /// and the next append will receive position N.
355    ///
356    /// # Errors
357    ///
358    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
359    ///
360    /// Underlying storage operations may leave the journal in an inconsistent state.
361    /// The journal should be closed and reopened to trigger alignment in [Journal::init].
362    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
363        // Validate rewind target
364        match size.cmp(&self.size) {
365            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
366            std::cmp::Ordering::Equal => return Ok(()), // No-op
367            std::cmp::Ordering::Less => {}
368        }
369
370        // Rewind never updates oldest_retained_pos.
371        if size < self.oldest_retained_pos {
372            return Err(Error::ItemPruned(size));
373        }
374
375        // Read the offset of the first item to discard (at position 'size').
376        let discard_offset = self.offsets.read(size).await?;
377        let discard_section = position_to_section(size, self.items_per_section);
378
379        self.data
380            .rewind_to_offset(discard_section, discard_offset)
381            .await?;
382        self.offsets.rewind(size).await?;
383
384        // Update our size
385        self.size = size;
386
387        Ok(())
388    }
389
390    /// Append a new item to the journal, returning its position.
391    ///
392    /// The position returned is a stable, consecutively increasing value starting from 0.
393    /// This position remains constant after pruning.
394    ///
395    /// When a section becomes full, both the data journal and offsets journal are synced
396    /// to maintain the invariant that all non-final sections are full and consistent.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the underlying storage operation fails or if the item cannot
401    /// be encoded.
402    ///
403    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
404    /// reopened to trigger alignment in [Journal::init].
405    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
406        // Calculate which section this position belongs to
407        let section = self.current_section();
408
409        // Append to data journal, get offset
410        let (offset, _size) = self.data.append(section, item).await?;
411
412        // Append offset to offsets journal
413        let offsets_pos = self.offsets.append(offset).await?;
414        assert_eq!(offsets_pos, self.size);
415
416        // Return the current position
417        let position = self.size;
418        self.size += 1;
419
420        // Maintain invariant that all full sections are synced.
421        if self.size.is_multiple_of(self.items_per_section) {
422            futures::try_join!(self.data.sync(section), self.offsets.sync())?;
423        }
424
425        Ok(position)
426    }
427
428    /// Return the total number of items that have been appended to the journal.
429    ///
430    /// This count is NOT affected by pruning. The next appended item will receive this
431    /// position as its value.
432    pub fn size(&self) -> u64 {
433        self.size
434    }
435
436    /// Return the position of the oldest item still retained in the journal.
437    ///
438    /// Returns `None` if the journal is empty or if all items have been pruned.
439    // TODO(#2056): Disambiguate between pruning boundary and oldest retained position.
440    pub fn oldest_retained_pos(&self) -> Option<u64> {
441        if self.size == self.oldest_retained_pos {
442            // No items retained: either never had data or fully pruned
443            None
444        } else {
445            Some(self.oldest_retained_pos)
446        }
447    }
448
449    /// Prune items at positions strictly less than `min_position`.
450    ///
451    /// Returns `true` if any data was pruned, `false` otherwise.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the underlying storage operation fails.
456    ///
457    /// Errors may leave the journal in an inconsistent state. The journal should be closed and
458    /// reopened to trigger alignment in [Journal::init].
459    pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
460        if min_position <= self.oldest_retained_pos {
461            return Ok(false);
462        }
463
464        // Cap min_position to size to maintain the invariant oldest_retained_pos <= size
465        let min_position = min_position.min(self.size);
466
467        // Calculate section number
468        let min_section = position_to_section(min_position, self.items_per_section);
469
470        let pruned = self.data.prune(min_section).await?;
471        if pruned {
472            self.oldest_retained_pos = min_section * self.items_per_section;
473            self.offsets.prune(self.oldest_retained_pos).await?;
474        }
475        Ok(pruned)
476    }
477
478    /// Read the item at the given position.
479    ///
480    /// # Errors
481    ///
482    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
483    /// - Returns [Error::ItemOutOfRange] if `position` is beyond the journal size.
484    /// - Returns other errors if storage or decoding fails.
485    pub async fn read(&self, position: u64) -> Result<V, Error> {
486        // Check bounds
487        if position >= self.size {
488            return Err(Error::ItemOutOfRange(position));
489        }
490
491        if position < self.oldest_retained_pos {
492            return Err(Error::ItemPruned(position));
493        }
494
495        // Read offset from journal and calculate section from position
496        let offset = self.offsets.read(position).await?;
497        let section = position_to_section(position, self.items_per_section);
498
499        // Read item from data journal
500        self.data.get(section, offset).await
501    }
502
503    /// Sync only the data journal to storage, without syncing the offsets journal.
504    ///
505    /// This is faster than `sync()` and can be used to ensure data durability without
506    /// the overhead of syncing the offsets journal.
507    ///
508    /// We call `sync` when appending to a section, so the offsets journal will eventually be
509    /// synced as well, maintaining the invariant that all nonfinal sections are fully synced.
510    /// In other words, the journal will remain in a consistent, recoverable state even if a crash
511    /// occurs after calling this method but before calling `sync`.
512    pub async fn sync_data(&mut self) -> Result<(), Error> {
513        let section = self.current_section();
514        self.data.sync(section).await
515    }
516
517    /// Sync all pending writes to storage.
518    ///
519    /// This syncs both the data journal and the offsets journal concurrently.
520    pub async fn sync(&mut self) -> Result<(), Error> {
521        // Sync only the current (final) section of the data journal.
522        // All non-final sections are already synced per Invariant #1.
523        let section = self.current_section();
524
525        // Sync both journals concurrently
526        futures::try_join!(self.data.sync(section), self.offsets.sync())?;
527
528        Ok(())
529    }
530
531    /// Close the journal, syncing all pending writes.
532    ///
533    /// This closes both the data journal and the offsets journal.
534    pub async fn close(mut self) -> Result<(), Error> {
535        self.sync().await?;
536        self.data.close().await?;
537        self.offsets.close().await
538    }
539
540    /// Remove any underlying blobs created by the journal.
541    ///
542    /// This destroys both the data journal and the offsets journal.
543    pub async fn destroy(self) -> Result<(), Error> {
544        self.data.destroy().await?;
545        self.offsets.destroy().await
546    }
547
548    /// Return the section number where the next append will write.
549    const fn current_section(&self) -> u64 {
550        position_to_section(self.size, self.items_per_section)
551    }
552
553    /// Align the offsets journal and data journal to be consistent in case a crash occured
554    /// on a previous run and left the journals in an inconsistent state.
555    ///
556    /// The data journal is the source of truth. This function scans it to determine
557    /// what SHOULD be in the offsets journal, then fixes any mismatches.
558    ///
559    /// # Returns
560    ///
561    /// Returns `(oldest_retained_pos, size)` for the contiguous journal.
562    async fn align_journals(
563        data: &mut variable::Journal<E, V>,
564        offsets: &mut fixed::Journal<E, u32>,
565        items_per_section: u64,
566    ) -> Result<(u64, u64), Error> {
567        // === Handle empty data journal case ===
568        let items_in_last_section = match data.blobs.last_key_value() {
569            Some((last_section, _)) => {
570                let stream = data.replay(*last_section, 0, REPLAY_BUFFER_SIZE).await?;
571                futures::pin_mut!(stream);
572                let mut count = 0u64;
573                while let Some(result) = stream.next().await {
574                    result?; // Propagate replay errors (corruption, etc.)
575                    count += 1;
576                }
577                count
578            }
579            None => 0,
580        };
581
582        // Data journal is empty if there are no sections or if there is one section and it has no items.
583        // The latter should only occur if a crash occured after opening a data journal blob but
584        // before writing to it.
585        let data_empty =
586            data.blobs.is_empty() || (data.blobs.len() == 1 && items_in_last_section == 0);
587        if data_empty {
588            let size = offsets.size().await;
589
590            if !data.blobs.is_empty() {
591                // A section exists but contains 0 items. This can happen in two cases:
592                // 1. Rewind crash: we rewound the data journal but crashed before rewinding offsets
593                // 2. First append crash: we opened the first section blob but crashed before writing to it
594                // In both cases, calculate target position from the first remaining section
595                let first_section = *data.blobs.first_key_value().unwrap().0;
596                let target_pos = first_section * items_per_section;
597
598                info!("crash repair: rewinding offsets from {size} to {target_pos}");
599                offsets.rewind(target_pos).await?;
600                offsets.sync().await?;
601                return Ok((target_pos, target_pos));
602            }
603
604            // data.blobs is empty. This can happen in two cases:
605            // 1. We completely pruned the data journal but crashed before pruning
606            //    the offsets journal.
607            // 2. The data journal was never opened.
608            if let Some(oldest) = offsets.oldest_retained_pos().await? {
609                if oldest < size {
610                    // Offsets has unpruned entries but data is gone - align by pruning
611                    info!("crash repair: pruning offsets to {size} (prune-all crash)");
612                    offsets.prune(size).await?;
613                    offsets.sync().await?;
614                }
615            }
616
617            return Ok((size, size));
618        }
619
620        // === Handle non-empty data journal case ===
621        let (data_oldest_pos, data_size) = {
622            // Data exists -- count items
623            let first_section = *data.blobs.first_key_value().unwrap().0;
624            let last_section = *data.blobs.last_key_value().unwrap().0;
625
626            let oldest_pos = first_section * items_per_section;
627
628            // Invariant 1 on `Variable` guarantees that all sections except possibly the last
629            // are full. Therefore, the size of the journal is the number of items in the last
630            // section plus the number of items in the other sections.
631            let size = (last_section * items_per_section) + items_in_last_section;
632            (oldest_pos, size)
633        };
634        assert_ne!(
635            data_oldest_pos, data_size,
636            "data journal expected to be non-empty"
637        );
638
639        // Align pruning state. We always prune the data journal before the offsets journal,
640        // so we validate that invariant and repair crash faults or detect corruption.
641        match offsets.oldest_retained_pos().await? {
642            Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
643                // Offsets behind on pruning -- prune to catch up
644                info!("crash repair: pruning offsets journal to {data_oldest_pos}");
645                offsets.prune(data_oldest_pos).await?;
646            }
647            Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
648                return Err(Error::Corruption(format!(
649                    "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
650                )));
651            }
652            Some(_) => {
653                // Both journals are pruned to the same position.
654            }
655            None if data_oldest_pos > 0 => {
656                // Offsets journal is empty (size == oldest_retained_pos).
657                // This can happen if we pruned all data, then appended new data, synced the
658                // data journal, but crashed before syncing the offsets journal.
659                // We can recover if offsets.size() matches data_oldest_pos (proper pruning).
660                let offsets_size = offsets.size().await;
661                if offsets_size != data_oldest_pos {
662                    return Err(Error::Corruption(format!(
663                        "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
664                    )));
665                }
666                info!("crash repair: offsets journal empty at {data_oldest_pos}");
667            }
668            None => {
669                // Both journals are empty/fully pruned.
670            }
671        }
672
673        let offsets_size = offsets.size().await;
674        if offsets_size > data_size {
675            // We must have crashed after writing offsets but before writing data.
676            info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
677            offsets.rewind(data_size).await?;
678        } else if offsets_size < data_size {
679            // We must have crashed after writing the data journal but before writing the offsets
680            // journal.
681            Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
682        }
683
684        assert_eq!(offsets.size().await, data_size);
685        // Oldest retained position is always Some because the data journal is non-empty.
686        assert_eq!(offsets.oldest_retained_pos().await?, Some(data_oldest_pos));
687
688        offsets.sync().await?;
689        Ok((data_oldest_pos, data_size))
690    }
691
692    /// Rebuild missing offset entries by replaying the data journal and
693    /// appending the missing entries to the offsets journal.
694    ///
695    /// The data journal is the source of truth. This function brings the offsets
696    /// journal up to date by replaying data items and indexing their positions.
697    ///
698    /// # Warning
699    ///
700    /// - Panics if `data.blobs` is empty
701    /// - Panics if `offsets_size` >= `data.size()`
702    async fn add_missing_offsets(
703        data: &variable::Journal<E, V>,
704        offsets: &mut fixed::Journal<E, u32>,
705        offsets_size: u64,
706        items_per_section: u64,
707    ) -> Result<(), Error> {
708        assert!(
709            !data.blobs.is_empty(),
710            "rebuild_offsets called with empty data journal"
711        );
712
713        // Find where to start replaying
714        let (start_section, resume_offset, skip_first) =
715            if let Some(oldest) = offsets.oldest_retained_pos().await? {
716                if oldest < offsets_size {
717                    // Offsets has items -- resume from last indexed position
718                    let last_offset = offsets.read(offsets_size - 1).await?;
719                    let last_section = position_to_section(offsets_size - 1, items_per_section);
720                    (last_section, last_offset, true)
721                } else {
722                    // Offsets fully pruned but data has items -- start from first data section
723                    // SAFETY: data.blobs is non-empty (checked above)
724                    let first_section = *data.blobs.first_key_value().unwrap().0;
725                    (first_section, 0, false)
726                }
727            } else {
728                // Offsets empty -- start from first data section
729                // SAFETY: data.blobs is non-empty (checked above)
730                let first_section = *data.blobs.first_key_value().unwrap().0;
731                (first_section, 0, false)
732            };
733
734        // Replay data journal from start position through the end and index all items.
735        // The data journal is the source of truth, so we consume the entire stream.
736        // (replay streams from start_section onwards through all subsequent sections)
737        let stream = data
738            .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
739            .await?;
740        futures::pin_mut!(stream);
741
742        let mut skipped_first = false;
743        while let Some(result) = stream.next().await {
744            let (_section, offset, _size, _item) = result?;
745
746            // Skip first item if resuming from last indexed offset
747            if skip_first && !skipped_first {
748                skipped_first = true;
749                continue;
750            }
751
752            offsets.append(offset).await?;
753        }
754
755        Ok(())
756    }
757}
758
759impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
760    /// Return a stream of all items in the journal starting from `start_pos`.
761    ///
762    /// Each item is yielded as a tuple `(position, item)` where position is the item's
763    /// position in the journal.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
768    /// errors occur during replay.
769    pub async fn replay(
770        &self,
771        start_pos: u64,
772        buffer_size: NonZeroUsize,
773    ) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error> {
774        // Validate start position is within bounds.
775        if start_pos < self.oldest_retained_pos {
776            return Err(Error::ItemPruned(start_pos));
777        }
778        if start_pos > self.size {
779            return Err(Error::ItemOutOfRange(start_pos));
780        }
781
782        // If replaying at exactly size, return empty stream
783        if start_pos == self.size {
784            return Ok(Box::pin(stream::empty()));
785        }
786
787        // Use offsets index to find offset to start from, calculate section from position
788        let start_offset = self.offsets.read(start_pos).await?;
789        let start_section = position_to_section(start_pos, self.items_per_section);
790        let data_stream = self
791            .data
792            .replay(start_section, start_offset, buffer_size)
793            .await?;
794
795        // Transform the stream to include position information
796        let transformed = data_stream.enumerate().map(move |(idx, result)| {
797            result.map(|(_section, _offset, _size, item)| {
798                // Calculate position: start_pos + items read
799                let pos = start_pos + idx as u64;
800                (pos, item)
801            })
802        });
803
804        Ok(Box::pin(transformed))
805    }
806}
807
808// Implement Contiguous trait for variable-length items
809impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V> {
810    type Item = V;
811
812    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
813        Journal::append(self, item).await
814    }
815
816    async fn size(&self) -> u64 {
817        Journal::size(self)
818    }
819
820    async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
821        Ok(Journal::oldest_retained_pos(self))
822    }
823
824    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
825        Journal::prune(self, min_position).await
826    }
827
828    async fn replay(
829        &self,
830        start_pos: u64,
831        buffer: NonZeroUsize,
832    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
833        Journal::replay(self, start_pos, buffer).await
834    }
835
836    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
837        Journal::read(self, position).await
838    }
839
840    async fn sync(&mut self) -> Result<(), Error> {
841        Journal::sync(self).await
842    }
843
844    async fn close(self) -> Result<(), Error> {
845        Journal::close(self).await
846    }
847
848    async fn destroy(self) -> Result<(), Error> {
849        Journal::destroy(self).await
850    }
851
852    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
853        Journal::rewind(self, size).await
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860    use crate::journal::contiguous::tests::run_contiguous_tests;
861    use commonware_macros::test_traced;
862    use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
863    use commonware_utils::{NZUsize, NZU64};
864    use futures::FutureExt as _;
865
866    // Use some jank sizes to exercise boundary conditions.
867    const PAGE_SIZE: usize = 101;
868    const PAGE_CACHE_SIZE: usize = 2;
869
870    /// Test that complete offsets partition loss after pruning is detected as unrecoverable.
871    ///
872    /// When the offsets partition is completely lost and the data has been pruned, we cannot
873    /// rebuild the index with correct position alignment (would require creating placeholder blobs).
874    /// This is a genuine external failure that should be detected and reported clearly.
875    #[test_traced]
876    fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
877        let executor = deterministic::Runner::default();
878        executor.start(|context| async move {
879            let cfg = Config {
880                partition: "offsets_loss_after_prune".to_string(),
881                items_per_section: NZU64!(10),
882                compression: None,
883                codec_config: (),
884                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
885                write_buffer: NZUsize!(1024),
886            };
887
888            // === Phase 1: Create journal with data and prune ===
889            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
890                .await
891                .unwrap();
892
893            // Append 40 items across 4 sections (0-3)
894            for i in 0..40u64 {
895                journal.append(i * 100).await.unwrap();
896            }
897
898            // Prune to position 20 (removes sections 0-1, keeps sections 2-3)
899            journal.prune(20).await.unwrap();
900            assert_eq!(journal.oldest_retained_pos(), Some(20));
901            assert_eq!(journal.size(), 40);
902
903            journal.close().await.unwrap();
904
905            // === Phase 2: Simulate complete offsets partition loss ===
906            context
907                .remove(&cfg.offsets_partition(), None)
908                .await
909                .expect("Failed to remove offsets partition");
910
911            // === Phase 3: Verify this is detected as unrecoverable ===
912            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
913            assert!(matches!(result, Err(Error::Corruption(_))));
914        });
915    }
916
917    /// Test that init aligns state when data is pruned/lost but offsets survives.
918    ///
919    /// This handles both:
920    /// 1. Crash during prune-all (data pruned, offsets not yet)
921    /// 2. External data partition loss
922    ///
923    /// In both cases, we align by pruning offsets to match.
924    #[test_traced]
925    fn test_variable_align_data_offsets_mismatch() {
926        let executor = deterministic::Runner::default();
927        executor.start(|context| async move {
928            let cfg = Config {
929                partition: "data_loss_test".to_string(),
930                items_per_section: NZU64!(10),
931                compression: None,
932                codec_config: (),
933                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
934                write_buffer: NZUsize!(1024),
935            };
936
937            // === Setup: Create journal with data ===
938            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
939                .await
940                .unwrap();
941
942            // Append 20 items across 2 sections
943            for i in 0..20u64 {
944                variable.append(i * 100).await.unwrap();
945            }
946
947            variable.close().await.unwrap();
948
949            // === Simulate data loss: Delete data partition but keep offsets ===
950            context
951                .remove(&cfg.data_partition(), None)
952                .await
953                .expect("Failed to remove data partition");
954
955            // === Verify init aligns the mismatch ===
956            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
957                .await
958                .expect("Should align offsets to match empty data");
959
960            // Size should be preserved
961            assert_eq!(journal.size(), 20);
962
963            // But no items remain (both journals pruned)
964            assert_eq!(journal.oldest_retained_pos(), None);
965
966            // All reads should fail with ItemPruned
967            for i in 0..20 {
968                assert!(matches!(
969                    journal.read(i).await,
970                    Err(crate::journal::Error::ItemPruned(_))
971                ));
972            }
973
974            // Can append new data starting at position 20
975            let pos = journal.append(999).await.unwrap();
976            assert_eq!(pos, 20);
977            assert_eq!(journal.read(20).await.unwrap(), 999);
978
979            journal.destroy().await.unwrap();
980        });
981    }
982
983    #[test_traced]
984    fn test_variable_contiguous() {
985        let executor = deterministic::Runner::default();
986        executor.start(|context| async move {
987            run_contiguous_tests(move |test_name: String| {
988                let context = context.clone();
989                async move {
990                    Journal::<_, u64>::init(
991                        context,
992                        Config {
993                            partition: format!("generic_test_{test_name}"),
994                            items_per_section: NZU64!(10),
995                            compression: None,
996                            codec_config: (),
997                            buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
998                            write_buffer: NZUsize!(1024),
999                        },
1000                    )
1001                    .await
1002                }
1003                .boxed()
1004            })
1005            .await;
1006        });
1007    }
1008
1009    /// Test multiple sequential prunes with Variable-specific guarantees.
1010    #[test_traced]
1011    fn test_variable_multiple_sequential_prunes() {
1012        let executor = deterministic::Runner::default();
1013        executor.start(|context| async move {
1014            let cfg = Config {
1015                partition: "sequential_prunes".to_string(),
1016                items_per_section: NZU64!(10),
1017                compression: None,
1018                codec_config: (),
1019                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1020                write_buffer: NZUsize!(1024),
1021            };
1022
1023            let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1024
1025            // Append items across 4 sections: [0-9], [10-19], [20-29], [30-39]
1026            for i in 0..40u64 {
1027                journal.append(i * 100).await.unwrap();
1028            }
1029
1030            // Initial state: all items accessible
1031            assert_eq!(journal.oldest_retained_pos(), Some(0));
1032            assert_eq!(journal.size(), 40);
1033
1034            // First prune: remove section 0 (positions 0-9)
1035            let pruned = journal.prune(10).await.unwrap();
1036            assert!(pruned);
1037
1038            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1039            let oldest = journal.oldest_retained_pos().unwrap();
1040            assert_eq!(oldest, 10);
1041
1042            // Items 0-9 should be pruned, 10+ should be accessible
1043            assert!(matches!(
1044                journal.read(0).await,
1045                Err(crate::journal::Error::ItemPruned(_))
1046            ));
1047            assert_eq!(journal.read(10).await.unwrap(), 1000);
1048            assert_eq!(journal.read(19).await.unwrap(), 1900);
1049
1050            // Second prune: remove section 1 (positions 10-19)
1051            let pruned = journal.prune(20).await.unwrap();
1052            assert!(pruned);
1053
1054            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1055            let oldest = journal.oldest_retained_pos().unwrap();
1056            assert_eq!(oldest, 20);
1057
1058            // Items 0-19 should be pruned, 20+ should be accessible
1059            assert!(matches!(
1060                journal.read(10).await,
1061                Err(crate::journal::Error::ItemPruned(_))
1062            ));
1063            assert!(matches!(
1064                journal.read(19).await,
1065                Err(crate::journal::Error::ItemPruned(_))
1066            ));
1067            assert_eq!(journal.read(20).await.unwrap(), 2000);
1068            assert_eq!(journal.read(29).await.unwrap(), 2900);
1069
1070            // Third prune: remove section 2 (positions 20-29)
1071            let pruned = journal.prune(30).await.unwrap();
1072            assert!(pruned);
1073
1074            // Variable-specific guarantee: oldest is EXACTLY at section boundary
1075            let oldest = journal.oldest_retained_pos().unwrap();
1076            assert_eq!(oldest, 30);
1077
1078            // Items 0-29 should be pruned, 30+ should be accessible
1079            assert!(matches!(
1080                journal.read(20).await,
1081                Err(crate::journal::Error::ItemPruned(_))
1082            ));
1083            assert!(matches!(
1084                journal.read(29).await,
1085                Err(crate::journal::Error::ItemPruned(_))
1086            ));
1087            assert_eq!(journal.read(30).await.unwrap(), 3000);
1088            assert_eq!(journal.read(39).await.unwrap(), 3900);
1089
1090            // Size should still be 40 (pruning doesn't affect size)
1091            assert_eq!(journal.size(), 40);
1092
1093            journal.destroy().await.unwrap();
1094        });
1095    }
1096
1097    /// Test that pruning all data and re-initializing preserves positions.
1098    #[test_traced]
1099    fn test_variable_prune_all_then_reinit() {
1100        let executor = deterministic::Runner::default();
1101        executor.start(|context| async move {
1102            let cfg = Config {
1103                partition: "prune_all_reinit".to_string(),
1104                items_per_section: NZU64!(10),
1105                compression: None,
1106                codec_config: (),
1107                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1108                write_buffer: NZUsize!(1024),
1109            };
1110
1111            // === Phase 1: Create journal and append data ===
1112            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1113                .await
1114                .unwrap();
1115
1116            for i in 0..100u64 {
1117                journal.append(i * 100).await.unwrap();
1118            }
1119
1120            assert_eq!(journal.size(), 100);
1121            assert_eq!(journal.oldest_retained_pos(), Some(0));
1122
1123            // === Phase 2: Prune all data ===
1124            let pruned = journal.prune(100).await.unwrap();
1125            assert!(pruned);
1126
1127            // All data is pruned - no items remain
1128            assert_eq!(journal.size(), 100);
1129            assert_eq!(journal.oldest_retained_pos(), None);
1130
1131            // All reads should fail with ItemPruned
1132            for i in 0..100 {
1133                assert!(matches!(
1134                    journal.read(i).await,
1135                    Err(crate::journal::Error::ItemPruned(_))
1136                ));
1137            }
1138
1139            journal.close().await.unwrap();
1140
1141            // === Phase 3: Re-init and verify position preserved ===
1142            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1143                .await
1144                .unwrap();
1145
1146            // Size should be preserved, but no items remain
1147            assert_eq!(journal.size(), 100);
1148            assert_eq!(journal.oldest_retained_pos(), None);
1149
1150            // All reads should still fail
1151            for i in 0..100 {
1152                assert!(matches!(
1153                    journal.read(i).await,
1154                    Err(crate::journal::Error::ItemPruned(_))
1155                ));
1156            }
1157
1158            // === Phase 4: Append new data ===
1159            // Next append should get position 100
1160            journal.append(10000).await.unwrap();
1161            assert_eq!(journal.size(), 101);
1162            // Now we have one item at position 100
1163            assert_eq!(journal.oldest_retained_pos(), Some(100));
1164
1165            // Can read the new item
1166            assert_eq!(journal.read(100).await.unwrap(), 10000);
1167
1168            // Old positions still fail
1169            assert!(matches!(
1170                journal.read(99).await,
1171                Err(crate::journal::Error::ItemPruned(_))
1172            ));
1173
1174            journal.destroy().await.unwrap();
1175        });
1176    }
1177
1178    /// Test recovery from crash after data journal pruned but before offsets journal.
1179    #[test_traced]
1180    fn test_variable_recovery_prune_crash_offsets_behind() {
1181        let executor = deterministic::Runner::default();
1182        executor.start(|context| async move {
1183            // === Setup: Create Variable wrapper with data ===
1184            let cfg = Config {
1185                partition: "recovery_prune_crash".to_string(),
1186                items_per_section: NZU64!(10),
1187                compression: None,
1188                codec_config: (),
1189                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1190                write_buffer: NZUsize!(1024),
1191            };
1192
1193            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1194                .await
1195                .unwrap();
1196
1197            // Append 40 items across 4 sections to both journals
1198            for i in 0..40u64 {
1199                variable.append(i * 100).await.unwrap();
1200            }
1201
1202            // Prune to position 10 normally (both data and offsets journals pruned)
1203            variable.prune(10).await.unwrap();
1204            assert_eq!(variable.oldest_retained_pos(), Some(10));
1205
1206            // === Simulate crash: Prune data journal but not offsets journal ===
1207            // Manually prune data journal to section 2 (position 20)
1208            variable.data.prune(2).await.unwrap();
1209            // Offsets journal still has data from position 10-19
1210
1211            variable.close().await.unwrap();
1212
1213            // === Verify recovery ===
1214            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1215                .await
1216                .unwrap();
1217
1218            // Init should auto-repair: offsets journal pruned to match data journal
1219            assert_eq!(variable.oldest_retained_pos(), Some(20));
1220            assert_eq!(variable.size(), 40);
1221
1222            // Reads before position 20 should fail (pruned from both journals)
1223            assert!(matches!(
1224                variable.read(10).await,
1225                Err(crate::journal::Error::ItemPruned(_))
1226            ));
1227
1228            // Reads at position 20+ should succeed
1229            assert_eq!(variable.read(20).await.unwrap(), 2000);
1230            assert_eq!(variable.read(39).await.unwrap(), 3900);
1231
1232            variable.destroy().await.unwrap();
1233        });
1234    }
1235
1236    /// Test recovery detects corruption when offsets journal pruned ahead of data journal.
1237    ///
1238    /// Simulates an impossible state (offsets journal pruned more than data journal) which
1239    /// should never happen due to write ordering. Verifies that init() returns corruption error.
1240    #[test_traced]
1241    fn test_variable_recovery_offsets_ahead_corruption() {
1242        let executor = deterministic::Runner::default();
1243        executor.start(|context| async move {
1244            // === Setup: Create Variable wrapper with data ===
1245            let cfg = Config {
1246                partition: "recovery_offsets_ahead".to_string(),
1247                items_per_section: NZU64!(10),
1248                compression: None,
1249                codec_config: (),
1250                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1251                write_buffer: NZUsize!(1024),
1252            };
1253
1254            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1255                .await
1256                .unwrap();
1257
1258            // Append 40 items across 4 sections to both journals
1259            for i in 0..40u64 {
1260                variable.append(i * 100).await.unwrap();
1261            }
1262
1263            // Prune offsets journal ahead of data journal (impossible state)
1264            variable.offsets.prune(20).await.unwrap(); // Prune to position 20
1265            variable.data.prune(1).await.unwrap(); // Only prune data journal to section 1 (position 10)
1266
1267            variable.close().await.unwrap();
1268
1269            // === Verify corruption detected ===
1270            let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1271            assert!(matches!(result, Err(Error::Corruption(_))));
1272        });
1273    }
1274
1275    /// Test recovery from crash after appending to data journal but before appending to offsets journal.
1276    #[test_traced]
1277    fn test_variable_recovery_append_crash_offsets_behind() {
1278        let executor = deterministic::Runner::default();
1279        executor.start(|context| async move {
1280            // === Setup: Create Variable wrapper with partial data ===
1281            let cfg = Config {
1282                partition: "recovery_append_crash".to_string(),
1283                items_per_section: NZU64!(10),
1284                compression: None,
1285                codec_config: (),
1286                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1287                write_buffer: NZUsize!(1024),
1288            };
1289
1290            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1291                .await
1292                .unwrap();
1293
1294            // Append 15 items to both journals (fills section 0, partial section 1)
1295            for i in 0..15u64 {
1296                variable.append(i * 100).await.unwrap();
1297            }
1298
1299            assert_eq!(variable.size(), 15);
1300
1301            // Manually append 5 more items directly to data journal only
1302            for i in 15..20u64 {
1303                variable.data.append(1, i * 100).await.unwrap();
1304            }
1305            // Offsets journal still has only 15 entries
1306
1307            variable.close().await.unwrap();
1308
1309            // === Verify recovery ===
1310            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1311                .await
1312                .unwrap();
1313
1314            // Init should rebuild offsets journal from data journal replay
1315            assert_eq!(variable.size(), 20);
1316            assert_eq!(variable.oldest_retained_pos(), Some(0));
1317
1318            // All items should be readable from both journals
1319            for i in 0..20u64 {
1320                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1321            }
1322
1323            // Offsets journal should be fully rebuilt to match data journal
1324            assert_eq!(variable.offsets.size().await, 20);
1325
1326            variable.destroy().await.unwrap();
1327        });
1328    }
1329
1330    /// Test recovery from multiple prune operations with crash.
1331    #[test_traced]
1332    fn test_variable_recovery_multiple_prunes_crash() {
1333        let executor = deterministic::Runner::default();
1334        executor.start(|context| async move {
1335            // === Setup: Create Variable wrapper with data ===
1336            let cfg = Config {
1337                partition: "recovery_multiple_prunes".to_string(),
1338                items_per_section: NZU64!(10),
1339                compression: None,
1340                codec_config: (),
1341                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1342                write_buffer: NZUsize!(1024),
1343            };
1344
1345            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1346                .await
1347                .unwrap();
1348
1349            // Append 50 items across 5 sections to both journals
1350            for i in 0..50u64 {
1351                variable.append(i * 100).await.unwrap();
1352            }
1353
1354            // Prune to position 10 normally (both data and offsets journals pruned)
1355            variable.prune(10).await.unwrap();
1356            assert_eq!(variable.oldest_retained_pos(), Some(10));
1357
1358            // === Simulate crash: Multiple prunes on data journal, not on offsets journal ===
1359            // Manually prune data journal to section 3 (position 30)
1360            variable.data.prune(3).await.unwrap();
1361            // Offsets journal still thinks oldest is position 10
1362
1363            variable.close().await.unwrap();
1364
1365            // === Verify recovery ===
1366            let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1367                .await
1368                .unwrap();
1369
1370            // Init should auto-repair: offsets journal pruned to match data journal
1371            assert_eq!(variable.oldest_retained_pos(), Some(30));
1372            assert_eq!(variable.size(), 50);
1373
1374            // Reads before position 30 should fail (pruned from both journals)
1375            assert!(matches!(
1376                variable.read(10).await,
1377                Err(crate::journal::Error::ItemPruned(_))
1378            ));
1379            assert!(matches!(
1380                variable.read(20).await,
1381                Err(crate::journal::Error::ItemPruned(_))
1382            ));
1383
1384            // Reads at position 30+ should succeed
1385            assert_eq!(variable.read(30).await.unwrap(), 3000);
1386            assert_eq!(variable.read(49).await.unwrap(), 4900);
1387
1388            variable.destroy().await.unwrap();
1389        });
1390    }
1391
1392    /// Test recovery from crash during rewind operation.
1393    ///
1394    /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes.
1395    /// This creates a situation where offsets journal has been rewound but data journal still
1396    /// contains items across multiple sections. Verifies that init() correctly rebuilds the
1397    /// offsets index across all sections to match the data journal.
1398    #[test_traced]
1399    fn test_variable_recovery_rewind_crash_multi_section() {
1400        let executor = deterministic::Runner::default();
1401        executor.start(|context| async move {
1402            // === Setup: Create Variable wrapper with data across multiple sections ===
1403            let cfg = Config {
1404                partition: "recovery_rewind_crash".to_string(),
1405                items_per_section: NZU64!(10),
1406                compression: None,
1407                codec_config: (),
1408                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1409                write_buffer: NZUsize!(1024),
1410            };
1411
1412            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1413                .await
1414                .unwrap();
1415
1416            // Append 25 items across 3 sections (section 0: 0-9, section 1: 10-19, section 2: 20-24)
1417            for i in 0..25u64 {
1418                variable.append(i * 100).await.unwrap();
1419            }
1420
1421            assert_eq!(variable.size(), 25);
1422
1423            // === Simulate crash during rewind(5) ===
1424            // Rewind offsets journal to size 5 (keeps positions 0-4)
1425            variable.offsets.rewind(5).await.unwrap();
1426            // CRASH before data.rewind() completes - data still has all 3 sections
1427
1428            variable.close().await.unwrap();
1429
1430            // === Verify recovery ===
1431            let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1432                .await
1433                .unwrap();
1434
1435            // Init should rebuild offsets[5-24] from data journal across all 3 sections
1436            assert_eq!(variable.size(), 25);
1437            assert_eq!(variable.oldest_retained_pos(), Some(0));
1438
1439            // All items should be readable - offsets rebuilt correctly across all sections
1440            for i in 0..25u64 {
1441                assert_eq!(variable.read(i).await.unwrap(), i * 100);
1442            }
1443
1444            // Verify offsets journal fully rebuilt
1445            assert_eq!(variable.offsets.size().await, 25);
1446
1447            // Verify next append gets position 25
1448            let pos = variable.append(2500).await.unwrap();
1449            assert_eq!(pos, 25);
1450            assert_eq!(variable.read(25).await.unwrap(), 2500);
1451
1452            variable.destroy().await.unwrap();
1453        });
1454    }
1455
1456    /// Test recovery from crash after data sync but before offsets sync when journal was
1457    /// previously emptied by pruning.
1458    #[test_traced]
1459    fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1460        let executor = deterministic::Runner::default();
1461        executor.start(|context| async move {
1462            let cfg = Config {
1463                partition: "recovery_empty_after_prune".to_string(),
1464                items_per_section: NZU64!(10),
1465                compression: None,
1466                codec_config: (),
1467                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1468                write_buffer: NZUsize!(1024),
1469            };
1470
1471            // === Phase 1: Create journal with one full section ===
1472            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1473                .await
1474                .unwrap();
1475
1476            // Append 10 items (positions 0-9), fills section 0
1477            for i in 0..10u64 {
1478                journal.append(i * 100).await.unwrap();
1479            }
1480            assert_eq!(journal.size(), 10);
1481            assert_eq!(journal.oldest_retained_pos(), Some(0));
1482
1483            // === Phase 2: Prune to create empty journal ===
1484            journal.prune(10).await.unwrap();
1485            assert_eq!(journal.size(), 10);
1486            assert_eq!(journal.oldest_retained_pos(), None); // Empty!
1487
1488            // === Phase 3: Append directly to data journal to simulate crash ===
1489            // Manually append to data journal only (bypassing Variable's append logic)
1490            // This simulates the case where data was synced but offsets wasn't
1491            for i in 10..20u64 {
1492                journal.data.append(1, i * 100).await.unwrap();
1493            }
1494            // Sync the data journal (section 1)
1495            journal.data.sync(1).await.unwrap();
1496            // Do NOT sync offsets journal - simulates crash before offsets.sync()
1497
1498            // Close without syncing offsets
1499            drop(journal);
1500
1501            // === Phase 4: Verify recovery succeeds ===
1502            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1503                .await
1504                .expect("Should recover from crash after data sync but before offsets sync");
1505
1506            // All data should be recovered
1507            assert_eq!(journal.size(), 20);
1508            assert_eq!(journal.oldest_retained_pos(), Some(10));
1509
1510            // All items from position 10-19 should be readable
1511            for i in 10..20u64 {
1512                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1513            }
1514
1515            // Items 0-9 should be pruned
1516            for i in 0..10 {
1517                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1518            }
1519
1520            journal.destroy().await.unwrap();
1521        });
1522    }
1523
1524    /// Test that offsets index is rebuilt from data after sync writes data but not offsets.
1525    #[test_traced]
1526    fn test_variable_concurrent_sync_recovery() {
1527        let executor = deterministic::Runner::default();
1528        executor.start(|context| async move {
1529            let cfg = Config {
1530                partition: "concurrent_sync_recovery".to_string(),
1531                items_per_section: NZU64!(10),
1532                compression: None,
1533                codec_config: (),
1534                buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1535                write_buffer: NZUsize!(1024),
1536            };
1537
1538            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1539                .await
1540                .unwrap();
1541
1542            // Append items across a section boundary
1543            for i in 0..15u64 {
1544                journal.append(i * 100).await.unwrap();
1545            }
1546
1547            // Manually sync only data to simulate crash during concurrent sync
1548            journal.sync_data().await.unwrap();
1549
1550            // Simulate a crash (offsets not synced)
1551            drop(journal);
1552
1553            let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1554                .await
1555                .unwrap();
1556
1557            // Data should be intact and offsets rebuilt
1558            assert_eq!(journal.size(), 15);
1559            for i in 0..15u64 {
1560                assert_eq!(journal.read(i).await.unwrap(), i * 100);
1561            }
1562
1563            journal.destroy().await.unwrap();
1564        });
1565    }
1566
1567    #[test_traced]
1568    fn test_init_at_size_zero() {
1569        let executor = deterministic::Runner::default();
1570        executor.start(|context| async move {
1571            let cfg = Config {
1572                partition: "init_at_size_zero".to_string(),
1573                items_per_section: NZU64!(5),
1574                compression: None,
1575                codec_config: (),
1576                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1577                write_buffer: NZUsize!(1024),
1578            };
1579
1580            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1581                .await
1582                .unwrap();
1583
1584            // Size should be 0
1585            assert_eq!(journal.size(), 0);
1586
1587            // No oldest retained position (empty journal)
1588            assert_eq!(journal.oldest_retained_pos(), None);
1589
1590            // Next append should get position 0
1591            let pos = journal.append(100).await.unwrap();
1592            assert_eq!(pos, 0);
1593            assert_eq!(journal.size(), 1);
1594            assert_eq!(journal.read(0).await.unwrap(), 100);
1595
1596            journal.destroy().await.unwrap();
1597        });
1598    }
1599
1600    #[test_traced]
1601    fn test_init_at_size_section_boundary() {
1602        let executor = deterministic::Runner::default();
1603        executor.start(|context| async move {
1604            let cfg = Config {
1605                partition: "init_a  t_size_boundary".to_string(),
1606                items_per_section: NZU64!(5),
1607                compression: None,
1608                codec_config: (),
1609                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1610                write_buffer: NZUsize!(1024),
1611            };
1612
1613            // Initialize at position 10 (exactly at section 1 boundary with items_per_section=5)
1614            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1615                .await
1616                .unwrap();
1617
1618            // Size should be 10
1619            assert_eq!(journal.size(), 10);
1620
1621            // No data yet, so no oldest retained position
1622            assert_eq!(journal.oldest_retained_pos(), None);
1623
1624            // Next append should get position 10
1625            let pos = journal.append(1000).await.unwrap();
1626            assert_eq!(pos, 10);
1627            assert_eq!(journal.size(), 11);
1628            assert_eq!(journal.read(10).await.unwrap(), 1000);
1629
1630            // Can continue appending
1631            let pos = journal.append(1001).await.unwrap();
1632            assert_eq!(pos, 11);
1633            assert_eq!(journal.read(11).await.unwrap(), 1001);
1634
1635            journal.destroy().await.unwrap();
1636        });
1637    }
1638
1639    #[test_traced]
1640    fn test_init_at_size_mid_section() {
1641        let executor = deterministic::Runner::default();
1642        executor.start(|context| async move {
1643            let cfg = Config {
1644                partition: "init_at_size_mid".to_string(),
1645                items_per_section: NZU64!(5),
1646                compression: None,
1647                codec_config: (),
1648                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1649                write_buffer: NZUsize!(1024),
1650            };
1651
1652            // Initialize at position 7 (middle of section 1 with items_per_section=5)
1653            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1654                .await
1655                .unwrap();
1656
1657            // Size should be 7
1658            assert_eq!(journal.size(), 7);
1659
1660            // No data yet, so no oldest retained position
1661            assert_eq!(journal.oldest_retained_pos(), None);
1662
1663            // Next append should get position 7
1664            let pos = journal.append(700).await.unwrap();
1665            assert_eq!(pos, 7);
1666            assert_eq!(journal.size(), 8);
1667            assert_eq!(journal.read(7).await.unwrap(), 700);
1668
1669            journal.destroy().await.unwrap();
1670        });
1671    }
1672
1673    #[test_traced]
1674    fn test_init_at_size_persistence() {
1675        let executor = deterministic::Runner::default();
1676        executor.start(|context| async move {
1677            let cfg = Config {
1678                partition: "init_at_size_persist".to_string(),
1679                items_per_section: NZU64!(5),
1680                compression: None,
1681                codec_config: (),
1682                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1683                write_buffer: NZUsize!(1024),
1684            };
1685
1686            // Initialize at position 15
1687            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1688                .await
1689                .unwrap();
1690
1691            // Append some items
1692            for i in 0..5u64 {
1693                let pos = journal.append(1500 + i).await.unwrap();
1694                assert_eq!(pos, 15 + i);
1695            }
1696
1697            assert_eq!(journal.size(), 20);
1698
1699            // Close and reopen
1700            journal.close().await.unwrap();
1701
1702            let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1703                .await
1704                .unwrap();
1705
1706            // Size and data should be preserved
1707            assert_eq!(journal.size(), 20);
1708            assert_eq!(journal.oldest_retained_pos(), Some(15));
1709
1710            // Verify data
1711            for i in 0..5u64 {
1712                assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1713            }
1714
1715            // Can continue appending
1716            let pos = journal.append(9999).await.unwrap();
1717            assert_eq!(pos, 20);
1718            assert_eq!(journal.read(20).await.unwrap(), 9999);
1719
1720            journal.destroy().await.unwrap();
1721        });
1722    }
1723
1724    #[test_traced]
1725    fn test_init_at_size_large_offset() {
1726        let executor = deterministic::Runner::default();
1727        executor.start(|context| async move {
1728            let cfg = Config {
1729                partition: "init_at_size_large".to_string(),
1730                items_per_section: NZU64!(5),
1731                compression: None,
1732                codec_config: (),
1733                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1734                write_buffer: NZUsize!(1024),
1735            };
1736
1737            // Initialize at a large position (position 1000)
1738            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1739                .await
1740                .unwrap();
1741
1742            assert_eq!(journal.size(), 1000);
1743            // No data yet, so no oldest retained position
1744            assert_eq!(journal.oldest_retained_pos(), None);
1745
1746            // Next append should get position 1000
1747            let pos = journal.append(100000).await.unwrap();
1748            assert_eq!(pos, 1000);
1749            assert_eq!(journal.read(1000).await.unwrap(), 100000);
1750
1751            journal.destroy().await.unwrap();
1752        });
1753    }
1754
1755    #[test_traced]
1756    fn test_init_at_size_prune_and_append() {
1757        let executor = deterministic::Runner::default();
1758        executor.start(|context| async move {
1759            let cfg = Config {
1760                partition: "init_at_size_prune".to_string(),
1761                items_per_section: NZU64!(5),
1762                compression: None,
1763                codec_config: (),
1764                buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1765                write_buffer: NZUsize!(1024),
1766            };
1767
1768            // Initialize at position 20
1769            let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1770                .await
1771                .unwrap();
1772
1773            // Append items 20-29
1774            for i in 0..10u64 {
1775                journal.append(2000 + i).await.unwrap();
1776            }
1777
1778            assert_eq!(journal.size(), 30);
1779
1780            // Prune to position 25
1781            journal.prune(25).await.unwrap();
1782
1783            assert_eq!(journal.size(), 30);
1784            assert_eq!(journal.oldest_retained_pos(), Some(25));
1785
1786            // Verify remaining items are readable
1787            for i in 25..30u64 {
1788                assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1789            }
1790
1791            // Continue appending
1792            let pos = journal.append(3000).await.unwrap();
1793            assert_eq!(pos, 30);
1794
1795            journal.destroy().await.unwrap();
1796        });
1797    }
1798
1799    /// Test `init_sync` when there is no existing data on disk.
1800    #[test_traced]
1801    fn test_init_sync_no_existing_data() {
1802        let executor = deterministic::Runner::default();
1803        executor.start(|context| async move {
1804            let cfg = Config {
1805                partition: "test_fresh_start".into(),
1806                items_per_section: NZU64!(5),
1807                compression: None,
1808                codec_config: (),
1809                write_buffer: NZUsize!(1024),
1810                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1811            };
1812
1813            // Initialize journal with sync boundaries when no existing data exists
1814            let lower_bound = 10;
1815            let upper_bound = 26;
1816            let mut journal =
1817                Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1818                    .await
1819                    .expect("Failed to initialize journal with sync boundaries");
1820
1821            assert_eq!(journal.size(), lower_bound);
1822            assert_eq!(journal.oldest_retained_pos(), None);
1823
1824            // Append items using the contiguous API
1825            let pos1 = journal.append(42u64).await.unwrap();
1826            assert_eq!(pos1, lower_bound);
1827            assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1828
1829            let pos2 = journal.append(43u64).await.unwrap();
1830            assert_eq!(pos2, lower_bound + 1);
1831            assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1832
1833            journal.destroy().await.unwrap();
1834        });
1835    }
1836
1837    /// Test `init_sync` when there is existing data that overlaps with the sync target range.
1838    #[test_traced]
1839    fn test_init_sync_existing_data_overlap() {
1840        let executor = deterministic::Runner::default();
1841        executor.start(|context| async move {
1842            let cfg = Config {
1843                partition: "test_overlap".into(),
1844                items_per_section: NZU64!(5),
1845                compression: None,
1846                codec_config: (),
1847                write_buffer: NZUsize!(1024),
1848                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1849            };
1850
1851            // Create initial journal with data in multiple sections
1852            let mut journal =
1853                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1854                    .await
1855                    .expect("Failed to create initial journal");
1856
1857            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1858            for i in 0..20u64 {
1859                journal.append(i * 100).await.unwrap();
1860            }
1861            journal.close().await.unwrap();
1862
1863            // Initialize with sync boundaries that overlap with existing data
1864            // lower_bound: 8 (section 1), upper_bound: 31 (last location 30, section 6)
1865            let lower_bound = 8;
1866            let upper_bound = 31;
1867            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1868                context.clone(),
1869                cfg.clone(),
1870                lower_bound..upper_bound,
1871            )
1872            .await
1873            .expect("Failed to initialize journal with overlap");
1874
1875            assert_eq!(journal.size(), 20);
1876
1877            // Verify oldest retained is pruned to lower_bound's section boundary (5)
1878            let oldest = journal.oldest_retained_pos();
1879            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1880
1881            // Verify data integrity: positions before 5 are pruned
1882            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1883            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1884
1885            // Positions 5-19 should be accessible
1886            assert_eq!(journal.read(5).await.unwrap(), 500);
1887            assert_eq!(journal.read(8).await.unwrap(), 800);
1888            assert_eq!(journal.read(19).await.unwrap(), 1900);
1889
1890            // Position 20+ should not exist yet
1891            assert!(matches!(
1892                journal.read(20).await,
1893                Err(Error::ItemOutOfRange(_))
1894            ));
1895
1896            // Assert journal can accept new items
1897            let pos = journal.append(999).await.unwrap();
1898            assert_eq!(pos, 20);
1899            assert_eq!(journal.read(20).await.unwrap(), 999);
1900
1901            journal.destroy().await.unwrap();
1902        });
1903    }
1904
1905    /// Test `init_sync` with invalid parameters.
1906    #[should_panic]
1907    #[test_traced]
1908    fn test_init_sync_invalid_parameters() {
1909        let executor = deterministic::Runner::default();
1910        executor.start(|context| async move {
1911            let cfg = Config {
1912                partition: "test_invalid".into(),
1913                items_per_section: NZU64!(5),
1914                compression: None,
1915                codec_config: (),
1916                write_buffer: NZUsize!(1024),
1917                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1918            };
1919
1920            #[allow(clippy::reversed_empty_ranges)]
1921            let _result = Journal::<deterministic::Context, u64>::init_sync(
1922                context.clone(),
1923                cfg,
1924                10..5, // invalid range: lower > upper
1925            )
1926            .await;
1927        });
1928    }
1929
1930    /// Test `init_sync` when existing data exactly matches the sync range.
1931    #[test_traced]
1932    fn test_init_sync_existing_data_exact_match() {
1933        let executor = deterministic::Runner::default();
1934        executor.start(|context| async move {
1935            let items_per_section = NZU64!(5);
1936            let cfg = Config {
1937                partition: "test_exact_match".to_string(),
1938                items_per_section,
1939                compression: None,
1940                codec_config: (),
1941                write_buffer: NZUsize!(1024),
1942                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1943            };
1944
1945            // Create initial journal with data exactly matching sync range
1946            let mut journal =
1947                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1948                    .await
1949                    .expect("Failed to create initial journal");
1950
1951            // Add data at positions 0-19 (sections 0-3 with items_per_section=5)
1952            for i in 0..20u64 {
1953                journal.append(i * 100).await.unwrap();
1954            }
1955            journal.close().await.unwrap();
1956
1957            // Initialize with sync boundaries that exactly match existing data
1958            let lower_bound = 5; // section 1
1959            let upper_bound = 20; // section 3
1960            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1961                context.clone(),
1962                cfg.clone(),
1963                lower_bound..upper_bound,
1964            )
1965            .await
1966            .expect("Failed to initialize journal with exact match");
1967
1968            assert_eq!(journal.size(), 20);
1969
1970            // Verify pruning to lower bound (section 1 boundary = position 5)
1971            let oldest = journal.oldest_retained_pos();
1972            assert_eq!(oldest, Some(5)); // Section 1 starts at position 5
1973
1974            // Verify positions before 5 are pruned
1975            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1976            assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1977
1978            // Positions 5-19 should be accessible
1979            assert_eq!(journal.read(5).await.unwrap(), 500);
1980            assert_eq!(journal.read(10).await.unwrap(), 1000);
1981            assert_eq!(journal.read(19).await.unwrap(), 1900);
1982
1983            // Position 20+ should not exist yet
1984            assert!(matches!(
1985                journal.read(20).await,
1986                Err(Error::ItemOutOfRange(_))
1987            ));
1988
1989            // Assert journal can accept new operations
1990            let pos = journal.append(999).await.unwrap();
1991            assert_eq!(pos, 20);
1992            assert_eq!(journal.read(20).await.unwrap(), 999);
1993
1994            journal.destroy().await.unwrap();
1995        });
1996    }
1997
1998    /// Test `init_sync` when existing data exceeds the sync target range.
1999    /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound.
2000    #[test_traced]
2001    fn test_init_sync_existing_data_exceeds_upper_bound() {
2002        let executor = deterministic::Runner::default();
2003        executor.start(|context| async move {
2004            let items_per_section = NZU64!(5);
2005            let cfg = Config {
2006                partition: "test_unexpected_data".into(),
2007                items_per_section,
2008                compression: None,
2009                codec_config: (),
2010                write_buffer: NZUsize!(1024),
2011                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2012            };
2013
2014            // Create initial journal with data beyond sync range
2015            let mut journal =
2016                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2017                    .await
2018                    .expect("Failed to create initial journal");
2019
2020            // Add data at positions 0-29 (sections 0-5 with items_per_section=5)
2021            for i in 0..30u64 {
2022                journal.append(i * 1000).await.unwrap();
2023            }
2024            journal.close().await.unwrap();
2025
2026            // Initialize with sync boundaries that are exceeded by existing data
2027            let lower_bound = 8; // section 1
2028            for upper_bound in 9..29 {
2029                let result = Journal::<deterministic::Context, u64>::init_sync(
2030                    context.clone(),
2031                    cfg.clone(),
2032                    lower_bound..upper_bound,
2033                )
2034                .await;
2035
2036                // Should return UnexpectedData error since data exists beyond upper_bound
2037                assert!(matches!(result, Err(crate::adb::Error::UnexpectedData(_))));
2038            }
2039        });
2040    }
2041
2042    /// Test `init_sync` when all existing data is stale (before lower bound).
2043    #[test_traced]
2044    fn test_init_sync_existing_data_stale() {
2045        let executor = deterministic::Runner::default();
2046        executor.start(|context| async move {
2047            let items_per_section = NZU64!(5);
2048            let cfg = Config {
2049                partition: "test_stale".into(),
2050                items_per_section,
2051                compression: None,
2052                codec_config: (),
2053                write_buffer: NZUsize!(1024),
2054                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2055            };
2056
2057            // Create initial journal with stale data
2058            let mut journal =
2059                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2060                    .await
2061                    .expect("Failed to create initial journal");
2062
2063            // Add data at positions 0-9 (sections 0-1 with items_per_section=5)
2064            for i in 0..10u64 {
2065                journal.append(i * 100).await.unwrap();
2066            }
2067            journal.close().await.unwrap();
2068
2069            // Initialize with sync boundaries beyond all existing data
2070            let lower_bound = 15; // section 3
2071            let upper_bound = 26; // last element in section 5
2072            let journal = Journal::<deterministic::Context, u64>::init_sync(
2073                context.clone(),
2074                cfg.clone(),
2075                lower_bound..upper_bound,
2076            )
2077            .await
2078            .expect("Failed to initialize journal with stale data");
2079
2080            assert_eq!(journal.size(), 15);
2081
2082            // Verify fresh journal (all old data destroyed, starts at position 15)
2083            assert_eq!(journal.oldest_retained_pos(), None);
2084
2085            // Verify old positions don't exist
2086            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2087            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2088            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2089
2090            journal.destroy().await.unwrap();
2091        });
2092    }
2093
2094    /// Test `init_sync` with section boundary edge cases.
2095    #[test_traced]
2096    fn test_init_sync_section_boundaries() {
2097        let executor = deterministic::Runner::default();
2098        executor.start(|context| async move {
2099            let items_per_section = NZU64!(5);
2100            let cfg = Config {
2101                partition: "test_boundaries".into(),
2102                items_per_section,
2103                compression: None,
2104                codec_config: (),
2105                write_buffer: NZUsize!(1024),
2106                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2107            };
2108
2109            // Create journal with data at section boundaries
2110            let mut journal =
2111                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2112                    .await
2113                    .expect("Failed to create initial journal");
2114
2115            // Add data at positions 0-24 (sections 0-4 with items_per_section=5)
2116            for i in 0..25u64 {
2117                journal.append(i * 100).await.unwrap();
2118            }
2119            journal.close().await.unwrap();
2120
2121            // Test sync boundaries exactly at section boundaries
2122            let lower_bound = 15; // Exactly at section boundary (15/5 = 3)
2123            let upper_bound = 25; // Last element exactly at section boundary (24/5 = 4)
2124            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2125                context.clone(),
2126                cfg.clone(),
2127                lower_bound..upper_bound,
2128            )
2129            .await
2130            .expect("Failed to initialize journal at boundaries");
2131
2132            assert_eq!(journal.size(), 25);
2133
2134            // Verify oldest retained is at section 3 boundary (position 15)
2135            let oldest = journal.oldest_retained_pos();
2136            assert_eq!(oldest, Some(15));
2137
2138            // Verify positions before 15 are pruned
2139            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2140            assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2141
2142            // Verify positions 15-24 are accessible
2143            assert_eq!(journal.read(15).await.unwrap(), 1500);
2144            assert_eq!(journal.read(20).await.unwrap(), 2000);
2145            assert_eq!(journal.read(24).await.unwrap(), 2400);
2146
2147            // Position 25+ should not exist yet
2148            assert!(matches!(
2149                journal.read(25).await,
2150                Err(Error::ItemOutOfRange(_))
2151            ));
2152
2153            // Assert journal can accept new operations
2154            let pos = journal.append(999).await.unwrap();
2155            assert_eq!(pos, 25);
2156            assert_eq!(journal.read(25).await.unwrap(), 999);
2157
2158            journal.destroy().await.unwrap();
2159        });
2160    }
2161
2162    /// Test `init_sync` when range.start and range.end-1 are in the same section.
2163    #[test_traced]
2164    fn test_init_sync_same_section_bounds() {
2165        let executor = deterministic::Runner::default();
2166        executor.start(|context| async move {
2167            let items_per_section = NZU64!(5);
2168            let cfg = Config {
2169                partition: "test_same_section".into(),
2170                items_per_section,
2171                compression: None,
2172                codec_config: (),
2173                write_buffer: NZUsize!(1024),
2174                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2175            };
2176
2177            // Create journal with data in multiple sections
2178            let mut journal =
2179                Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2180                    .await
2181                    .expect("Failed to create initial journal");
2182
2183            // Add data at positions 0-14 (sections 0-2 with items_per_section=5)
2184            for i in 0..15u64 {
2185                journal.append(i * 100).await.unwrap();
2186            }
2187            journal.close().await.unwrap();
2188
2189            // Test sync boundaries within the same section
2190            let lower_bound = 10; // operation 10 (section 2: 10/5 = 2)
2191            let upper_bound = 15; // Last operation 14 (section 2: 14/5 = 2)
2192            let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2193                context.clone(),
2194                cfg.clone(),
2195                lower_bound..upper_bound,
2196            )
2197            .await
2198            .expect("Failed to initialize journal with same-section bounds");
2199
2200            assert_eq!(journal.size(), 15);
2201
2202            // Both operations are in section 2, so sections 0, 1 should be pruned, section 2 retained
2203            // Oldest retained position should be at section 2 boundary (position 10)
2204            let oldest = journal.oldest_retained_pos();
2205            assert_eq!(oldest, Some(10));
2206
2207            // Verify positions before 10 are pruned
2208            assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2209            assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2210
2211            // Verify positions 10-14 are accessible
2212            assert_eq!(journal.read(10).await.unwrap(), 1000);
2213            assert_eq!(journal.read(11).await.unwrap(), 1100);
2214            assert_eq!(journal.read(14).await.unwrap(), 1400);
2215
2216            // Position 15+ should not exist yet
2217            assert!(matches!(
2218                journal.read(15).await,
2219                Err(Error::ItemOutOfRange(_))
2220            ));
2221
2222            // Assert journal can accept new operations
2223            let pos = journal.append(999).await.unwrap();
2224            assert_eq!(pos, 15);
2225            assert_eq!(journal.read(15).await.unwrap(), 999);
2226
2227            journal.destroy().await.unwrap();
2228        });
2229    }
2230}