Skip to main content

commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs. Each `Blob` contains a
12//! configurable maximum of `items_per_blob`, with page-level data integrity provided by a buffer
13//! pool.
14//!
15//! ```text
16//! +--------+----- --+--- -+----------+
17//! | item_0 | item_1 | ... | item_n-1 |
18//! +--------+-----------+--------+----0
19//!
20//! n = config.items_per_blob
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! Data fetched from disk is always checked for integrity before being returned. If the data is
27//! found to be invalid, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` and/or pruning old items.
33//!
34//! # Partition
35//!
36//! Blobs are stored in the legacy partition (`cfg.partition`) if it already contains data;
37//! otherwise they are stored in `{cfg.partition}-blobs`.
38//!
39//! Metadata is stored in `{cfg.partition}-metadata`.
40//!
41//! # Consistency
42//!
43//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
44//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
45//! calling `close`, all pending data is automatically synced and any open blobs are closed.
46//!
47//! # Pruning
48//!
49//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
50//! given point in history.
51//!
52//! # Replay
53//!
54//! The `replay` method supports fast reading of all unpruned items into memory.
55
56#[cfg(test)]
57use super::Reader as _;
58use crate::{
59    journal::{
60        contiguous::Mutable,
61        segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62        Error,
63    },
64    metadata::{Config as MetadataConfig, Metadata},
65    Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74/// Metadata key for storing the pruning boundary.
75const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77/// Configuration for `Journal` storage.
78#[derive(Clone)]
79pub struct Config {
80    /// Prefix for the journal partitions.
81    ///
82    /// Blobs are stored in `partition` (legacy) if it contains data, otherwise in
83    /// `{partition}-blobs`. Metadata is stored in `{partition}-metadata`.
84    pub partition: String,
85
86    /// The maximum number of journal items to store in each blob.
87    ///
88    /// Any unpruned historical blobs will contain exactly this number of items.
89    /// Only the newest blob may contain fewer items.
90    pub items_per_blob: NonZeroU64,
91
92    /// The page cache to use for caching data.
93    pub page_cache: CacheRef,
94
95    /// The size of the write buffer to use for each blob.
96    pub write_buffer: NonZeroUsize,
97}
98
99/// Inner state protected by a single RwLock.
100struct Inner<E: Clock + Storage + Metrics, A: CodecFixedShared> {
101    /// The underlying segmented journal.
102    journal: SegmentedJournal<E, A>,
103
104    /// Total number of items appended (not affected by pruning).
105    size: u64,
106
107    /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's
108    /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning
109    /// boundary. Otherwise, the metadata is empty.
110    ///
111    /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is
112    /// persisted to ensure that its pruning boundary is never after the inner journal's size.
113    // TODO(#2939): Remove metadata
114    metadata: Metadata<E, u64, Vec<u8>>,
115
116    /// The position before which all items have been pruned.
117    pruning_boundary: u64,
118}
119
120impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Inner<E, A> {
121    /// Read the item at position `pos` in the journal.
122    ///
123    /// # Errors
124    ///
125    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
126    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
127    async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128        if pos >= self.size {
129            return Err(Error::ItemOutOfRange(pos));
130        }
131        if pos < self.pruning_boundary {
132            return Err(Error::ItemPruned(pos));
133        }
134
135        let section = pos / items_per_blob;
136        let section_start = section * items_per_blob;
137
138        // Calculate position within the blob.
139        // This accounts for sections that begin mid-section (pruning_boundary > section_start).
140        let first_in_section = self.pruning_boundary.max(section_start);
141        let pos_in_section = pos - first_in_section;
142
143        self.journal
144            .get(section, pos_in_section)
145            .await
146            .map_err(|e| {
147                // Since we check bounds above, any failure here is unexpected.
148                match e {
149                    Error::SectionOutOfRange(e)
150                    | Error::AlreadyPrunedToSection(e)
151                    | Error::ItemOutOfRange(e) => {
152                        Error::Corruption(format!("section/item should be found, but got: {e}"))
153                    }
154                    other => other,
155                }
156            })
157    }
158}
159
160/// Implementation of `Journal` storage.
161///
162/// This is implemented as a wrapper around [SegmentedJournal] that provides position-based access
163/// where positions are automatically mapped to (section, position_in_section) pairs.
164///
165/// # Repair
166///
167/// Like
168/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
169/// and
170/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
171/// the first invalid data read will be considered the new end of the journal (and the
172/// underlying blob will be truncated to the last valid item). Repair is performed
173/// by the underlying [SegmentedJournal] during init.
174pub struct Journal<E: Clock + Storage + Metrics, A: CodecFixedShared> {
175    /// Inner state with segmented journal and size.
176    ///
177    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
178    /// race conditions while allowing concurrent reads during sync.
179    inner: UpgradableAsyncRwLock<Inner<E, A>>,
180
181    /// The maximum number of items per blob (section).
182    items_per_blob: u64,
183}
184
185/// A reader guard that holds a consistent snapshot of the journal's bounds.
186pub struct Reader<'a, E: Clock + Storage + Metrics, A: CodecFixedShared> {
187    guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
188    items_per_blob: u64,
189}
190
191impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
192    type Item = A;
193
194    fn bounds(&self) -> std::ops::Range<u64> {
195        self.guard.pruning_boundary..self.guard.size
196    }
197
198    async fn read(&self, pos: u64) -> Result<A, Error> {
199        self.guard.read(pos, self.items_per_blob).await
200    }
201
202    async fn replay(
203        &self,
204        buffer: NonZeroUsize,
205        start_pos: u64,
206    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
207        let items_per_blob = self.items_per_blob;
208        let pruning_boundary = self.guard.pruning_boundary;
209
210        // Validate bounds.
211        if start_pos > self.guard.size {
212            return Err(Error::ItemOutOfRange(start_pos));
213        }
214        if start_pos < pruning_boundary {
215            return Err(Error::ItemPruned(start_pos));
216        }
217
218        let start_section = start_pos / items_per_blob;
219        let section_start = start_section * items_per_blob;
220
221        // Calculate start position within the section.
222        let first_in_section = pruning_boundary.max(section_start);
223        let start_pos_in_section = start_pos - first_in_section;
224
225        // Check all middle sections (not oldest, not tail) in range are complete.
226        let journal = &self.guard.journal;
227        if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
228            let first_to_check = start_section.max(oldest + 1);
229            for section in first_to_check..newest {
230                let len = journal.section_len(section).await?;
231                if len < items_per_blob {
232                    return Err(Error::Corruption(format!(
233                        "section {section} incomplete: expected {items_per_blob} items, got {len}"
234                    )));
235                }
236            }
237        }
238
239        let inner_stream = journal
240            .replay(start_section, start_pos_in_section, buffer)
241            .await?;
242
243        // Transform (section, pos_in_section, item) to (global_pos, item).
244        let stream = inner_stream.map(move |result| {
245            result.map(|(section, pos_in_section, item)| {
246                let section_start = section * items_per_blob;
247                let first_in_section = pruning_boundary.max(section_start);
248                let global_pos = first_in_section + pos_in_section;
249                (global_pos, item)
250            })
251        });
252
253        Ok(stream)
254    }
255}
256
257impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
258    /// Size of each entry in bytes.
259    pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
260
261    /// Size of each entry in bytes (as u64).
262    pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
263
264    /// Scan a partition and return blob names, treating a missing partition as empty.
265    async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
266        match context.scan(partition).await {
267            Ok(blobs) => Ok(blobs),
268            Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
269            Err(err) => Err(Error::Runtime(err)),
270        }
271    }
272
273    /// Select the blobs partition using legacy-first compatibility rules.
274    ///
275    /// If both legacy and new blobs partitions contain data, returns corruption.
276    /// If neither contains data, defaults to the new blobs partition.
277    // TODO(#2941): Remove legacy partition support
278    async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
279        let legacy_partition = cfg.partition.as_str();
280        let new_partition = format!("{}-blobs", cfg.partition);
281
282        let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
283        let new_blobs = Self::scan_partition(context, &new_partition).await?;
284
285        if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
286            return Err(Error::Corruption(format!(
287                "both legacy and blobs partitions contain data: legacy={} blobs={}",
288                legacy_partition, new_partition
289            )));
290        }
291
292        if !legacy_blobs.is_empty() {
293            Ok(legacy_partition.into())
294        } else {
295            Ok(new_partition)
296        }
297    }
298
299    /// Initialize a new `Journal` instance.
300    ///
301    /// All backing blobs are opened but not read during initialization. The `replay` method can be
302    /// used to iterate over all items in the `Journal`.
303    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
304        let items_per_blob = cfg.items_per_blob.get();
305
306        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
307        let segmented_cfg = SegmentedConfig {
308            partition: blob_partition,
309            page_cache: cfg.page_cache,
310            write_buffer: cfg.write_buffer,
311        };
312
313        let mut journal =
314            SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
315        // Initialize metadata store
316        let meta_cfg = MetadataConfig {
317            partition: format!("{}-metadata", cfg.partition),
318            codec_config: ((0..).into(), ()),
319        };
320
321        let mut metadata =
322            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
323
324        // Parse metadata if present
325        let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
326            Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
327                |_| Error::Corruption("invalid pruning_boundary metadata".into()),
328            )?)),
329            None => None,
330        };
331
332        // Recover bounds from metadata and/or blobs
333        let (pruning_boundary, size, needs_metadata_update) =
334            Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
335
336        // Persist metadata if needed
337        if needs_metadata_update {
338            if pruning_boundary.is_multiple_of(items_per_blob) {
339                metadata.remove(&PRUNING_BOUNDARY_KEY);
340            } else {
341                metadata.put(
342                    PRUNING_BOUNDARY_KEY,
343                    pruning_boundary.to_be_bytes().to_vec(),
344                );
345            }
346            metadata.sync().await?;
347        }
348
349        // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
350        // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where
351        // the next append would go).
352        let tail_section = size / items_per_blob;
353        journal.ensure_section_exists(tail_section).await?;
354
355        Ok(Self {
356            inner: UpgradableAsyncRwLock::new(Inner {
357                journal,
358                size,
359                metadata,
360                pruning_boundary,
361            }),
362            items_per_blob,
363        })
364    }
365
366    /// Returns (pruning_boundary, size, needs_metadata_update) based on metadata and blobs.
367    ///
368    /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state:
369    /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary
370    /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary
371    /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size]
372    ///   or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs.
373    /// - Otherwise, metadata is valid and we use it
374    ///
375    /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs.
376    async fn recover_bounds(
377        inner: &SegmentedJournal<E, A>,
378        items_per_blob: u64,
379        meta_pruning_boundary: Option<u64>,
380    ) -> Result<(u64, u64, bool), Error> {
381        // Blob-based boundary is always section-aligned
382        let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
383
384        let (pruning_boundary, needs_update) = match meta_pruning_boundary {
385            // Mid-section metadata: validate against blobs
386            Some(meta_pruning_boundary)
387                if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
388            {
389                let meta_oldest_section = meta_pruning_boundary / items_per_blob;
390                match inner.oldest_section() {
391                    None => {
392                        // No blobs exist but metadata claims mid-section boundary.
393                        // This can happen if we crash after inner.clear() but before
394                        // ensure_section_exists(). Ignore stale metadata.
395                        warn!(
396                            meta_oldest_section,
397                            "crash repair: no blobs exist, ignoring stale metadata"
398                        );
399                        (blob_boundary, true)
400                    }
401                    Some(oldest_section) if meta_oldest_section < oldest_section => {
402                        warn!(
403                            meta_oldest_section,
404                            oldest_section, "crash repair: metadata stale, computing from blobs"
405                        );
406                        (blob_boundary, true)
407                    }
408                    Some(oldest_section) if meta_oldest_section > oldest_section => {
409                        // Metadata references a section ahead of the oldest blob. This can happen
410                        // if we crash during clear_to_size/init_at_size after blobs update but
411                        // before metadata update. Fall back to blob state.
412                        warn!(
413                            meta_oldest_section,
414                            oldest_section,
415                            "crash repair: metadata ahead of blobs, computing from blobs"
416                        );
417                        (blob_boundary, true)
418                    }
419                    Some(_) => (meta_pruning_boundary, false), // valid mid-section metadata
420                }
421            }
422            // Section-aligned metadata: unnecessary, use blob-based
423            Some(_) => (blob_boundary, true),
424            // No metadata: use blob-based, no update needed
425            None => (blob_boundary, false),
426        };
427
428        // Validate oldest section before computing size.
429        Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
430
431        let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
432        Ok((pruning_boundary, size, needs_update))
433    }
434
435    /// Validate that the oldest section has the expected number of items.
436    ///
437    /// Non-tail sections must be full from their logical start. The tail section
438    /// (oldest == newest) can be partially filled.
439    async fn validate_oldest_section(
440        inner: &SegmentedJournal<E, A>,
441        items_per_blob: u64,
442        pruning_boundary: u64,
443    ) -> Result<(), Error> {
444        let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
445            return Ok(()); // No sections to validate
446        };
447
448        if oldest == newest {
449            return Ok(()); // Tail section, can be partial
450        }
451
452        let oldest_len = inner.section_len(oldest).await?;
453        let oldest_start = oldest * items_per_blob;
454
455        let expected = if pruning_boundary > oldest_start {
456            // Mid-section boundary: items from pruning_boundary to section end
457            items_per_blob - (pruning_boundary - oldest_start)
458        } else {
459            // Section-aligned boundary: full section
460            items_per_blob
461        };
462
463        if oldest_len != expected {
464            return Err(Error::Corruption(format!(
465                "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
466            )));
467        }
468
469        Ok(())
470    }
471
472    /// Returns the total number of items ever appended (size), computed from the blobs.
473    async fn compute_size(
474        inner: &SegmentedJournal<E, A>,
475        items_per_blob: u64,
476        pruning_boundary: u64,
477    ) -> Result<u64, Error> {
478        let oldest = inner.oldest_section();
479        let newest = inner.newest_section();
480
481        let (Some(oldest), Some(newest)) = (oldest, newest) else {
482            return Ok(pruning_boundary);
483        };
484
485        if oldest == newest {
486            // Single section: count from pruning boundary
487            let tail_len = inner.section_len(newest).await?;
488            return Ok(pruning_boundary + tail_len);
489        }
490
491        // Multiple sections: sum actual item counts
492        let oldest_len = inner.section_len(oldest).await?;
493        let tail_len = inner.section_len(newest).await?;
494
495        // Middle sections are assumed full
496        let middle_sections = newest - oldest - 1;
497        let middle_items = middle_sections * items_per_blob;
498
499        Ok(pruning_boundary + oldest_len + middle_items + tail_len)
500    }
501
502    /// Initialize a new `Journal` instance in a pruned state at a given size.
503    ///
504    /// This is used for state sync to create a journal that appears to have had `size` items
505    /// appended and then pruned up to that point.
506    ///
507    /// # Arguments
508    /// * `context` - The storage context
509    /// * `cfg` - Configuration for the journal
510    /// * `size` - The number of operations that have been "pruned"
511    ///
512    /// # Behavior
513    /// - Clears any existing data in the partition
514    /// - Creates an empty tail blob where the next append (at position `size`) will go
515    /// - `bounds().is_empty()` returns `true` (fully pruned state)
516    /// - The next `append()` will write to position `size`
517    ///
518    /// # Post-conditions
519    /// - `bounds().end` returns `size`
520    /// - `bounds().is_empty()` returns `true`
521    /// - `bounds.start` equals `size` (no data exists)
522    ///
523    /// # Crash Safety
524    /// If a crash occurs during this operation, `init()` will recover to a consistent state
525    /// (though possibly different from the intended `size`).
526    #[commonware_macros::stability(ALPHA)]
527    pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
528        let items_per_blob = cfg.items_per_blob.get();
529        let tail_section = size / items_per_blob;
530
531        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
532        let segmented_cfg = SegmentedConfig {
533            partition: blob_partition,
534            page_cache: cfg.page_cache,
535            write_buffer: cfg.write_buffer,
536        };
537
538        // Initialize both stores.
539        let meta_cfg = MetadataConfig {
540            partition: format!("{}-metadata", cfg.partition),
541            codec_config: ((0..).into(), ()),
542        };
543        let mut metadata =
544            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
545        let mut journal =
546            SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
547
548        // Clear blobs before updating metadata.
549        // This ordering is critical for crash safety:
550        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
551        // - Crash after create: old metadata triggers "metadata ahead" warning,
552        //   recovery falls back to blob state.
553        journal.clear().await?;
554        journal.ensure_section_exists(tail_section).await?;
555
556        // Persist metadata if pruning_boundary is mid-section.
557        if !size.is_multiple_of(items_per_blob) {
558            metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
559            metadata.sync().await?;
560        } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
561            metadata.remove(&PRUNING_BOUNDARY_KEY);
562            metadata.sync().await?;
563        }
564
565        Ok(Self {
566            inner: UpgradableAsyncRwLock::new(Inner {
567                journal,
568                size,
569                metadata,
570                pruning_boundary: size, // No data exists yet
571            }),
572            items_per_blob,
573        })
574    }
575
576    /// Convert a global position to (section, position_in_section).
577    #[inline]
578    const fn position_to_section(&self, position: u64) -> (u64, u64) {
579        let section = position / self.items_per_blob;
580        let pos_in_section = position % self.items_per_blob;
581        (section, pos_in_section)
582    }
583
584    /// Sync any pending updates to disk.
585    ///
586    /// Only the tail section can have pending updates since historical sections are synced
587    /// when they become full.
588    pub async fn sync(&self) -> Result<(), Error> {
589        // Serialize with append/prune/rewind to ensure section selection is stable, while still allowing
590        // concurrent readers.
591        let inner = self.inner.upgradable_read().await;
592
593        // Sync the tail section
594        let tail_section = inner.size / self.items_per_blob;
595
596        // The tail section may not exist yet if the previous section was just filled, but syncing a
597        // non-existent section is safe (returns Ok).
598        inner.journal.sync(tail_section).await?;
599
600        // Persist metadata only when pruning_boundary is mid-section.
601        let pruning_boundary = inner.pruning_boundary;
602        let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
603        let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
604            let needs_update = pruning_boundary_from_metadata
605                .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
606
607            if needs_update {
608                true
609            } else {
610                return Ok(());
611            }
612        } else if pruning_boundary_from_metadata.is_some() {
613            false
614        } else {
615            return Ok(());
616        };
617
618        // Upgrade only for the metadata mutation/sync step; reads were allowed while syncing
619        // the tail section above.
620        let mut inner = inner.upgrade().await;
621        if put {
622            inner.metadata.put(
623                PRUNING_BOUNDARY_KEY,
624                pruning_boundary.to_be_bytes().to_vec(),
625            );
626        } else {
627            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
628        }
629        inner.metadata.sync().await?;
630
631        Ok(())
632    }
633
634    /// Acquire a reader guard that holds a consistent view of the journal.
635    pub async fn reader(&self) -> Reader<'_, E, A> {
636        Reader {
637            guard: self.inner.read().await,
638            items_per_blob: self.items_per_blob,
639        }
640    }
641
642    /// Return the total number of items in the journal, irrespective of pruning. The next value
643    /// appended to the journal will be at this position.
644    pub async fn size(&self) -> u64 {
645        self.inner.read().await.size
646    }
647
648    /// Append a new item to the journal. Return the item's position in the journal, or error if the
649    /// operation fails.
650    pub async fn append(&self, item: &A) -> Result<u64, Error> {
651        // Mutating operations are serialized by taking the write guard.
652        let mut inner = self.inner.write().await;
653
654        // Append the item to the journal.
655        let position = inner.size;
656        let (section, _pos_in_section) = self.position_to_section(position);
657        inner.journal.append(section, item).await?;
658        inner.size += 1;
659
660        // Return early if no sync is needed (section not full).
661        if !inner.size.is_multiple_of(self.items_per_blob) {
662            return Ok(position);
663        }
664
665        // The section was filled and must be synced. Downgrade so readers can continue during the
666        // sync, but keep mutators blocked. After sync, upgrade again to create the next tail
667        // section before any append can proceed.
668        let inner = inner.downgrade_to_upgradable();
669        inner.journal.sync(section).await?;
670
671        // Ensure the new tail section exists, as required to maintain the invariant. This must
672        // happen after the previous section is synced.
673        let mut inner = inner.upgrade().await;
674        inner.journal.ensure_section_exists(section + 1).await?;
675
676        Ok(position)
677    }
678
679    /// Rewind the journal to the given `size`. Returns [Error::InvalidRewind] if the rewind point
680    /// precedes the oldest retained element. The journal is not synced after rewinding.
681    ///
682    /// # Warnings
683    ///
684    /// * This operation is not guaranteed to survive restarts until sync is called.
685    /// * This operation is not atomic, but it will always leave the journal in a consistent state
686    ///   in the event of failure since blobs are always removed from newest to oldest.
687    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
688        let mut inner = self.inner.write().await;
689
690        match size.cmp(&inner.size) {
691            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
692            std::cmp::Ordering::Equal => return Ok(()),
693            std::cmp::Ordering::Less => {}
694        }
695
696        if size < inner.pruning_boundary {
697            return Err(Error::InvalidRewind(size));
698        }
699
700        let section = size / self.items_per_blob;
701        let section_start = section * self.items_per_blob;
702
703        // Calculate offset within section for rewind
704        let first_in_section = inner.pruning_boundary.max(section_start);
705        let pos_in_section = size - first_in_section;
706        let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
707
708        inner.journal.rewind(section, byte_offset).await?;
709        inner.size = size;
710
711        Ok(())
712    }
713
714    /// Return the location before which all items have been pruned.
715    pub async fn pruning_boundary(&self) -> u64 {
716        let inner = self.inner.read().await;
717        inner.pruning_boundary
718    }
719
720    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
721    /// such items in order to preserve blob boundaries, but the amount of such items will always be
722    /// less than the configured number of items per blob. Returns true if any items were pruned.
723    ///
724    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
725    /// event of failure as items are always pruned in order from oldest to newest.
726    pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
727        let mut inner = self.inner.write().await;
728
729        // Calculate the section that would contain min_item_pos
730        let target_section = min_item_pos / self.items_per_blob;
731
732        // Calculate the tail section.
733        let tail_section = inner.size / self.items_per_blob;
734
735        // Cap to tail section. The tail section is guaranteed to exist by our invariant.
736        let min_section = std::cmp::min(target_section, tail_section);
737
738        let pruned = inner.journal.prune(min_section).await?;
739
740        // After pruning, update pruning_boundary to the start of the oldest remaining section
741        if pruned {
742            let new_oldest = inner
743                .journal
744                .oldest_section()
745                .expect("all sections pruned - violates tail section invariant");
746            // Pruning boundary only moves forward
747            assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
748            inner.pruning_boundary = new_oldest * self.items_per_blob;
749        }
750
751        Ok(pruned)
752    }
753
754    /// Remove any persisted data created by the journal.
755    pub async fn destroy(self) -> Result<(), Error> {
756        // Destroy inner journal
757        let inner = self.inner.into_inner();
758        inner.journal.destroy().await?;
759
760        // Destroy metadata
761        inner.metadata.destroy().await?;
762
763        Ok(())
764    }
765
766    /// Clear all data and reset the journal to a new starting position.
767    ///
768    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
769    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
770    ///
771    /// # Crash Safety
772    /// If a crash occurs during this operation, `init()` will recover to a consistent state
773    /// (though possibly different from the intended `new_size`).
774    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
775        // Clear blobs before updating metadata.
776        // This ordering is critical for crash safety:
777        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
778        // - Crash after create: old metadata triggers "metadata ahead" warning,
779        //   recovery falls back to blob state
780        let mut inner = self.inner.write().await;
781        inner.journal.clear().await?;
782        let tail_section = new_size / self.items_per_blob;
783        inner.journal.ensure_section_exists(tail_section).await?;
784
785        inner.size = new_size;
786        inner.pruning_boundary = new_size; // No data exists
787
788        // Persist metadata only when pruning_boundary is mid-section.
789        if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
790            let value = inner.pruning_boundary.to_be_bytes().to_vec();
791            inner.metadata.put(PRUNING_BOUNDARY_KEY, value);
792            inner.metadata.sync().await?;
793        } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
794            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
795            inner.metadata.sync().await?;
796        }
797
798        Ok(())
799    }
800
801    /// Test helper: Read the item at the given position.
802    #[cfg(test)]
803    pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
804        self.reader().await.read(pos).await
805    }
806
807    /// Test helper: Return the bounds of the journal.
808    #[cfg(test)]
809    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
810        self.reader().await.bounds()
811    }
812
813    /// Test helper: Get the oldest section from the internal segmented journal.
814    #[cfg(test)]
815    pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
816        let inner = self.inner.read().await;
817        inner.journal.oldest_section()
818    }
819
820    /// Test helper: Get the newest section from the internal segmented journal.
821    #[cfg(test)]
822    pub(crate) async fn test_newest_section(&self) -> Option<u64> {
823        let inner = self.inner.read().await;
824        inner.journal.newest_section()
825    }
826}
827
828// Implement Contiguous trait for fixed-length journals
829impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
830    type Item = A;
831
832    async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
833        Self::reader(self).await
834    }
835
836    async fn size(&self) -> u64 {
837        Self::size(self).await
838    }
839}
840
841impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Mutable for Journal<E, A> {
842    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
843        Self::append(self, item).await
844    }
845
846    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
847        Self::prune(self, min_position).await
848    }
849
850    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
851        Self::rewind(self, size).await
852    }
853}
854
855impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
856    type Error = Error;
857
858    async fn commit(&self) -> Result<(), Error> {
859        self.sync().await
860    }
861
862    async fn sync(&self) -> Result<(), Error> {
863        self.sync().await
864    }
865
866    async fn destroy(self) -> Result<(), Error> {
867        self.destroy().await
868    }
869}
870
871#[cfg(test)]
872mod tests {
873    use super::*;
874    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
875    use commonware_macros::test_traced;
876    use commonware_runtime::{
877        deterministic::{self, Context},
878        Blob, BufferPooler, Error as RuntimeError, Metrics, Runner, Storage,
879    };
880    use commonware_utils::{NZUsize, NZU16, NZU64};
881    use futures::{pin_mut, StreamExt};
882    use std::num::NonZeroU16;
883
884    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
885    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
886
887    /// Generate a SHA-256 digest for the given value.
888    fn test_digest(value: u64) -> Digest {
889        Sha256::hash(&value.to_be_bytes())
890    }
891
892    fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
893        Config {
894            partition: "test-partition".into(),
895            items_per_blob,
896            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
897            write_buffer: NZUsize!(2048),
898        }
899    }
900
901    fn blob_partition(cfg: &Config) -> String {
902        format!("{}-blobs", cfg.partition)
903    }
904
905    async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
906        match context.scan(partition).await {
907            Ok(blobs) => blobs,
908            Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
909            Err(err) => panic!("Failed to scan partition {partition}: {err}"),
910        }
911    }
912
913    #[test_traced]
914    fn test_fixed_journal_init_conflicting_partitions() {
915        let executor = deterministic::Runner::default();
916        executor.start(|context| async move {
917            let cfg = test_cfg(&context, NZU64!(2));
918            let legacy_partition = cfg.partition.clone();
919            let blobs_partition = blob_partition(&cfg);
920
921            let (legacy_blob, _) = context
922                .open(&legacy_partition, &0u64.to_be_bytes())
923                .await
924                .expect("Failed to open legacy blob");
925            legacy_blob
926                .write_at(0, vec![0u8; 1])
927                .await
928                .expect("Failed to write legacy blob");
929            legacy_blob
930                .sync()
931                .await
932                .expect("Failed to sync legacy blob");
933
934            let (new_blob, _) = context
935                .open(&blobs_partition, &0u64.to_be_bytes())
936                .await
937                .expect("Failed to open new blob");
938            new_blob
939                .write_at(0, vec![0u8; 1])
940                .await
941                .expect("Failed to write new blob");
942            new_blob.sync().await.expect("Failed to sync new blob");
943
944            let result =
945                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
946            assert!(matches!(result, Err(Error::Corruption(_))));
947        });
948    }
949
950    #[test_traced]
951    fn test_fixed_journal_init_prefers_legacy_partition() {
952        let executor = deterministic::Runner::default();
953        executor.start(|context| async move {
954            let cfg = test_cfg(&context, NZU64!(2));
955            let legacy_partition = cfg.partition.clone();
956            let blobs_partition = blob_partition(&cfg);
957
958            // Seed legacy partition so it is selected.
959            let (legacy_blob, _) = context
960                .open(&legacy_partition, &0u64.to_be_bytes())
961                .await
962                .expect("Failed to open legacy blob");
963            legacy_blob
964                .write_at(0, vec![0u8; 1])
965                .await
966                .expect("Failed to write legacy blob");
967            legacy_blob
968                .sync()
969                .await
970                .expect("Failed to sync legacy blob");
971
972            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
973                .await
974                .expect("failed to initialize journal");
975            journal.append(&test_digest(1)).await.unwrap();
976            journal.sync().await.unwrap();
977            drop(journal);
978
979            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
980            let new_blobs = scan_partition(&context, &blobs_partition).await;
981            assert!(!legacy_blobs.is_empty());
982            assert!(new_blobs.is_empty());
983        });
984    }
985
986    #[test_traced]
987    fn test_fixed_journal_init_defaults_to_blobs_partition() {
988        let executor = deterministic::Runner::default();
989        executor.start(|context| async move {
990            let cfg = test_cfg(&context, NZU64!(2));
991            let legacy_partition = cfg.partition.clone();
992            let blobs_partition = blob_partition(&cfg);
993
994            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
995                .await
996                .expect("failed to initialize journal");
997            journal.append(&test_digest(1)).await.unwrap();
998            journal.sync().await.unwrap();
999            drop(journal);
1000
1001            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1002            let new_blobs = scan_partition(&context, &blobs_partition).await;
1003            assert!(legacy_blobs.is_empty());
1004            assert!(!new_blobs.is_empty());
1005        });
1006    }
1007
1008    #[test_traced]
1009    fn test_fixed_journal_append_and_prune() {
1010        // Initialize the deterministic context
1011        let executor = deterministic::Runner::default();
1012
1013        // Start the test within the executor
1014        executor.start(|context| async move {
1015            // Initialize the journal, allowing a max of 2 items per blob.
1016            let cfg = test_cfg(&context, NZU64!(2));
1017            let journal = Journal::init(context.with_label("first"), cfg.clone())
1018                .await
1019                .expect("failed to initialize journal");
1020
1021            // Append an item to the journal
1022            let mut pos = journal
1023                .append(&test_digest(0))
1024                .await
1025                .expect("failed to append data 0");
1026            assert_eq!(pos, 0);
1027
1028            // Drop the journal and re-initialize it to simulate a restart
1029            journal.sync().await.expect("Failed to sync journal");
1030            drop(journal);
1031
1032            let cfg = test_cfg(&context, NZU64!(2));
1033            let journal = Journal::init(context.with_label("second"), cfg.clone())
1034                .await
1035                .expect("failed to re-initialize journal");
1036            assert_eq!(journal.size().await, 1);
1037
1038            // Append two more items to the journal to trigger a new blob creation
1039            pos = journal
1040                .append(&test_digest(1))
1041                .await
1042                .expect("failed to append data 1");
1043            assert_eq!(pos, 1);
1044            pos = journal
1045                .append(&test_digest(2))
1046                .await
1047                .expect("failed to append data 2");
1048            assert_eq!(pos, 2);
1049
1050            // Read the items back
1051            let item0 = journal.read(0).await.expect("failed to read data 0");
1052            assert_eq!(item0, test_digest(0));
1053            let item1 = journal.read(1).await.expect("failed to read data 1");
1054            assert_eq!(item1, test_digest(1));
1055            let item2 = journal.read(2).await.expect("failed to read data 2");
1056            assert_eq!(item2, test_digest(2));
1057            let err = journal.read(3).await.expect_err("expected read to fail");
1058            assert!(matches!(err, Error::ItemOutOfRange(3)));
1059
1060            // Sync the journal
1061            journal.sync().await.expect("failed to sync journal");
1062
1063            // Pruning to 1 should be a no-op because there's no blob with only older items.
1064            journal.prune(1).await.expect("failed to prune journal 1");
1065
1066            // Pruning to 2 should allow the first blob to be pruned.
1067            journal.prune(2).await.expect("failed to prune journal 2");
1068            assert_eq!(journal.bounds().await.start, 2);
1069
1070            // Reading from the first blob should fail since it's now pruned
1071            let result0 = journal.read(0).await;
1072            assert!(matches!(result0, Err(Error::ItemPruned(0))));
1073            let result1 = journal.read(1).await;
1074            assert!(matches!(result1, Err(Error::ItemPruned(1))));
1075
1076            // Third item should still be readable
1077            let result2 = journal.read(2).await.unwrap();
1078            assert_eq!(result2, test_digest(2));
1079
1080            // Should be able to continue to append items
1081            for i in 3..10 {
1082                let pos = journal
1083                    .append(&test_digest(i))
1084                    .await
1085                    .expect("failed to append data");
1086                assert_eq!(pos, i);
1087            }
1088
1089            // Check no-op pruning
1090            journal.prune(0).await.expect("no-op pruning failed");
1091            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1092            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1093            assert_eq!(journal.bounds().await.start, 2);
1094
1095            // Prune first 3 blobs (6 items)
1096            journal
1097                .prune(3 * cfg.items_per_blob.get())
1098                .await
1099                .expect("failed to prune journal 2");
1100            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1101            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1102            assert_eq!(journal.bounds().await.start, 6);
1103
1104            // Try pruning (more than) everything in the journal.
1105            journal
1106                .prune(10000)
1107                .await
1108                .expect("failed to max-prune journal");
1109            let size = journal.size().await;
1110            assert_eq!(size, 10);
1111            assert_eq!(journal.test_oldest_section().await, Some(5));
1112            assert_eq!(journal.test_newest_section().await, Some(5));
1113            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
1114            // will be empty, and there will be no retained items.
1115            let bounds = journal.bounds().await;
1116            assert!(bounds.is_empty());
1117            // bounds.start should equal bounds.end when empty.
1118            assert_eq!(bounds.start, size);
1119
1120            // Replaying from 0 should fail since all items before bounds.start are pruned
1121            {
1122                let reader = journal.reader().await;
1123                let result = reader.replay(NZUsize!(1024), 0).await;
1124                assert!(matches!(result, Err(Error::ItemPruned(0))));
1125            }
1126
1127            // Replaying from pruning_boundary should return empty stream
1128            {
1129                let reader = journal.reader().await;
1130                let res = reader.replay(NZUsize!(1024), 0).await;
1131                assert!(matches!(res, Err(Error::ItemPruned(_))));
1132
1133                let reader = journal.reader().await;
1134                let stream = reader
1135                    .replay(NZUsize!(1024), journal.bounds().await.start)
1136                    .await
1137                    .expect("failed to replay journal from pruning boundary");
1138                pin_mut!(stream);
1139                let mut items = Vec::new();
1140                while let Some(result) = stream.next().await {
1141                    match result {
1142                        Ok((pos, item)) => {
1143                            assert_eq!(test_digest(pos), item);
1144                            items.push(pos);
1145                        }
1146                        Err(err) => panic!("Failed to read item: {err}"),
1147                    }
1148                }
1149                assert_eq!(items, Vec::<u64>::new());
1150            }
1151
1152            journal.destroy().await.unwrap();
1153        });
1154    }
1155
1156    /// Append a lot of data to make sure we exercise page cache paging boundaries.
1157    #[test_traced]
1158    fn test_fixed_journal_append_a_lot_of_data() {
1159        // Initialize the deterministic context
1160        let executor = deterministic::Runner::default();
1161        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1162        executor.start(|context| async move {
1163            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1164            let journal = Journal::init(context.with_label("first"), cfg.clone())
1165                .await
1166                .expect("failed to initialize journal");
1167            // Append 2 blobs worth of items.
1168            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1169                journal
1170                    .append(&test_digest(i))
1171                    .await
1172                    .expect("failed to append data");
1173            }
1174            // Sync, reopen, then read back.
1175            journal.sync().await.expect("failed to sync journal");
1176            drop(journal);
1177            let journal = Journal::init(context.with_label("second"), cfg.clone())
1178                .await
1179                .expect("failed to re-initialize journal");
1180            for i in 0u64..10000 {
1181                let item: Digest = journal.read(i).await.expect("failed to read data");
1182                assert_eq!(item, test_digest(i));
1183            }
1184            journal.destroy().await.expect("failed to destroy journal");
1185        });
1186    }
1187
1188    #[test_traced]
1189    fn test_fixed_journal_replay() {
1190        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1191        // Initialize the deterministic context
1192        let executor = deterministic::Runner::default();
1193
1194        // Start the test within the executor
1195        executor.start(|context| async move {
1196            // Initialize the journal, allowing a max of 7 items per blob.
1197            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1198            let journal = Journal::init(context.with_label("first"), cfg.clone())
1199                .await
1200                .expect("failed to initialize journal");
1201
1202            // Append many items, filling 100 blobs and part of the 101st
1203            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1204                let pos = journal
1205                    .append(&test_digest(i))
1206                    .await
1207                    .expect("failed to append data");
1208                assert_eq!(pos, i);
1209            }
1210
1211            // Read them back the usual way.
1212            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1213                let item: Digest = journal.read(i).await.expect("failed to read data");
1214                assert_eq!(item, test_digest(i), "i={i}");
1215            }
1216
1217            // Replay should return all items
1218            {
1219                let reader = journal.reader().await;
1220                let stream = reader
1221                    .replay(NZUsize!(1024), 0)
1222                    .await
1223                    .expect("failed to replay journal");
1224                let mut items = Vec::new();
1225                pin_mut!(stream);
1226                while let Some(result) = stream.next().await {
1227                    match result {
1228                        Ok((pos, item)) => {
1229                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1230                            items.push(pos);
1231                        }
1232                        Err(err) => panic!("Failed to read item: {err}"),
1233                    }
1234                }
1235
1236                // Make sure all items were replayed
1237                assert_eq!(
1238                    items.len(),
1239                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1240                );
1241                items.sort();
1242                for (i, pos) in items.iter().enumerate() {
1243                    assert_eq!(i as u64, *pos);
1244                }
1245            }
1246
1247            journal.sync().await.expect("Failed to sync journal");
1248            drop(journal);
1249
1250            // Corrupt one of the bytes and make sure it's detected.
1251            let (blob, _) = context
1252                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1253                .await
1254                .expect("Failed to open blob");
1255            // Write junk bytes.
1256            let bad_bytes = 123456789u32;
1257            blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1258                .await
1259                .expect("Failed to write bad bytes");
1260            blob.sync().await.expect("Failed to sync blob");
1261
1262            // Re-initialize the journal to simulate a restart
1263            let journal = Journal::init(context.with_label("second"), cfg.clone())
1264                .await
1265                .expect("Failed to re-initialize journal");
1266
1267            // Make sure reading an item that resides in the corrupted page fails.
1268            let err = journal
1269                .read(40 * ITEMS_PER_BLOB.get() + 1)
1270                .await
1271                .unwrap_err();
1272            assert!(matches!(err, Error::Runtime(_)));
1273
1274            // Replay all items.
1275            {
1276                let mut error_found = false;
1277                let reader = journal.reader().await;
1278                let stream = reader
1279                    .replay(NZUsize!(1024), 0)
1280                    .await
1281                    .expect("failed to replay journal");
1282                let mut items = Vec::new();
1283                pin_mut!(stream);
1284                while let Some(result) = stream.next().await {
1285                    match result {
1286                        Ok((pos, item)) => {
1287                            assert_eq!(test_digest(pos), item);
1288                            items.push(pos);
1289                        }
1290                        Err(err) => {
1291                            error_found = true;
1292                            assert!(matches!(err, Error::Runtime(_)));
1293                            break;
1294                        }
1295                    }
1296                }
1297                assert!(error_found); // error should abort replay
1298            }
1299        });
1300    }
1301
1302    #[test_traced]
1303    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1304        // Initialize the deterministic context
1305        let executor = deterministic::Runner::default();
1306        // Start the test within the executor
1307        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1308        executor.start(|context| async move {
1309            // Initialize the journal, allowing a max of 7 items per blob.
1310            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1311            let journal = Journal::init(context.with_label("first"), cfg.clone())
1312                .await
1313                .expect("failed to initialize journal");
1314
1315            // Append many items, filling 100 blobs and part of the 101st
1316            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1317                let pos = journal
1318                    .append(&test_digest(i))
1319                    .await
1320                    .expect("failed to append data");
1321                assert_eq!(pos, i);
1322            }
1323            journal.sync().await.expect("Failed to sync journal");
1324            drop(journal);
1325
1326            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1327            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1328            // missing one item. This should be detected during init because all non-tail blobs
1329            // must be full.
1330            let (blob, size) = context
1331                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1332                .await
1333                .expect("Failed to open blob");
1334            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1335            blob.sync().await.expect("Failed to sync blob");
1336
1337            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1338            // missing one item. This should be detected as corruption during replay.
1339            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1340                .await
1341                .expect("failed to initialize journal");
1342
1343            // Journal size is computed from the tail section, so it's unchanged
1344            // despite the corruption in section 40.
1345            let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1346            assert_eq!(journal.size().await, expected_size);
1347
1348            // Replay should detect corruption (incomplete section) in section 40
1349            let reader = journal.reader().await;
1350            match reader.replay(NZUsize!(1024), 0).await {
1351                Err(Error::Corruption(msg)) => {
1352                    assert!(
1353                        msg.contains("section 40"),
1354                        "Error should mention section 40, got: {msg}"
1355                    );
1356                }
1357                Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1358                Ok(_) => panic!("Expected replay to fail with corruption"),
1359            };
1360        });
1361    }
1362
1363    #[test_traced]
1364    fn test_fixed_journal_replay_with_missing_historical_blob() {
1365        let executor = deterministic::Runner::default();
1366        executor.start(|context| async move {
1367            let cfg = test_cfg(&context, NZU64!(2));
1368            let journal = Journal::init(context.with_label("first"), cfg.clone())
1369                .await
1370                .expect("failed to initialize journal");
1371            for i in 0u64..5 {
1372                journal
1373                    .append(&test_digest(i))
1374                    .await
1375                    .expect("failed to append data");
1376            }
1377            journal.sync().await.expect("failed to sync journal");
1378            drop(journal);
1379
1380            context
1381                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1382                .await
1383                .expect("failed to remove blob");
1384
1385            // Init won't detect the corruption.
1386            let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1387                .await
1388                .expect("init shouldn't fail");
1389
1390            // But replay will.
1391            let reader = result.reader().await;
1392            match reader.replay(NZUsize!(1024), 0).await {
1393                Err(Error::Corruption(_)) => {}
1394                Err(err) => panic!("expected Corruption, got: {err}"),
1395                Ok(_) => panic!("expected Corruption, got ok"),
1396            };
1397
1398            // As will trying to read an item that was in the deleted blob.
1399            match result.read(2).await {
1400                Err(Error::Corruption(_)) => {}
1401                Err(err) => panic!("expected Corruption, got: {err}"),
1402                Ok(_) => panic!("expected Corruption, got ok"),
1403            };
1404        });
1405    }
1406
1407    #[test_traced]
1408    fn test_fixed_journal_test_trim_blob() {
1409        // Initialize the deterministic context
1410        let executor = deterministic::Runner::default();
1411        // Start the test within the executor
1412        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1413        executor.start(|context| async move {
1414            // Initialize the journal, allowing a max of 7 items per blob.
1415            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1416            let journal = Journal::init(context.with_label("first"), cfg.clone())
1417                .await
1418                .expect("failed to initialize journal");
1419
1420            // Fill one blob and put 3 items in the second.
1421            let item_count = ITEMS_PER_BLOB.get() + 3;
1422            for i in 0u64..item_count {
1423                journal
1424                    .append(&test_digest(i))
1425                    .await
1426                    .expect("failed to append data");
1427            }
1428            assert_eq!(journal.size().await, item_count);
1429            journal.sync().await.expect("Failed to sync journal");
1430            drop(journal);
1431
1432            // Truncate the tail blob by one byte, which should result in the last item being
1433            // discarded during replay (detected via corruption).
1434            let (blob, size) = context
1435                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1436                .await
1437                .expect("Failed to open blob");
1438            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1439            blob.sync().await.expect("Failed to sync blob");
1440
1441            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1442                .await
1443                .unwrap();
1444
1445            // The truncation invalidates the last page (bad checksum), which is removed.
1446            // This loses one item.
1447            assert_eq!(journal.size().await, item_count - 1);
1448
1449            // Cleanup.
1450            journal.destroy().await.expect("Failed to destroy journal");
1451        });
1452    }
1453
1454    #[test_traced]
1455    fn test_fixed_journal_partial_replay() {
1456        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1457        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1458        // starting position.
1459        const START_POS: u64 = 53;
1460
1461        // Initialize the deterministic context
1462        let executor = deterministic::Runner::default();
1463        // Start the test within the executor
1464        executor.start(|context| async move {
1465            // Initialize the journal, allowing a max of 7 items per blob.
1466            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1467            let journal = Journal::init(context.clone(), cfg.clone())
1468                .await
1469                .expect("failed to initialize journal");
1470
1471            // Append many items, filling 100 blobs and part of the 101st
1472            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1473                let pos = journal
1474                    .append(&test_digest(i))
1475                    .await
1476                    .expect("failed to append data");
1477                assert_eq!(pos, i);
1478            }
1479
1480            // Replay should return all items except the first `START_POS`.
1481            {
1482                let reader = journal.reader().await;
1483                let stream = reader
1484                    .replay(NZUsize!(1024), START_POS)
1485                    .await
1486                    .expect("failed to replay journal");
1487                let mut items = Vec::new();
1488                pin_mut!(stream);
1489                while let Some(result) = stream.next().await {
1490                    match result {
1491                        Ok((pos, item)) => {
1492                            assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1493                            assert_eq!(
1494                                test_digest(pos),
1495                                item,
1496                                "Item at position {pos} did not match expected digest"
1497                            );
1498                            items.push(pos);
1499                        }
1500                        Err(err) => panic!("Failed to read item: {err}"),
1501                    }
1502                }
1503
1504                // Make sure all items were replayed
1505                assert_eq!(
1506                    items.len(),
1507                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1508                        - START_POS as usize
1509                );
1510                items.sort();
1511                for (i, pos) in items.iter().enumerate() {
1512                    assert_eq!(i as u64, *pos - START_POS);
1513                }
1514            }
1515
1516            journal.destroy().await.unwrap();
1517        });
1518    }
1519
1520    #[test_traced]
1521    fn test_fixed_journal_recover_from_partial_write() {
1522        // Initialize the deterministic context
1523        let executor = deterministic::Runner::default();
1524
1525        // Start the test within the executor
1526        executor.start(|context| async move {
1527            // Initialize the journal, allowing a max of 3 items per blob.
1528            let cfg = test_cfg(&context, NZU64!(3));
1529            let journal = Journal::init(context.with_label("first"), cfg.clone())
1530                .await
1531                .expect("failed to initialize journal");
1532            for i in 0..5 {
1533                journal
1534                    .append(&test_digest(i))
1535                    .await
1536                    .expect("failed to append data");
1537            }
1538            assert_eq!(journal.size().await, 5);
1539            journal.sync().await.expect("Failed to sync journal");
1540            drop(journal);
1541
1542            // Manually truncate most recent blob to simulate a partial write.
1543            let (blob, size) = context
1544                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1545                .await
1546                .expect("Failed to open blob");
1547            // truncate the most recent blob by 1 byte which corrupts the most recent item
1548            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1549            blob.sync().await.expect("Failed to sync blob");
1550
1551            // Re-initialize the journal to simulate a restart
1552            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1553                .await
1554                .expect("Failed to re-initialize journal");
1555            // The truncation invalidates the last page, which is removed. This loses one item.
1556            assert_eq!(journal.pruning_boundary().await, 0);
1557            assert_eq!(journal.size().await, 4);
1558            drop(journal);
1559
1560            // Delete the second blob and re-init
1561            context
1562                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1563                .await
1564                .expect("Failed to remove blob");
1565
1566            let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1567                .await
1568                .expect("Failed to re-initialize journal");
1569            // Only the first blob remains
1570            assert_eq!(journal.size().await, 3);
1571
1572            journal.destroy().await.unwrap();
1573        });
1574    }
1575
1576    #[test_traced]
1577    fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context| async move {
1580            let cfg = test_cfg(&context, NZU64!(5));
1581            let journal =
1582                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1583                    .await
1584                    .expect("failed to initialize journal at size");
1585
1586            // Append items so section 1 has exactly the expected minimum (3 items).
1587            for i in 0..8u64 {
1588                journal
1589                    .append(&test_digest(100 + i))
1590                    .await
1591                    .expect("failed to append data");
1592            }
1593            journal.sync().await.expect("failed to sync journal");
1594            assert_eq!(journal.pruning_boundary().await, 7);
1595            assert_eq!(journal.size().await, 15);
1596            drop(journal);
1597
1598            // Corrupt the oldest section by truncating one byte (drops one item on recovery).
1599            let (blob, size) = context
1600                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1601                .await
1602                .expect("failed to open oldest blob");
1603            blob.resize(size - 1).await.expect("failed to corrupt blob");
1604            blob.sync().await.expect("failed to sync blob");
1605
1606            let result =
1607                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1608            assert!(matches!(result, Err(Error::Corruption(_))));
1609        });
1610    }
1611
1612    #[test_traced]
1613    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1614        let executor = deterministic::Runner::default();
1615        executor.start(|context| async move {
1616            // Initialize the journal, allowing a max of 10 items per blob.
1617            let cfg = test_cfg(&context, NZU64!(10));
1618            let journal = Journal::init(context.with_label("first"), cfg.clone())
1619                .await
1620                .expect("failed to initialize journal");
1621            // Add only a single item
1622            journal
1623                .append(&test_digest(0))
1624                .await
1625                .expect("failed to append data");
1626            assert_eq!(journal.size().await, 1);
1627            journal.sync().await.expect("Failed to sync journal");
1628            drop(journal);
1629
1630            // Manually truncate most recent blob to simulate a partial write.
1631            let (blob, size) = context
1632                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1633                .await
1634                .expect("Failed to open blob");
1635            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1636            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1637            blob.sync().await.expect("Failed to sync blob");
1638
1639            // Re-initialize the journal to simulate a restart
1640            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1641                .await
1642                .expect("Failed to re-initialize journal");
1643
1644            // Since there was only a single item appended which we then corrupted, recovery should
1645            // leave us in the state of an empty journal.
1646            let bounds = journal.bounds().await;
1647            assert_eq!(bounds.end, 0);
1648            assert!(bounds.is_empty());
1649            // Make sure journal still works for appending.
1650            journal
1651                .append(&test_digest(0))
1652                .await
1653                .expect("failed to append data");
1654            assert_eq!(journal.size().await, 1);
1655
1656            journal.destroy().await.unwrap();
1657        });
1658    }
1659
1660    #[test_traced("DEBUG")]
1661    fn test_fixed_journal_recover_from_unwritten_data() {
1662        let executor = deterministic::Runner::default();
1663        executor.start(|context| async move {
1664            // Initialize the journal, allowing a max of 10 items per blob.
1665            let cfg = test_cfg(&context, NZU64!(10));
1666            let journal = Journal::init(context.with_label("first"), cfg.clone())
1667                .await
1668                .expect("failed to initialize journal");
1669
1670            // Add only a single item
1671            journal
1672                .append(&test_digest(0))
1673                .await
1674                .expect("failed to append data");
1675            assert_eq!(journal.size().await, 1);
1676            journal.sync().await.expect("Failed to sync journal");
1677            drop(journal);
1678
1679            // Manually extend the blob to simulate a failure where the file was extended, but no
1680            // bytes were written due to failure.
1681            let (blob, size) = context
1682                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1683                .await
1684                .expect("Failed to open blob");
1685            blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1686                .await
1687                .expect("Failed to extend blob");
1688            blob.sync().await.expect("Failed to sync blob");
1689
1690            // Re-initialize the journal to simulate a restart
1691            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1692                .await
1693                .expect("Failed to re-initialize journal");
1694
1695            // The zero-filled pages are detected as invalid (bad checksum) and truncated.
1696            // No items should be lost since we called sync before the corruption.
1697            assert_eq!(journal.size().await, 1);
1698
1699            // Make sure journal still works for appending.
1700            journal
1701                .append(&test_digest(1))
1702                .await
1703                .expect("failed to append data");
1704
1705            journal.destroy().await.unwrap();
1706        });
1707    }
1708
1709    #[test_traced]
1710    fn test_fixed_journal_rewinding() {
1711        let executor = deterministic::Runner::default();
1712        executor.start(|context| async move {
1713            // Initialize the journal, allowing a max of 2 items per blob.
1714            let cfg = test_cfg(&context, NZU64!(2));
1715            let journal = Journal::init(context.with_label("first"), cfg.clone())
1716                .await
1717                .expect("failed to initialize journal");
1718            assert!(matches!(journal.rewind(0).await, Ok(())));
1719            assert!(matches!(
1720                journal.rewind(1).await,
1721                Err(Error::InvalidRewind(1))
1722            ));
1723
1724            // Append an item to the journal
1725            journal
1726                .append(&test_digest(0))
1727                .await
1728                .expect("failed to append data 0");
1729            assert_eq!(journal.size().await, 1);
1730            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1731            assert!(matches!(journal.rewind(0).await, Ok(())));
1732            assert_eq!(journal.size().await, 0);
1733
1734            // append 7 items
1735            for i in 0..7 {
1736                let pos = journal
1737                    .append(&test_digest(i))
1738                    .await
1739                    .expect("failed to append data");
1740                assert_eq!(pos, i);
1741            }
1742            assert_eq!(journal.size().await, 7);
1743
1744            // rewind back to item #4, which should prune 2 blobs
1745            assert!(matches!(journal.rewind(4).await, Ok(())));
1746            assert_eq!(journal.size().await, 4);
1747
1748            // rewind back to empty and ensure all blobs are rewound over
1749            assert!(matches!(journal.rewind(0).await, Ok(())));
1750            assert_eq!(journal.size().await, 0);
1751
1752            // stress test: add 100 items, rewind 49, repeat x10.
1753            for _ in 0..10 {
1754                for i in 0..100 {
1755                    journal
1756                        .append(&test_digest(i))
1757                        .await
1758                        .expect("failed to append data");
1759                }
1760                journal.rewind(journal.size().await - 49).await.unwrap();
1761            }
1762            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1763            assert_eq!(journal.size().await, ITEMS_REMAINING);
1764
1765            journal.sync().await.expect("Failed to sync journal");
1766            drop(journal);
1767
1768            // Repeat with a different blob size (3 items per blob)
1769            let mut cfg = test_cfg(&context, NZU64!(3));
1770            cfg.partition = "test-partition-2".into();
1771            let journal = Journal::init(context.with_label("second"), cfg.clone())
1772                .await
1773                .expect("failed to initialize journal");
1774            for _ in 0..10 {
1775                for i in 0..100 {
1776                    journal
1777                        .append(&test_digest(i))
1778                        .await
1779                        .expect("failed to append data");
1780                }
1781                journal.rewind(journal.size().await - 49).await.unwrap();
1782            }
1783            assert_eq!(journal.size().await, ITEMS_REMAINING);
1784
1785            journal.sync().await.expect("Failed to sync journal");
1786            drop(journal);
1787
1788            // Make sure re-opened journal is as expected
1789            let journal: Journal<_, Digest> =
1790                Journal::init(context.with_label("third"), cfg.clone())
1791                    .await
1792                    .expect("failed to re-initialize journal");
1793            assert_eq!(journal.size().await, 10 * (100 - 49));
1794
1795            // Make sure rewinding works after pruning
1796            journal.prune(300).await.expect("pruning failed");
1797            assert_eq!(journal.size().await, ITEMS_REMAINING);
1798            // Rewinding prior to our prune point should fail.
1799            assert!(matches!(
1800                journal.rewind(299).await,
1801                Err(Error::InvalidRewind(299))
1802            ));
1803            // Rewinding to the prune point should work.
1804            // always remain in the journal.
1805            assert!(matches!(journal.rewind(300).await, Ok(())));
1806            let bounds = journal.bounds().await;
1807            assert_eq!(bounds.end, 300);
1808            assert!(bounds.is_empty());
1809
1810            journal.destroy().await.unwrap();
1811        });
1812    }
1813
1814    /// Test recovery when blob is truncated to a page boundary with item size not dividing page size.
1815    ///
1816    /// This tests the scenario where:
1817    /// 1. Items (32 bytes) don't divide evenly into page size (44 bytes)
1818    /// 2. Data spans multiple pages
1819    /// 3. Blob is truncated to a page boundary (simulating crash before last page was written)
1820    /// 4. Journal should recover correctly on reopen
1821    #[test_traced]
1822    fn test_fixed_journal_recover_from_page_boundary_truncation() {
1823        let executor = deterministic::Runner::default();
1824        executor.start(|context: Context| async move {
1825            // Use a small items_per_blob to keep the test focused on a single blob
1826            let cfg = test_cfg(&context, NZU64!(100));
1827            let journal = Journal::init(context.with_label("first"), cfg.clone())
1828                .await
1829                .expect("failed to initialize journal");
1830
1831            // Item size is 32 bytes (Digest), page size is 44 bytes.
1832            // 32 doesn't divide 44, so items will cross page boundaries.
1833            // Physical page size = 44 + 12 (CRC) = 56 bytes.
1834            //
1835            // Write enough items to span multiple pages:
1836            // - 10 items = 320 logical bytes
1837            // - This spans ceil(320/44) = 8 logical pages
1838            for i in 0u64..10 {
1839                journal
1840                    .append(&test_digest(i))
1841                    .await
1842                    .expect("failed to append data");
1843            }
1844            assert_eq!(journal.size().await, 10);
1845            journal.sync().await.expect("Failed to sync journal");
1846            drop(journal);
1847
1848            // Open the blob directly and truncate to a page boundary.
1849            // Physical page size = PAGE_SIZE + CHECKSUM_SIZE = 44 + 12 = 56
1850            let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1851            let (blob, size) = context
1852                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1853                .await
1854                .expect("Failed to open blob");
1855
1856            // Calculate how many full physical pages we have and truncate to lose the last one.
1857            let full_pages = size / physical_page_size;
1858            assert!(full_pages >= 2, "need at least 2 pages for this test");
1859            let truncate_to = (full_pages - 1) * physical_page_size;
1860
1861            blob.resize(truncate_to)
1862                .await
1863                .expect("Failed to truncate blob");
1864            blob.sync().await.expect("Failed to sync blob");
1865
1866            // Re-initialize the journal - it should recover by truncating to valid data
1867            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1868                .await
1869                .expect("Failed to re-initialize journal after page truncation");
1870
1871            // The journal should have fewer items now (those that fit in the remaining pages).
1872            // With logical page size 44 and item size 32:
1873            // - After truncating to (full_pages-1) physical pages, we have (full_pages-1)*44 logical bytes
1874            // - Number of complete items = floor(logical_bytes / 32)
1875            let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1876            let expected_items = remaining_logical_bytes / 32; // 32 = Digest::SIZE
1877            assert_eq!(
1878                journal.size().await,
1879                expected_items,
1880                "Journal should recover to {} items after truncation",
1881                expected_items
1882            );
1883
1884            // Verify we can still read the remaining items
1885            for i in 0..expected_items {
1886                let item = journal
1887                    .read(i)
1888                    .await
1889                    .expect("failed to read recovered item");
1890                assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1891            }
1892
1893            journal.destroy().await.expect("Failed to destroy journal");
1894        });
1895    }
1896
1897    /// Test the contiguous fixed journal with items_per_blob: 1.
1898    ///
1899    /// This is an edge case where each item creates its own blob, and the
1900    /// tail blob is always empty after sync (because the item fills the blob
1901    /// and a new empty one is created).
1902    #[test_traced]
1903    fn test_single_item_per_blob() {
1904        let executor = deterministic::Runner::default();
1905        executor.start(|context| async move {
1906            let cfg = Config {
1907                partition: "single-item-per-blob".into(),
1908                items_per_blob: NZU64!(1),
1909                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1910                write_buffer: NZUsize!(2048),
1911            };
1912
1913            // === Test 1: Basic single item operation ===
1914            let journal = Journal::init(context.with_label("first"), cfg.clone())
1915                .await
1916                .expect("failed to initialize journal");
1917
1918            // Verify empty state
1919            let bounds = journal.bounds().await;
1920            assert_eq!(bounds.end, 0);
1921            assert!(bounds.is_empty());
1922
1923            // Append 1 item
1924            let pos = journal
1925                .append(&test_digest(0))
1926                .await
1927                .expect("failed to append");
1928            assert_eq!(pos, 0);
1929            assert_eq!(journal.size().await, 1);
1930
1931            // Sync
1932            journal.sync().await.expect("failed to sync");
1933
1934            // Read from size() - 1
1935            let value = journal
1936                .read(journal.size().await - 1)
1937                .await
1938                .expect("failed to read");
1939            assert_eq!(value, test_digest(0));
1940
1941            // === Test 2: Multiple items with single item per blob ===
1942            for i in 1..10u64 {
1943                let pos = journal
1944                    .append(&test_digest(i))
1945                    .await
1946                    .expect("failed to append");
1947                assert_eq!(pos, i);
1948                assert_eq!(journal.size().await, i + 1);
1949
1950                // Verify we can read the just-appended item at size() - 1
1951                let value = journal
1952                    .read(journal.size().await - 1)
1953                    .await
1954                    .expect("failed to read");
1955                assert_eq!(value, test_digest(i));
1956            }
1957
1958            // Verify all items can be read
1959            for i in 0..10u64 {
1960                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1961            }
1962
1963            journal.sync().await.expect("failed to sync");
1964
1965            // === Test 3: Pruning with single item per blob ===
1966            // Prune to position 5 (removes positions 0-4)
1967            journal.prune(5).await.expect("failed to prune");
1968
1969            // Size should still be 10
1970            assert_eq!(journal.size().await, 10);
1971
1972            // bounds.start should be 5
1973            assert_eq!(journal.bounds().await.start, 5);
1974
1975            // Reading from size() - 1 (position 9) should still work
1976            let value = journal
1977                .read(journal.size().await - 1)
1978                .await
1979                .expect("failed to read");
1980            assert_eq!(value, test_digest(9));
1981
1982            // Reading from pruned positions should return ItemPruned
1983            for i in 0..5 {
1984                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1985            }
1986
1987            // Reading from retained positions should work
1988            for i in 5..10u64 {
1989                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1990            }
1991
1992            // Append more items after pruning
1993            for i in 10..15u64 {
1994                let pos = journal
1995                    .append(&test_digest(i))
1996                    .await
1997                    .expect("failed to append");
1998                assert_eq!(pos, i);
1999
2000                // Verify we can read from size() - 1
2001                let value = journal
2002                    .read(journal.size().await - 1)
2003                    .await
2004                    .expect("failed to read");
2005                assert_eq!(value, test_digest(i));
2006            }
2007
2008            journal.sync().await.expect("failed to sync");
2009            drop(journal);
2010
2011            // === Test 4: Restart persistence with single item per blob ===
2012            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2013                .await
2014                .expect("failed to re-initialize journal");
2015
2016            // Verify size is preserved
2017            assert_eq!(journal.size().await, 15);
2018
2019            // Verify bounds.start is preserved
2020            assert_eq!(journal.bounds().await.start, 5);
2021
2022            // Reading from size() - 1 should work after restart
2023            let value = journal
2024                .read(journal.size().await - 1)
2025                .await
2026                .expect("failed to read");
2027            assert_eq!(value, test_digest(14));
2028
2029            // Reading all retained positions should work
2030            for i in 5..15u64 {
2031                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2032            }
2033
2034            journal.destroy().await.expect("failed to destroy journal");
2035
2036            // === Test 5: Restart after pruning with non-zero index ===
2037            // Fresh journal for this test
2038            let journal = Journal::init(context.with_label("third"), cfg.clone())
2039                .await
2040                .expect("failed to initialize journal");
2041
2042            // Append 10 items (positions 0-9)
2043            for i in 0..10u64 {
2044                journal.append(&test_digest(i + 100)).await.unwrap();
2045            }
2046
2047            // Prune to position 5 (removes positions 0-4)
2048            journal.prune(5).await.unwrap();
2049            let bounds = journal.bounds().await;
2050            assert_eq!(bounds.end, 10);
2051            assert_eq!(bounds.start, 5);
2052
2053            // Sync and restart
2054            journal.sync().await.unwrap();
2055            drop(journal);
2056
2057            // Re-open journal
2058            let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
2059                .await
2060                .expect("failed to re-initialize journal");
2061
2062            // Verify state after restart
2063            let bounds = journal.bounds().await;
2064            assert_eq!(bounds.end, 10);
2065            assert_eq!(bounds.start, 5);
2066
2067            // Reading from size() - 1 (position 9) should work
2068            let value = journal.read(journal.size().await - 1).await.unwrap();
2069            assert_eq!(value, test_digest(109));
2070
2071            // Verify all retained positions (5-9) work
2072            for i in 5..10u64 {
2073                assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2074            }
2075
2076            journal.destroy().await.expect("failed to destroy journal");
2077
2078            // === Test 6: Prune all items (edge case) ===
2079            let journal = Journal::init(context.clone(), cfg.clone())
2080                .await
2081                .expect("failed to initialize journal");
2082
2083            for i in 0..5u64 {
2084                journal.append(&test_digest(i + 200)).await.unwrap();
2085            }
2086            journal.sync().await.unwrap();
2087
2088            // Prune all items
2089            journal.prune(5).await.unwrap();
2090            let bounds = journal.bounds().await;
2091            assert_eq!(bounds.end, 5); // Size unchanged
2092            assert!(bounds.is_empty()); // All pruned
2093
2094            // size() - 1 = 4, but position 4 is pruned
2095            let result = journal.read(journal.size().await - 1).await;
2096            assert!(matches!(result, Err(Error::ItemPruned(4))));
2097
2098            // After appending, reading works again
2099            journal.append(&test_digest(205)).await.unwrap();
2100            assert_eq!(journal.bounds().await.start, 5);
2101            assert_eq!(
2102                journal.read(journal.size().await - 1).await.unwrap(),
2103                test_digest(205)
2104            );
2105
2106            journal.destroy().await.expect("failed to destroy journal");
2107        });
2108    }
2109
2110    #[test_traced]
2111    fn test_fixed_journal_init_at_size_zero() {
2112        let executor = deterministic::Runner::default();
2113        executor.start(|context| async move {
2114            let cfg = test_cfg(&context, NZU64!(5));
2115            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2116                .await
2117                .unwrap();
2118
2119            let bounds = journal.bounds().await;
2120            assert_eq!(bounds.end, 0);
2121            assert!(bounds.is_empty());
2122
2123            // Next append should get position 0
2124            let pos = journal.append(&test_digest(100)).await.unwrap();
2125            assert_eq!(pos, 0);
2126            assert_eq!(journal.size().await, 1);
2127            assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2128
2129            journal.destroy().await.unwrap();
2130        });
2131    }
2132
2133    #[test_traced]
2134    fn test_fixed_journal_init_at_size_section_boundary() {
2135        let executor = deterministic::Runner::default();
2136        executor.start(|context| async move {
2137            let cfg = test_cfg(&context, NZU64!(5));
2138
2139            // Initialize at position 10 (exactly at section 2 boundary with items_per_blob=5)
2140            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2141                .await
2142                .unwrap();
2143
2144            let bounds = journal.bounds().await;
2145            assert_eq!(bounds.end, 10);
2146            assert!(bounds.is_empty());
2147
2148            // Next append should get position 10
2149            let pos = journal.append(&test_digest(1000)).await.unwrap();
2150            assert_eq!(pos, 10);
2151            assert_eq!(journal.size().await, 11);
2152            assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2153
2154            // Can continue appending
2155            let pos = journal.append(&test_digest(1001)).await.unwrap();
2156            assert_eq!(pos, 11);
2157            assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2158
2159            journal.destroy().await.unwrap();
2160        });
2161    }
2162
2163    #[test_traced]
2164    fn test_fixed_journal_init_at_size_mid_section() {
2165        let executor = deterministic::Runner::default();
2166        executor.start(|context| async move {
2167            let cfg = test_cfg(&context, NZU64!(5));
2168
2169            // Initialize at position 7 (middle of section 1 with items_per_blob=5)
2170            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2171                .await
2172                .unwrap();
2173
2174            let bounds = journal.bounds().await;
2175            assert_eq!(bounds.end, 7);
2176            // No data exists yet after init_at_size
2177            assert!(bounds.is_empty());
2178
2179            // Reading before bounds.start should return ItemPruned
2180            assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2181            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2182
2183            // Next append should get position 7
2184            let pos = journal.append(&test_digest(700)).await.unwrap();
2185            assert_eq!(pos, 7);
2186            assert_eq!(journal.size().await, 8);
2187            assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2188            // Now bounds.start should be 7 (first data position)
2189            assert_eq!(journal.bounds().await.start, 7);
2190
2191            journal.destroy().await.unwrap();
2192        });
2193    }
2194
2195    #[test_traced]
2196    fn test_fixed_journal_init_at_size_persistence() {
2197        let executor = deterministic::Runner::default();
2198        executor.start(|context| async move {
2199            let cfg = test_cfg(&context, NZU64!(5));
2200
2201            // Initialize at position 15
2202            let journal =
2203                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2204                    .await
2205                    .unwrap();
2206
2207            // Append some items
2208            for i in 0..5u64 {
2209                let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2210                assert_eq!(pos, 15 + i);
2211            }
2212
2213            assert_eq!(journal.size().await, 20);
2214
2215            // Sync and reopen
2216            journal.sync().await.unwrap();
2217            drop(journal);
2218
2219            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2220                .await
2221                .unwrap();
2222
2223            // Size and data should be preserved
2224            let bounds = journal.bounds().await;
2225            assert_eq!(bounds.end, 20);
2226            assert_eq!(bounds.start, 15);
2227
2228            // Verify data
2229            for i in 0..5u64 {
2230                assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2231            }
2232
2233            // Can continue appending
2234            let pos = journal.append(&test_digest(9999)).await.unwrap();
2235            assert_eq!(pos, 20);
2236            assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2237
2238            journal.destroy().await.unwrap();
2239        });
2240    }
2241
2242    #[test_traced]
2243    fn test_fixed_journal_init_at_size_persistence_without_data() {
2244        let executor = deterministic::Runner::default();
2245        executor.start(|context| async move {
2246            let cfg = test_cfg(&context, NZU64!(5));
2247
2248            // Initialize at position 15
2249            let journal =
2250                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2251                    .await
2252                    .unwrap();
2253
2254            let bounds = journal.bounds().await;
2255            assert_eq!(bounds.end, 15);
2256            assert!(bounds.is_empty());
2257
2258            // Drop without writing any data
2259            drop(journal);
2260
2261            // Reopen and verify size persisted
2262            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2263                .await
2264                .unwrap();
2265
2266            let bounds = journal.bounds().await;
2267            assert_eq!(bounds.end, 15);
2268            assert!(bounds.is_empty());
2269
2270            // Can append starting at position 15
2271            let pos = journal.append(&test_digest(1500)).await.unwrap();
2272            assert_eq!(pos, 15);
2273            assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2274
2275            journal.destroy().await.unwrap();
2276        });
2277    }
2278
2279    #[test_traced]
2280    fn test_fixed_journal_init_at_size_large_offset() {
2281        let executor = deterministic::Runner::default();
2282        executor.start(|context| async move {
2283            let cfg = test_cfg(&context, NZU64!(5));
2284
2285            // Initialize at a large position (position 1000)
2286            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2287                .await
2288                .unwrap();
2289
2290            let bounds = journal.bounds().await;
2291            assert_eq!(bounds.end, 1000);
2292            assert!(bounds.is_empty());
2293
2294            // Next append should get position 1000
2295            let pos = journal.append(&test_digest(100000)).await.unwrap();
2296            assert_eq!(pos, 1000);
2297            assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2298
2299            journal.destroy().await.unwrap();
2300        });
2301    }
2302
2303    #[test_traced]
2304    fn test_fixed_journal_init_at_size_prune_and_append() {
2305        let executor = deterministic::Runner::default();
2306        executor.start(|context| async move {
2307            let cfg = test_cfg(&context, NZU64!(5));
2308
2309            // Initialize at position 20
2310            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2311                .await
2312                .unwrap();
2313
2314            // Append items 20-29
2315            for i in 0..10u64 {
2316                journal.append(&test_digest(2000 + i)).await.unwrap();
2317            }
2318
2319            assert_eq!(journal.size().await, 30);
2320
2321            // Prune to position 25
2322            journal.prune(25).await.unwrap();
2323
2324            let bounds = journal.bounds().await;
2325            assert_eq!(bounds.end, 30);
2326            assert_eq!(bounds.start, 25);
2327
2328            // Verify remaining items are readable
2329            for i in 25..30u64 {
2330                assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2331            }
2332
2333            // Continue appending
2334            let pos = journal.append(&test_digest(3000)).await.unwrap();
2335            assert_eq!(pos, 30);
2336
2337            journal.destroy().await.unwrap();
2338        });
2339    }
2340
2341    #[test_traced]
2342    fn test_fixed_journal_clear_to_size() {
2343        let executor = deterministic::Runner::default();
2344        executor.start(|context| async move {
2345            let cfg = test_cfg(&context, NZU64!(10));
2346            let journal = Journal::init(context.with_label("journal"), cfg.clone())
2347                .await
2348                .expect("failed to initialize journal");
2349
2350            // Append 25 items (positions 0-24, spanning 3 blobs)
2351            for i in 0..25u64 {
2352                journal.append(&test_digest(i)).await.unwrap();
2353            }
2354            assert_eq!(journal.size().await, 25);
2355            journal.sync().await.unwrap();
2356
2357            // Clear to position 100, effectively resetting the journal
2358            journal.clear_to_size(100).await.unwrap();
2359            assert_eq!(journal.size().await, 100);
2360
2361            // Old positions should fail
2362            for i in 0..25 {
2363                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2364            }
2365
2366            // Verify size persists after restart without writing any data
2367            drop(journal);
2368            let journal =
2369                Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2370                    .await
2371                    .expect("failed to re-initialize journal after clear");
2372            assert_eq!(journal.size().await, 100);
2373
2374            // Append new data starting at position 100
2375            for i in 100..105u64 {
2376                let pos = journal.append(&test_digest(i)).await.unwrap();
2377                assert_eq!(pos, i);
2378            }
2379            assert_eq!(journal.size().await, 105);
2380
2381            // New positions should be readable
2382            for i in 100..105u64 {
2383                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2384            }
2385
2386            // Sync and re-init to verify persistence
2387            journal.sync().await.unwrap();
2388            drop(journal);
2389
2390            let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2391                .await
2392                .expect("failed to re-initialize journal");
2393
2394            assert_eq!(journal.size().await, 105);
2395            for i in 100..105u64 {
2396                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2397            }
2398
2399            journal.destroy().await.unwrap();
2400        });
2401    }
2402
2403    #[test_traced]
2404    fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2405        // Old meta = None (aligned), new boundary = aligned.
2406        let executor = deterministic::Runner::default();
2407        executor.start(|context| async move {
2408            let cfg = test_cfg(&context, NZU64!(5));
2409            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2410                .await
2411                .unwrap();
2412
2413            for i in 0..5u64 {
2414                journal.append(&test_digest(i)).await.unwrap();
2415            }
2416            let inner = journal.inner.read().await;
2417            let tail_section = inner.size / journal.items_per_blob;
2418            inner.journal.sync(tail_section).await.unwrap();
2419            drop(inner);
2420            drop(journal);
2421
2422            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2423                .await
2424                .unwrap();
2425            let bounds = journal.bounds().await;
2426            assert_eq!(bounds.start, 0);
2427            assert_eq!(bounds.end, 5);
2428            journal.destroy().await.unwrap();
2429        });
2430    }
2431
2432    #[test_traced]
2433    fn test_fixed_journal_oldest_section_invalid_len() {
2434        // Old meta = None (aligned), new boundary = mid-section.
2435        let executor = deterministic::Runner::default();
2436        executor.start(|context| async move {
2437            let cfg = test_cfg(&context, NZU64!(5));
2438            let journal =
2439                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2440                    .await
2441                    .unwrap();
2442            for i in 0..3u64 {
2443                journal.append(&test_digest(i)).await.unwrap();
2444            }
2445            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2446            journal.sync().await.unwrap();
2447
2448            // Simulate metadata deletion (corruption).
2449            let mut inner = journal.inner.write().await;
2450            inner.metadata.clear();
2451            inner.metadata.sync().await.unwrap();
2452            drop(inner);
2453            drop(journal);
2454
2455            // Section 1 has items 7,8,9 but metadata is missing, so falls back to blob-based boundary.
2456            // Section 1 has 3 items, but recovery thinks it should have 5 because metadata deletion
2457            // causes us to forget that section 1 starts at logical position 7.
2458            let result =
2459                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2460            assert!(matches!(result, Err(Error::Corruption(_))));
2461            context.remove(&blob_partition(&cfg), None).await.unwrap();
2462            context
2463                .remove(&format!("{}-metadata", cfg.partition), None)
2464                .await
2465                .unwrap();
2466        });
2467    }
2468
2469    #[test_traced]
2470    fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2471        // Old meta = Some(mid), new boundary = mid-section (same value).
2472        let executor = deterministic::Runner::default();
2473        executor.start(|context| async move {
2474            let cfg = test_cfg(&context, NZU64!(5));
2475            let journal =
2476                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2477                    .await
2478                    .unwrap();
2479            for i in 0..3u64 {
2480                journal.append(&test_digest(i)).await.unwrap();
2481            }
2482            let inner = journal.inner.read().await;
2483            let tail_section = inner.size / journal.items_per_blob;
2484            inner.journal.sync(tail_section).await.unwrap();
2485            drop(inner);
2486            drop(journal);
2487
2488            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2489                .await
2490                .unwrap();
2491            let bounds = journal.bounds().await;
2492            assert_eq!(bounds.start, 7);
2493            assert_eq!(bounds.end, 10);
2494            journal.destroy().await.unwrap();
2495        });
2496    }
2497    #[test_traced]
2498    fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2499        // Old meta = Some(mid), new boundary = aligned.
2500        let executor = deterministic::Runner::default();
2501        executor.start(|context| async move {
2502            let cfg = test_cfg(&context, NZU64!(5));
2503            let journal =
2504                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2505                    .await
2506                    .unwrap();
2507            for i in 0..10u64 {
2508                journal.append(&test_digest(i)).await.unwrap();
2509            }
2510            assert_eq!(journal.size().await, 17);
2511            journal.prune(10).await.unwrap();
2512
2513            let inner = journal.inner.read().await;
2514            let tail_section = inner.size / journal.items_per_blob;
2515            inner.journal.sync(tail_section).await.unwrap();
2516            drop(inner);
2517            drop(journal);
2518
2519            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2520                .await
2521                .unwrap();
2522            let bounds = journal.bounds().await;
2523            assert_eq!(bounds.start, 10);
2524            assert_eq!(bounds.end, 17);
2525            journal.destroy().await.unwrap();
2526        });
2527    }
2528
2529    #[test_traced]
2530    fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2531        // Pruning to a position earlier than pruning_boundary (within the same section)
2532        // should not move the boundary backwards.
2533        let executor = deterministic::Runner::default();
2534        executor.start(|context| async move {
2535            let cfg = test_cfg(&context, NZU64!(5));
2536            // init_at_size(7) sets pruning_boundary = 7 (mid-section in section 1)
2537            let journal =
2538                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2539                    .await
2540                    .unwrap();
2541            // Append 5 items at positions 7-11, filling section 1 and part of section 2
2542            for i in 0..5u64 {
2543                journal.append(&test_digest(i)).await.unwrap();
2544            }
2545            // Prune to position 5 (section 1 start) should NOT move boundary back from 7 to 5
2546            journal.prune(5).await.unwrap();
2547            assert_eq!(journal.bounds().await.start, 7);
2548            journal.destroy().await.unwrap();
2549        });
2550    }
2551
2552    #[test_traced]
2553    fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2554        // Test replay when first section begins mid-section: init_at_size creates a journal
2555        // where pruning_boundary is mid-section, then we append across multiple sections.
2556        let executor = deterministic::Runner::default();
2557        executor.start(|context| async move {
2558            let cfg = test_cfg(&context, NZU64!(5));
2559
2560            // Initialize at position 7 (mid-section with items_per_blob=5)
2561            // Section 1 (positions 5-9) begins mid-section: only positions 7, 8, 9 have data
2562            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2563                .await
2564                .unwrap();
2565
2566            // Append 13 items (positions 7-19), spanning sections 1, 2, 3
2567            for i in 0..13u64 {
2568                let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2569                assert_eq!(pos, 7 + i);
2570            }
2571            assert_eq!(journal.size().await, 20);
2572            journal.sync().await.unwrap();
2573
2574            // Replay from pruning_boundary
2575            {
2576                let reader = journal.reader().await;
2577                let stream = reader
2578                    .replay(NZUsize!(1024), 7)
2579                    .await
2580                    .expect("failed to replay");
2581                pin_mut!(stream);
2582                let mut items: Vec<(u64, Digest)> = Vec::new();
2583                while let Some(result) = stream.next().await {
2584                    items.push(result.expect("replay item failed"));
2585                }
2586
2587                // Should get all 13 items with correct logical positions
2588                assert_eq!(items.len(), 13);
2589                for (i, (pos, item)) in items.iter().enumerate() {
2590                    assert_eq!(*pos, 7 + i as u64);
2591                    assert_eq!(*item, test_digest(100 + i as u64));
2592                }
2593            }
2594
2595            // Replay from mid-stream (position 12)
2596            {
2597                let reader = journal.reader().await;
2598                let stream = reader
2599                    .replay(NZUsize!(1024), 12)
2600                    .await
2601                    .expect("failed to replay from mid-stream");
2602                pin_mut!(stream);
2603                let mut items: Vec<(u64, Digest)> = Vec::new();
2604                while let Some(result) = stream.next().await {
2605                    items.push(result.expect("replay item failed"));
2606                }
2607
2608                // Should get items from position 12 onwards
2609                assert_eq!(items.len(), 8);
2610                for (i, (pos, item)) in items.iter().enumerate() {
2611                    assert_eq!(*pos, 12 + i as u64);
2612                    assert_eq!(*item, test_digest(100 + 5 + i as u64));
2613                }
2614            }
2615
2616            journal.destroy().await.unwrap();
2617        });
2618    }
2619
2620    #[test_traced]
2621    fn test_fixed_journal_rewind_error_before_bounds_start() {
2622        // Test that rewind returns error when trying to rewind before bounds.start
2623        let executor = deterministic::Runner::default();
2624        executor.start(|context| async move {
2625            let cfg = test_cfg(&context, NZU64!(5));
2626
2627            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2628                .await
2629                .unwrap();
2630
2631            // Append a few items (positions 10, 11, 12)
2632            for i in 0..3u64 {
2633                journal.append(&test_digest(i)).await.unwrap();
2634            }
2635            assert_eq!(journal.size().await, 13);
2636
2637            // Rewind to position 11 should work
2638            journal.rewind(11).await.unwrap();
2639            assert_eq!(journal.size().await, 11);
2640
2641            // Rewind to position 10 (pruning_boundary) should work
2642            journal.rewind(10).await.unwrap();
2643            assert_eq!(journal.size().await, 10);
2644
2645            // Rewind to before pruning_boundary should fail
2646            let result = journal.rewind(9).await;
2647            assert!(matches!(result, Err(Error::InvalidRewind(9))));
2648
2649            journal.destroy().await.unwrap();
2650        });
2651    }
2652
2653    #[test_traced]
2654    fn test_fixed_journal_init_at_size_crash_scenarios() {
2655        let executor = deterministic::Runner::default();
2656        executor.start(|context| async move {
2657            let cfg = test_cfg(&context, NZU64!(5));
2658
2659            // Setup: Create a journal with some data and mid-section metadata
2660            let journal =
2661                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2662                    .await
2663                    .unwrap();
2664            for i in 0..5u64 {
2665                journal.append(&test_digest(i)).await.unwrap();
2666            }
2667            journal.sync().await.unwrap();
2668            drop(journal);
2669
2670            // Crash Scenario 1: After clear(), before blob creation
2671            // Simulate by manually removing all blobs but leaving metadata
2672            let blob_part = blob_partition(&cfg);
2673            context.remove(&blob_part, None).await.unwrap();
2674
2675            // Recovery should see no blobs and return empty journal, ignoring metadata
2676            let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2677                .await
2678                .expect("init failed after clear crash");
2679            let bounds = journal.bounds().await;
2680            assert_eq!(bounds.end, 0);
2681            assert_eq!(bounds.start, 0);
2682            drop(journal);
2683
2684            // Restore metadata for next scenario (it might have been removed by init)
2685            let meta_cfg = MetadataConfig {
2686                partition: format!("{}-metadata", cfg.partition),
2687                codec_config: ((0..).into(), ()),
2688            };
2689            let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2690                context.with_label("restore_meta"),
2691                meta_cfg.clone(),
2692            )
2693            .await
2694            .unwrap();
2695            metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2696            metadata.sync().await.unwrap();
2697
2698            // Crash Scenario 2: After ensure_section_exists(), before metadata update
2699            // Target: init_at_size(12) -> should be section 2 (starts at 10)
2700            // State: Blob at section 2, Metadata says 7 (section 1)
2701            // Wait, old metadata (7) is BEHIND new blob (12/5 = 2).
2702            // recover_bounds treats "meta < blob" as stale -> uses blob.
2703
2704            // Let's try init_at_size(2) -> section 0.
2705            // Old metadata says 7 (section 1).
2706            // State: Blob at section 0, Metadata says 7 (section 1).
2707            // recover_bounds sees "meta (1) > blob (0)" -> metadata ahead -> uses blob.
2708
2709            // Simulate: Create blob at section 0 (tail for init_at_size(2))
2710            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2711            blob.sync().await.unwrap(); // Ensure it exists
2712
2713            // Recovery should warn "metadata ahead" and use blob state (0, 0)
2714            let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2715                .await
2716                .expect("init failed after create crash");
2717
2718            // Should recover to blob state (section 0 aligned)
2719            let bounds = journal.bounds().await;
2720            assert_eq!(bounds.start, 0);
2721            // Size is 0 because blob is empty
2722            assert_eq!(bounds.end, 0);
2723            journal.destroy().await.unwrap();
2724        });
2725    }
2726
2727    #[test_traced]
2728    fn test_fixed_journal_clear_to_size_crash_scenarios() {
2729        let executor = deterministic::Runner::default();
2730        executor.start(|context| async move {
2731            let cfg = test_cfg(&context, NZU64!(5));
2732
2733            // Setup: Init at 12 (Section 2, offset 2)
2734            // Metadata = 12
2735            let journal =
2736                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2737                    .await
2738                    .unwrap();
2739            journal.sync().await.unwrap();
2740            drop(journal);
2741
2742            // Crash Scenario: clear_to_size(2) [Section 0]
2743            // We want to simulate crash after blob 0 created, but metadata still 12.
2744
2745            // manually clear blobs
2746            let blob_part = blob_partition(&cfg);
2747            context.remove(&blob_part, None).await.unwrap();
2748
2749            // manually create section 0
2750            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2751            blob.sync().await.unwrap();
2752
2753            // Metadata is still 12 (from setup)
2754            // Blob is Section 0
2755            // Metadata (12 -> sec 2) > Blob (sec 0) -> Ahead warning
2756
2757            let journal =
2758                Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2759                    .await
2760                    .expect("init failed after clear_to_size crash");
2761
2762            // Should fallback to blobs
2763            let bounds = journal.bounds().await;
2764            assert_eq!(bounds.start, 0);
2765            assert_eq!(bounds.end, 0);
2766            journal.destroy().await.unwrap();
2767        });
2768    }
2769}