Skip to main content

commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs. Each `Blob` contains a
12//! configurable maximum of `items_per_blob`, with page-level data integrity provided by a buffer
13//! pool.
14//!
15//! ```text
16//! +--------+----- --+--- -+----------+
17//! | item_0 | item_1 | ... | item_n-1 |
18//! +--------+-----------+--------+----0
19//!
20//! n = config.items_per_blob
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! Data fetched from disk is always checked for integrity before being returned. If the data is
27//! found to be invalid, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` and/or pruning old items.
33//!
34//! # Partition
35//!
36//! Blobs are stored in the legacy partition (`cfg.partition`) if it already contains data;
37//! otherwise they are stored in `{cfg.partition}-blobs`.
38//!
39//! Metadata is stored in `{cfg.partition}-metadata`.
40//!
41//! # Consistency
42//!
43//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
44//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
45//! calling `close`, all pending data is automatically synced and any open blobs are closed.
46//!
47//! # Pruning
48//!
49//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
50//! given point in history.
51//!
52//! # Replay
53//!
54//! The `replay` method supports fast reading of all unpruned items into memory.
55
56use crate::{
57    journal::{
58        contiguous::MutableContiguous,
59        segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
60        Error,
61    },
62    metadata::{Config as MetadataConfig, Metadata},
63    Persistable,
64};
65use commonware_codec::CodecFixedShared;
66use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
67use futures::{stream::Stream, StreamExt};
68use std::num::{NonZeroU64, NonZeroUsize};
69use tracing::warn;
70
71/// Metadata key for storing the pruning boundary.
72const PRUNING_BOUNDARY_KEY: u64 = 1;
73
74/// Configuration for `Journal` storage.
75#[derive(Clone)]
76pub struct Config {
77    /// Prefix for the journal partitions.
78    ///
79    /// Blobs are stored in `partition` (legacy) if it contains data, otherwise in
80    /// `{partition}-blobs`. Metadata is stored in `{partition}-metadata`.
81    pub partition: String,
82
83    /// The maximum number of journal items to store in each blob.
84    ///
85    /// Any unpruned historical blobs will contain exactly this number of items.
86    /// Only the newest blob may contain fewer items.
87    pub items_per_blob: NonZeroU64,
88
89    /// The page cache to use for caching data.
90    pub page_cache: CacheRef,
91
92    /// The size of the write buffer to use for each blob.
93    pub write_buffer: NonZeroUsize,
94}
95
96/// Implementation of `Journal` storage.
97///
98/// This is implemented as a wrapper around [SegmentedJournal] that provides position-based access
99/// where positions are automatically mapped to (section, position_in_section) pairs.
100///
101/// # Repair
102///
103/// Like
104/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
105/// and
106/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
107/// the first invalid data read will be considered the new end of the journal (and the
108/// underlying blob will be truncated to the last valid item). Repair is performed
109/// by the underlying [SegmentedJournal] during init.
110pub struct Journal<E: Clock + Storage + Metrics, A: CodecFixedShared> {
111    inner: SegmentedJournal<E, A>,
112
113    /// The maximum number of items per blob (section).
114    items_per_blob: u64,
115
116    /// Total number of items appended (not affected by pruning).
117    size: u64,
118
119    /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's
120    /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning
121    /// boundary. Otherwise, the metadata is empty.
122    ///
123    /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is
124    /// persisted to ensure that its pruning boundary is never after the inner journal's size.
125    // TODO(#2939): Remove metadata
126    metadata: Metadata<E, u64, Vec<u8>>,
127
128    /// The position before which all items have been pruned.
129    pruning_boundary: u64,
130}
131
132impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
133    /// Size of each entry in bytes.
134    pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
135
136    /// Size of each entry in bytes (as u64).
137    pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
138
139    /// Scan a partition and return blob names, treating a missing partition as empty.
140    async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
141        match context.scan(partition).await {
142            Ok(blobs) => Ok(blobs),
143            Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
144            Err(err) => Err(Error::Runtime(err)),
145        }
146    }
147
148    /// Select the blobs partition using legacy-first compatibility rules.
149    ///
150    /// If both legacy and new blobs partitions contain data, returns corruption.
151    /// If neither contains data, defaults to the new blobs partition.
152    // TODO(#2941): Remove legacy partition support
153    async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
154        let legacy_partition = cfg.partition.as_str();
155        let new_partition = format!("{}-blobs", cfg.partition);
156
157        let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
158        let new_blobs = Self::scan_partition(context, &new_partition).await?;
159
160        if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
161            return Err(Error::Corruption(format!(
162                "both legacy and blobs partitions contain data: legacy={} blobs={}",
163                legacy_partition, new_partition
164            )));
165        }
166
167        if !legacy_blobs.is_empty() {
168            Ok(legacy_partition.to_string())
169        } else {
170            Ok(new_partition)
171        }
172    }
173
174    /// Initialize a new `Journal` instance.
175    ///
176    /// All backing blobs are opened but not read during initialization. The `replay` method can be
177    /// used to iterate over all items in the `Journal`.
178    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
179        let items_per_blob = cfg.items_per_blob.get();
180
181        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
182        let segmented_cfg = SegmentedConfig {
183            partition: blob_partition,
184            page_cache: cfg.page_cache,
185            write_buffer: cfg.write_buffer,
186        };
187
188        let mut inner = SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
189
190        // Initialize metadata store
191        let meta_cfg = MetadataConfig {
192            partition: format!("{}-metadata", cfg.partition),
193            codec_config: ((0..).into(), ()),
194        };
195        let mut metadata =
196            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
197
198        // Parse metadata if present
199        let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
200            Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
201                |_| Error::Corruption("invalid pruning_boundary metadata".into()),
202            )?)),
203            None => None,
204        };
205
206        // Recover bounds from metadata and/or blobs
207        let (pruning_boundary, size, needs_metadata_update) =
208            Self::recover_bounds(&inner, items_per_blob, meta_pruning_boundary).await?;
209
210        // Persist metadata if needed
211        if needs_metadata_update {
212            if pruning_boundary.is_multiple_of(items_per_blob) {
213                metadata.remove(&PRUNING_BOUNDARY_KEY);
214            } else {
215                metadata.put(
216                    PRUNING_BOUNDARY_KEY,
217                    pruning_boundary.to_be_bytes().to_vec(),
218                );
219            }
220            metadata.sync().await?;
221        }
222
223        // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
224        // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where
225        // the next append would go).
226        let tail_section = size / items_per_blob;
227        inner.ensure_section_exists(tail_section).await?;
228
229        Ok(Self {
230            inner,
231            items_per_blob,
232            size,
233            pruning_boundary,
234            metadata,
235        })
236    }
237
238    /// Returns (pruning_boundary, size, needs_metadata_update) based on metadata and blobs.
239    ///
240    /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state:
241    /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary
242    /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary
243    /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size]
244    ///   or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs.
245    /// - Otherwise, metadata is valid and we use it
246    ///
247    /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs.
248    async fn recover_bounds(
249        inner: &SegmentedJournal<E, A>,
250        items_per_blob: u64,
251        meta_pruning_boundary: Option<u64>,
252    ) -> Result<(u64, u64, bool), Error> {
253        // Blob-based boundary is always section-aligned
254        let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
255
256        let (pruning_boundary, needs_update) = match meta_pruning_boundary {
257            // Mid-section metadata: validate against blobs
258            Some(meta_pruning_boundary)
259                if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
260            {
261                let meta_oldest_section = meta_pruning_boundary / items_per_blob;
262                match inner.oldest_section() {
263                    None => {
264                        // No blobs exist but metadata claims mid-section boundary.
265                        // This can happen if we crash after inner.clear() but before
266                        // ensure_section_exists(). Ignore stale metadata.
267                        warn!(
268                            meta_oldest_section,
269                            "crash repair: no blobs exist, ignoring stale metadata"
270                        );
271                        (blob_boundary, true)
272                    }
273                    Some(oldest_section) if meta_oldest_section < oldest_section => {
274                        warn!(
275                            meta_oldest_section,
276                            oldest_section, "crash repair: metadata stale, computing from blobs"
277                        );
278                        (blob_boundary, true)
279                    }
280                    Some(oldest_section) if meta_oldest_section > oldest_section => {
281                        // Metadata references a section ahead of the oldest blob. This can happen
282                        // if we crash during clear_to_size/init_at_size after blobs update but
283                        // before metadata update. Fall back to blob state.
284                        warn!(
285                            meta_oldest_section,
286                            oldest_section,
287                            "crash repair: metadata ahead of blobs, computing from blobs"
288                        );
289                        (blob_boundary, true)
290                    }
291                    Some(_) => (meta_pruning_boundary, false), // valid mid-section metadata
292                }
293            }
294            // Section-aligned metadata: unnecessary, use blob-based
295            Some(_) => (blob_boundary, true),
296            // No metadata: use blob-based, no update needed
297            None => (blob_boundary, false),
298        };
299
300        // Validate oldest section before computing size.
301        Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
302
303        let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
304        Ok((pruning_boundary, size, needs_update))
305    }
306
307    /// Validate that the oldest section has the expected number of items.
308    ///
309    /// Non-tail sections must be full from their logical start. The tail section
310    /// (oldest == newest) can be partially filled.
311    async fn validate_oldest_section(
312        inner: &SegmentedJournal<E, A>,
313        items_per_blob: u64,
314        pruning_boundary: u64,
315    ) -> Result<(), Error> {
316        let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
317            return Ok(()); // No sections to validate
318        };
319
320        if oldest == newest {
321            return Ok(()); // Tail section, can be partial
322        }
323
324        let oldest_len = inner.section_len(oldest).await?;
325        let oldest_start = oldest * items_per_blob;
326
327        let expected = if pruning_boundary > oldest_start {
328            // Mid-section boundary: items from pruning_boundary to section end
329            items_per_blob - (pruning_boundary - oldest_start)
330        } else {
331            // Section-aligned boundary: full section
332            items_per_blob
333        };
334
335        if oldest_len != expected {
336            return Err(Error::Corruption(format!(
337                "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
338            )));
339        }
340
341        Ok(())
342    }
343
344    /// Returns the total number of items ever appended (size), computed from the blobs.
345    async fn compute_size(
346        inner: &SegmentedJournal<E, A>,
347        items_per_blob: u64,
348        pruning_boundary: u64,
349    ) -> Result<u64, Error> {
350        let oldest = inner.oldest_section();
351        let newest = inner.newest_section();
352
353        let (Some(oldest), Some(newest)) = (oldest, newest) else {
354            return Ok(pruning_boundary);
355        };
356
357        if oldest == newest {
358            // Single section: count from pruning boundary
359            let tail_len = inner.section_len(newest).await?;
360            return Ok(pruning_boundary + tail_len);
361        }
362
363        // Multiple sections: sum actual item counts
364        let oldest_len = inner.section_len(oldest).await?;
365        let tail_len = inner.section_len(newest).await?;
366
367        // Middle sections are assumed full
368        let middle_sections = newest - oldest - 1;
369        let middle_items = middle_sections * items_per_blob;
370
371        Ok(pruning_boundary + oldest_len + middle_items + tail_len)
372    }
373
374    /// Initialize a new `Journal` instance in a pruned state at a given size.
375    ///
376    /// This is used for state sync to create a journal that appears to have had `size` items
377    /// appended and then pruned up to that point.
378    ///
379    /// # Arguments
380    /// * `context` - The storage context
381    /// * `cfg` - Configuration for the journal
382    /// * `size` - The number of operations that have been "pruned"
383    ///
384    /// # Behavior
385    /// - Clears any existing data in the partition
386    /// - Creates an empty tail blob where the next append (at position `size`) will go
387    /// - `bounds().is_empty()` returns `true` (fully pruned state)
388    /// - The next `append()` will write to position `size`
389    ///
390    /// # Post-conditions
391    /// - `bounds().end` returns `size`
392    /// - `bounds().is_empty()` returns `true`
393    /// - `bounds.start` equals `size` (no data exists)
394    ///
395    /// # Crash Safety
396    /// If a crash occurs during this operation, `init()` will recover to a consistent state
397    /// (though possibly different from the intended `size`).
398    #[commonware_macros::stability(ALPHA)]
399    pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
400        let items_per_blob = cfg.items_per_blob.get();
401        let tail_section = size / items_per_blob;
402
403        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
404        let segmented_cfg = SegmentedConfig {
405            partition: blob_partition,
406            page_cache: cfg.page_cache,
407            write_buffer: cfg.write_buffer,
408        };
409
410        // Initialize both stores.
411        let meta_cfg = MetadataConfig {
412            partition: format!("{}-metadata", cfg.partition),
413            codec_config: ((0..).into(), ()),
414        };
415        let mut metadata =
416            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
417        let mut inner = SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
418
419        // Clear blobs before updating metadata.
420        // This ordering is critical for crash safety:
421        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
422        // - Crash after create: old metadata triggers "metadata ahead" warning,
423        //   recovery falls back to blob state.
424        inner.clear().await?;
425        inner.ensure_section_exists(tail_section).await?;
426
427        // Persist metadata if pruning_boundary is mid-section.
428        if !size.is_multiple_of(items_per_blob) {
429            metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
430            metadata.sync().await?;
431        } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
432            metadata.remove(&PRUNING_BOUNDARY_KEY);
433            metadata.sync().await?;
434        }
435
436        Ok(Self {
437            inner,
438            items_per_blob,
439            size,
440            pruning_boundary: size, // No data exists yet
441            metadata,
442        })
443    }
444
445    /// Convert a global position to (section, position_in_section).
446    #[inline]
447    const fn position_to_section(&self, position: u64) -> (u64, u64) {
448        let section = position / self.items_per_blob;
449        let pos_in_section = position % self.items_per_blob;
450        (section, pos_in_section)
451    }
452
453    /// Sync any pending updates to disk.
454    ///
455    /// Only the tail section can have pending updates since historical sections are synced
456    /// when they become full.
457    pub async fn sync(&mut self) -> Result<(), Error> {
458        // Sync inner journal
459        let tail_section = self.size / self.items_per_blob;
460        self.inner.sync(tail_section).await?;
461
462        // Persist metadata only when pruning_boundary is mid-section.
463        if !self.pruning_boundary.is_multiple_of(self.items_per_blob) {
464            let needs_update = self
465                .metadata
466                .get(&PRUNING_BOUNDARY_KEY)
467                .is_none_or(|bytes| bytes.as_slice() != self.pruning_boundary.to_be_bytes());
468            if needs_update {
469                self.metadata.put(
470                    PRUNING_BOUNDARY_KEY,
471                    self.pruning_boundary.to_be_bytes().to_vec(),
472                );
473                self.metadata.sync().await?;
474            }
475        } else if self.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
476            self.metadata.remove(&PRUNING_BOUNDARY_KEY);
477            self.metadata.sync().await?;
478        }
479
480        Ok(())
481    }
482
483    /// Return the total number of items in the journal, irrespective of pruning. The next value
484    /// appended to the journal will be at this position.
485    pub const fn size(&self) -> u64 {
486        self.size
487    }
488
489    /// Returns [start, end) where `start` and `end - 1` are the indices of the oldest and newest
490    /// retained operations respectively.
491    pub const fn bounds(&self) -> std::ops::Range<u64> {
492        self.pruning_boundary..self.size
493    }
494
495    /// Append a new item to the journal. Return the item's position in the journal, or error if the
496    /// operation fails.
497    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
498        let position = self.size;
499        let (section, _pos_in_section) = self.position_to_section(position);
500
501        self.inner.append(section, item).await?;
502        self.size += 1;
503
504        // If we just filled up a section, sync it and create the next tail blob. This maintains the
505        // invariant that the tail blob always exists.
506        if self.size.is_multiple_of(self.items_per_blob) {
507            self.inner.sync(section).await?;
508            // Create the new tail blob.
509            self.inner.ensure_section_exists(section + 1).await?;
510        }
511
512        Ok(position)
513    }
514
515    /// Rewind the journal to the given `size`. Returns [Error::InvalidRewind] if the rewind point
516    /// precedes the oldest retained element. The journal is not synced after rewinding.
517    ///
518    /// # Warnings
519    ///
520    /// * This operation is not guaranteed to survive restarts until sync is called.
521    /// * This operation is not atomic, but it will always leave the journal in a consistent state
522    ///   in the event of failure since blobs are always removed from newest to oldest.
523    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
524        match size.cmp(&self.size) {
525            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
526            std::cmp::Ordering::Equal => return Ok(()),
527            std::cmp::Ordering::Less => {}
528        }
529
530        if size < self.pruning_boundary {
531            return Err(Error::InvalidRewind(size));
532        }
533
534        let section = size / self.items_per_blob;
535        let section_start = section * self.items_per_blob;
536
537        // Calculate offset within section for rewind
538        let first_in_section = self.pruning_boundary.max(section_start);
539        let pos_in_section = size - first_in_section;
540        let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
541
542        self.inner.rewind(section, byte_offset).await?;
543        self.size = size;
544
545        Ok(())
546    }
547
548    /// Read the item at position `pos` in the journal.
549    ///
550    /// # Errors
551    ///
552    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
553    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
554    pub async fn read(&self, pos: u64) -> Result<A, Error> {
555        let bounds = self.bounds();
556        if pos >= bounds.end {
557            return Err(Error::ItemOutOfRange(pos));
558        }
559        if pos < bounds.start {
560            return Err(Error::ItemPruned(pos));
561        }
562
563        let section = pos / self.items_per_blob;
564        let section_start = section * self.items_per_blob;
565
566        // Calculate position within the blob.
567        // This accounts for sections that begin mid-section (bounds.start > section_start).
568        let first_in_section = bounds.start.max(section_start);
569        let pos_in_section = pos - first_in_section;
570
571        self.inner.get(section, pos_in_section).await.map_err(|e| {
572            // Since we check bounds above, any failure here is unexpected.
573            match e {
574                Error::SectionOutOfRange(e)
575                | Error::AlreadyPrunedToSection(e)
576                | Error::ItemOutOfRange(e) => {
577                    Error::Corruption(format!("section/item should be found, but got: {e}"))
578                }
579                other => other,
580            }
581        })
582    }
583
584    /// Returns an ordered stream of all items in the journal with position >= `start_pos`.
585    ///
586    /// # Errors
587    ///
588    /// - [Error::ItemOutOfRange] if `start_pos > size`
589    /// - [Error::ItemPruned] if `start_pos < pruning_boundary`
590    /// - [Error::Corruption] if a middle section is incomplete
591    pub async fn replay(
592        &self,
593        buffer: NonZeroUsize,
594        start_pos: u64,
595    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
596        let bounds = self.bounds();
597        if start_pos > bounds.end {
598            return Err(Error::ItemOutOfRange(start_pos));
599        }
600        if start_pos < bounds.start {
601            return Err(Error::ItemPruned(start_pos));
602        }
603
604        let start_section = start_pos / self.items_per_blob;
605        let section_start = start_section * self.items_per_blob;
606
607        // Calculate start position within the section
608        let first_in_section = bounds.start.max(section_start);
609        let start_pos_in_section = start_pos - first_in_section;
610
611        let items_per_blob = self.items_per_blob;
612        let pruning_boundary = bounds.start;
613
614        // Check all middle sections (not oldest, not tail) in range are complete.
615        // The oldest section may be partial due to mid-section pruning boundary.
616        // The tail section may be partial because it's not been fully filled yet.
617        if let (Some(oldest), Some(newest)) =
618            (self.inner.oldest_section(), self.inner.newest_section())
619        {
620            // Start from max(start_section, oldest+1) to skip oldest which may be partial
621            let first_to_check = start_section.max(oldest + 1);
622            for section in first_to_check..newest {
623                let len = self.inner.section_len(section).await?;
624                if len < items_per_blob {
625                    return Err(Error::Corruption(format!(
626                        "section {section} incomplete: expected {items_per_blob} items, got {len}"
627                    )));
628                }
629            }
630        }
631
632        let inner_stream = self
633            .inner
634            .replay(start_section, start_pos_in_section, buffer)
635            .await?;
636
637        // Transform (section, pos_in_section, item) to (global_pos, item).
638        let stream = inner_stream.map(move |result| {
639            result.map(|(section, pos_in_section, item)| {
640                let section_start = section * items_per_blob;
641                let first_in_section = pruning_boundary.max(section_start);
642                let global_pos = first_in_section + pos_in_section;
643                (global_pos, item)
644            })
645        });
646
647        Ok(stream)
648    }
649
650    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
651    /// such items in order to preserve blob boundaries, but the amount of such items will always be
652    /// less than the configured number of items per blob. Returns true if any items were pruned.
653    ///
654    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
655    /// event of failure as items are always pruned in order from oldest to newest.
656    pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
657        // Calculate the section that would contain min_item_pos
658        let target_section = min_item_pos / self.items_per_blob;
659
660        // Calculate the tail section.
661        let tail_section = self.size / self.items_per_blob;
662
663        // Cap to tail section. The tail section is guaranteed to exist by our invariant.
664        let min_section = std::cmp::min(target_section, tail_section);
665
666        let pruned = self.inner.prune(min_section).await?;
667
668        // After pruning, update pruning_boundary to the start of the oldest remaining section
669        if pruned {
670            let new_oldest = self
671                .inner
672                .oldest_section()
673                .expect("all sections pruned - violates tail section invariant");
674            // Pruning boundary only moves forward
675            assert!(self.pruning_boundary < new_oldest * self.items_per_blob);
676            self.pruning_boundary = new_oldest * self.items_per_blob;
677        }
678
679        Ok(pruned)
680    }
681
682    /// Remove any persisted data created by the journal.
683    pub async fn destroy(self) -> Result<(), Error> {
684        // Destroy inner journal
685        self.inner.destroy().await?;
686
687        // Destroy metadata
688        self.metadata.destroy().await?;
689
690        Ok(())
691    }
692
693    /// Clear all data and reset the journal to a new starting position.
694    ///
695    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
696    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
697    ///
698    /// # Crash Safety
699    /// If a crash occurs during this operation, `init()` will recover to a consistent state
700    /// (though possibly different from the intended `new_size`).
701    pub(crate) async fn clear_to_size(&mut self, new_size: u64) -> Result<(), Error> {
702        // Clear blobs before updating metadata.
703        // This ordering is critical for crash safety:
704        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
705        // - Crash after create: old metadata triggers "metadata ahead" warning,
706        //   recovery falls back to blob state
707        self.inner.clear().await?;
708        let tail_section = new_size / self.items_per_blob;
709        self.inner.ensure_section_exists(tail_section).await?;
710
711        self.size = new_size;
712        self.pruning_boundary = new_size; // No data exists
713
714        // Persist metadata only when pruning_boundary is mid-section.
715        if !self.pruning_boundary.is_multiple_of(self.items_per_blob) {
716            self.metadata.put(
717                PRUNING_BOUNDARY_KEY,
718                self.pruning_boundary.to_be_bytes().to_vec(),
719            );
720            self.metadata.sync().await?;
721        } else if self.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
722            self.metadata.remove(&PRUNING_BOUNDARY_KEY);
723            self.metadata.sync().await?;
724        }
725
726        Ok(())
727    }
728}
729
730// Implement Contiguous trait for fixed-length journals
731impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
732    type Item = A;
733
734    fn bounds(&self) -> std::ops::Range<u64> {
735        Self::bounds(self)
736    }
737
738    async fn replay(
739        &self,
740        start_pos: u64,
741        buffer: NonZeroUsize,
742    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
743        Self::replay(self, buffer, start_pos).await
744    }
745
746    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
747        Self::read(self, position).await
748    }
749}
750
751impl<E: Clock + Storage + Metrics, A: CodecFixedShared> MutableContiguous for Journal<E, A> {
752    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
753        Self::append(self, item).await
754    }
755
756    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
757        Self::prune(self, min_position).await
758    }
759
760    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
761        Self::rewind(self, size).await
762    }
763}
764
765impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
766    type Error = Error;
767
768    async fn commit(&mut self) -> Result<(), Error> {
769        Self::sync(self).await
770    }
771
772    async fn sync(&mut self) -> Result<(), Error> {
773        Self::sync(self).await
774    }
775
776    async fn destroy(self) -> Result<(), Error> {
777        Self::destroy(self).await
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
785    use commonware_macros::test_traced;
786    use commonware_runtime::{
787        deterministic::{self, Context},
788        Blob, Error as RuntimeError, Metrics, Runner, Storage,
789    };
790    use commonware_utils::{NZUsize, NZU16, NZU64};
791    use futures::{pin_mut, StreamExt};
792    use std::num::NonZeroU16;
793
794    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
795    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
796
797    /// Generate a SHA-256 digest for the given value.
798    fn test_digest(value: u64) -> Digest {
799        Sha256::hash(&value.to_be_bytes())
800    }
801
802    fn test_cfg(items_per_blob: NonZeroU64) -> Config {
803        Config {
804            partition: "test_partition".into(),
805            items_per_blob,
806            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
807            write_buffer: NZUsize!(2048),
808        }
809    }
810
811    fn blob_partition(cfg: &Config) -> String {
812        format!("{}-blobs", cfg.partition)
813    }
814
815    async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
816        match context.scan(partition).await {
817            Ok(blobs) => blobs,
818            Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
819            Err(err) => panic!("Failed to scan partition {partition}: {err}"),
820        }
821    }
822
823    #[test_traced]
824    fn test_fixed_journal_init_conflicting_partitions() {
825        let executor = deterministic::Runner::default();
826        executor.start(|context| async move {
827            let cfg = test_cfg(NZU64!(2));
828            let legacy_partition = cfg.partition.clone();
829            let blobs_partition = blob_partition(&cfg);
830
831            let (legacy_blob, _) = context
832                .open(&legacy_partition, &0u64.to_be_bytes())
833                .await
834                .expect("Failed to open legacy blob");
835            legacy_blob
836                .write_at(0, vec![0u8; 1])
837                .await
838                .expect("Failed to write legacy blob");
839            legacy_blob
840                .sync()
841                .await
842                .expect("Failed to sync legacy blob");
843
844            let (new_blob, _) = context
845                .open(&blobs_partition, &0u64.to_be_bytes())
846                .await
847                .expect("Failed to open new blob");
848            new_blob
849                .write_at(0, vec![0u8; 1])
850                .await
851                .expect("Failed to write new blob");
852            new_blob.sync().await.expect("Failed to sync new blob");
853
854            let result =
855                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
856            assert!(matches!(result, Err(Error::Corruption(_))));
857        });
858    }
859
860    #[test_traced]
861    fn test_fixed_journal_init_prefers_legacy_partition() {
862        let executor = deterministic::Runner::default();
863        executor.start(|context| async move {
864            let cfg = test_cfg(NZU64!(2));
865            let legacy_partition = cfg.partition.clone();
866            let blobs_partition = blob_partition(&cfg);
867
868            // Seed legacy partition so it is selected.
869            let (legacy_blob, _) = context
870                .open(&legacy_partition, &0u64.to_be_bytes())
871                .await
872                .expect("Failed to open legacy blob");
873            legacy_blob
874                .write_at(0, vec![0u8; 1])
875                .await
876                .expect("Failed to write legacy blob");
877            legacy_blob
878                .sync()
879                .await
880                .expect("Failed to sync legacy blob");
881
882            let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
883                .await
884                .expect("failed to initialize journal");
885            journal.append(test_digest(1)).await.unwrap();
886            journal.sync().await.unwrap();
887            drop(journal);
888
889            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
890            let new_blobs = scan_partition(&context, &blobs_partition).await;
891            assert!(!legacy_blobs.is_empty());
892            assert!(new_blobs.is_empty());
893        });
894    }
895
896    #[test_traced]
897    fn test_fixed_journal_init_defaults_to_blobs_partition() {
898        let executor = deterministic::Runner::default();
899        executor.start(|context| async move {
900            let cfg = test_cfg(NZU64!(2));
901            let legacy_partition = cfg.partition.clone();
902            let blobs_partition = blob_partition(&cfg);
903
904            let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
905                .await
906                .expect("failed to initialize journal");
907            journal.append(test_digest(1)).await.unwrap();
908            journal.sync().await.unwrap();
909            drop(journal);
910
911            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
912            let new_blobs = scan_partition(&context, &blobs_partition).await;
913            assert!(legacy_blobs.is_empty());
914            assert!(!new_blobs.is_empty());
915        });
916    }
917
918    #[test_traced]
919    fn test_fixed_journal_append_and_prune() {
920        // Initialize the deterministic context
921        let executor = deterministic::Runner::default();
922
923        // Start the test within the executor
924        executor.start(|context| async move {
925            // Initialize the journal, allowing a max of 2 items per blob.
926            let cfg = test_cfg(NZU64!(2));
927            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
928                .await
929                .expect("failed to initialize journal");
930
931            // Append an item to the journal
932            let mut pos = journal
933                .append(test_digest(0))
934                .await
935                .expect("failed to append data 0");
936            assert_eq!(pos, 0);
937
938            // Drop the journal and re-initialize it to simulate a restart
939            journal.sync().await.expect("Failed to sync journal");
940            drop(journal);
941
942            let cfg = test_cfg(NZU64!(2));
943            let mut journal = Journal::init(context.with_label("second"), cfg.clone())
944                .await
945                .expect("failed to re-initialize journal");
946            assert_eq!(journal.size(), 1);
947
948            // Append two more items to the journal to trigger a new blob creation
949            pos = journal
950                .append(test_digest(1))
951                .await
952                .expect("failed to append data 1");
953            assert_eq!(pos, 1);
954            pos = journal
955                .append(test_digest(2))
956                .await
957                .expect("failed to append data 2");
958            assert_eq!(pos, 2);
959
960            // Read the items back
961            let item0 = journal.read(0).await.expect("failed to read data 0");
962            assert_eq!(item0, test_digest(0));
963            let item1 = journal.read(1).await.expect("failed to read data 1");
964            assert_eq!(item1, test_digest(1));
965            let item2 = journal.read(2).await.expect("failed to read data 2");
966            assert_eq!(item2, test_digest(2));
967            let err = journal.read(3).await.expect_err("expected read to fail");
968            assert!(matches!(err, Error::ItemOutOfRange(3)));
969
970            // Sync the journal
971            journal.sync().await.expect("failed to sync journal");
972
973            // Pruning to 1 should be a no-op because there's no blob with only older items.
974            journal.prune(1).await.expect("failed to prune journal 1");
975
976            // Pruning to 2 should allow the first blob to be pruned.
977            journal.prune(2).await.expect("failed to prune journal 2");
978            assert_eq!(journal.bounds().start, 2);
979
980            // Reading from the first blob should fail since it's now pruned
981            let result0 = journal.read(0).await;
982            assert!(matches!(result0, Err(Error::ItemPruned(0))));
983            let result1 = journal.read(1).await;
984            assert!(matches!(result1, Err(Error::ItemPruned(1))));
985
986            // Third item should still be readable
987            let result2 = journal.read(2).await.unwrap();
988            assert_eq!(result2, test_digest(2));
989
990            // Should be able to continue to append items
991            for i in 3..10 {
992                let pos = journal
993                    .append(test_digest(i))
994                    .await
995                    .expect("failed to append data");
996                assert_eq!(pos, i);
997            }
998
999            // Check no-op pruning
1000            journal.prune(0).await.expect("no-op pruning failed");
1001            assert_eq!(journal.inner.oldest_section(), Some(1));
1002            assert_eq!(journal.inner.newest_section(), Some(5));
1003            assert_eq!(journal.bounds().start, 2);
1004
1005            // Prune first 3 blobs (6 items)
1006            journal
1007                .prune(3 * cfg.items_per_blob.get())
1008                .await
1009                .expect("failed to prune journal 2");
1010            assert_eq!(journal.inner.oldest_section(), Some(3));
1011            assert_eq!(journal.inner.newest_section(), Some(5));
1012            assert_eq!(journal.bounds().start, 6);
1013
1014            // Try pruning (more than) everything in the journal.
1015            journal
1016                .prune(10000)
1017                .await
1018                .expect("failed to max-prune journal");
1019            let size = journal.size();
1020            assert_eq!(size, 10);
1021            assert_eq!(journal.inner.oldest_section(), Some(5));
1022            assert_eq!(journal.inner.newest_section(), Some(5));
1023            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
1024            // will be empty, and there will be no retained items.
1025            assert!(journal.bounds().is_empty());
1026            // bounds.start should equal bounds.end when empty.
1027            assert_eq!(journal.bounds().start, size);
1028
1029            // Replaying from 0 should fail since all items before bounds.start are pruned
1030            {
1031                let result = journal.replay(NZUsize!(1024), 0).await;
1032                assert!(matches!(result, Err(Error::ItemPruned(0))));
1033            }
1034
1035            // Replaying from pruning_boundary should return empty stream
1036            {
1037                let stream = journal
1038                    .replay(NZUsize!(1024), journal.bounds().start)
1039                    .await
1040                    .expect("failed to replay journal from pruning boundary");
1041                pin_mut!(stream);
1042                let mut items = Vec::new();
1043                while let Some(result) = stream.next().await {
1044                    match result {
1045                        Ok((pos, item)) => {
1046                            assert_eq!(test_digest(pos), item);
1047                            items.push(pos);
1048                        }
1049                        Err(err) => panic!("Failed to read item: {err}"),
1050                    }
1051                }
1052                assert_eq!(items, Vec::<u64>::new());
1053            }
1054
1055            journal.destroy().await.unwrap();
1056        });
1057    }
1058
1059    /// Append a lot of data to make sure we exercise page cache paging boundaries.
1060    #[test_traced]
1061    fn test_fixed_journal_append_a_lot_of_data() {
1062        // Initialize the deterministic context
1063        let executor = deterministic::Runner::default();
1064        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1065        executor.start(|context| async move {
1066            let cfg = test_cfg(ITEMS_PER_BLOB);
1067            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1068                .await
1069                .expect("failed to initialize journal");
1070            // Append 2 blobs worth of items.
1071            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1072                journal
1073                    .append(test_digest(i))
1074                    .await
1075                    .expect("failed to append data");
1076            }
1077            // Sync, reopen, then read back.
1078            journal.sync().await.expect("failed to sync journal");
1079            drop(journal);
1080            let journal = Journal::init(context.with_label("second"), cfg.clone())
1081                .await
1082                .expect("failed to re-initialize journal");
1083            for i in 0u64..10000 {
1084                let item: Digest = journal.read(i).await.expect("failed to read data");
1085                assert_eq!(item, test_digest(i));
1086            }
1087            journal.destroy().await.expect("failed to destroy journal");
1088        });
1089    }
1090
1091    #[test_traced]
1092    fn test_fixed_journal_replay() {
1093        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1094        // Initialize the deterministic context
1095        let executor = deterministic::Runner::default();
1096
1097        // Start the test within the executor
1098        executor.start(|context| async move {
1099            // Initialize the journal, allowing a max of 7 items per blob.
1100            let cfg = test_cfg(ITEMS_PER_BLOB);
1101            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1102                .await
1103                .expect("failed to initialize journal");
1104
1105            // Append many items, filling 100 blobs and part of the 101st
1106            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1107                let pos = journal
1108                    .append(test_digest(i))
1109                    .await
1110                    .expect("failed to append data");
1111                assert_eq!(pos, i);
1112            }
1113
1114            // Read them back the usual way.
1115            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1116                let item: Digest = journal.read(i).await.expect("failed to read data");
1117                assert_eq!(item, test_digest(i), "i={i}");
1118            }
1119
1120            // Replay should return all items
1121            {
1122                let stream = journal
1123                    .replay(NZUsize!(1024), 0)
1124                    .await
1125                    .expect("failed to replay journal");
1126                let mut items = Vec::new();
1127                pin_mut!(stream);
1128                while let Some(result) = stream.next().await {
1129                    match result {
1130                        Ok((pos, item)) => {
1131                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1132                            items.push(pos);
1133                        }
1134                        Err(err) => panic!("Failed to read item: {err}"),
1135                    }
1136                }
1137
1138                // Make sure all items were replayed
1139                assert_eq!(
1140                    items.len(),
1141                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1142                );
1143                items.sort();
1144                for (i, pos) in items.iter().enumerate() {
1145                    assert_eq!(i as u64, *pos);
1146                }
1147            }
1148
1149            journal.sync().await.expect("Failed to sync journal");
1150            drop(journal);
1151
1152            // Corrupt one of the bytes and make sure it's detected.
1153            let (blob, _) = context
1154                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1155                .await
1156                .expect("Failed to open blob");
1157            // Write junk bytes.
1158            let bad_bytes = 123456789u32;
1159            blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1160                .await
1161                .expect("Failed to write bad bytes");
1162            blob.sync().await.expect("Failed to sync blob");
1163
1164            // Re-initialize the journal to simulate a restart
1165            let journal = Journal::init(context.with_label("second"), cfg.clone())
1166                .await
1167                .expect("Failed to re-initialize journal");
1168
1169            // Make sure reading an item that resides in the corrupted page fails.
1170            let err = journal
1171                .read(40 * ITEMS_PER_BLOB.get() + 1)
1172                .await
1173                .unwrap_err();
1174            assert!(matches!(err, Error::Runtime(_)));
1175
1176            // Replay all items.
1177            {
1178                let mut error_found = false;
1179                let stream = journal
1180                    .replay(NZUsize!(1024), 0)
1181                    .await
1182                    .expect("failed to replay journal");
1183                let mut items = Vec::new();
1184                pin_mut!(stream);
1185                while let Some(result) = stream.next().await {
1186                    match result {
1187                        Ok((pos, item)) => {
1188                            assert_eq!(test_digest(pos), item);
1189                            items.push(pos);
1190                        }
1191                        Err(err) => {
1192                            error_found = true;
1193                            assert!(matches!(err, Error::Runtime(_)));
1194                            break;
1195                        }
1196                    }
1197                }
1198                assert!(error_found); // error should abort replay
1199            }
1200        });
1201    }
1202
1203    #[test_traced]
1204    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1205        // Initialize the deterministic context
1206        let executor = deterministic::Runner::default();
1207        // Start the test within the executor
1208        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1209        executor.start(|context| async move {
1210            // Initialize the journal, allowing a max of 7 items per blob.
1211            let cfg = test_cfg(ITEMS_PER_BLOB);
1212            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1213                .await
1214                .expect("failed to initialize journal");
1215
1216            // Append many items, filling 100 blobs and part of the 101st
1217            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1218                let pos = journal
1219                    .append(test_digest(i))
1220                    .await
1221                    .expect("failed to append data");
1222                assert_eq!(pos, i);
1223            }
1224            journal.sync().await.expect("Failed to sync journal");
1225            drop(journal);
1226
1227            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1228            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1229            // missing one item. This should be detected during init because all non-tail blobs
1230            // must be full.
1231            let (blob, size) = context
1232                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1233                .await
1234                .expect("Failed to open blob");
1235            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1236            blob.sync().await.expect("Failed to sync blob");
1237
1238            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1239            // missing one item. This should be detected as corruption during replay.
1240            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1241                .await
1242                .expect("failed to initialize journal");
1243
1244            // Journal size is computed from the tail section, so it's unchanged
1245            // despite the corruption in section 40.
1246            let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1247            assert_eq!(journal.size(), expected_size);
1248
1249            // Replay should detect corruption (incomplete section) in section 40
1250            match journal.replay(NZUsize!(1024), 0).await {
1251                Err(Error::Corruption(msg)) => {
1252                    assert!(
1253                        msg.contains("section 40"),
1254                        "Error should mention section 40, got: {msg}"
1255                    );
1256                }
1257                Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1258                Ok(_) => panic!("Expected replay to fail with corruption"),
1259            };
1260        });
1261    }
1262
1263    #[test_traced]
1264    fn test_fixed_journal_replay_with_missing_historical_blob() {
1265        let executor = deterministic::Runner::default();
1266        executor.start(|context| async move {
1267            let cfg = test_cfg(NZU64!(2));
1268            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1269                .await
1270                .expect("failed to initialize journal");
1271            for i in 0u64..5 {
1272                journal
1273                    .append(test_digest(i))
1274                    .await
1275                    .expect("failed to append data");
1276            }
1277            journal.sync().await.expect("failed to sync journal");
1278            drop(journal);
1279
1280            context
1281                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1282                .await
1283                .expect("failed to remove blob");
1284
1285            // Init won't detect the corruption.
1286            let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1287                .await
1288                .expect("init shouldn't fail");
1289
1290            // But replay will.
1291            match result.replay(NZUsize!(1024), 0).await {
1292                Err(Error::Corruption(_)) => {}
1293                Err(err) => panic!("expected Corruption, got: {err}"),
1294                Ok(_) => panic!("expected Corruption, got ok"),
1295            };
1296
1297            // As will trying to read an item that was in the deleted blob.
1298            match result.read(2).await {
1299                Err(Error::Corruption(_)) => {}
1300                Err(err) => panic!("expected Corruption, got: {err}"),
1301                Ok(_) => panic!("expected Corruption, got ok"),
1302            };
1303        });
1304    }
1305
1306    #[test_traced]
1307    fn test_fixed_journal_test_trim_blob() {
1308        // Initialize the deterministic context
1309        let executor = deterministic::Runner::default();
1310        // Start the test within the executor
1311        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1312        executor.start(|context| async move {
1313            // Initialize the journal, allowing a max of 7 items per blob.
1314            let cfg = test_cfg(ITEMS_PER_BLOB);
1315            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1316                .await
1317                .expect("failed to initialize journal");
1318
1319            // Fill one blob and put 3 items in the second.
1320            let item_count = ITEMS_PER_BLOB.get() + 3;
1321            for i in 0u64..item_count {
1322                journal
1323                    .append(test_digest(i))
1324                    .await
1325                    .expect("failed to append data");
1326            }
1327            assert_eq!(journal.size(), item_count);
1328            journal.sync().await.expect("Failed to sync journal");
1329            drop(journal);
1330
1331            // Truncate the tail blob by one byte, which should result in the last item being
1332            // discarded during replay (detected via corruption).
1333            let (blob, size) = context
1334                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1335                .await
1336                .expect("Failed to open blob");
1337            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1338            blob.sync().await.expect("Failed to sync blob");
1339
1340            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1341                .await
1342                .unwrap();
1343
1344            // The truncation invalidates the last page (bad checksum), which is removed.
1345            // This loses one item.
1346            assert_eq!(journal.size(), item_count - 1);
1347
1348            // Cleanup.
1349            journal.destroy().await.expect("Failed to destroy journal");
1350        });
1351    }
1352
1353    #[test_traced]
1354    fn test_fixed_journal_partial_replay() {
1355        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1356        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1357        // starting position.
1358        const START_POS: u64 = 53;
1359
1360        // Initialize the deterministic context
1361        let executor = deterministic::Runner::default();
1362        // Start the test within the executor
1363        executor.start(|context| async move {
1364            // Initialize the journal, allowing a max of 7 items per blob.
1365            let cfg = test_cfg(ITEMS_PER_BLOB);
1366            let mut journal = Journal::init(context.clone(), cfg.clone())
1367                .await
1368                .expect("failed to initialize journal");
1369
1370            // Append many items, filling 100 blobs and part of the 101st
1371            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1372                let pos = journal
1373                    .append(test_digest(i))
1374                    .await
1375                    .expect("failed to append data");
1376                assert_eq!(pos, i);
1377            }
1378
1379            // Replay should return all items except the first `START_POS`.
1380            {
1381                let stream = journal
1382                    .replay(NZUsize!(1024), START_POS)
1383                    .await
1384                    .expect("failed to replay journal");
1385                let mut items = Vec::new();
1386                pin_mut!(stream);
1387                while let Some(result) = stream.next().await {
1388                    match result {
1389                        Ok((pos, item)) => {
1390                            assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1391                            assert_eq!(
1392                                test_digest(pos),
1393                                item,
1394                                "Item at position {pos} did not match expected digest"
1395                            );
1396                            items.push(pos);
1397                        }
1398                        Err(err) => panic!("Failed to read item: {err}"),
1399                    }
1400                }
1401
1402                // Make sure all items were replayed
1403                assert_eq!(
1404                    items.len(),
1405                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1406                        - START_POS as usize
1407                );
1408                items.sort();
1409                for (i, pos) in items.iter().enumerate() {
1410                    assert_eq!(i as u64, *pos - START_POS);
1411                }
1412            }
1413
1414            journal.destroy().await.unwrap();
1415        });
1416    }
1417
1418    #[test_traced]
1419    fn test_fixed_journal_recover_from_partial_write() {
1420        // Initialize the deterministic context
1421        let executor = deterministic::Runner::default();
1422
1423        // Start the test within the executor
1424        executor.start(|context| async move {
1425            // Initialize the journal, allowing a max of 3 items per blob.
1426            let cfg = test_cfg(NZU64!(3));
1427            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1428                .await
1429                .expect("failed to initialize journal");
1430            for i in 0..5 {
1431                journal
1432                    .append(test_digest(i))
1433                    .await
1434                    .expect("failed to append data");
1435            }
1436            assert_eq!(journal.size(), 5);
1437            journal.sync().await.expect("Failed to sync journal");
1438            drop(journal);
1439
1440            // Manually truncate most recent blob to simulate a partial write.
1441            let (blob, size) = context
1442                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1443                .await
1444                .expect("Failed to open blob");
1445            // truncate the most recent blob by 1 byte which corrupts the most recent item
1446            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1447            blob.sync().await.expect("Failed to sync blob");
1448
1449            // Re-initialize the journal to simulate a restart
1450            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1451                .await
1452                .expect("Failed to re-initialize journal");
1453            // The truncation invalidates the last page, which is removed. This loses one item.
1454            assert_eq!(journal.pruning_boundary, 0);
1455            assert_eq!(journal.size(), 4);
1456            drop(journal);
1457
1458            // Delete the second blob and re-init
1459            context
1460                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1461                .await
1462                .expect("Failed to remove blob");
1463
1464            let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1465                .await
1466                .expect("Failed to re-initialize journal");
1467            // Only the first blob remains
1468            assert_eq!(journal.size(), 3);
1469
1470            journal.destroy().await.unwrap();
1471        });
1472    }
1473
1474    #[test_traced]
1475    fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1476        let executor = deterministic::Runner::default();
1477        executor.start(|context| async move {
1478            let cfg = test_cfg(NZU64!(5));
1479            let mut journal =
1480                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1481                    .await
1482                    .expect("failed to initialize journal at size");
1483
1484            // Append items so section 1 has exactly the expected minimum (3 items).
1485            for i in 0..8u64 {
1486                journal
1487                    .append(test_digest(100 + i))
1488                    .await
1489                    .expect("failed to append data");
1490            }
1491            journal.sync().await.expect("failed to sync journal");
1492            assert_eq!(journal.pruning_boundary, 7);
1493            assert_eq!(journal.size(), 15);
1494            drop(journal);
1495
1496            // Corrupt the oldest section by truncating one byte (drops one item on recovery).
1497            let (blob, size) = context
1498                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1499                .await
1500                .expect("failed to open oldest blob");
1501            blob.resize(size - 1).await.expect("failed to corrupt blob");
1502            blob.sync().await.expect("failed to sync blob");
1503
1504            let result =
1505                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1506            assert!(matches!(result, Err(Error::Corruption(_))));
1507        });
1508    }
1509
1510    #[test_traced]
1511    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1512        let executor = deterministic::Runner::default();
1513        executor.start(|context| async move {
1514            // Initialize the journal, allowing a max of 10 items per blob.
1515            let cfg = test_cfg(NZU64!(10));
1516            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1517                .await
1518                .expect("failed to initialize journal");
1519            // Add only a single item
1520            journal
1521                .append(test_digest(0))
1522                .await
1523                .expect("failed to append data");
1524            assert_eq!(journal.size(), 1);
1525            journal.sync().await.expect("Failed to sync journal");
1526            drop(journal);
1527
1528            // Manually truncate most recent blob to simulate a partial write.
1529            let (blob, size) = context
1530                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1531                .await
1532                .expect("Failed to open blob");
1533            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1534            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1535            blob.sync().await.expect("Failed to sync blob");
1536
1537            // Re-initialize the journal to simulate a restart
1538            let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1539                .await
1540                .expect("Failed to re-initialize journal");
1541
1542            // Since there was only a single item appended which we then corrupted, recovery should
1543            // leave us in the state of an empty journal.
1544            assert_eq!(journal.bounds().end, 0);
1545            assert!(journal.bounds().is_empty());
1546            // Make sure journal still works for appending.
1547            journal
1548                .append(test_digest(0))
1549                .await
1550                .expect("failed to append data");
1551            assert_eq!(journal.size(), 1);
1552
1553            journal.destroy().await.unwrap();
1554        });
1555    }
1556
1557    #[test_traced("DEBUG")]
1558    fn test_fixed_journal_recover_from_unwritten_data() {
1559        let executor = deterministic::Runner::default();
1560        executor.start(|context| async move {
1561            // Initialize the journal, allowing a max of 10 items per blob.
1562            let cfg = test_cfg(NZU64!(10));
1563            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1564                .await
1565                .expect("failed to initialize journal");
1566
1567            // Add only a single item
1568            journal
1569                .append(test_digest(0))
1570                .await
1571                .expect("failed to append data");
1572            assert_eq!(journal.size(), 1);
1573            journal.sync().await.expect("Failed to sync journal");
1574            drop(journal);
1575
1576            // Manually extend the blob to simulate a failure where the file was extended, but no
1577            // bytes were written due to failure.
1578            let (blob, size) = context
1579                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1580                .await
1581                .expect("Failed to open blob");
1582            blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1583                .await
1584                .expect("Failed to extend blob");
1585            blob.sync().await.expect("Failed to sync blob");
1586
1587            // Re-initialize the journal to simulate a restart
1588            let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1589                .await
1590                .expect("Failed to re-initialize journal");
1591
1592            // The zero-filled pages are detected as invalid (bad checksum) and truncated.
1593            // No items should be lost since we called sync before the corruption.
1594            assert_eq!(journal.size(), 1);
1595
1596            // Make sure journal still works for appending.
1597            journal
1598                .append(test_digest(1))
1599                .await
1600                .expect("failed to append data");
1601
1602            journal.destroy().await.unwrap();
1603        });
1604    }
1605
1606    #[test_traced]
1607    fn test_fixed_journal_rewinding() {
1608        let executor = deterministic::Runner::default();
1609        executor.start(|context| async move {
1610            // Initialize the journal, allowing a max of 2 items per blob.
1611            let cfg = test_cfg(NZU64!(2));
1612            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1613                .await
1614                .expect("failed to initialize journal");
1615            assert!(matches!(journal.rewind(0).await, Ok(())));
1616            assert!(matches!(
1617                journal.rewind(1).await,
1618                Err(Error::InvalidRewind(1))
1619            ));
1620
1621            // Append an item to the journal
1622            journal
1623                .append(test_digest(0))
1624                .await
1625                .expect("failed to append data 0");
1626            assert_eq!(journal.size(), 1);
1627            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1628            assert!(matches!(journal.rewind(0).await, Ok(())));
1629            assert_eq!(journal.size(), 0);
1630
1631            // append 7 items
1632            for i in 0..7 {
1633                let pos = journal
1634                    .append(test_digest(i))
1635                    .await
1636                    .expect("failed to append data");
1637                assert_eq!(pos, i);
1638            }
1639            assert_eq!(journal.size(), 7);
1640
1641            // rewind back to item #4, which should prune 2 blobs
1642            assert!(matches!(journal.rewind(4).await, Ok(())));
1643            assert_eq!(journal.size(), 4);
1644
1645            // rewind back to empty and ensure all blobs are rewound over
1646            assert!(matches!(journal.rewind(0).await, Ok(())));
1647            assert_eq!(journal.size(), 0);
1648
1649            // stress test: add 100 items, rewind 49, repeat x10.
1650            for _ in 0..10 {
1651                for i in 0..100 {
1652                    journal
1653                        .append(test_digest(i))
1654                        .await
1655                        .expect("failed to append data");
1656                }
1657                journal.rewind(journal.size() - 49).await.unwrap();
1658            }
1659            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1660            assert_eq!(journal.size(), ITEMS_REMAINING);
1661
1662            journal.sync().await.expect("Failed to sync journal");
1663            drop(journal);
1664
1665            // Repeat with a different blob size (3 items per blob)
1666            let mut cfg = test_cfg(NZU64!(3));
1667            cfg.partition = "test_partition_2".into();
1668            let mut journal = Journal::init(context.with_label("second"), cfg.clone())
1669                .await
1670                .expect("failed to initialize journal");
1671            for _ in 0..10 {
1672                for i in 0..100 {
1673                    journal
1674                        .append(test_digest(i))
1675                        .await
1676                        .expect("failed to append data");
1677                }
1678                journal.rewind(journal.size() - 49).await.unwrap();
1679            }
1680            assert_eq!(journal.size(), ITEMS_REMAINING);
1681
1682            journal.sync().await.expect("Failed to sync journal");
1683            drop(journal);
1684
1685            // Make sure re-opened journal is as expected
1686            let mut journal: Journal<_, Digest> =
1687                Journal::init(context.with_label("third"), cfg.clone())
1688                    .await
1689                    .expect("failed to re-initialize journal");
1690            assert_eq!(journal.size(), 10 * (100 - 49));
1691
1692            // Make sure rewinding works after pruning
1693            journal.prune(300).await.expect("pruning failed");
1694            assert_eq!(journal.size(), ITEMS_REMAINING);
1695            // Rewinding prior to our prune point should fail.
1696            assert!(matches!(
1697                journal.rewind(299).await,
1698                Err(Error::InvalidRewind(299))
1699            ));
1700            // Rewinding to the prune point should work.
1701            // always remain in the journal.
1702            assert!(matches!(journal.rewind(300).await, Ok(())));
1703            assert_eq!(journal.bounds().end, 300);
1704            assert!(journal.bounds().is_empty());
1705
1706            journal.destroy().await.unwrap();
1707        });
1708    }
1709
1710    /// Test recovery when blob is truncated to a page boundary with item size not dividing page size.
1711    ///
1712    /// This tests the scenario where:
1713    /// 1. Items (32 bytes) don't divide evenly into page size (44 bytes)
1714    /// 2. Data spans multiple pages
1715    /// 3. Blob is truncated to a page boundary (simulating crash before last page was written)
1716    /// 4. Journal should recover correctly on reopen
1717    #[test_traced]
1718    fn test_fixed_journal_recover_from_page_boundary_truncation() {
1719        let executor = deterministic::Runner::default();
1720        executor.start(|context: Context| async move {
1721            // Use a small items_per_blob to keep the test focused on a single blob
1722            let cfg = test_cfg(NZU64!(100));
1723            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1724                .await
1725                .expect("failed to initialize journal");
1726
1727            // Item size is 32 bytes (Digest), page size is 44 bytes.
1728            // 32 doesn't divide 44, so items will cross page boundaries.
1729            // Physical page size = 44 + 12 (CRC) = 56 bytes.
1730            //
1731            // Write enough items to span multiple pages:
1732            // - 10 items = 320 logical bytes
1733            // - This spans ceil(320/44) = 8 logical pages
1734            for i in 0u64..10 {
1735                journal
1736                    .append(test_digest(i))
1737                    .await
1738                    .expect("failed to append data");
1739            }
1740            assert_eq!(journal.size(), 10);
1741            journal.sync().await.expect("Failed to sync journal");
1742            drop(journal);
1743
1744            // Open the blob directly and truncate to a page boundary.
1745            // Physical page size = PAGE_SIZE + CHECKSUM_SIZE = 44 + 12 = 56
1746            let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1747            let (blob, size) = context
1748                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1749                .await
1750                .expect("Failed to open blob");
1751
1752            // Calculate how many full physical pages we have and truncate to lose the last one.
1753            let full_pages = size / physical_page_size;
1754            assert!(full_pages >= 2, "need at least 2 pages for this test");
1755            let truncate_to = (full_pages - 1) * physical_page_size;
1756
1757            blob.resize(truncate_to)
1758                .await
1759                .expect("Failed to truncate blob");
1760            blob.sync().await.expect("Failed to sync blob");
1761
1762            // Re-initialize the journal - it should recover by truncating to valid data
1763            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1764                .await
1765                .expect("Failed to re-initialize journal after page truncation");
1766
1767            // The journal should have fewer items now (those that fit in the remaining pages).
1768            // With logical page size 44 and item size 32:
1769            // - After truncating to (full_pages-1) physical pages, we have (full_pages-1)*44 logical bytes
1770            // - Number of complete items = floor(logical_bytes / 32)
1771            let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1772            let expected_items = remaining_logical_bytes / 32; // 32 = Digest::SIZE
1773            assert_eq!(
1774                journal.size(),
1775                expected_items,
1776                "Journal should recover to {} items after truncation",
1777                expected_items
1778            );
1779
1780            // Verify we can still read the remaining items
1781            for i in 0..expected_items {
1782                let item = journal
1783                    .read(i)
1784                    .await
1785                    .expect("failed to read recovered item");
1786                assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1787            }
1788
1789            journal.destroy().await.expect("Failed to destroy journal");
1790        });
1791    }
1792
1793    /// Test the contiguous fixed journal with items_per_blob: 1.
1794    ///
1795    /// This is an edge case where each item creates its own blob, and the
1796    /// tail blob is always empty after sync (because the item fills the blob
1797    /// and a new empty one is created).
1798    #[test_traced]
1799    fn test_single_item_per_blob() {
1800        let executor = deterministic::Runner::default();
1801        executor.start(|context| async move {
1802            let cfg = Config {
1803                partition: "single_item_per_blob".into(),
1804                items_per_blob: NZU64!(1),
1805                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1806                write_buffer: NZUsize!(2048),
1807            };
1808
1809            // === Test 1: Basic single item operation ===
1810            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1811                .await
1812                .expect("failed to initialize journal");
1813
1814            // Verify empty state
1815            assert_eq!(journal.bounds().end, 0);
1816            assert!(journal.bounds().is_empty());
1817
1818            // Append 1 item
1819            let pos = journal
1820                .append(test_digest(0))
1821                .await
1822                .expect("failed to append");
1823            assert_eq!(pos, 0);
1824            assert_eq!(journal.size(), 1);
1825
1826            // Sync
1827            journal.sync().await.expect("failed to sync");
1828
1829            // Read from size() - 1
1830            let value = journal
1831                .read(journal.size() - 1)
1832                .await
1833                .expect("failed to read");
1834            assert_eq!(value, test_digest(0));
1835
1836            // === Test 2: Multiple items with single item per blob ===
1837            for i in 1..10u64 {
1838                let pos = journal
1839                    .append(test_digest(i))
1840                    .await
1841                    .expect("failed to append");
1842                assert_eq!(pos, i);
1843                assert_eq!(journal.size(), i + 1);
1844
1845                // Verify we can read the just-appended item at size() - 1
1846                let value = journal
1847                    .read(journal.size() - 1)
1848                    .await
1849                    .expect("failed to read");
1850                assert_eq!(value, test_digest(i));
1851            }
1852
1853            // Verify all items can be read
1854            for i in 0..10u64 {
1855                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1856            }
1857
1858            journal.sync().await.expect("failed to sync");
1859
1860            // === Test 3: Pruning with single item per blob ===
1861            // Prune to position 5 (removes positions 0-4)
1862            journal.prune(5).await.expect("failed to prune");
1863
1864            // Size should still be 10
1865            assert_eq!(journal.size(), 10);
1866
1867            // bounds.start should be 5
1868            assert_eq!(journal.bounds().start, 5);
1869
1870            // Reading from size() - 1 (position 9) should still work
1871            let value = journal
1872                .read(journal.size() - 1)
1873                .await
1874                .expect("failed to read");
1875            assert_eq!(value, test_digest(9));
1876
1877            // Reading from pruned positions should return ItemPruned
1878            for i in 0..5 {
1879                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1880            }
1881
1882            // Reading from retained positions should work
1883            for i in 5..10u64 {
1884                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1885            }
1886
1887            // Append more items after pruning
1888            for i in 10..15u64 {
1889                let pos = journal
1890                    .append(test_digest(i))
1891                    .await
1892                    .expect("failed to append");
1893                assert_eq!(pos, i);
1894
1895                // Verify we can read from size() - 1
1896                let value = journal
1897                    .read(journal.size() - 1)
1898                    .await
1899                    .expect("failed to read");
1900                assert_eq!(value, test_digest(i));
1901            }
1902
1903            journal.sync().await.expect("failed to sync");
1904            drop(journal);
1905
1906            // === Test 4: Restart persistence with single item per blob ===
1907            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1908                .await
1909                .expect("failed to re-initialize journal");
1910
1911            // Verify size is preserved
1912            assert_eq!(journal.size(), 15);
1913
1914            // Verify bounds.start is preserved
1915            assert_eq!(journal.bounds().start, 5);
1916
1917            // Reading from size() - 1 should work after restart
1918            let value = journal
1919                .read(journal.size() - 1)
1920                .await
1921                .expect("failed to read");
1922            assert_eq!(value, test_digest(14));
1923
1924            // Reading all retained positions should work
1925            for i in 5..15u64 {
1926                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1927            }
1928
1929            journal.destroy().await.expect("failed to destroy journal");
1930
1931            // === Test 5: Restart after pruning with non-zero index ===
1932            // Fresh journal for this test
1933            let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1934                .await
1935                .expect("failed to initialize journal");
1936
1937            // Append 10 items (positions 0-9)
1938            for i in 0..10u64 {
1939                journal.append(test_digest(i + 100)).await.unwrap();
1940            }
1941
1942            // Prune to position 5 (removes positions 0-4)
1943            journal.prune(5).await.unwrap();
1944            assert_eq!(journal.bounds().end, 10);
1945            assert_eq!(journal.bounds().start, 5);
1946
1947            // Sync and restart
1948            journal.sync().await.unwrap();
1949            drop(journal);
1950
1951            // Re-open journal
1952            let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
1953                .await
1954                .expect("failed to re-initialize journal");
1955
1956            // Verify state after restart
1957            assert_eq!(journal.bounds().end, 10);
1958            assert_eq!(journal.bounds().start, 5);
1959
1960            // Reading from size() - 1 (position 9) should work
1961            let value = journal.read(journal.size() - 1).await.unwrap();
1962            assert_eq!(value, test_digest(109));
1963
1964            // Verify all retained positions (5-9) work
1965            for i in 5..10u64 {
1966                assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
1967            }
1968
1969            journal.destroy().await.expect("failed to destroy journal");
1970
1971            // === Test 6: Prune all items (edge case) ===
1972            let mut journal = Journal::init(context.clone(), cfg.clone())
1973                .await
1974                .expect("failed to initialize journal");
1975
1976            for i in 0..5u64 {
1977                journal.append(test_digest(i + 200)).await.unwrap();
1978            }
1979            journal.sync().await.unwrap();
1980
1981            // Prune all items
1982            journal.prune(5).await.unwrap();
1983            assert_eq!(journal.bounds().end, 5); // Size unchanged
1984            assert!(journal.bounds().is_empty()); // All pruned
1985
1986            // size() - 1 = 4, but position 4 is pruned
1987            let result = journal.read(journal.size() - 1).await;
1988            assert!(matches!(result, Err(Error::ItemPruned(4))));
1989
1990            // After appending, reading works again
1991            journal.append(test_digest(205)).await.unwrap();
1992            assert_eq!(journal.bounds().start, 5);
1993            assert_eq!(
1994                journal.read(journal.size() - 1).await.unwrap(),
1995                test_digest(205)
1996            );
1997
1998            journal.destroy().await.expect("failed to destroy journal");
1999        });
2000    }
2001
2002    #[test_traced]
2003    fn test_fixed_journal_init_at_size_zero() {
2004        let executor = deterministic::Runner::default();
2005        executor.start(|context| async move {
2006            let cfg = test_cfg(NZU64!(5));
2007            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2008                .await
2009                .unwrap();
2010
2011            assert_eq!(journal.bounds().end, 0);
2012            assert!(journal.bounds().is_empty());
2013
2014            // Next append should get position 0
2015            let pos = journal.append(test_digest(100)).await.unwrap();
2016            assert_eq!(pos, 0);
2017            assert_eq!(journal.size(), 1);
2018            assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2019
2020            journal.destroy().await.unwrap();
2021        });
2022    }
2023
2024    #[test_traced]
2025    fn test_fixed_journal_init_at_size_section_boundary() {
2026        let executor = deterministic::Runner::default();
2027        executor.start(|context| async move {
2028            let cfg = test_cfg(NZU64!(5));
2029
2030            // Initialize at position 10 (exactly at section 2 boundary with items_per_blob=5)
2031            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2032                .await
2033                .unwrap();
2034
2035            assert_eq!(journal.bounds().end, 10);
2036            assert!(journal.bounds().is_empty());
2037
2038            // Next append should get position 10
2039            let pos = journal.append(test_digest(1000)).await.unwrap();
2040            assert_eq!(pos, 10);
2041            assert_eq!(journal.size(), 11);
2042            assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2043
2044            // Can continue appending
2045            let pos = journal.append(test_digest(1001)).await.unwrap();
2046            assert_eq!(pos, 11);
2047            assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2048
2049            journal.destroy().await.unwrap();
2050        });
2051    }
2052
2053    #[test_traced]
2054    fn test_fixed_journal_init_at_size_mid_section() {
2055        let executor = deterministic::Runner::default();
2056        executor.start(|context| async move {
2057            let cfg = test_cfg(NZU64!(5));
2058
2059            // Initialize at position 7 (middle of section 1 with items_per_blob=5)
2060            // No data exists yet, so oldest_retained_pos is None
2061            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2062                .await
2063                .unwrap();
2064
2065            assert_eq!(journal.bounds().end, 7);
2066            // No data exists yet after init_at_size
2067            assert!(journal.bounds().is_empty());
2068
2069            // Reading before bounds.start should return ItemPruned
2070            assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2071            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2072
2073            // Next append should get position 7
2074            let pos = journal.append(test_digest(700)).await.unwrap();
2075            assert_eq!(pos, 7);
2076            assert_eq!(journal.size(), 8);
2077            assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2078            // Now bounds.start should be 7 (first data position)
2079            assert_eq!(journal.bounds().start, 7);
2080
2081            journal.destroy().await.unwrap();
2082        });
2083    }
2084
2085    #[test_traced]
2086    fn test_fixed_journal_init_at_size_persistence() {
2087        let executor = deterministic::Runner::default();
2088        executor.start(|context| async move {
2089            let cfg = test_cfg(NZU64!(5));
2090
2091            // Initialize at position 15
2092            let mut journal =
2093                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2094                    .await
2095                    .unwrap();
2096
2097            // Append some items
2098            for i in 0..5u64 {
2099                let pos = journal.append(test_digest(1500 + i)).await.unwrap();
2100                assert_eq!(pos, 15 + i);
2101            }
2102
2103            assert_eq!(journal.size(), 20);
2104
2105            // Sync and reopen
2106            journal.sync().await.unwrap();
2107            drop(journal);
2108
2109            let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2110                .await
2111                .unwrap();
2112
2113            // Size and data should be preserved
2114            assert_eq!(journal.bounds().end, 20);
2115            assert_eq!(journal.bounds().start, 15);
2116
2117            // Verify data
2118            for i in 0..5u64 {
2119                assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2120            }
2121
2122            // Can continue appending
2123            let pos = journal.append(test_digest(9999)).await.unwrap();
2124            assert_eq!(pos, 20);
2125            assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2126
2127            journal.destroy().await.unwrap();
2128        });
2129    }
2130
2131    #[test_traced]
2132    fn test_fixed_journal_init_at_size_persistence_without_data() {
2133        let executor = deterministic::Runner::default();
2134        executor.start(|context| async move {
2135            let cfg = test_cfg(NZU64!(5));
2136
2137            // Initialize at position 15
2138            let journal =
2139                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2140                    .await
2141                    .unwrap();
2142
2143            assert_eq!(journal.bounds().end, 15);
2144            assert!(journal.bounds().is_empty());
2145
2146            // Drop without writing any data
2147            drop(journal);
2148
2149            // Reopen and verify size persisted
2150            let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2151                .await
2152                .unwrap();
2153
2154            assert_eq!(journal.bounds().end, 15);
2155            assert!(journal.bounds().is_empty());
2156
2157            // Can append starting at position 15
2158            let pos = journal.append(test_digest(1500)).await.unwrap();
2159            assert_eq!(pos, 15);
2160            assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2161
2162            journal.destroy().await.unwrap();
2163        });
2164    }
2165
2166    #[test_traced]
2167    fn test_fixed_journal_init_at_size_large_offset() {
2168        let executor = deterministic::Runner::default();
2169        executor.start(|context| async move {
2170            let cfg = test_cfg(NZU64!(5));
2171
2172            // Initialize at a large position (position 1000)
2173            let mut journal =
2174                Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2175                    .await
2176                    .unwrap();
2177
2178            assert_eq!(journal.bounds().end, 1000);
2179            assert!(journal.bounds().is_empty());
2180
2181            // Next append should get position 1000
2182            let pos = journal.append(test_digest(100000)).await.unwrap();
2183            assert_eq!(pos, 1000);
2184            assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2185
2186            journal.destroy().await.unwrap();
2187        });
2188    }
2189
2190    #[test_traced]
2191    fn test_fixed_journal_init_at_size_prune_and_append() {
2192        let executor = deterministic::Runner::default();
2193        executor.start(|context| async move {
2194            let cfg = test_cfg(NZU64!(5));
2195
2196            // Initialize at position 20
2197            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2198                .await
2199                .unwrap();
2200
2201            // Append items 20-29
2202            for i in 0..10u64 {
2203                journal.append(test_digest(2000 + i)).await.unwrap();
2204            }
2205
2206            assert_eq!(journal.size(), 30);
2207
2208            // Prune to position 25
2209            journal.prune(25).await.unwrap();
2210
2211            assert_eq!(journal.bounds().end, 30);
2212            assert_eq!(journal.bounds().start, 25);
2213
2214            // Verify remaining items are readable
2215            for i in 25..30u64 {
2216                assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2217            }
2218
2219            // Continue appending
2220            let pos = journal.append(test_digest(3000)).await.unwrap();
2221            assert_eq!(pos, 30);
2222
2223            journal.destroy().await.unwrap();
2224        });
2225    }
2226
2227    #[test_traced]
2228    fn test_fixed_journal_clear_to_size() {
2229        let executor = deterministic::Runner::default();
2230        executor.start(|context| async move {
2231            let cfg = test_cfg(NZU64!(10));
2232            let mut journal = Journal::init(context.with_label("journal"), cfg.clone())
2233                .await
2234                .expect("failed to initialize journal");
2235
2236            // Append 25 items (positions 0-24, spanning 3 blobs)
2237            for i in 0..25u64 {
2238                journal.append(test_digest(i)).await.unwrap();
2239            }
2240            assert_eq!(journal.size(), 25);
2241            journal.sync().await.unwrap();
2242
2243            // Clear to position 100, effectively resetting the journal
2244            journal.clear_to_size(100).await.unwrap();
2245            assert_eq!(journal.size(), 100);
2246
2247            // Old positions should fail
2248            for i in 0..25 {
2249                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2250            }
2251
2252            // Verify size persists after restart without writing any data
2253            drop(journal);
2254            let mut journal =
2255                Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2256                    .await
2257                    .expect("failed to re-initialize journal after clear");
2258            assert_eq!(journal.size(), 100);
2259
2260            // Append new data starting at position 100
2261            for i in 100..105u64 {
2262                let pos = journal.append(test_digest(i)).await.unwrap();
2263                assert_eq!(pos, i);
2264            }
2265            assert_eq!(journal.size(), 105);
2266
2267            // New positions should be readable
2268            for i in 100..105u64 {
2269                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2270            }
2271
2272            // Sync and re-init to verify persistence
2273            journal.sync().await.unwrap();
2274            drop(journal);
2275
2276            let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2277                .await
2278                .expect("failed to re-initialize journal");
2279
2280            assert_eq!(journal.size(), 105);
2281            for i in 100..105u64 {
2282                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2283            }
2284
2285            journal.destroy().await.unwrap();
2286        });
2287    }
2288
2289    #[test_traced]
2290    fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2291        // Old meta = None (aligned), new boundary = aligned.
2292        let executor = deterministic::Runner::default();
2293        executor.start(|context| async move {
2294            let cfg = test_cfg(NZU64!(5));
2295            let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2296                .await
2297                .unwrap();
2298
2299            for i in 0..5u64 {
2300                journal.append(test_digest(i)).await.unwrap();
2301            }
2302            let tail_section = journal.size / journal.items_per_blob;
2303            journal.inner.sync(tail_section).await.unwrap();
2304            drop(journal);
2305
2306            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2307                .await
2308                .unwrap();
2309            assert_eq!(journal.bounds().start, 0);
2310            assert_eq!(journal.bounds().end, 5);
2311            journal.destroy().await.unwrap();
2312        });
2313    }
2314
2315    #[test_traced]
2316    fn test_fixed_journal_oldest_section_invalid_len() {
2317        // Old meta = None (aligned), new boundary = mid-section.
2318        let executor = deterministic::Runner::default();
2319        executor.start(|context| async move {
2320            let cfg = test_cfg(NZU64!(5));
2321            let mut journal =
2322                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2323                    .await
2324                    .unwrap();
2325            for i in 0..3u64 {
2326                journal.append(test_digest(i)).await.unwrap();
2327            }
2328            assert_eq!(journal.inner.newest_section(), Some(2));
2329            journal.sync().await.unwrap();
2330
2331            // Simulate metadata deletion (corruption).
2332            journal.metadata.clear();
2333            journal.metadata.sync().await.unwrap();
2334            drop(journal);
2335
2336            // Section 1 has items 7,8,9 but metadata is missing, so falls back to blob-based boundary.
2337            // Section 1 has 3 items, but recovery thinks it should have 5 because metadata deletion
2338            // causes us to forget that section 1 starts at logical position 7.
2339            let result =
2340                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2341            assert!(matches!(result, Err(Error::Corruption(_))));
2342            context.remove(&blob_partition(&cfg), None).await.unwrap();
2343            context
2344                .remove(&format!("{}-metadata", cfg.partition), None)
2345                .await
2346                .unwrap();
2347        });
2348    }
2349
2350    #[test_traced]
2351    fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2352        // Old meta = Some(mid), new boundary = mid-section (same value).
2353        let executor = deterministic::Runner::default();
2354        executor.start(|context| async move {
2355            let cfg = test_cfg(NZU64!(5));
2356            let mut journal =
2357                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2358                    .await
2359                    .unwrap();
2360            for i in 0..3u64 {
2361                journal.append(test_digest(i)).await.unwrap();
2362            }
2363            let tail_section = journal.size / journal.items_per_blob;
2364            journal.inner.sync(tail_section).await.unwrap();
2365            drop(journal);
2366
2367            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2368                .await
2369                .unwrap();
2370            assert_eq!(journal.bounds().start, 7);
2371            assert_eq!(journal.bounds().end, 10);
2372            journal.destroy().await.unwrap();
2373        });
2374    }
2375    #[test_traced]
2376    fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2377        // Old meta = Some(mid), new boundary = aligned.
2378        let executor = deterministic::Runner::default();
2379        executor.start(|context| async move {
2380            let cfg = test_cfg(NZU64!(5));
2381            let mut journal =
2382                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2383                    .await
2384                    .unwrap();
2385            for i in 0..10u64 {
2386                journal.append(test_digest(i)).await.unwrap();
2387            }
2388            assert_eq!(journal.size(), 17);
2389            journal.prune(10).await.unwrap();
2390
2391            let tail_section = journal.size / journal.items_per_blob;
2392            journal.inner.sync(tail_section).await.unwrap();
2393            drop(journal);
2394
2395            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2396                .await
2397                .unwrap();
2398            assert_eq!(journal.bounds().start, 10);
2399            assert_eq!(journal.bounds().end, 17);
2400            journal.destroy().await.unwrap();
2401        });
2402    }
2403
2404    #[test_traced]
2405    fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2406        // Pruning to a position earlier than pruning_boundary (within the same section)
2407        // should not move the boundary backwards.
2408        let executor = deterministic::Runner::default();
2409        executor.start(|context| async move {
2410            let cfg = test_cfg(NZU64!(5));
2411            // init_at_size(7) sets pruning_boundary = 7 (mid-section in section 1)
2412            let mut journal =
2413                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2414                    .await
2415                    .unwrap();
2416            // Append 5 items at positions 7-11, filling section 1 and part of section 2
2417            for i in 0..5u64 {
2418                journal.append(test_digest(i)).await.unwrap();
2419            }
2420            // Prune to position 5 (section 1 start) should NOT move boundary back from 7 to 5
2421            journal.prune(5).await.unwrap();
2422            assert_eq!(journal.bounds().start, 7);
2423            journal.destroy().await.unwrap();
2424        });
2425    }
2426
2427    #[test_traced]
2428    fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2429        // Test replay when first section begins mid-section: init_at_size creates a journal
2430        // where pruning_boundary is mid-section, then we append across multiple sections.
2431        let executor = deterministic::Runner::default();
2432        executor.start(|context| async move {
2433            let cfg = test_cfg(NZU64!(5));
2434
2435            // Initialize at position 7 (mid-section with items_per_blob=5)
2436            // Section 1 (positions 5-9) begins mid-section: only positions 7, 8, 9 have data
2437            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2438                .await
2439                .unwrap();
2440
2441            // Append 13 items (positions 7-19), spanning sections 1, 2, 3
2442            for i in 0..13u64 {
2443                let pos = journal.append(test_digest(100 + i)).await.unwrap();
2444                assert_eq!(pos, 7 + i);
2445            }
2446            assert_eq!(journal.size(), 20);
2447            journal.sync().await.unwrap();
2448
2449            // Replay from pruning_boundary
2450            {
2451                let stream = journal
2452                    .replay(NZUsize!(1024), 7)
2453                    .await
2454                    .expect("failed to replay");
2455                pin_mut!(stream);
2456                let mut items: Vec<(u64, Digest)> = Vec::new();
2457                while let Some(result) = stream.next().await {
2458                    items.push(result.expect("replay item failed"));
2459                }
2460
2461                // Should get all 13 items with correct logical positions
2462                assert_eq!(items.len(), 13);
2463                for (i, (pos, item)) in items.iter().enumerate() {
2464                    assert_eq!(*pos, 7 + i as u64);
2465                    assert_eq!(*item, test_digest(100 + i as u64));
2466                }
2467            }
2468
2469            // Replay from mid-stream (position 12)
2470            {
2471                let stream = journal
2472                    .replay(NZUsize!(1024), 12)
2473                    .await
2474                    .expect("failed to replay from mid-stream");
2475                pin_mut!(stream);
2476                let mut items: Vec<(u64, Digest)> = Vec::new();
2477                while let Some(result) = stream.next().await {
2478                    items.push(result.expect("replay item failed"));
2479                }
2480
2481                // Should get items from position 12 onwards
2482                assert_eq!(items.len(), 8);
2483                for (i, (pos, item)) in items.iter().enumerate() {
2484                    assert_eq!(*pos, 12 + i as u64);
2485                    assert_eq!(*item, test_digest(100 + 5 + i as u64));
2486                }
2487            }
2488
2489            journal.destroy().await.unwrap();
2490        });
2491    }
2492
2493    #[test_traced]
2494    fn test_fixed_journal_rewind_error_before_bounds_start() {
2495        // Test that rewind returns error when trying to rewind before bounds.start
2496        let executor = deterministic::Runner::default();
2497        executor.start(|context| async move {
2498            let cfg = test_cfg(NZU64!(5));
2499
2500            let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2501                .await
2502                .unwrap();
2503
2504            // Append a few items (positions 10, 11, 12)
2505            for i in 0..3u64 {
2506                journal.append(test_digest(i)).await.unwrap();
2507            }
2508            assert_eq!(journal.size(), 13);
2509
2510            // Rewind to position 11 should work
2511            journal.rewind(11).await.unwrap();
2512            assert_eq!(journal.size(), 11);
2513
2514            // Rewind to position 10 (pruning_boundary) should work
2515            journal.rewind(10).await.unwrap();
2516            assert_eq!(journal.size(), 10);
2517
2518            // Rewind to before pruning_boundary should fail
2519            let result = journal.rewind(9).await;
2520            assert!(matches!(result, Err(Error::InvalidRewind(9))));
2521
2522            journal.destroy().await.unwrap();
2523        });
2524    }
2525
2526    #[test_traced]
2527    fn test_fixed_journal_init_at_size_crash_scenarios() {
2528        let executor = deterministic::Runner::default();
2529        executor.start(|context| async move {
2530            let cfg = test_cfg(NZU64!(5));
2531
2532            // Setup: Create a journal with some data and mid-section metadata
2533            let mut journal =
2534                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2535                    .await
2536                    .unwrap();
2537            for i in 0..5u64 {
2538                journal.append(test_digest(i)).await.unwrap();
2539            }
2540            journal.sync().await.unwrap();
2541            drop(journal);
2542
2543            // Crash Scenario 1: After clear(), before blob creation
2544            // Simulate by manually removing all blobs but leaving metadata
2545            let blob_part = blob_partition(&cfg);
2546            context.remove(&blob_part, None).await.unwrap();
2547
2548            // Recovery should see no blobs and return empty journal, ignoring metadata
2549            let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2550                .await
2551                .expect("init failed after clear crash");
2552            assert_eq!(journal.bounds().end, 0);
2553            assert_eq!(journal.bounds().start, 0);
2554            drop(journal);
2555
2556            // Restore metadata for next scenario (it might have been removed by init)
2557            let meta_cfg = MetadataConfig {
2558                partition: format!("{}-metadata", cfg.partition),
2559                codec_config: ((0..).into(), ()),
2560            };
2561            let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2562                context.with_label("restore_meta"),
2563                meta_cfg.clone(),
2564            )
2565            .await
2566            .unwrap();
2567            metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2568            metadata.sync().await.unwrap();
2569
2570            // Crash Scenario 2: After ensure_section_exists(), before metadata update
2571            // Target: init_at_size(12) -> should be section 2 (starts at 10)
2572            // State: Blob at section 2, Metadata says 7 (section 1)
2573            // Wait, old metadata (7) is BEHIND new blob (12/5 = 2).
2574            // recover_bounds treats "meta < blob" as stale -> uses blob.
2575
2576            // Let's try init_at_size(2) -> section 0.
2577            // Old metadata says 7 (section 1).
2578            // State: Blob at section 0, Metadata says 7 (section 1).
2579            // recover_bounds sees "meta (1) > blob (0)" -> metadata ahead -> uses blob.
2580
2581            // Simulate: Create blob at section 0 (tail for init_at_size(2))
2582            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2583            blob.sync().await.unwrap(); // Ensure it exists
2584
2585            // Recovery should warn "metadata ahead" and use blob state (0, 0)
2586            let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2587                .await
2588                .expect("init failed after create crash");
2589
2590            // Should recover to blob state (section 0 aligned)
2591            assert_eq!(journal.bounds().start, 0);
2592            // Size is 0 because blob is empty
2593            assert_eq!(journal.bounds().end, 0);
2594            journal.destroy().await.unwrap();
2595        });
2596    }
2597
2598    #[test_traced]
2599    fn test_fixed_journal_clear_to_size_crash_scenarios() {
2600        let executor = deterministic::Runner::default();
2601        executor.start(|context| async move {
2602            let cfg = test_cfg(NZU64!(5));
2603
2604            // Setup: Init at 12 (Section 2, offset 2)
2605            // Metadata = 12
2606            let mut journal =
2607                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2608                    .await
2609                    .unwrap();
2610            journal.sync().await.unwrap();
2611            drop(journal);
2612
2613            // Crash Scenario: clear_to_size(2) [Section 0]
2614            // We want to simulate crash after blob 0 created, but metadata still 12.
2615
2616            // manually clear blobs
2617            let blob_part = blob_partition(&cfg);
2618            context.remove(&blob_part, None).await.unwrap();
2619
2620            // manually create section 0
2621            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2622            blob.sync().await.unwrap();
2623
2624            // Metadata is still 12 (from setup)
2625            // Blob is Section 0
2626            // Metadata (12 -> sec 2) > Blob (sec 0) -> Ahead warning
2627
2628            let journal =
2629                Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2630                    .await
2631                    .expect("init failed after clear_to_size crash");
2632
2633            // Should fallback to blobs
2634            assert_eq!(journal.bounds().start, 0);
2635            assert_eq!(journal.bounds().end, 0);
2636            journal.destroy().await.unwrap();
2637        });
2638    }
2639}