Skip to main content

commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs. Each `Blob` contains a
12//! configurable maximum of `items_per_blob`, with page-level data integrity provided by a buffer
13//! pool.
14//!
15//! ```text
16//! +--------+----- --+--- -+----------+
17//! | item_0 | item_1 | ... | item_n-1 |
18//! +--------+-----------+--------+----0
19//!
20//! n = config.items_per_blob
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! Data fetched from disk is always checked for integrity before being returned. If the data is
27//! found to be invalid, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` and/or pruning old items.
33//!
34//! # Partition
35//!
36//! Blobs are stored in the legacy partition (`cfg.partition`) if it already contains data;
37//! otherwise they are stored in `{cfg.partition}-blobs`.
38//!
39//! Metadata is stored in `{cfg.partition}-metadata`.
40//!
41//! # Consistency
42//!
43//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
44//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
45//! calling `close`, all pending data is automatically synced and any open blobs are closed.
46//!
47//! # Pruning
48//!
49//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
50//! given point in history.
51//!
52//! # Replay
53//!
54//! The `replay` method supports fast reading of all unpruned items into memory.
55
56#[cfg(test)]
57use super::Reader as _;
58use crate::{
59    journal::{
60        contiguous::{Many, Mutable},
61        segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62        Error,
63    },
64    metadata::{Config as MetadataConfig, Metadata},
65    Context, Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::buffer::paged::CacheRef;
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74/// Metadata key for storing the pruning boundary.
75const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77/// Configuration for `Journal` storage.
78#[derive(Clone)]
79pub struct Config {
80    /// Prefix for the journal partitions.
81    ///
82    /// Blobs are stored in `partition` (legacy) if it contains data, otherwise in
83    /// `{partition}-blobs`. Metadata is stored in `{partition}-metadata`.
84    pub partition: String,
85
86    /// The maximum number of journal items to store in each blob.
87    ///
88    /// Any unpruned historical blobs will contain exactly this number of items.
89    /// Only the newest blob may contain fewer items.
90    pub items_per_blob: NonZeroU64,
91
92    /// The page cache to use for caching data.
93    pub page_cache: CacheRef,
94
95    /// The size of the write buffer to use for each blob.
96    pub write_buffer: NonZeroUsize,
97}
98
99/// Inner state protected by a single RwLock.
100struct Inner<E: Context, A: CodecFixedShared> {
101    /// The underlying segmented journal.
102    journal: SegmentedJournal<E, A>,
103
104    /// Total number of items appended (not affected by pruning).
105    size: u64,
106
107    /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's
108    /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning
109    /// boundary. Otherwise, the metadata is empty.
110    ///
111    /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is
112    /// persisted to ensure that its pruning boundary is never after the inner journal's size.
113    // TODO(#2939): Remove metadata
114    metadata: Metadata<E, u64, Vec<u8>>,
115
116    /// The position before which all items have been pruned.
117    pruning_boundary: u64,
118}
119
120impl<E: Context, A: CodecFixedShared> Inner<E, A> {
121    /// Read the item at position `pos` in the journal.
122    ///
123    /// # Errors
124    ///
125    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
126    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
127    async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128        if pos >= self.size {
129            return Err(Error::ItemOutOfRange(pos));
130        }
131        if pos < self.pruning_boundary {
132            return Err(Error::ItemPruned(pos));
133        }
134
135        let section = pos / items_per_blob;
136        let section_start = section * items_per_blob;
137
138        // Calculate position within the blob.
139        // This accounts for sections that begin mid-section (pruning_boundary > section_start).
140        let first_in_section = self.pruning_boundary.max(section_start);
141        let pos_in_section = pos - first_in_section;
142
143        self.journal
144            .get(section, pos_in_section)
145            .await
146            .map_err(|e| {
147                // Since we check bounds above, any failure here is unexpected.
148                match e {
149                    Error::SectionOutOfRange(e)
150                    | Error::AlreadyPrunedToSection(e)
151                    | Error::ItemOutOfRange(e) => {
152                        Error::Corruption(format!("section/item should be found, but got: {e}"))
153                    }
154                    other => other,
155                }
156            })
157    }
158
159    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
160    fn try_read_sync(&self, pos: u64, items_per_blob: u64) -> Option<A> {
161        if pos >= self.size || pos < self.pruning_boundary {
162            return None;
163        }
164        let section = pos / items_per_blob;
165        let section_start = section * items_per_blob;
166        let first_in_section = self.pruning_boundary.max(section_start);
167        let pos_in_section = pos - first_in_section;
168        self.journal.try_get_sync(section, pos_in_section)
169    }
170}
171
172/// Implementation of `Journal` storage.
173///
174/// This is implemented as a wrapper around [SegmentedJournal] that provides position-based access
175/// where positions are automatically mapped to (section, position_in_section) pairs.
176///
177/// # Repair
178///
179/// Like
180/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
181/// and
182/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
183/// the first invalid data read will be considered the new end of the journal (and the
184/// underlying blob will be truncated to the last valid item). Repair is performed
185/// by the underlying [SegmentedJournal] during init.
186pub struct Journal<E: Context, A: CodecFixedShared> {
187    /// Inner state with segmented journal and size.
188    ///
189    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
190    /// race conditions while allowing concurrent reads during sync.
191    inner: UpgradableAsyncRwLock<Inner<E, A>>,
192
193    /// The maximum number of items per blob (section).
194    items_per_blob: u64,
195}
196
197/// A reader guard that holds a consistent snapshot of the journal's bounds.
198pub struct Reader<'a, E: Context, A: CodecFixedShared> {
199    guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
200    items_per_blob: u64,
201}
202
203impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
204    type Item = A;
205
206    fn bounds(&self) -> std::ops::Range<u64> {
207        self.guard.pruning_boundary..self.guard.size
208    }
209
210    async fn read(&self, pos: u64) -> Result<A, Error> {
211        self.guard.read(pos, self.items_per_blob).await
212    }
213
214    fn try_read_sync(&self, pos: u64) -> Option<A> {
215        self.guard.try_read_sync(pos, self.items_per_blob)
216    }
217
218    async fn replay(
219        &self,
220        buffer: NonZeroUsize,
221        start_pos: u64,
222    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
223        let items_per_blob = self.items_per_blob;
224        let pruning_boundary = self.guard.pruning_boundary;
225
226        // Validate bounds.
227        if start_pos > self.guard.size {
228            return Err(Error::ItemOutOfRange(start_pos));
229        }
230        if start_pos < pruning_boundary {
231            return Err(Error::ItemPruned(start_pos));
232        }
233
234        let start_section = start_pos / items_per_blob;
235        let section_start = start_section * items_per_blob;
236
237        // Calculate start position within the section.
238        let first_in_section = pruning_boundary.max(section_start);
239        let start_pos_in_section = start_pos - first_in_section;
240
241        // Check all middle sections (not oldest, not tail) in range are complete.
242        let journal = &self.guard.journal;
243        if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
244            let first_to_check = start_section.max(oldest + 1);
245            for section in first_to_check..newest {
246                let len = journal.section_len(section).await?;
247                if len < items_per_blob {
248                    return Err(Error::Corruption(format!(
249                        "section {section} incomplete: expected {items_per_blob} items, got {len}"
250                    )));
251                }
252            }
253        }
254
255        let inner_stream = journal
256            .replay(start_section, start_pos_in_section, buffer)
257            .await?;
258
259        // Transform (section, pos_in_section, item) to (global_pos, item).
260        let stream = inner_stream.map(move |result| {
261            result.map(|(section, pos_in_section, item)| {
262                let section_start = section * items_per_blob;
263                let first_in_section = pruning_boundary.max(section_start);
264                let global_pos = first_in_section + pos_in_section;
265                (global_pos, item)
266            })
267        });
268
269        Ok(stream)
270    }
271}
272
273impl<E: Context, A: CodecFixedShared> Journal<E, A> {
274    /// Size of each entry in bytes.
275    pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
276
277    /// Size of each entry in bytes (as u64).
278    pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
279
280    /// Scan a partition and return blob names, treating a missing partition as empty.
281    async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
282        match context.scan(partition).await {
283            Ok(blobs) => Ok(blobs),
284            Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
285            Err(err) => Err(Error::Runtime(err)),
286        }
287    }
288
289    /// Select the blobs partition using legacy-first compatibility rules.
290    ///
291    /// If both legacy and new blobs partitions contain data, returns corruption.
292    /// If neither contains data, defaults to the new blobs partition.
293    // TODO(#2941): Remove legacy partition support
294    async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
295        let legacy_partition = cfg.partition.as_str();
296        let new_partition = format!("{}-blobs", cfg.partition);
297
298        let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
299        let new_blobs = Self::scan_partition(context, &new_partition).await?;
300
301        if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
302            return Err(Error::Corruption(format!(
303                "both legacy and blobs partitions contain data: legacy={} blobs={}",
304                legacy_partition, new_partition
305            )));
306        }
307
308        if !legacy_blobs.is_empty() {
309            Ok(legacy_partition.into())
310        } else {
311            Ok(new_partition)
312        }
313    }
314
315    /// Initialize a new `Journal` instance.
316    ///
317    /// All backing blobs are opened but not read during initialization. The `replay` method can be
318    /// used to iterate over all items in the `Journal`.
319    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
320        let items_per_blob = cfg.items_per_blob.get();
321
322        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
323        let segmented_cfg = SegmentedConfig {
324            partition: blob_partition,
325            page_cache: cfg.page_cache,
326            write_buffer: cfg.write_buffer,
327        };
328
329        let mut journal =
330            SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
331        // Initialize metadata store
332        let meta_cfg = MetadataConfig {
333            partition: format!("{}-metadata", cfg.partition),
334            codec_config: ((0..).into(), ()),
335        };
336
337        let mut metadata =
338            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
339
340        // Parse metadata if present
341        let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
342            Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
343                |_| Error::Corruption("invalid pruning_boundary metadata".into()),
344            )?)),
345            None => None,
346        };
347
348        // Recover bounds from metadata and/or blobs
349        let (pruning_boundary, size, needs_metadata_update) =
350            Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
351
352        // Persist metadata if needed
353        if needs_metadata_update {
354            if pruning_boundary.is_multiple_of(items_per_blob) {
355                metadata.remove(&PRUNING_BOUNDARY_KEY);
356            } else {
357                metadata.put(
358                    PRUNING_BOUNDARY_KEY,
359                    pruning_boundary.to_be_bytes().to_vec(),
360                );
361            }
362            metadata.sync().await?;
363        }
364
365        // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
366        // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where
367        // the next append would go).
368        let tail_section = size / items_per_blob;
369        journal.ensure_section_exists(tail_section).await?;
370
371        Ok(Self {
372            inner: UpgradableAsyncRwLock::new(Inner {
373                journal,
374                size,
375                metadata,
376                pruning_boundary,
377            }),
378            items_per_blob,
379        })
380    }
381
382    /// Returns (pruning_boundary, size, needs_metadata_update) based on metadata and blobs.
383    ///
384    /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state:
385    /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary
386    /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary
387    /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size]
388    ///   or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs.
389    /// - Otherwise, metadata is valid and we use it
390    ///
391    /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs.
392    async fn recover_bounds(
393        inner: &SegmentedJournal<E, A>,
394        items_per_blob: u64,
395        meta_pruning_boundary: Option<u64>,
396    ) -> Result<(u64, u64, bool), Error> {
397        // Blob-based boundary is always section-aligned
398        let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
399
400        let (pruning_boundary, needs_update) = match meta_pruning_boundary {
401            // Mid-section metadata: validate against blobs
402            Some(meta_pruning_boundary)
403                if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
404            {
405                let meta_oldest_section = meta_pruning_boundary / items_per_blob;
406                match inner.oldest_section() {
407                    None => {
408                        // No blobs exist but metadata claims mid-section boundary.
409                        // This can happen if we crash after inner.clear() but before
410                        // ensure_section_exists(). Ignore stale metadata.
411                        warn!(
412                            meta_oldest_section,
413                            "crash repair: no blobs exist, ignoring stale metadata"
414                        );
415                        (blob_boundary, true)
416                    }
417                    Some(oldest_section) if meta_oldest_section < oldest_section => {
418                        warn!(
419                            meta_oldest_section,
420                            oldest_section, "crash repair: metadata stale, computing from blobs"
421                        );
422                        (blob_boundary, true)
423                    }
424                    Some(oldest_section) if meta_oldest_section > oldest_section => {
425                        // Metadata references a section ahead of the oldest blob. This can happen
426                        // if we crash during clear_to_size/init_at_size after blobs update but
427                        // before metadata update. Fall back to blob state.
428                        warn!(
429                            meta_oldest_section,
430                            oldest_section,
431                            "crash repair: metadata ahead of blobs, computing from blobs"
432                        );
433                        (blob_boundary, true)
434                    }
435                    Some(_) => (meta_pruning_boundary, false), // valid mid-section metadata
436                }
437            }
438            // Section-aligned metadata: unnecessary, use blob-based
439            Some(_) => (blob_boundary, true),
440            // No metadata: use blob-based, no update needed
441            None => (blob_boundary, false),
442        };
443
444        // Validate oldest section before computing size.
445        Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
446
447        let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
448        Ok((pruning_boundary, size, needs_update))
449    }
450
451    /// Validate that the oldest section has the expected number of items.
452    ///
453    /// Non-tail sections must be full from their logical start. The tail section
454    /// (oldest == newest) can be partially filled.
455    async fn validate_oldest_section(
456        inner: &SegmentedJournal<E, A>,
457        items_per_blob: u64,
458        pruning_boundary: u64,
459    ) -> Result<(), Error> {
460        let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
461            return Ok(()); // No sections to validate
462        };
463
464        if oldest == newest {
465            return Ok(()); // Tail section, can be partial
466        }
467
468        let oldest_len = inner.section_len(oldest).await?;
469        let oldest_start = oldest * items_per_blob;
470
471        let expected = if pruning_boundary > oldest_start {
472            // Mid-section boundary: items from pruning_boundary to section end
473            items_per_blob - (pruning_boundary - oldest_start)
474        } else {
475            // Section-aligned boundary: full section
476            items_per_blob
477        };
478
479        if oldest_len != expected {
480            return Err(Error::Corruption(format!(
481                "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
482            )));
483        }
484
485        Ok(())
486    }
487
488    /// Returns the total number of items ever appended (size), computed from the blobs.
489    async fn compute_size(
490        inner: &SegmentedJournal<E, A>,
491        items_per_blob: u64,
492        pruning_boundary: u64,
493    ) -> Result<u64, Error> {
494        let oldest = inner.oldest_section();
495        let newest = inner.newest_section();
496
497        let (Some(oldest), Some(newest)) = (oldest, newest) else {
498            return Ok(pruning_boundary);
499        };
500
501        if oldest == newest {
502            // Single section: count from pruning boundary
503            let tail_len = inner.section_len(newest).await?;
504            return Ok(pruning_boundary + tail_len);
505        }
506
507        // Multiple sections: sum actual item counts
508        let oldest_len = inner.section_len(oldest).await?;
509        let tail_len = inner.section_len(newest).await?;
510
511        // Middle sections are assumed full
512        let middle_sections = newest - oldest - 1;
513        let middle_items = middle_sections * items_per_blob;
514
515        Ok(pruning_boundary + oldest_len + middle_items + tail_len)
516    }
517
518    /// Initialize a new `Journal` instance in a pruned state at a given size.
519    ///
520    /// This is used for state sync to create a journal that appears to have had `size` items
521    /// appended and then pruned up to that point.
522    ///
523    /// # Arguments
524    /// * `context` - The storage context
525    /// * `cfg` - Configuration for the journal
526    /// * `size` - The number of operations that have been "pruned"
527    ///
528    /// # Behavior
529    /// - Clears any existing data in the partition
530    /// - Creates an empty tail blob where the next append (at position `size`) will go
531    /// - `bounds().is_empty()` returns `true` (fully pruned state)
532    /// - The next `append()` will write to position `size`
533    ///
534    /// # Post-conditions
535    /// - `bounds().end` returns `size`
536    /// - `bounds().is_empty()` returns `true`
537    /// - `bounds.start` equals `size` (no data exists)
538    ///
539    /// # Crash Safety
540    /// If a crash occurs during this operation, `init()` will recover to a consistent state
541    /// (though possibly different from the intended `size`).
542    #[commonware_macros::stability(ALPHA)]
543    pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
544        let items_per_blob = cfg.items_per_blob.get();
545        let tail_section = size / items_per_blob;
546
547        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
548        let segmented_cfg = SegmentedConfig {
549            partition: blob_partition,
550            page_cache: cfg.page_cache,
551            write_buffer: cfg.write_buffer,
552        };
553
554        // Initialize both stores.
555        let meta_cfg = MetadataConfig {
556            partition: format!("{}-metadata", cfg.partition),
557            codec_config: ((0..).into(), ()),
558        };
559        let mut metadata =
560            Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
561        let mut journal =
562            SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
563
564        // Clear blobs before updating metadata.
565        // This ordering is critical for crash safety:
566        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
567        // - Crash after create: old metadata triggers "metadata ahead" warning,
568        //   recovery falls back to blob state.
569        journal.clear().await?;
570        journal.ensure_section_exists(tail_section).await?;
571
572        // Persist metadata if pruning_boundary is mid-section.
573        if !size.is_multiple_of(items_per_blob) {
574            metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
575            metadata.sync().await?;
576        } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
577            metadata.remove(&PRUNING_BOUNDARY_KEY);
578            metadata.sync().await?;
579        }
580
581        Ok(Self {
582            inner: UpgradableAsyncRwLock::new(Inner {
583                journal,
584                size,
585                metadata,
586                pruning_boundary: size, // No data exists yet
587            }),
588            items_per_blob,
589        })
590    }
591
592    /// Convert a global position to (section, position_in_section).
593    #[inline]
594    const fn position_to_section(&self, position: u64) -> (u64, u64) {
595        let section = position / self.items_per_blob;
596        let pos_in_section = position % self.items_per_blob;
597        (section, pos_in_section)
598    }
599
600    /// Sync any pending updates to disk.
601    ///
602    /// Only the tail section can have pending updates since historical sections are synced
603    /// when they become full.
604    pub async fn sync(&self) -> Result<(), Error> {
605        // Serialize with append/prune/rewind to ensure section selection is stable, while still allowing
606        // concurrent readers.
607        let inner = self.inner.upgradable_read().await;
608
609        // Sync the tail section
610        let tail_section = inner.size / self.items_per_blob;
611
612        // The tail section may not exist yet if the previous section was just filled, but syncing a
613        // non-existent section is safe (returns Ok).
614        inner.journal.sync(tail_section).await?;
615
616        // Persist metadata only when pruning_boundary is mid-section.
617        let pruning_boundary = inner.pruning_boundary;
618        let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
619        let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
620            let needs_update = pruning_boundary_from_metadata
621                .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
622
623            if needs_update {
624                true
625            } else {
626                return Ok(());
627            }
628        } else if pruning_boundary_from_metadata.is_some() {
629            false
630        } else {
631            return Ok(());
632        };
633
634        // Upgrade only for the metadata mutation/sync step; reads were allowed while syncing
635        // the tail section above.
636        let mut inner = inner.upgrade().await;
637        if put {
638            inner.metadata.put(
639                PRUNING_BOUNDARY_KEY,
640                pruning_boundary.to_be_bytes().to_vec(),
641            );
642        } else {
643            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
644        }
645        inner.metadata.sync().await?;
646
647        Ok(())
648    }
649
650    /// Acquire a reader guard that holds a consistent view of the journal.
651    pub async fn reader(&self) -> Reader<'_, E, A> {
652        Reader {
653            guard: self.inner.read().await,
654            items_per_blob: self.items_per_blob,
655        }
656    }
657
658    /// Return the total number of items in the journal, irrespective of pruning. The next value
659    /// appended to the journal will be at this position.
660    pub async fn size(&self) -> u64 {
661        self.inner.read().await.size
662    }
663
664    /// Append a new item to the journal. Return the item's position in the journal, or error if the
665    /// operation fails.
666    pub async fn append(&self, item: &A) -> Result<u64, Error> {
667        self.append_many(Many::Flat(std::slice::from_ref(item)))
668            .await
669    }
670
671    /// Append items to the journal, returning the position of the last item appended.
672    ///
673    /// Acquires the write lock once for all items instead of per-item.
674    /// Returns [Error::EmptyAppend] if items is empty.
675    pub async fn append_many<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
676        if items.is_empty() {
677            return Err(Error::EmptyAppend);
678        }
679
680        // Encode all items into a single contiguous buffer before taking the write guard.
681        // Uses Write::write directly to avoid per-item Bytes allocations from Encode::encode.
682        let items_count = match &items {
683            Many::Flat(items) => items.len(),
684            Many::Nested(nested_items) => nested_items.iter().map(|s| s.len()).sum(),
685        };
686        let mut items_buf = Vec::with_capacity(items_count * A::SIZE);
687        match &items {
688            Many::Flat(items) => {
689                for item in *items {
690                    item.write(&mut items_buf);
691                }
692            }
693            Many::Nested(nested_items) => {
694                for items in *nested_items {
695                    for item in *items {
696                        item.write(&mut items_buf);
697                    }
698                }
699            }
700        }
701
702        // Mutating operations are serialized by taking the write guard.
703        let mut inner = self.inner.write().await;
704        let mut written = 0;
705        while written < items_count {
706            let (section, pos_in_section) = self.position_to_section(inner.size);
707            let remaining_space = (self.items_per_blob - pos_in_section) as usize;
708            let batch_count = remaining_space.min(items_count - written);
709            let start = written * A::SIZE;
710            let end = start + batch_count * A::SIZE;
711
712            inner
713                .journal
714                .append_raw(section, &items_buf[start..end])
715                .await?;
716            inner.size += batch_count as u64;
717            written += batch_count;
718
719            if inner.size.is_multiple_of(self.items_per_blob) {
720                // The section was filled and must be synced. Downgrade so readers can continue
721                // during the sync, but keep mutators blocked. After sync, upgrade again to
722                // create the next tail section before any append can proceed.
723                let inner_ref = inner.downgrade_to_upgradable();
724                inner_ref.journal.sync(section).await?;
725                inner = inner_ref.upgrade().await;
726                inner.journal.ensure_section_exists(section + 1).await?;
727            }
728        }
729
730        Ok(inner.size - 1)
731    }
732
733    /// Rewind the journal to the given `size`. Returns [Error::InvalidRewind] if the rewind point
734    /// precedes the oldest retained element. The journal is not synced after rewinding.
735    ///
736    /// # Warnings
737    ///
738    /// * This operation is not guaranteed to survive restarts until sync is called.
739    /// * This operation is not atomic, but it will always leave the journal in a consistent state
740    ///   in the event of failure since blobs are always removed from newest to oldest.
741    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
742        let mut inner = self.inner.write().await;
743
744        match size.cmp(&inner.size) {
745            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
746            std::cmp::Ordering::Equal => return Ok(()),
747            std::cmp::Ordering::Less => {}
748        }
749
750        if size < inner.pruning_boundary {
751            return Err(Error::InvalidRewind(size));
752        }
753
754        let section = size / self.items_per_blob;
755        let section_start = section * self.items_per_blob;
756
757        // Calculate offset within section for rewind
758        let first_in_section = inner.pruning_boundary.max(section_start);
759        let pos_in_section = size - first_in_section;
760        let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
761
762        inner.journal.rewind(section, byte_offset).await?;
763        inner.size = size;
764
765        Ok(())
766    }
767
768    /// Return the location before which all items have been pruned.
769    pub async fn pruning_boundary(&self) -> u64 {
770        let inner = self.inner.read().await;
771        inner.pruning_boundary
772    }
773
774    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
775    /// such items in order to preserve blob boundaries, but the amount of such items will always be
776    /// less than the configured number of items per blob. Returns true if any items were pruned.
777    ///
778    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
779    /// event of failure as items are always pruned in order from oldest to newest.
780    pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
781        let mut inner = self.inner.write().await;
782
783        // Calculate the section that would contain min_item_pos
784        let target_section = min_item_pos / self.items_per_blob;
785
786        // Calculate the tail section.
787        let tail_section = inner.size / self.items_per_blob;
788
789        // Cap to tail section. The tail section is guaranteed to exist by our invariant.
790        let min_section = std::cmp::min(target_section, tail_section);
791
792        let pruned = inner.journal.prune(min_section).await?;
793
794        // After pruning, update pruning_boundary to the start of the oldest remaining section
795        if pruned {
796            let new_oldest = inner
797                .journal
798                .oldest_section()
799                .expect("all sections pruned - violates tail section invariant");
800            // Pruning boundary only moves forward
801            assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
802            inner.pruning_boundary = new_oldest * self.items_per_blob;
803        }
804
805        Ok(pruned)
806    }
807
808    /// Remove any persisted data created by the journal.
809    pub async fn destroy(self) -> Result<(), Error> {
810        // Destroy inner journal
811        let inner = self.inner.into_inner();
812        inner.journal.destroy().await?;
813
814        // Destroy metadata
815        inner.metadata.destroy().await?;
816
817        Ok(())
818    }
819
820    /// Clear all data and reset the journal to a new starting position.
821    ///
822    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
823    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
824    ///
825    /// # Crash Safety
826    /// If a crash occurs during this operation, `init()` will recover to a consistent state
827    /// (though possibly different from the intended `new_size`).
828    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
829        // Clear blobs before updating metadata.
830        // This ordering is critical for crash safety:
831        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
832        // - Crash after create: old metadata triggers "metadata ahead" warning,
833        //   recovery falls back to blob state
834        let mut inner = self.inner.write().await;
835        inner.journal.clear().await?;
836        let tail_section = new_size / self.items_per_blob;
837        inner.journal.ensure_section_exists(tail_section).await?;
838
839        inner.size = new_size;
840        inner.pruning_boundary = new_size; // No data exists
841
842        // Persist metadata only when pruning_boundary is mid-section.
843        if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
844            let value = inner.pruning_boundary.to_be_bytes().to_vec();
845            inner.metadata.put(PRUNING_BOUNDARY_KEY, value);
846            inner.metadata.sync().await?;
847        } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
848            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
849            inner.metadata.sync().await?;
850        }
851
852        Ok(())
853    }
854
855    /// Test helper: Read the item at the given position.
856    #[cfg(test)]
857    pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
858        self.reader().await.read(pos).await
859    }
860
861    /// Test helper: Return the bounds of the journal.
862    #[cfg(test)]
863    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
864        self.reader().await.bounds()
865    }
866
867    /// Test helper: Get the oldest section from the internal segmented journal.
868    #[cfg(test)]
869    pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
870        let inner = self.inner.read().await;
871        inner.journal.oldest_section()
872    }
873
874    /// Test helper: Get the newest section from the internal segmented journal.
875    #[cfg(test)]
876    pub(crate) async fn test_newest_section(&self) -> Option<u64> {
877        let inner = self.inner.read().await;
878        inner.journal.newest_section()
879    }
880}
881
882// Implement Contiguous trait for fixed-length journals
883impl<E: Context, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
884    type Item = A;
885
886    async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
887        Self::reader(self).await
888    }
889
890    async fn size(&self) -> u64 {
891        Self::size(self).await
892    }
893}
894
895impl<E: Context, A: CodecFixedShared> Mutable for Journal<E, A> {
896    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
897        Self::append(self, item).await
898    }
899
900    async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
901        Self::append_many(self, items).await
902    }
903
904    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
905        Self::prune(self, min_position).await
906    }
907
908    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
909        Self::rewind(self, size).await
910    }
911}
912
913impl<E: Context, A: CodecFixedShared> Persistable for Journal<E, A> {
914    type Error = Error;
915
916    async fn commit(&self) -> Result<(), Error> {
917        self.sync().await
918    }
919
920    async fn sync(&self) -> Result<(), Error> {
921        self.sync().await
922    }
923
924    async fn destroy(self) -> Result<(), Error> {
925        self.destroy().await
926    }
927}
928
929#[commonware_macros::stability(ALPHA)]
930impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> for Journal<E, A> {
931    type Config = Config;
932
933    async fn init<F: crate::merkle::Family, H: commonware_cryptography::Hasher>(
934        context: E,
935        merkle_cfg: crate::merkle::journaled::Config,
936        journal_cfg: Self::Config,
937        rewind_predicate: fn(&A) -> bool,
938    ) -> Result<
939        crate::journal::authenticated::Journal<F, E, Self, H>,
940        crate::journal::authenticated::Error<F>,
941    > {
942        crate::journal::authenticated::Journal::<F, E, Self, H>::new(
943            context,
944            merkle_cfg,
945            journal_cfg,
946            rewind_predicate,
947        )
948        .await
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
956    use commonware_macros::test_traced;
957    use commonware_runtime::{
958        deterministic::{self, Context},
959        Blob, BufferPooler, Error as RuntimeError, Metrics, Runner, Storage,
960    };
961    use commonware_utils::{NZUsize, NZU16, NZU64};
962    use futures::{pin_mut, StreamExt};
963    use std::num::NonZeroU16;
964
965    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
966    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
967
968    /// Generate a SHA-256 digest for the given value.
969    fn test_digest(value: u64) -> Digest {
970        Sha256::hash(&value.to_be_bytes())
971    }
972
973    fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
974        Config {
975            partition: "test-partition".into(),
976            items_per_blob,
977            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
978            write_buffer: NZUsize!(2048),
979        }
980    }
981
982    fn blob_partition(cfg: &Config) -> String {
983        format!("{}-blobs", cfg.partition)
984    }
985
986    async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
987        match context.scan(partition).await {
988            Ok(blobs) => blobs,
989            Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
990            Err(err) => panic!("Failed to scan partition {partition}: {err}"),
991        }
992    }
993
994    #[test_traced]
995    fn test_fixed_journal_init_conflicting_partitions() {
996        let executor = deterministic::Runner::default();
997        executor.start(|context| async move {
998            let cfg = test_cfg(&context, NZU64!(2));
999            let legacy_partition = cfg.partition.clone();
1000            let blobs_partition = blob_partition(&cfg);
1001
1002            let (legacy_blob, _) = context
1003                .open(&legacy_partition, &0u64.to_be_bytes())
1004                .await
1005                .expect("Failed to open legacy blob");
1006            legacy_blob
1007                .write_at(0, vec![0u8; 1])
1008                .await
1009                .expect("Failed to write legacy blob");
1010            legacy_blob
1011                .sync()
1012                .await
1013                .expect("Failed to sync legacy blob");
1014
1015            let (new_blob, _) = context
1016                .open(&blobs_partition, &0u64.to_be_bytes())
1017                .await
1018                .expect("Failed to open new blob");
1019            new_blob
1020                .write_at(0, vec![0u8; 1])
1021                .await
1022                .expect("Failed to write new blob");
1023            new_blob.sync().await.expect("Failed to sync new blob");
1024
1025            let result =
1026                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1027            assert!(matches!(result, Err(Error::Corruption(_))));
1028        });
1029    }
1030
1031    #[test_traced]
1032    fn test_fixed_journal_init_prefers_legacy_partition() {
1033        let executor = deterministic::Runner::default();
1034        executor.start(|context| async move {
1035            let cfg = test_cfg(&context, NZU64!(2));
1036            let legacy_partition = cfg.partition.clone();
1037            let blobs_partition = blob_partition(&cfg);
1038
1039            // Seed legacy partition so it is selected.
1040            let (legacy_blob, _) = context
1041                .open(&legacy_partition, &0u64.to_be_bytes())
1042                .await
1043                .expect("Failed to open legacy blob");
1044            legacy_blob
1045                .write_at(0, vec![0u8; 1])
1046                .await
1047                .expect("Failed to write legacy blob");
1048            legacy_blob
1049                .sync()
1050                .await
1051                .expect("Failed to sync legacy blob");
1052
1053            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
1054                .await
1055                .expect("failed to initialize journal");
1056            journal.append(&test_digest(1)).await.unwrap();
1057            journal.sync().await.unwrap();
1058            drop(journal);
1059
1060            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1061            let new_blobs = scan_partition(&context, &blobs_partition).await;
1062            assert!(!legacy_blobs.is_empty());
1063            assert!(new_blobs.is_empty());
1064        });
1065    }
1066
1067    #[test_traced]
1068    fn test_fixed_journal_init_defaults_to_blobs_partition() {
1069        let executor = deterministic::Runner::default();
1070        executor.start(|context| async move {
1071            let cfg = test_cfg(&context, NZU64!(2));
1072            let legacy_partition = cfg.partition.clone();
1073            let blobs_partition = blob_partition(&cfg);
1074
1075            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
1076                .await
1077                .expect("failed to initialize journal");
1078            journal.append(&test_digest(1)).await.unwrap();
1079            journal.sync().await.unwrap();
1080            drop(journal);
1081
1082            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1083            let new_blobs = scan_partition(&context, &blobs_partition).await;
1084            assert!(legacy_blobs.is_empty());
1085            assert!(!new_blobs.is_empty());
1086        });
1087    }
1088
1089    #[test_traced]
1090    fn test_fixed_journal_append_and_prune() {
1091        // Initialize the deterministic context
1092        let executor = deterministic::Runner::default();
1093
1094        // Start the test within the executor
1095        executor.start(|context| async move {
1096            // Initialize the journal, allowing a max of 2 items per blob.
1097            let cfg = test_cfg(&context, NZU64!(2));
1098            let journal = Journal::init(context.with_label("first"), cfg.clone())
1099                .await
1100                .expect("failed to initialize journal");
1101
1102            // Append an item to the journal
1103            let mut pos = journal
1104                .append(&test_digest(0))
1105                .await
1106                .expect("failed to append data 0");
1107            assert_eq!(pos, 0);
1108
1109            // Drop the journal and re-initialize it to simulate a restart
1110            journal.sync().await.expect("Failed to sync journal");
1111            drop(journal);
1112
1113            let cfg = test_cfg(&context, NZU64!(2));
1114            let journal = Journal::init(context.with_label("second"), cfg.clone())
1115                .await
1116                .expect("failed to re-initialize journal");
1117            assert_eq!(journal.size().await, 1);
1118
1119            // Append two more items to the journal to trigger a new blob creation
1120            pos = journal
1121                .append(&test_digest(1))
1122                .await
1123                .expect("failed to append data 1");
1124            assert_eq!(pos, 1);
1125            pos = journal
1126                .append(&test_digest(2))
1127                .await
1128                .expect("failed to append data 2");
1129            assert_eq!(pos, 2);
1130
1131            // Read the items back
1132            let item0 = journal.read(0).await.expect("failed to read data 0");
1133            assert_eq!(item0, test_digest(0));
1134            let item1 = journal.read(1).await.expect("failed to read data 1");
1135            assert_eq!(item1, test_digest(1));
1136            let item2 = journal.read(2).await.expect("failed to read data 2");
1137            assert_eq!(item2, test_digest(2));
1138            let err = journal.read(3).await.expect_err("expected read to fail");
1139            assert!(matches!(err, Error::ItemOutOfRange(3)));
1140
1141            // Sync the journal
1142            journal.sync().await.expect("failed to sync journal");
1143
1144            // Pruning to 1 should be a no-op because there's no blob with only older items.
1145            journal.prune(1).await.expect("failed to prune journal 1");
1146
1147            // Pruning to 2 should allow the first blob to be pruned.
1148            journal.prune(2).await.expect("failed to prune journal 2");
1149            assert_eq!(journal.bounds().await.start, 2);
1150
1151            // Reading from the first blob should fail since it's now pruned
1152            let result0 = journal.read(0).await;
1153            assert!(matches!(result0, Err(Error::ItemPruned(0))));
1154            let result1 = journal.read(1).await;
1155            assert!(matches!(result1, Err(Error::ItemPruned(1))));
1156
1157            // Third item should still be readable
1158            let result2 = journal.read(2).await.unwrap();
1159            assert_eq!(result2, test_digest(2));
1160
1161            // Should be able to continue to append items
1162            for i in 3..10 {
1163                let pos = journal
1164                    .append(&test_digest(i))
1165                    .await
1166                    .expect("failed to append data");
1167                assert_eq!(pos, i);
1168            }
1169
1170            // Check no-op pruning
1171            journal.prune(0).await.expect("no-op pruning failed");
1172            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1173            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1174            assert_eq!(journal.bounds().await.start, 2);
1175
1176            // Prune first 3 blobs (6 items)
1177            journal
1178                .prune(3 * cfg.items_per_blob.get())
1179                .await
1180                .expect("failed to prune journal 2");
1181            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1182            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1183            assert_eq!(journal.bounds().await.start, 6);
1184
1185            // Try pruning (more than) everything in the journal.
1186            journal
1187                .prune(10000)
1188                .await
1189                .expect("failed to max-prune journal");
1190            let size = journal.size().await;
1191            assert_eq!(size, 10);
1192            assert_eq!(journal.test_oldest_section().await, Some(5));
1193            assert_eq!(journal.test_newest_section().await, Some(5));
1194            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
1195            // will be empty, and there will be no retained items.
1196            let bounds = journal.bounds().await;
1197            assert!(bounds.is_empty());
1198            // bounds.start should equal bounds.end when empty.
1199            assert_eq!(bounds.start, size);
1200
1201            // Replaying from 0 should fail since all items before bounds.start are pruned
1202            {
1203                let reader = journal.reader().await;
1204                let result = reader.replay(NZUsize!(1024), 0).await;
1205                assert!(matches!(result, Err(Error::ItemPruned(0))));
1206            }
1207
1208            // Replaying from pruning_boundary should return empty stream
1209            {
1210                let reader = journal.reader().await;
1211                let res = reader.replay(NZUsize!(1024), 0).await;
1212                assert!(matches!(res, Err(Error::ItemPruned(_))));
1213
1214                let reader = journal.reader().await;
1215                let stream = reader
1216                    .replay(NZUsize!(1024), journal.bounds().await.start)
1217                    .await
1218                    .expect("failed to replay journal from pruning boundary");
1219                pin_mut!(stream);
1220                let mut items = Vec::new();
1221                while let Some(result) = stream.next().await {
1222                    match result {
1223                        Ok((pos, item)) => {
1224                            assert_eq!(test_digest(pos), item);
1225                            items.push(pos);
1226                        }
1227                        Err(err) => panic!("Failed to read item: {err}"),
1228                    }
1229                }
1230                assert_eq!(items, Vec::<u64>::new());
1231            }
1232
1233            journal.destroy().await.unwrap();
1234        });
1235    }
1236
1237    /// Append a lot of data to make sure we exercise page cache paging boundaries.
1238    #[test_traced]
1239    fn test_fixed_journal_append_a_lot_of_data() {
1240        // Initialize the deterministic context
1241        let executor = deterministic::Runner::default();
1242        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1243        executor.start(|context| async move {
1244            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1245            let journal = Journal::init(context.with_label("first"), cfg.clone())
1246                .await
1247                .expect("failed to initialize journal");
1248            // Append 2 blobs worth of items.
1249            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1250                journal
1251                    .append(&test_digest(i))
1252                    .await
1253                    .expect("failed to append data");
1254            }
1255            // Sync, reopen, then read back.
1256            journal.sync().await.expect("failed to sync journal");
1257            drop(journal);
1258            let journal = Journal::init(context.with_label("second"), cfg.clone())
1259                .await
1260                .expect("failed to re-initialize journal");
1261            for i in 0u64..10000 {
1262                let item: Digest = journal.read(i).await.expect("failed to read data");
1263                assert_eq!(item, test_digest(i));
1264            }
1265            journal.destroy().await.expect("failed to destroy journal");
1266        });
1267    }
1268
1269    #[test_traced]
1270    fn test_fixed_journal_replay() {
1271        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1272        // Initialize the deterministic context
1273        let executor = deterministic::Runner::default();
1274
1275        // Start the test within the executor
1276        executor.start(|context| async move {
1277            // Initialize the journal, allowing a max of 7 items per blob.
1278            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1279            let journal = Journal::init(context.with_label("first"), cfg.clone())
1280                .await
1281                .expect("failed to initialize journal");
1282
1283            // Append many items, filling 100 blobs and part of the 101st
1284            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1285                let pos = journal
1286                    .append(&test_digest(i))
1287                    .await
1288                    .expect("failed to append data");
1289                assert_eq!(pos, i);
1290            }
1291
1292            // Read them back the usual way.
1293            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1294                let item: Digest = journal.read(i).await.expect("failed to read data");
1295                assert_eq!(item, test_digest(i), "i={i}");
1296            }
1297
1298            // Replay should return all items
1299            {
1300                let reader = journal.reader().await;
1301                let stream = reader
1302                    .replay(NZUsize!(1024), 0)
1303                    .await
1304                    .expect("failed to replay journal");
1305                let mut items = Vec::new();
1306                pin_mut!(stream);
1307                while let Some(result) = stream.next().await {
1308                    match result {
1309                        Ok((pos, item)) => {
1310                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1311                            items.push(pos);
1312                        }
1313                        Err(err) => panic!("Failed to read item: {err}"),
1314                    }
1315                }
1316
1317                // Make sure all items were replayed
1318                assert_eq!(
1319                    items.len(),
1320                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1321                );
1322                items.sort();
1323                for (i, pos) in items.iter().enumerate() {
1324                    assert_eq!(i as u64, *pos);
1325                }
1326            }
1327
1328            journal.sync().await.expect("Failed to sync journal");
1329            drop(journal);
1330
1331            // Corrupt one of the bytes and make sure it's detected.
1332            let (blob, _) = context
1333                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1334                .await
1335                .expect("Failed to open blob");
1336            // Write junk bytes.
1337            let bad_bytes = 123456789u32;
1338            blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1339                .await
1340                .expect("Failed to write bad bytes");
1341            blob.sync().await.expect("Failed to sync blob");
1342
1343            // Re-initialize the journal to simulate a restart
1344            let journal = Journal::init(context.with_label("second"), cfg.clone())
1345                .await
1346                .expect("Failed to re-initialize journal");
1347
1348            // Make sure reading an item that resides in the corrupted page fails.
1349            let err = journal
1350                .read(40 * ITEMS_PER_BLOB.get() + 1)
1351                .await
1352                .unwrap_err();
1353            assert!(matches!(err, Error::Runtime(_)));
1354
1355            // Replay all items.
1356            {
1357                let mut error_found = false;
1358                let reader = journal.reader().await;
1359                let stream = reader
1360                    .replay(NZUsize!(1024), 0)
1361                    .await
1362                    .expect("failed to replay journal");
1363                let mut items = Vec::new();
1364                pin_mut!(stream);
1365                while let Some(result) = stream.next().await {
1366                    match result {
1367                        Ok((pos, item)) => {
1368                            assert_eq!(test_digest(pos), item);
1369                            items.push(pos);
1370                        }
1371                        Err(err) => {
1372                            error_found = true;
1373                            assert!(matches!(err, Error::Runtime(_)));
1374                            break;
1375                        }
1376                    }
1377                }
1378                assert!(error_found); // error should abort replay
1379            }
1380        });
1381    }
1382
1383    #[test_traced]
1384    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1385        // Initialize the deterministic context
1386        let executor = deterministic::Runner::default();
1387        // Start the test within the executor
1388        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1389        executor.start(|context| async move {
1390            // Initialize the journal, allowing a max of 7 items per blob.
1391            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1392            let journal = Journal::init(context.with_label("first"), cfg.clone())
1393                .await
1394                .expect("failed to initialize journal");
1395
1396            // Append many items, filling 100 blobs and part of the 101st
1397            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1398                let pos = journal
1399                    .append(&test_digest(i))
1400                    .await
1401                    .expect("failed to append data");
1402                assert_eq!(pos, i);
1403            }
1404            journal.sync().await.expect("Failed to sync journal");
1405            drop(journal);
1406
1407            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1408            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1409            // missing one item. This should be detected during init because all non-tail blobs
1410            // must be full.
1411            let (blob, size) = context
1412                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1413                .await
1414                .expect("Failed to open blob");
1415            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1416            blob.sync().await.expect("Failed to sync blob");
1417
1418            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1419            // missing one item. This should be detected as corruption during replay.
1420            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1421                .await
1422                .expect("failed to initialize journal");
1423
1424            // Journal size is computed from the tail section, so it's unchanged
1425            // despite the corruption in section 40.
1426            let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1427            assert_eq!(journal.size().await, expected_size);
1428
1429            // Replay should detect corruption (incomplete section) in section 40
1430            let reader = journal.reader().await;
1431            match reader.replay(NZUsize!(1024), 0).await {
1432                Err(Error::Corruption(msg)) => {
1433                    assert!(
1434                        msg.contains("section 40"),
1435                        "Error should mention section 40, got: {msg}"
1436                    );
1437                }
1438                Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1439                Ok(_) => panic!("Expected replay to fail with corruption"),
1440            };
1441        });
1442    }
1443
1444    #[test_traced]
1445    fn test_fixed_journal_replay_with_missing_historical_blob() {
1446        let executor = deterministic::Runner::default();
1447        executor.start(|context| async move {
1448            let cfg = test_cfg(&context, NZU64!(2));
1449            let journal = Journal::init(context.with_label("first"), cfg.clone())
1450                .await
1451                .expect("failed to initialize journal");
1452            for i in 0u64..5 {
1453                journal
1454                    .append(&test_digest(i))
1455                    .await
1456                    .expect("failed to append data");
1457            }
1458            journal.sync().await.expect("failed to sync journal");
1459            drop(journal);
1460
1461            context
1462                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1463                .await
1464                .expect("failed to remove blob");
1465
1466            // Init won't detect the corruption.
1467            let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1468                .await
1469                .expect("init shouldn't fail");
1470
1471            // But replay will.
1472            let reader = result.reader().await;
1473            match reader.replay(NZUsize!(1024), 0).await {
1474                Err(Error::Corruption(_)) => {}
1475                Err(err) => panic!("expected Corruption, got: {err}"),
1476                Ok(_) => panic!("expected Corruption, got ok"),
1477            };
1478
1479            // As will trying to read an item that was in the deleted blob.
1480            match result.read(2).await {
1481                Err(Error::Corruption(_)) => {}
1482                Err(err) => panic!("expected Corruption, got: {err}"),
1483                Ok(_) => panic!("expected Corruption, got ok"),
1484            };
1485        });
1486    }
1487
1488    #[test_traced]
1489    fn test_fixed_journal_test_trim_blob() {
1490        // Initialize the deterministic context
1491        let executor = deterministic::Runner::default();
1492        // Start the test within the executor
1493        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1494        executor.start(|context| async move {
1495            // Initialize the journal, allowing a max of 7 items per blob.
1496            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1497            let journal = Journal::init(context.with_label("first"), cfg.clone())
1498                .await
1499                .expect("failed to initialize journal");
1500
1501            // Fill one blob and put 3 items in the second.
1502            let item_count = ITEMS_PER_BLOB.get() + 3;
1503            for i in 0u64..item_count {
1504                journal
1505                    .append(&test_digest(i))
1506                    .await
1507                    .expect("failed to append data");
1508            }
1509            assert_eq!(journal.size().await, item_count);
1510            journal.sync().await.expect("Failed to sync journal");
1511            drop(journal);
1512
1513            // Truncate the tail blob by one byte, which should result in the last item being
1514            // discarded during replay (detected via corruption).
1515            let (blob, size) = context
1516                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1517                .await
1518                .expect("Failed to open blob");
1519            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1520            blob.sync().await.expect("Failed to sync blob");
1521
1522            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1523                .await
1524                .unwrap();
1525
1526            // The truncation invalidates the last page (bad checksum), which is removed.
1527            // This loses one item.
1528            assert_eq!(journal.size().await, item_count - 1);
1529
1530            // Cleanup.
1531            journal.destroy().await.expect("Failed to destroy journal");
1532        });
1533    }
1534
1535    #[test_traced]
1536    fn test_fixed_journal_partial_replay() {
1537        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1538        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1539        // starting position.
1540        const START_POS: u64 = 53;
1541
1542        // Initialize the deterministic context
1543        let executor = deterministic::Runner::default();
1544        // Start the test within the executor
1545        executor.start(|context| async move {
1546            // Initialize the journal, allowing a max of 7 items per blob.
1547            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1548            let journal = Journal::init(context.clone(), cfg.clone())
1549                .await
1550                .expect("failed to initialize journal");
1551
1552            // Append many items, filling 100 blobs and part of the 101st
1553            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1554                let pos = journal
1555                    .append(&test_digest(i))
1556                    .await
1557                    .expect("failed to append data");
1558                assert_eq!(pos, i);
1559            }
1560
1561            // Replay should return all items except the first `START_POS`.
1562            {
1563                let reader = journal.reader().await;
1564                let stream = reader
1565                    .replay(NZUsize!(1024), START_POS)
1566                    .await
1567                    .expect("failed to replay journal");
1568                let mut items = Vec::new();
1569                pin_mut!(stream);
1570                while let Some(result) = stream.next().await {
1571                    match result {
1572                        Ok((pos, item)) => {
1573                            assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1574                            assert_eq!(
1575                                test_digest(pos),
1576                                item,
1577                                "Item at position {pos} did not match expected digest"
1578                            );
1579                            items.push(pos);
1580                        }
1581                        Err(err) => panic!("Failed to read item: {err}"),
1582                    }
1583                }
1584
1585                // Make sure all items were replayed
1586                assert_eq!(
1587                    items.len(),
1588                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1589                        - START_POS as usize
1590                );
1591                items.sort();
1592                for (i, pos) in items.iter().enumerate() {
1593                    assert_eq!(i as u64, *pos - START_POS);
1594                }
1595            }
1596
1597            journal.destroy().await.unwrap();
1598        });
1599    }
1600
1601    #[test_traced]
1602    fn test_fixed_journal_recover_from_partial_write() {
1603        // Initialize the deterministic context
1604        let executor = deterministic::Runner::default();
1605
1606        // Start the test within the executor
1607        executor.start(|context| async move {
1608            // Initialize the journal, allowing a max of 3 items per blob.
1609            let cfg = test_cfg(&context, NZU64!(3));
1610            let journal = Journal::init(context.with_label("first"), cfg.clone())
1611                .await
1612                .expect("failed to initialize journal");
1613            for i in 0..5 {
1614                journal
1615                    .append(&test_digest(i))
1616                    .await
1617                    .expect("failed to append data");
1618            }
1619            assert_eq!(journal.size().await, 5);
1620            journal.sync().await.expect("Failed to sync journal");
1621            drop(journal);
1622
1623            // Manually truncate most recent blob to simulate a partial write.
1624            let (blob, size) = context
1625                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1626                .await
1627                .expect("Failed to open blob");
1628            // truncate the most recent blob by 1 byte which corrupts the most recent item
1629            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1630            blob.sync().await.expect("Failed to sync blob");
1631
1632            // Re-initialize the journal to simulate a restart
1633            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1634                .await
1635                .expect("Failed to re-initialize journal");
1636            // The truncation invalidates the last page, which is removed. This loses one item.
1637            assert_eq!(journal.pruning_boundary().await, 0);
1638            assert_eq!(journal.size().await, 4);
1639            drop(journal);
1640
1641            // Delete the second blob and re-init
1642            context
1643                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1644                .await
1645                .expect("Failed to remove blob");
1646
1647            let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1648                .await
1649                .expect("Failed to re-initialize journal");
1650            // Only the first blob remains
1651            assert_eq!(journal.size().await, 3);
1652
1653            journal.destroy().await.unwrap();
1654        });
1655    }
1656
1657    #[test_traced]
1658    fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1659        let executor = deterministic::Runner::default();
1660        executor.start(|context| async move {
1661            let cfg = test_cfg(&context, NZU64!(5));
1662            let journal =
1663                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1664                    .await
1665                    .expect("failed to initialize journal at size");
1666
1667            // Append items so section 1 has exactly the expected minimum (3 items).
1668            for i in 0..8u64 {
1669                journal
1670                    .append(&test_digest(100 + i))
1671                    .await
1672                    .expect("failed to append data");
1673            }
1674            journal.sync().await.expect("failed to sync journal");
1675            assert_eq!(journal.pruning_boundary().await, 7);
1676            assert_eq!(journal.size().await, 15);
1677            drop(journal);
1678
1679            // Corrupt the oldest section by truncating one byte (drops one item on recovery).
1680            let (blob, size) = context
1681                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1682                .await
1683                .expect("failed to open oldest blob");
1684            blob.resize(size - 1).await.expect("failed to corrupt blob");
1685            blob.sync().await.expect("failed to sync blob");
1686
1687            let result =
1688                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1689            assert!(matches!(result, Err(Error::Corruption(_))));
1690        });
1691    }
1692
1693    #[test_traced]
1694    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1695        let executor = deterministic::Runner::default();
1696        executor.start(|context| async move {
1697            // Initialize the journal, allowing a max of 10 items per blob.
1698            let cfg = test_cfg(&context, NZU64!(10));
1699            let journal = Journal::init(context.with_label("first"), cfg.clone())
1700                .await
1701                .expect("failed to initialize journal");
1702            // Add only a single item
1703            journal
1704                .append(&test_digest(0))
1705                .await
1706                .expect("failed to append data");
1707            assert_eq!(journal.size().await, 1);
1708            journal.sync().await.expect("Failed to sync journal");
1709            drop(journal);
1710
1711            // Manually truncate most recent blob to simulate a partial write.
1712            let (blob, size) = context
1713                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1714                .await
1715                .expect("Failed to open blob");
1716            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1717            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1718            blob.sync().await.expect("Failed to sync blob");
1719
1720            // Re-initialize the journal to simulate a restart
1721            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1722                .await
1723                .expect("Failed to re-initialize journal");
1724
1725            // Since there was only a single item appended which we then corrupted, recovery should
1726            // leave us in the state of an empty journal.
1727            let bounds = journal.bounds().await;
1728            assert_eq!(bounds.end, 0);
1729            assert!(bounds.is_empty());
1730            // Make sure journal still works for appending.
1731            journal
1732                .append(&test_digest(0))
1733                .await
1734                .expect("failed to append data");
1735            assert_eq!(journal.size().await, 1);
1736
1737            journal.destroy().await.unwrap();
1738        });
1739    }
1740
1741    #[test_traced("DEBUG")]
1742    fn test_fixed_journal_recover_from_unwritten_data() {
1743        let executor = deterministic::Runner::default();
1744        executor.start(|context| async move {
1745            // Initialize the journal, allowing a max of 10 items per blob.
1746            let cfg = test_cfg(&context, NZU64!(10));
1747            let journal = Journal::init(context.with_label("first"), cfg.clone())
1748                .await
1749                .expect("failed to initialize journal");
1750
1751            // Add only a single item
1752            journal
1753                .append(&test_digest(0))
1754                .await
1755                .expect("failed to append data");
1756            assert_eq!(journal.size().await, 1);
1757            journal.sync().await.expect("Failed to sync journal");
1758            drop(journal);
1759
1760            // Manually extend the blob to simulate a failure where the file was extended, but no
1761            // bytes were written due to failure.
1762            let (blob, size) = context
1763                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1764                .await
1765                .expect("Failed to open blob");
1766            blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1767                .await
1768                .expect("Failed to extend blob");
1769            blob.sync().await.expect("Failed to sync blob");
1770
1771            // Re-initialize the journal to simulate a restart
1772            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1773                .await
1774                .expect("Failed to re-initialize journal");
1775
1776            // The zero-filled pages are detected as invalid (bad checksum) and truncated.
1777            // No items should be lost since we called sync before the corruption.
1778            assert_eq!(journal.size().await, 1);
1779
1780            // Make sure journal still works for appending.
1781            journal
1782                .append(&test_digest(1))
1783                .await
1784                .expect("failed to append data");
1785
1786            journal.destroy().await.unwrap();
1787        });
1788    }
1789
1790    #[test_traced]
1791    fn test_fixed_journal_rewinding() {
1792        let executor = deterministic::Runner::default();
1793        executor.start(|context| async move {
1794            // Initialize the journal, allowing a max of 2 items per blob.
1795            let cfg = test_cfg(&context, NZU64!(2));
1796            let journal = Journal::init(context.with_label("first"), cfg.clone())
1797                .await
1798                .expect("failed to initialize journal");
1799            assert!(matches!(journal.rewind(0).await, Ok(())));
1800            assert!(matches!(
1801                journal.rewind(1).await,
1802                Err(Error::InvalidRewind(1))
1803            ));
1804
1805            // Append an item to the journal
1806            journal
1807                .append(&test_digest(0))
1808                .await
1809                .expect("failed to append data 0");
1810            assert_eq!(journal.size().await, 1);
1811            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1812            assert!(matches!(journal.rewind(0).await, Ok(())));
1813            assert_eq!(journal.size().await, 0);
1814
1815            // append 7 items
1816            for i in 0..7 {
1817                let pos = journal
1818                    .append(&test_digest(i))
1819                    .await
1820                    .expect("failed to append data");
1821                assert_eq!(pos, i);
1822            }
1823            assert_eq!(journal.size().await, 7);
1824
1825            // rewind back to item #4, which should prune 2 blobs
1826            assert!(matches!(journal.rewind(4).await, Ok(())));
1827            assert_eq!(journal.size().await, 4);
1828
1829            // rewind back to empty and ensure all blobs are rewound over
1830            assert!(matches!(journal.rewind(0).await, Ok(())));
1831            assert_eq!(journal.size().await, 0);
1832
1833            // stress test: add 100 items, rewind 49, repeat x10.
1834            for _ in 0..10 {
1835                for i in 0..100 {
1836                    journal
1837                        .append(&test_digest(i))
1838                        .await
1839                        .expect("failed to append data");
1840                }
1841                journal.rewind(journal.size().await - 49).await.unwrap();
1842            }
1843            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1844            assert_eq!(journal.size().await, ITEMS_REMAINING);
1845
1846            journal.sync().await.expect("Failed to sync journal");
1847            drop(journal);
1848
1849            // Repeat with a different blob size (3 items per blob)
1850            let mut cfg = test_cfg(&context, NZU64!(3));
1851            cfg.partition = "test-partition-2".into();
1852            let journal = Journal::init(context.with_label("second"), cfg.clone())
1853                .await
1854                .expect("failed to initialize journal");
1855            for _ in 0..10 {
1856                for i in 0..100 {
1857                    journal
1858                        .append(&test_digest(i))
1859                        .await
1860                        .expect("failed to append data");
1861                }
1862                journal.rewind(journal.size().await - 49).await.unwrap();
1863            }
1864            assert_eq!(journal.size().await, ITEMS_REMAINING);
1865
1866            journal.sync().await.expect("Failed to sync journal");
1867            drop(journal);
1868
1869            // Make sure re-opened journal is as expected
1870            let journal: Journal<_, Digest> =
1871                Journal::init(context.with_label("third"), cfg.clone())
1872                    .await
1873                    .expect("failed to re-initialize journal");
1874            assert_eq!(journal.size().await, 10 * (100 - 49));
1875
1876            // Make sure rewinding works after pruning
1877            journal.prune(300).await.expect("pruning failed");
1878            assert_eq!(journal.size().await, ITEMS_REMAINING);
1879            // Rewinding prior to our prune point should fail.
1880            assert!(matches!(
1881                journal.rewind(299).await,
1882                Err(Error::InvalidRewind(299))
1883            ));
1884            // Rewinding to the prune point should work.
1885            // always remain in the journal.
1886            assert!(matches!(journal.rewind(300).await, Ok(())));
1887            let bounds = journal.bounds().await;
1888            assert_eq!(bounds.end, 300);
1889            assert!(bounds.is_empty());
1890
1891            journal.destroy().await.unwrap();
1892        });
1893    }
1894
1895    /// Test recovery when blob is truncated to a page boundary with item size not dividing page size.
1896    ///
1897    /// This tests the scenario where:
1898    /// 1. Items (32 bytes) don't divide evenly into page size (44 bytes)
1899    /// 2. Data spans multiple pages
1900    /// 3. Blob is truncated to a page boundary (simulating crash before last page was written)
1901    /// 4. Journal should recover correctly on reopen
1902    #[test_traced]
1903    fn test_fixed_journal_recover_from_page_boundary_truncation() {
1904        let executor = deterministic::Runner::default();
1905        executor.start(|context: Context| async move {
1906            // Use a small items_per_blob to keep the test focused on a single blob
1907            let cfg = test_cfg(&context, NZU64!(100));
1908            let journal = Journal::init(context.with_label("first"), cfg.clone())
1909                .await
1910                .expect("failed to initialize journal");
1911
1912            // Item size is 32 bytes (Digest), page size is 44 bytes.
1913            // 32 doesn't divide 44, so items will cross page boundaries.
1914            // Physical page size = 44 + 12 (CRC) = 56 bytes.
1915            //
1916            // Write enough items to span multiple pages:
1917            // - 10 items = 320 logical bytes
1918            // - This spans ceil(320/44) = 8 logical pages
1919            for i in 0u64..10 {
1920                journal
1921                    .append(&test_digest(i))
1922                    .await
1923                    .expect("failed to append data");
1924            }
1925            assert_eq!(journal.size().await, 10);
1926            journal.sync().await.expect("Failed to sync journal");
1927            drop(journal);
1928
1929            // Open the blob directly and truncate to a page boundary.
1930            // Physical page size = PAGE_SIZE + CHECKSUM_SIZE = 44 + 12 = 56
1931            let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1932            let (blob, size) = context
1933                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1934                .await
1935                .expect("Failed to open blob");
1936
1937            // Calculate how many full physical pages we have and truncate to lose the last one.
1938            let full_pages = size / physical_page_size;
1939            assert!(full_pages >= 2, "need at least 2 pages for this test");
1940            let truncate_to = (full_pages - 1) * physical_page_size;
1941
1942            blob.resize(truncate_to)
1943                .await
1944                .expect("Failed to truncate blob");
1945            blob.sync().await.expect("Failed to sync blob");
1946
1947            // Re-initialize the journal - it should recover by truncating to valid data
1948            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1949                .await
1950                .expect("Failed to re-initialize journal after page truncation");
1951
1952            // The journal should have fewer items now (those that fit in the remaining pages).
1953            // With logical page size 44 and item size 32:
1954            // - After truncating to (full_pages-1) physical pages, we have (full_pages-1)*44 logical bytes
1955            // - Number of complete items = floor(logical_bytes / 32)
1956            let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1957            let expected_items = remaining_logical_bytes / 32; // 32 = Digest::SIZE
1958            assert_eq!(
1959                journal.size().await,
1960                expected_items,
1961                "Journal should recover to {} items after truncation",
1962                expected_items
1963            );
1964
1965            // Verify we can still read the remaining items
1966            for i in 0..expected_items {
1967                let item = journal
1968                    .read(i)
1969                    .await
1970                    .expect("failed to read recovered item");
1971                assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1972            }
1973
1974            journal.destroy().await.expect("Failed to destroy journal");
1975        });
1976    }
1977
1978    /// Test the contiguous fixed journal with items_per_blob: 1.
1979    ///
1980    /// This is an edge case where each item creates its own blob, and the
1981    /// tail blob is always empty after sync (because the item fills the blob
1982    /// and a new empty one is created).
1983    #[test_traced]
1984    fn test_single_item_per_blob() {
1985        let executor = deterministic::Runner::default();
1986        executor.start(|context| async move {
1987            let cfg = Config {
1988                partition: "single-item-per-blob".into(),
1989                items_per_blob: NZU64!(1),
1990                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1991                write_buffer: NZUsize!(2048),
1992            };
1993
1994            // === Test 1: Basic single item operation ===
1995            let journal = Journal::init(context.with_label("first"), cfg.clone())
1996                .await
1997                .expect("failed to initialize journal");
1998
1999            // Verify empty state
2000            let bounds = journal.bounds().await;
2001            assert_eq!(bounds.end, 0);
2002            assert!(bounds.is_empty());
2003
2004            // Append 1 item
2005            let pos = journal
2006                .append(&test_digest(0))
2007                .await
2008                .expect("failed to append");
2009            assert_eq!(pos, 0);
2010            assert_eq!(journal.size().await, 1);
2011
2012            // Sync
2013            journal.sync().await.expect("failed to sync");
2014
2015            // Read from size() - 1
2016            let value = journal
2017                .read(journal.size().await - 1)
2018                .await
2019                .expect("failed to read");
2020            assert_eq!(value, test_digest(0));
2021
2022            // === Test 2: Multiple items with single item per blob ===
2023            for i in 1..10u64 {
2024                let pos = journal
2025                    .append(&test_digest(i))
2026                    .await
2027                    .expect("failed to append");
2028                assert_eq!(pos, i);
2029                assert_eq!(journal.size().await, i + 1);
2030
2031                // Verify we can read the just-appended item at size() - 1
2032                let value = journal
2033                    .read(journal.size().await - 1)
2034                    .await
2035                    .expect("failed to read");
2036                assert_eq!(value, test_digest(i));
2037            }
2038
2039            // Verify all items can be read
2040            for i in 0..10u64 {
2041                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2042            }
2043
2044            journal.sync().await.expect("failed to sync");
2045
2046            // === Test 3: Pruning with single item per blob ===
2047            // Prune to position 5 (removes positions 0-4)
2048            journal.prune(5).await.expect("failed to prune");
2049
2050            // Size should still be 10
2051            assert_eq!(journal.size().await, 10);
2052
2053            // bounds.start should be 5
2054            assert_eq!(journal.bounds().await.start, 5);
2055
2056            // Reading from size() - 1 (position 9) should still work
2057            let value = journal
2058                .read(journal.size().await - 1)
2059                .await
2060                .expect("failed to read");
2061            assert_eq!(value, test_digest(9));
2062
2063            // Reading from pruned positions should return ItemPruned
2064            for i in 0..5 {
2065                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2066            }
2067
2068            // Reading from retained positions should work
2069            for i in 5..10u64 {
2070                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2071            }
2072
2073            // Append more items after pruning
2074            for i in 10..15u64 {
2075                let pos = journal
2076                    .append(&test_digest(i))
2077                    .await
2078                    .expect("failed to append");
2079                assert_eq!(pos, i);
2080
2081                // Verify we can read from size() - 1
2082                let value = journal
2083                    .read(journal.size().await - 1)
2084                    .await
2085                    .expect("failed to read");
2086                assert_eq!(value, test_digest(i));
2087            }
2088
2089            journal.sync().await.expect("failed to sync");
2090            drop(journal);
2091
2092            // === Test 4: Restart persistence with single item per blob ===
2093            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2094                .await
2095                .expect("failed to re-initialize journal");
2096
2097            // Verify size is preserved
2098            assert_eq!(journal.size().await, 15);
2099
2100            // Verify bounds.start is preserved
2101            assert_eq!(journal.bounds().await.start, 5);
2102
2103            // Reading from size() - 1 should work after restart
2104            let value = journal
2105                .read(journal.size().await - 1)
2106                .await
2107                .expect("failed to read");
2108            assert_eq!(value, test_digest(14));
2109
2110            // Reading all retained positions should work
2111            for i in 5..15u64 {
2112                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2113            }
2114
2115            journal.destroy().await.expect("failed to destroy journal");
2116
2117            // === Test 5: Restart after pruning with non-zero index ===
2118            // Fresh journal for this test
2119            let journal = Journal::init(context.with_label("third"), cfg.clone())
2120                .await
2121                .expect("failed to initialize journal");
2122
2123            // Append 10 items (positions 0-9)
2124            for i in 0..10u64 {
2125                journal.append(&test_digest(i + 100)).await.unwrap();
2126            }
2127
2128            // Prune to position 5 (removes positions 0-4)
2129            journal.prune(5).await.unwrap();
2130            let bounds = journal.bounds().await;
2131            assert_eq!(bounds.end, 10);
2132            assert_eq!(bounds.start, 5);
2133
2134            // Sync and restart
2135            journal.sync().await.unwrap();
2136            drop(journal);
2137
2138            // Re-open journal
2139            let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
2140                .await
2141                .expect("failed to re-initialize journal");
2142
2143            // Verify state after restart
2144            let bounds = journal.bounds().await;
2145            assert_eq!(bounds.end, 10);
2146            assert_eq!(bounds.start, 5);
2147
2148            // Reading from size() - 1 (position 9) should work
2149            let value = journal.read(journal.size().await - 1).await.unwrap();
2150            assert_eq!(value, test_digest(109));
2151
2152            // Verify all retained positions (5-9) work
2153            for i in 5..10u64 {
2154                assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2155            }
2156
2157            journal.destroy().await.expect("failed to destroy journal");
2158
2159            // === Test 6: Prune all items (edge case) ===
2160            let journal = Journal::init(context.clone(), cfg.clone())
2161                .await
2162                .expect("failed to initialize journal");
2163
2164            for i in 0..5u64 {
2165                journal.append(&test_digest(i + 200)).await.unwrap();
2166            }
2167            journal.sync().await.unwrap();
2168
2169            // Prune all items
2170            journal.prune(5).await.unwrap();
2171            let bounds = journal.bounds().await;
2172            assert_eq!(bounds.end, 5); // Size unchanged
2173            assert!(bounds.is_empty()); // All pruned
2174
2175            // size() - 1 = 4, but position 4 is pruned
2176            let result = journal.read(journal.size().await - 1).await;
2177            assert!(matches!(result, Err(Error::ItemPruned(4))));
2178
2179            // After appending, reading works again
2180            journal.append(&test_digest(205)).await.unwrap();
2181            assert_eq!(journal.bounds().await.start, 5);
2182            assert_eq!(
2183                journal.read(journal.size().await - 1).await.unwrap(),
2184                test_digest(205)
2185            );
2186
2187            journal.destroy().await.expect("failed to destroy journal");
2188        });
2189    }
2190
2191    #[test_traced]
2192    fn test_fixed_journal_init_at_size_zero() {
2193        let executor = deterministic::Runner::default();
2194        executor.start(|context| async move {
2195            let cfg = test_cfg(&context, NZU64!(5));
2196            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2197                .await
2198                .unwrap();
2199
2200            let bounds = journal.bounds().await;
2201            assert_eq!(bounds.end, 0);
2202            assert!(bounds.is_empty());
2203
2204            // Next append should get position 0
2205            let pos = journal.append(&test_digest(100)).await.unwrap();
2206            assert_eq!(pos, 0);
2207            assert_eq!(journal.size().await, 1);
2208            assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2209
2210            journal.destroy().await.unwrap();
2211        });
2212    }
2213
2214    #[test_traced]
2215    fn test_fixed_journal_init_at_size_section_boundary() {
2216        let executor = deterministic::Runner::default();
2217        executor.start(|context| async move {
2218            let cfg = test_cfg(&context, NZU64!(5));
2219
2220            // Initialize at position 10 (exactly at section 2 boundary with items_per_blob=5)
2221            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2222                .await
2223                .unwrap();
2224
2225            let bounds = journal.bounds().await;
2226            assert_eq!(bounds.end, 10);
2227            assert!(bounds.is_empty());
2228
2229            // Next append should get position 10
2230            let pos = journal.append(&test_digest(1000)).await.unwrap();
2231            assert_eq!(pos, 10);
2232            assert_eq!(journal.size().await, 11);
2233            assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2234
2235            // Can continue appending
2236            let pos = journal.append(&test_digest(1001)).await.unwrap();
2237            assert_eq!(pos, 11);
2238            assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2239
2240            journal.destroy().await.unwrap();
2241        });
2242    }
2243
2244    #[test_traced]
2245    fn test_fixed_journal_init_at_size_mid_section() {
2246        let executor = deterministic::Runner::default();
2247        executor.start(|context| async move {
2248            let cfg = test_cfg(&context, NZU64!(5));
2249
2250            // Initialize at position 7 (middle of section 1 with items_per_blob=5)
2251            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2252                .await
2253                .unwrap();
2254
2255            let bounds = journal.bounds().await;
2256            assert_eq!(bounds.end, 7);
2257            // No data exists yet after init_at_size
2258            assert!(bounds.is_empty());
2259
2260            // Reading before bounds.start should return ItemPruned
2261            assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2262            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2263
2264            // Next append should get position 7
2265            let pos = journal.append(&test_digest(700)).await.unwrap();
2266            assert_eq!(pos, 7);
2267            assert_eq!(journal.size().await, 8);
2268            assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2269            // Now bounds.start should be 7 (first data position)
2270            assert_eq!(journal.bounds().await.start, 7);
2271
2272            journal.destroy().await.unwrap();
2273        });
2274    }
2275
2276    #[test_traced]
2277    fn test_fixed_journal_init_at_size_persistence() {
2278        let executor = deterministic::Runner::default();
2279        executor.start(|context| async move {
2280            let cfg = test_cfg(&context, NZU64!(5));
2281
2282            // Initialize at position 15
2283            let journal =
2284                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2285                    .await
2286                    .unwrap();
2287
2288            // Append some items
2289            for i in 0..5u64 {
2290                let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2291                assert_eq!(pos, 15 + i);
2292            }
2293
2294            assert_eq!(journal.size().await, 20);
2295
2296            // Sync and reopen
2297            journal.sync().await.unwrap();
2298            drop(journal);
2299
2300            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2301                .await
2302                .unwrap();
2303
2304            // Size and data should be preserved
2305            let bounds = journal.bounds().await;
2306            assert_eq!(bounds.end, 20);
2307            assert_eq!(bounds.start, 15);
2308
2309            // Verify data
2310            for i in 0..5u64 {
2311                assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2312            }
2313
2314            // Can continue appending
2315            let pos = journal.append(&test_digest(9999)).await.unwrap();
2316            assert_eq!(pos, 20);
2317            assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2318
2319            journal.destroy().await.unwrap();
2320        });
2321    }
2322
2323    #[test_traced]
2324    fn test_fixed_journal_init_at_size_persistence_without_data() {
2325        let executor = deterministic::Runner::default();
2326        executor.start(|context| async move {
2327            let cfg = test_cfg(&context, NZU64!(5));
2328
2329            // Initialize at position 15
2330            let journal =
2331                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2332                    .await
2333                    .unwrap();
2334
2335            let bounds = journal.bounds().await;
2336            assert_eq!(bounds.end, 15);
2337            assert!(bounds.is_empty());
2338
2339            // Drop without writing any data
2340            drop(journal);
2341
2342            // Reopen and verify size persisted
2343            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2344                .await
2345                .unwrap();
2346
2347            let bounds = journal.bounds().await;
2348            assert_eq!(bounds.end, 15);
2349            assert!(bounds.is_empty());
2350
2351            // Can append starting at position 15
2352            let pos = journal.append(&test_digest(1500)).await.unwrap();
2353            assert_eq!(pos, 15);
2354            assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2355
2356            journal.destroy().await.unwrap();
2357        });
2358    }
2359
2360    #[test_traced]
2361    fn test_fixed_journal_init_at_size_large_offset() {
2362        let executor = deterministic::Runner::default();
2363        executor.start(|context| async move {
2364            let cfg = test_cfg(&context, NZU64!(5));
2365
2366            // Initialize at a large position (position 1000)
2367            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2368                .await
2369                .unwrap();
2370
2371            let bounds = journal.bounds().await;
2372            assert_eq!(bounds.end, 1000);
2373            assert!(bounds.is_empty());
2374
2375            // Next append should get position 1000
2376            let pos = journal.append(&test_digest(100000)).await.unwrap();
2377            assert_eq!(pos, 1000);
2378            assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2379
2380            journal.destroy().await.unwrap();
2381        });
2382    }
2383
2384    #[test_traced]
2385    fn test_fixed_journal_init_at_size_prune_and_append() {
2386        let executor = deterministic::Runner::default();
2387        executor.start(|context| async move {
2388            let cfg = test_cfg(&context, NZU64!(5));
2389
2390            // Initialize at position 20
2391            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2392                .await
2393                .unwrap();
2394
2395            // Append items 20-29
2396            for i in 0..10u64 {
2397                journal.append(&test_digest(2000 + i)).await.unwrap();
2398            }
2399
2400            assert_eq!(journal.size().await, 30);
2401
2402            // Prune to position 25
2403            journal.prune(25).await.unwrap();
2404
2405            let bounds = journal.bounds().await;
2406            assert_eq!(bounds.end, 30);
2407            assert_eq!(bounds.start, 25);
2408
2409            // Verify remaining items are readable
2410            for i in 25..30u64 {
2411                assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2412            }
2413
2414            // Continue appending
2415            let pos = journal.append(&test_digest(3000)).await.unwrap();
2416            assert_eq!(pos, 30);
2417
2418            journal.destroy().await.unwrap();
2419        });
2420    }
2421
2422    #[test_traced]
2423    fn test_fixed_journal_clear_to_size() {
2424        let executor = deterministic::Runner::default();
2425        executor.start(|context| async move {
2426            let cfg = test_cfg(&context, NZU64!(10));
2427            let journal = Journal::init(context.with_label("journal"), cfg.clone())
2428                .await
2429                .expect("failed to initialize journal");
2430
2431            // Append 25 items (positions 0-24, spanning 3 blobs)
2432            for i in 0..25u64 {
2433                journal.append(&test_digest(i)).await.unwrap();
2434            }
2435            assert_eq!(journal.size().await, 25);
2436            journal.sync().await.unwrap();
2437
2438            // Clear to position 100, effectively resetting the journal
2439            journal.clear_to_size(100).await.unwrap();
2440            assert_eq!(journal.size().await, 100);
2441
2442            // Old positions should fail
2443            for i in 0..25 {
2444                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2445            }
2446
2447            // Verify size persists after restart without writing any data
2448            drop(journal);
2449            let journal =
2450                Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2451                    .await
2452                    .expect("failed to re-initialize journal after clear");
2453            assert_eq!(journal.size().await, 100);
2454
2455            // Append new data starting at position 100
2456            for i in 100..105u64 {
2457                let pos = journal.append(&test_digest(i)).await.unwrap();
2458                assert_eq!(pos, i);
2459            }
2460            assert_eq!(journal.size().await, 105);
2461
2462            // New positions should be readable
2463            for i in 100..105u64 {
2464                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2465            }
2466
2467            // Sync and re-init to verify persistence
2468            journal.sync().await.unwrap();
2469            drop(journal);
2470
2471            let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2472                .await
2473                .expect("failed to re-initialize journal");
2474
2475            assert_eq!(journal.size().await, 105);
2476            for i in 100..105u64 {
2477                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2478            }
2479
2480            journal.destroy().await.unwrap();
2481        });
2482    }
2483
2484    #[test_traced]
2485    fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2486        // Old meta = None (aligned), new boundary = aligned.
2487        let executor = deterministic::Runner::default();
2488        executor.start(|context| async move {
2489            let cfg = test_cfg(&context, NZU64!(5));
2490            let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2491                .await
2492                .unwrap();
2493
2494            for i in 0..5u64 {
2495                journal.append(&test_digest(i)).await.unwrap();
2496            }
2497            let inner = journal.inner.read().await;
2498            let tail_section = inner.size / journal.items_per_blob;
2499            inner.journal.sync(tail_section).await.unwrap();
2500            drop(inner);
2501            drop(journal);
2502
2503            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2504                .await
2505                .unwrap();
2506            let bounds = journal.bounds().await;
2507            assert_eq!(bounds.start, 0);
2508            assert_eq!(bounds.end, 5);
2509            journal.destroy().await.unwrap();
2510        });
2511    }
2512
2513    #[test_traced]
2514    fn test_fixed_journal_oldest_section_invalid_len() {
2515        // Old meta = None (aligned), new boundary = mid-section.
2516        let executor = deterministic::Runner::default();
2517        executor.start(|context| async move {
2518            let cfg = test_cfg(&context, NZU64!(5));
2519            let journal =
2520                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2521                    .await
2522                    .unwrap();
2523            for i in 0..3u64 {
2524                journal.append(&test_digest(i)).await.unwrap();
2525            }
2526            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2527            journal.sync().await.unwrap();
2528
2529            // Simulate metadata deletion (corruption).
2530            let mut inner = journal.inner.write().await;
2531            inner.metadata.clear();
2532            inner.metadata.sync().await.unwrap();
2533            drop(inner);
2534            drop(journal);
2535
2536            // Section 1 has items 7,8,9 but metadata is missing, so falls back to blob-based boundary.
2537            // Section 1 has 3 items, but recovery thinks it should have 5 because metadata deletion
2538            // causes us to forget that section 1 starts at logical position 7.
2539            let result =
2540                Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2541            assert!(matches!(result, Err(Error::Corruption(_))));
2542            context.remove(&blob_partition(&cfg), None).await.unwrap();
2543            context
2544                .remove(&format!("{}-metadata", cfg.partition), None)
2545                .await
2546                .unwrap();
2547        });
2548    }
2549
2550    #[test_traced]
2551    fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2552        // Old meta = Some(mid), new boundary = mid-section (same value).
2553        let executor = deterministic::Runner::default();
2554        executor.start(|context| async move {
2555            let cfg = test_cfg(&context, NZU64!(5));
2556            let journal =
2557                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2558                    .await
2559                    .unwrap();
2560            for i in 0..3u64 {
2561                journal.append(&test_digest(i)).await.unwrap();
2562            }
2563            let inner = journal.inner.read().await;
2564            let tail_section = inner.size / journal.items_per_blob;
2565            inner.journal.sync(tail_section).await.unwrap();
2566            drop(inner);
2567            drop(journal);
2568
2569            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2570                .await
2571                .unwrap();
2572            let bounds = journal.bounds().await;
2573            assert_eq!(bounds.start, 7);
2574            assert_eq!(bounds.end, 10);
2575            journal.destroy().await.unwrap();
2576        });
2577    }
2578    #[test_traced]
2579    fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2580        // Old meta = Some(mid), new boundary = aligned.
2581        let executor = deterministic::Runner::default();
2582        executor.start(|context| async move {
2583            let cfg = test_cfg(&context, NZU64!(5));
2584            let journal =
2585                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2586                    .await
2587                    .unwrap();
2588            for i in 0..10u64 {
2589                journal.append(&test_digest(i)).await.unwrap();
2590            }
2591            assert_eq!(journal.size().await, 17);
2592            journal.prune(10).await.unwrap();
2593
2594            let inner = journal.inner.read().await;
2595            let tail_section = inner.size / journal.items_per_blob;
2596            inner.journal.sync(tail_section).await.unwrap();
2597            drop(inner);
2598            drop(journal);
2599
2600            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2601                .await
2602                .unwrap();
2603            let bounds = journal.bounds().await;
2604            assert_eq!(bounds.start, 10);
2605            assert_eq!(bounds.end, 17);
2606            journal.destroy().await.unwrap();
2607        });
2608    }
2609
2610    #[test_traced]
2611    fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2612        // Pruning to a position earlier than pruning_boundary (within the same section)
2613        // should not move the boundary backwards.
2614        let executor = deterministic::Runner::default();
2615        executor.start(|context| async move {
2616            let cfg = test_cfg(&context, NZU64!(5));
2617            // init_at_size(7) sets pruning_boundary = 7 (mid-section in section 1)
2618            let journal =
2619                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2620                    .await
2621                    .unwrap();
2622            // Append 5 items at positions 7-11, filling section 1 and part of section 2
2623            for i in 0..5u64 {
2624                journal.append(&test_digest(i)).await.unwrap();
2625            }
2626            // Prune to position 5 (section 1 start) should NOT move boundary back from 7 to 5
2627            journal.prune(5).await.unwrap();
2628            assert_eq!(journal.bounds().await.start, 7);
2629            journal.destroy().await.unwrap();
2630        });
2631    }
2632
2633    #[test_traced]
2634    fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2635        // Test replay when first section begins mid-section: init_at_size creates a journal
2636        // where pruning_boundary is mid-section, then we append across multiple sections.
2637        let executor = deterministic::Runner::default();
2638        executor.start(|context| async move {
2639            let cfg = test_cfg(&context, NZU64!(5));
2640
2641            // Initialize at position 7 (mid-section with items_per_blob=5)
2642            // Section 1 (positions 5-9) begins mid-section: only positions 7, 8, 9 have data
2643            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2644                .await
2645                .unwrap();
2646
2647            // Append 13 items (positions 7-19), spanning sections 1, 2, 3
2648            for i in 0..13u64 {
2649                let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2650                assert_eq!(pos, 7 + i);
2651            }
2652            assert_eq!(journal.size().await, 20);
2653            journal.sync().await.unwrap();
2654
2655            // Replay from pruning_boundary
2656            {
2657                let reader = journal.reader().await;
2658                let stream = reader
2659                    .replay(NZUsize!(1024), 7)
2660                    .await
2661                    .expect("failed to replay");
2662                pin_mut!(stream);
2663                let mut items: Vec<(u64, Digest)> = Vec::new();
2664                while let Some(result) = stream.next().await {
2665                    items.push(result.expect("replay item failed"));
2666                }
2667
2668                // Should get all 13 items with correct logical positions
2669                assert_eq!(items.len(), 13);
2670                for (i, (pos, item)) in items.iter().enumerate() {
2671                    assert_eq!(*pos, 7 + i as u64);
2672                    assert_eq!(*item, test_digest(100 + i as u64));
2673                }
2674            }
2675
2676            // Replay from mid-stream (position 12)
2677            {
2678                let reader = journal.reader().await;
2679                let stream = reader
2680                    .replay(NZUsize!(1024), 12)
2681                    .await
2682                    .expect("failed to replay from mid-stream");
2683                pin_mut!(stream);
2684                let mut items: Vec<(u64, Digest)> = Vec::new();
2685                while let Some(result) = stream.next().await {
2686                    items.push(result.expect("replay item failed"));
2687                }
2688
2689                // Should get items from position 12 onwards
2690                assert_eq!(items.len(), 8);
2691                for (i, (pos, item)) in items.iter().enumerate() {
2692                    assert_eq!(*pos, 12 + i as u64);
2693                    assert_eq!(*item, test_digest(100 + 5 + i as u64));
2694                }
2695            }
2696
2697            journal.destroy().await.unwrap();
2698        });
2699    }
2700
2701    #[test_traced]
2702    fn test_fixed_journal_rewind_error_before_bounds_start() {
2703        // Test that rewind returns error when trying to rewind before bounds.start
2704        let executor = deterministic::Runner::default();
2705        executor.start(|context| async move {
2706            let cfg = test_cfg(&context, NZU64!(5));
2707
2708            let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2709                .await
2710                .unwrap();
2711
2712            // Append a few items (positions 10, 11, 12)
2713            for i in 0..3u64 {
2714                journal.append(&test_digest(i)).await.unwrap();
2715            }
2716            assert_eq!(journal.size().await, 13);
2717
2718            // Rewind to position 11 should work
2719            journal.rewind(11).await.unwrap();
2720            assert_eq!(journal.size().await, 11);
2721
2722            // Rewind to position 10 (pruning_boundary) should work
2723            journal.rewind(10).await.unwrap();
2724            assert_eq!(journal.size().await, 10);
2725
2726            // Rewind to before pruning_boundary should fail
2727            let result = journal.rewind(9).await;
2728            assert!(matches!(result, Err(Error::InvalidRewind(9))));
2729
2730            journal.destroy().await.unwrap();
2731        });
2732    }
2733
2734    #[test_traced]
2735    fn test_fixed_journal_init_at_size_crash_scenarios() {
2736        let executor = deterministic::Runner::default();
2737        executor.start(|context| async move {
2738            let cfg = test_cfg(&context, NZU64!(5));
2739
2740            // Setup: Create a journal with some data and mid-section metadata
2741            let journal =
2742                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2743                    .await
2744                    .unwrap();
2745            for i in 0..5u64 {
2746                journal.append(&test_digest(i)).await.unwrap();
2747            }
2748            journal.sync().await.unwrap();
2749            drop(journal);
2750
2751            // Crash Scenario 1: After clear(), before blob creation
2752            // Simulate by manually removing all blobs but leaving metadata
2753            let blob_part = blob_partition(&cfg);
2754            context.remove(&blob_part, None).await.unwrap();
2755
2756            // Recovery should see no blobs and return empty journal, ignoring metadata
2757            let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2758                .await
2759                .expect("init failed after clear crash");
2760            let bounds = journal.bounds().await;
2761            assert_eq!(bounds.end, 0);
2762            assert_eq!(bounds.start, 0);
2763            drop(journal);
2764
2765            // Restore metadata for next scenario (it might have been removed by init)
2766            let meta_cfg = MetadataConfig {
2767                partition: format!("{}-metadata", cfg.partition),
2768                codec_config: ((0..).into(), ()),
2769            };
2770            let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2771                context.with_label("restore_meta"),
2772                meta_cfg.clone(),
2773            )
2774            .await
2775            .unwrap();
2776            metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2777            metadata.sync().await.unwrap();
2778
2779            // Crash Scenario 2: After ensure_section_exists(), before metadata update
2780            // Target: init_at_size(12) -> should be section 2 (starts at 10)
2781            // State: Blob at section 2, Metadata says 7 (section 1)
2782            // Wait, old metadata (7) is BEHIND new blob (12/5 = 2).
2783            // recover_bounds treats "meta < blob" as stale -> uses blob.
2784
2785            // Let's try init_at_size(2) -> section 0.
2786            // Old metadata says 7 (section 1).
2787            // State: Blob at section 0, Metadata says 7 (section 1).
2788            // recover_bounds sees "meta (1) > blob (0)" -> metadata ahead -> uses blob.
2789
2790            // Simulate: Create blob at section 0 (tail for init_at_size(2))
2791            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2792            blob.sync().await.unwrap(); // Ensure it exists
2793
2794            // Recovery should warn "metadata ahead" and use blob state (0, 0)
2795            let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2796                .await
2797                .expect("init failed after create crash");
2798
2799            // Should recover to blob state (section 0 aligned)
2800            let bounds = journal.bounds().await;
2801            assert_eq!(bounds.start, 0);
2802            // Size is 0 because blob is empty
2803            assert_eq!(bounds.end, 0);
2804            journal.destroy().await.unwrap();
2805        });
2806    }
2807
2808    #[test_traced]
2809    fn test_fixed_journal_clear_to_size_crash_scenarios() {
2810        let executor = deterministic::Runner::default();
2811        executor.start(|context| async move {
2812            let cfg = test_cfg(&context, NZU64!(5));
2813
2814            // Setup: Init at 12 (Section 2, offset 2)
2815            // Metadata = 12
2816            let journal =
2817                Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2818                    .await
2819                    .unwrap();
2820            journal.sync().await.unwrap();
2821            drop(journal);
2822
2823            // Crash Scenario: clear_to_size(2) [Section 0]
2824            // We want to simulate crash after blob 0 created, but metadata still 12.
2825
2826            // manually clear blobs
2827            let blob_part = blob_partition(&cfg);
2828            context.remove(&blob_part, None).await.unwrap();
2829
2830            // manually create section 0
2831            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2832            blob.sync().await.unwrap();
2833
2834            // Metadata is still 12 (from setup)
2835            // Blob is Section 0
2836            // Metadata (12 -> sec 2) > Blob (sec 0) -> Ahead warning
2837
2838            let journal =
2839                Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2840                    .await
2841                    .expect("init failed after clear_to_size crash");
2842
2843            // Should fallback to blobs
2844            let bounds = journal.bounds().await;
2845            assert_eq!(bounds.start, 0);
2846            assert_eq!(bounds.end, 0);
2847            journal.destroy().await.unwrap();
2848        });
2849    }
2850}