Skip to main content

commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs. Each `Blob` contains a
12//! configurable maximum of `items_per_blob`, with page-level data integrity provided by a buffer
13//! pool.
14//!
15//! ```text
16//! +--------+----- --+--- -+----------+
17//! | item_0 | item_1 | ... | item_n-1 |
18//! +--------+-----------+--------+----0
19//!
20//! n = config.items_per_blob
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! Data fetched from disk is always checked for integrity before being returned. If the data is
27//! found to be invalid, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` and/or pruning old items.
33//!
34//! # Partition
35//!
36//! Blobs are stored in the legacy partition (`cfg.partition`) if it already contains data;
37//! otherwise they are stored in `{cfg.partition}-blobs`.
38//!
39//! Metadata is stored in `{cfg.partition}-metadata`.
40//!
41//! # Consistency
42//!
43//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
44//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
45//! calling `close`, all pending data is automatically synced and any open blobs are closed.
46//!
47//! # Pruning
48//!
49//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
50//! given point in history.
51//!
52//! # Replay
53//!
54//! The `replay` method supports fast reading of all unpruned items into memory.
55
56#[cfg(test)]
57use super::Reader as _;
58use crate::{
59    journal::{
60        contiguous::{metrics::FixedMetrics as Metrics, Many, Mutable},
61        segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62        Error,
63    },
64    metadata::{Config as MetadataConfig, Metadata},
65    Context, Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::buffer::paged::CacheRef;
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74/// Metadata key for storing the pruning boundary.
75const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77/// Configuration for `Journal` storage.
78#[derive(Clone)]
79pub struct Config {
80    /// Prefix for the journal partitions.
81    ///
82    /// Blobs are stored in `partition` (legacy) if it contains data, otherwise in
83    /// `{partition}-blobs`. Metadata is stored in `{partition}-metadata`.
84    pub partition: String,
85
86    /// The maximum number of journal items to store in each blob.
87    ///
88    /// Any unpruned historical blobs will contain exactly this number of items.
89    /// Only the newest blob may contain fewer items.
90    pub items_per_blob: NonZeroU64,
91
92    /// The page cache to use for caching data.
93    pub page_cache: CacheRef,
94
95    /// The size of the write buffer to use for each blob.
96    pub write_buffer: NonZeroUsize,
97}
98
99/// Inner state protected by a single RwLock.
100struct Inner<E: Context, A: CodecFixedShared> {
101    /// The underlying segmented journal.
102    journal: SegmentedJournal<E, A>,
103
104    /// Total number of items appended (not affected by pruning).
105    size: u64,
106
107    /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's
108    /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning
109    /// boundary. Otherwise, the metadata is empty.
110    ///
111    /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is
112    /// persisted to ensure that its pruning boundary is never after the inner journal's size.
113    // TODO(#2939): Remove metadata
114    metadata: Metadata<E, u64, Vec<u8>>,
115
116    /// The position before which all items have been pruned.
117    pruning_boundary: u64,
118}
119
120impl<E: Context, A: CodecFixedShared> Inner<E, A> {
121    /// Read the item at position `pos` in the journal.
122    ///
123    /// # Errors
124    ///
125    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
126    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
127    async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128        if pos >= self.size {
129            return Err(Error::ItemOutOfRange(pos));
130        }
131        if pos < self.pruning_boundary {
132            return Err(Error::ItemPruned(pos));
133        }
134
135        let section = pos / items_per_blob;
136        let section_start = section * items_per_blob;
137
138        // Calculate position within the blob.
139        // This accounts for sections that begin mid-section (pruning_boundary > section_start).
140        let first_in_section = self.pruning_boundary.max(section_start);
141        let pos_in_section = pos - first_in_section;
142
143        self.journal
144            .get(section, pos_in_section)
145            .await
146            .map_err(|e| {
147                // Since we check bounds above, any failure here is unexpected.
148                match e {
149                    Error::SectionOutOfRange(e)
150                    | Error::AlreadyPrunedToSection(e)
151                    | Error::ItemOutOfRange(e) => {
152                        Error::Corruption(format!("section/item should be found, but got: {e}"))
153                    }
154                    other => other,
155                }
156            })
157    }
158
159    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
160    fn try_read_sync(&self, pos: u64, items_per_blob: u64) -> Option<A> {
161        let mut buf = vec![0u8; SegmentedJournal::<E, A>::CHUNK_SIZE];
162        self.try_read_sync_into(pos, items_per_blob, &mut buf)
163    }
164
165    /// Read an item synchronously using caller-provided buffer.
166    fn try_read_sync_into(&self, pos: u64, items_per_blob: u64, buf: &mut [u8]) -> Option<A> {
167        if pos >= self.size || pos < self.pruning_boundary {
168            return None;
169        }
170        let section = pos / items_per_blob;
171        let section_start = section * items_per_blob;
172        let first_in_section = self.pruning_boundary.max(section_start);
173        let pos_in_section = pos - first_in_section;
174        self.journal.try_get_sync_into(section, pos_in_section, buf)
175    }
176}
177
178/// Implementation of `Journal` storage.
179///
180/// This is implemented as a wrapper around [SegmentedJournal] that provides position-based access
181/// where positions are automatically mapped to (section, position_in_section) pairs.
182///
183/// # Repair
184///
185/// Like
186/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
187/// and
188/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
189/// the first invalid data read will be considered the new end of the journal (and the
190/// underlying blob will be truncated to the last valid item). Repair is performed
191/// by the underlying [SegmentedJournal] during init.
192pub struct Journal<E: Context, A: CodecFixedShared> {
193    /// Inner state with segmented journal and size.
194    ///
195    /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent
196    /// race conditions while allowing concurrent reads during sync.
197    inner: UpgradableAsyncRwLock<Inner<E, A>>,
198
199    /// The maximum number of items per blob (section).
200    items_per_blob: u64,
201
202    /// Metrics for monitoring journal state and activity.
203    metrics: Metrics<E>,
204}
205
206/// A reader guard that holds a consistent snapshot of the journal's bounds.
207pub struct Reader<'a, E: Context, A: CodecFixedShared> {
208    guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
209    items_per_blob: u64,
210    metrics: &'a Metrics<E>,
211}
212
213impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
214    type Item = A;
215
216    fn bounds(&self) -> std::ops::Range<u64> {
217        self.guard.pruning_boundary..self.guard.size
218    }
219
220    async fn read(&self, pos: u64) -> Result<A, Error> {
221        let _timer = self.metrics.read_timer();
222        self.metrics.read_calls.inc();
223        let result = match self.guard.read(pos, self.items_per_blob).await {
224            Ok(item) => {
225                self.metrics.items_read.inc();
226                Ok(item)
227            }
228            Err(error) => Err(error),
229        };
230        result
231    }
232
233    async fn read_many(&self, positions: &[u64]) -> Result<Vec<A>, Error> {
234        if positions.is_empty() {
235            return Ok(Vec::new());
236        }
237        let _timer = self.metrics.read_many_timer();
238        self.metrics.read_many_calls.inc();
239        assert!(
240            positions.windows(2).all(|w| w[0] < w[1]),
241            "positions must be strictly increasing"
242        );
243        // Validate all positions.
244        for &pos in positions {
245            if pos >= self.guard.size {
246                return Err(Error::ItemOutOfRange(pos));
247            }
248            if pos < self.guard.pruning_boundary {
249                return Err(Error::ItemPruned(pos));
250            }
251        }
252
253        let items_per_blob = self.items_per_blob;
254        let pruning_boundary = self.guard.pruning_boundary;
255        let chunk_size = SegmentedJournal::<E, A>::CHUNK_SIZE;
256
257        // Phase 1: Drain page-cache hits synchronously.
258        let mut result: Vec<Option<A>> = Vec::with_capacity(positions.len());
259        let mut miss_indices: Vec<usize> = Vec::new();
260        let mut miss_positions: Vec<u64> = Vec::new();
261
262        let mut sync_buf = vec![0u8; chunk_size];
263        for (i, &pos) in positions.iter().enumerate() {
264            if let Some(item) = self
265                .guard
266                .try_read_sync_into(pos, items_per_blob, &mut sync_buf)
267            {
268                result.push(Some(item));
269            } else {
270                result.push(None);
271                miss_indices.push(i);
272                miss_positions.push(pos);
273            }
274        }
275
276        if miss_positions.is_empty() {
277            self.metrics.record_cache_hits(positions.len() as u64);
278            self.metrics.items_read.inc_by(positions.len() as u64);
279            return Ok(result.into_iter().map(|r| r.unwrap()).collect());
280        }
281        self.metrics
282            .record_cache_hits((positions.len() - miss_positions.len()) as u64);
283        self.metrics
284            .record_cache_misses(miss_positions.len() as u64);
285
286        // Phase 2: Read cache misses grouped by section (sequential).
287        let mut reusable_buf = vec![0u8; miss_positions.len() * chunk_size];
288        let mut disk_offset = 0;
289
290        let mut group_start = 0;
291        while group_start < miss_positions.len() {
292            let section = miss_positions[group_start] / items_per_blob;
293            let section_start = section * items_per_blob;
294            let first_in_section = pruning_boundary.max(section_start);
295
296            let mut group_end = group_start + 1;
297            while group_end < miss_positions.len()
298                && miss_positions[group_end] / items_per_blob == section
299            {
300                group_end += 1;
301            }
302
303            let group_len = group_end - group_start;
304            let section_positions: Vec<u64> = miss_positions[group_start..group_end]
305                .iter()
306                .map(|&pos| pos - first_in_section)
307                .collect();
308
309            let buf = &mut reusable_buf[..group_len * chunk_size];
310            let items = self
311                .guard
312                .journal
313                .get_many(section, &section_positions, buf)
314                .await
315                .map_err(|e| match e {
316                    Error::SectionOutOfRange(e)
317                    | Error::AlreadyPrunedToSection(e)
318                    | Error::ItemOutOfRange(e) => {
319                        Error::Corruption(format!("section/item should be found, but got: {e}"))
320                    }
321                    other => other,
322                })?;
323
324            for (item, &miss_idx) in items.into_iter().zip(&miss_indices[disk_offset..]) {
325                result[miss_idx] = Some(item);
326            }
327
328            disk_offset += group_len;
329            group_start = group_end;
330        }
331
332        self.metrics.items_read.inc_by(positions.len() as u64);
333        Ok(result.into_iter().map(|r| r.unwrap()).collect())
334    }
335
336    fn try_read_sync(&self, pos: u64) -> Option<A> {
337        self.guard
338            .try_read_sync(pos, self.items_per_blob)
339            .map_or_else(
340                || {
341                    self.metrics.record_cache_misses(1);
342                    None
343                },
344                |item| {
345                    self.metrics.record_cache_hits(1);
346                    self.metrics.try_read_sync_hits.inc();
347                    self.metrics.items_read.inc();
348                    Some(item)
349                },
350            )
351    }
352
353    async fn replay(
354        &self,
355        buffer: NonZeroUsize,
356        start_pos: u64,
357    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
358        let items_per_blob = self.items_per_blob;
359        let pruning_boundary = self.guard.pruning_boundary;
360
361        // Validate bounds.
362        if start_pos > self.guard.size {
363            return Err(Error::ItemOutOfRange(start_pos));
364        }
365        if start_pos < pruning_boundary {
366            return Err(Error::ItemPruned(start_pos));
367        }
368
369        let start_section = start_pos / items_per_blob;
370        let section_start = start_section * items_per_blob;
371
372        // Calculate start position within the section.
373        let first_in_section = pruning_boundary.max(section_start);
374        let start_pos_in_section = start_pos - first_in_section;
375
376        // Check all middle sections (not oldest, not tail) in range are complete.
377        let journal = &self.guard.journal;
378        if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
379            let first_to_check = start_section.max(oldest + 1);
380            for section in first_to_check..newest {
381                let len = journal.section_len(section).await?;
382                if len < items_per_blob {
383                    return Err(Error::Corruption(format!(
384                        "section {section} incomplete: expected {items_per_blob} items, got {len}"
385                    )));
386                }
387            }
388        }
389
390        let inner_stream = journal
391            .replay(start_section, start_pos_in_section, buffer)
392            .await?;
393
394        // Transform (section, pos_in_section, item) to (global_pos, item).
395        let stream = inner_stream.map(move |result| {
396            result.map(|(section, pos_in_section, item)| {
397                let section_start = section * items_per_blob;
398                let first_in_section = pruning_boundary.max(section_start);
399                let global_pos = first_in_section + pos_in_section;
400                (global_pos, item)
401            })
402        });
403
404        Ok(stream)
405    }
406}
407
408impl<E: Context, A: CodecFixedShared> Journal<E, A> {
409    /// Size of each entry in bytes.
410    pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
411
412    /// Size of each entry in bytes (as u64).
413    pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
414
415    /// Scan a partition and return blob names, treating a missing partition as empty.
416    async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
417        match context.scan(partition).await {
418            Ok(blobs) => Ok(blobs),
419            Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
420            Err(err) => Err(Error::Runtime(err)),
421        }
422    }
423
424    /// Select the blobs partition using legacy-first compatibility rules.
425    ///
426    /// If both legacy and new blobs partitions contain data, returns corruption.
427    /// If neither contains data, defaults to the new blobs partition.
428    // TODO(#2941): Remove legacy partition support
429    async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
430        let legacy_partition = cfg.partition.as_str();
431        let new_partition = format!("{}-blobs", cfg.partition);
432
433        let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
434        let new_blobs = Self::scan_partition(context, &new_partition).await?;
435
436        if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
437            return Err(Error::Corruption(format!(
438                "both legacy and blobs partitions contain data: legacy={} blobs={}",
439                legacy_partition, new_partition
440            )));
441        }
442
443        if !legacy_blobs.is_empty() {
444            Ok(legacy_partition.into())
445        } else {
446            Ok(new_partition)
447        }
448    }
449
450    /// Initialize a new `Journal` instance.
451    ///
452    /// All backing blobs are opened but not read during initialization. The `replay` method can be
453    /// used to iterate over all items in the `Journal`.
454    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
455        let items_per_blob = cfg.items_per_blob.get();
456
457        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
458        let segmented_cfg = SegmentedConfig {
459            partition: blob_partition,
460            page_cache: cfg.page_cache,
461            write_buffer: cfg.write_buffer,
462        };
463
464        let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?;
465        // Initialize metadata store
466        let meta_cfg = MetadataConfig {
467            partition: format!("{}-metadata", cfg.partition),
468            codec_config: ((0..).into(), ()),
469        };
470
471        let mut metadata =
472            Metadata::<_, u64, Vec<u8>>::init(context.child("meta"), meta_cfg).await?;
473
474        // Parse metadata if present
475        let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
476            Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
477                |_| Error::Corruption("invalid pruning_boundary metadata".into()),
478            )?)),
479            None => None,
480        };
481
482        // Recover bounds from metadata and/or blobs
483        let (pruning_boundary, size, needs_metadata_update) =
484            Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
485
486        // Persist metadata if needed
487        if needs_metadata_update {
488            if pruning_boundary.is_multiple_of(items_per_blob) {
489                metadata.remove(&PRUNING_BOUNDARY_KEY);
490                metadata.sync().await?;
491            } else {
492                metadata
493                    .put_sync(
494                        PRUNING_BOUNDARY_KEY,
495                        pruning_boundary.to_be_bytes().to_vec(),
496                    )
497                    .await?;
498            }
499        }
500
501        // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
502        // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where
503        // the next append would go).
504        let tail_section = size / items_per_blob;
505        journal.ensure_section_exists(tail_section).await?;
506
507        let metrics = Metrics::new(context);
508        metrics.update(size, pruning_boundary, items_per_blob);
509
510        Ok(Self {
511            inner: UpgradableAsyncRwLock::new(Inner {
512                journal,
513                size,
514                metadata,
515                pruning_boundary,
516            }),
517            items_per_blob,
518            metrics,
519        })
520    }
521
522    /// Returns (pruning_boundary, size, needs_metadata_update) based on metadata and blobs.
523    ///
524    /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state:
525    /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary
526    /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary
527    /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size]
528    ///   or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs.
529    /// - Otherwise, metadata is valid and we use it
530    ///
531    /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs.
532    async fn recover_bounds(
533        inner: &SegmentedJournal<E, A>,
534        items_per_blob: u64,
535        meta_pruning_boundary: Option<u64>,
536    ) -> Result<(u64, u64, bool), Error> {
537        // Blob-based boundary is always section-aligned
538        let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
539
540        let (pruning_boundary, needs_update) = match meta_pruning_boundary {
541            // Mid-section metadata: validate against blobs
542            Some(meta_pruning_boundary)
543                if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
544            {
545                let meta_oldest_section = meta_pruning_boundary / items_per_blob;
546                match inner.oldest_section() {
547                    None => {
548                        // No blobs exist but metadata claims mid-section boundary.
549                        // This can happen if we crash after inner.clear() but before
550                        // ensure_section_exists(). Ignore stale metadata.
551                        warn!(
552                            meta_oldest_section,
553                            "crash repair: no blobs exist, ignoring stale metadata"
554                        );
555                        (blob_boundary, true)
556                    }
557                    Some(oldest_section) if meta_oldest_section < oldest_section => {
558                        warn!(
559                            meta_oldest_section,
560                            oldest_section, "crash repair: metadata stale, computing from blobs"
561                        );
562                        (blob_boundary, true)
563                    }
564                    Some(oldest_section) if meta_oldest_section > oldest_section => {
565                        // Metadata references a section ahead of the oldest blob. This can happen
566                        // if we crash during clear_to_size/init_at_size after blobs update but
567                        // before metadata update. Fall back to blob state.
568                        warn!(
569                            meta_oldest_section,
570                            oldest_section,
571                            "crash repair: metadata ahead of blobs, computing from blobs"
572                        );
573                        (blob_boundary, true)
574                    }
575                    Some(_) => (meta_pruning_boundary, false), // valid mid-section metadata
576                }
577            }
578            // Section-aligned metadata: unnecessary, use blob-based
579            Some(_) => (blob_boundary, true),
580            // No metadata: use blob-based, no update needed
581            None => (blob_boundary, false),
582        };
583
584        // Validate oldest section before computing size.
585        Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
586
587        let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
588        Ok((pruning_boundary, size, needs_update))
589    }
590
591    /// Validate that the oldest section has the expected number of items.
592    ///
593    /// Non-tail sections must be full from their logical start. The tail section
594    /// (oldest == newest) can be partially filled.
595    async fn validate_oldest_section(
596        inner: &SegmentedJournal<E, A>,
597        items_per_blob: u64,
598        pruning_boundary: u64,
599    ) -> Result<(), Error> {
600        let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
601            return Ok(()); // No sections to validate
602        };
603
604        if oldest == newest {
605            return Ok(()); // Tail section, can be partial
606        }
607
608        let oldest_len = inner.section_len(oldest).await?;
609        let oldest_start = oldest * items_per_blob;
610
611        let expected = if pruning_boundary > oldest_start {
612            // Mid-section boundary: items from pruning_boundary to section end
613            items_per_blob - (pruning_boundary - oldest_start)
614        } else {
615            // Section-aligned boundary: full section
616            items_per_blob
617        };
618
619        if oldest_len != expected {
620            return Err(Error::Corruption(format!(
621                "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
622            )));
623        }
624
625        Ok(())
626    }
627
628    /// Returns the total number of items ever appended (size), computed from the blobs.
629    async fn compute_size(
630        inner: &SegmentedJournal<E, A>,
631        items_per_blob: u64,
632        pruning_boundary: u64,
633    ) -> Result<u64, Error> {
634        let oldest = inner.oldest_section();
635        let newest = inner.newest_section();
636
637        let (Some(oldest), Some(newest)) = (oldest, newest) else {
638            return Ok(pruning_boundary);
639        };
640
641        if oldest == newest {
642            // Single section: count from pruning boundary
643            let tail_len = inner.section_len(newest).await?;
644            return Ok(pruning_boundary + tail_len);
645        }
646
647        // Multiple sections: sum actual item counts
648        let oldest_len = inner.section_len(oldest).await?;
649        let tail_len = inner.section_len(newest).await?;
650
651        // Middle sections are assumed full
652        let middle_sections = newest - oldest - 1;
653        let middle_items = middle_sections * items_per_blob;
654
655        Ok(pruning_boundary + oldest_len + middle_items + tail_len)
656    }
657
658    /// Initialize a new `Journal` instance in a pruned state at a given size.
659    ///
660    /// This is used for state sync to create a journal that appears to have had `size` items
661    /// appended and then pruned up to that point.
662    ///
663    /// # Arguments
664    /// * `context` - The storage context
665    /// * `cfg` - Configuration for the journal
666    /// * `size` - The number of operations that have been "pruned"
667    ///
668    /// # Behavior
669    /// - Clears any existing data in the partition
670    /// - Creates an empty tail blob where the next append (at position `size`) will go
671    /// - `bounds().is_empty()` returns `true` (fully pruned state)
672    /// - The next `append()` will write to position `size`
673    ///
674    /// # Post-conditions
675    /// - `bounds().end` returns `size`
676    /// - `bounds().is_empty()` returns `true`
677    /// - `bounds.start` equals `size` (no data exists)
678    ///
679    /// # Crash Safety
680    /// If a crash occurs during this operation, `init()` will recover to a consistent state
681    /// (though possibly different from the intended `size`).
682    #[commonware_macros::stability(ALPHA)]
683    pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
684        let items_per_blob = cfg.items_per_blob.get();
685        let tail_section = size / items_per_blob;
686
687        let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
688        let segmented_cfg = SegmentedConfig {
689            partition: blob_partition,
690            page_cache: cfg.page_cache,
691            write_buffer: cfg.write_buffer,
692        };
693
694        // Initialize both stores.
695        let meta_cfg = MetadataConfig {
696            partition: format!("{}-metadata", cfg.partition),
697            codec_config: ((0..).into(), ()),
698        };
699        let mut metadata =
700            Metadata::<_, u64, Vec<u8>>::init(context.child("meta"), meta_cfg).await?;
701        let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?;
702
703        // Clear blobs before updating metadata.
704        // This ordering is critical for crash safety:
705        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
706        // - Crash after create: old metadata triggers "metadata ahead" warning,
707        //   recovery falls back to blob state.
708        journal.clear().await?;
709        journal.ensure_section_exists(tail_section).await?;
710
711        // Persist metadata if pruning_boundary is mid-section.
712        if !size.is_multiple_of(items_per_blob) {
713            metadata
714                .put_sync(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec())
715                .await?;
716        } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
717            metadata.remove(&PRUNING_BOUNDARY_KEY);
718            metadata.sync().await?;
719        }
720
721        let metrics = Metrics::new(context);
722        metrics.update(size, size, items_per_blob);
723
724        Ok(Self {
725            inner: UpgradableAsyncRwLock::new(Inner {
726                journal,
727                size,
728                metadata,
729                pruning_boundary: size, // No data exists yet
730            }),
731            items_per_blob,
732            metrics,
733        })
734    }
735
736    /// Convert a global position to (section, position_in_section).
737    #[inline]
738    const fn position_to_section(&self, position: u64) -> (u64, u64) {
739        let section = position / self.items_per_blob;
740        let pos_in_section = position % self.items_per_blob;
741        (section, pos_in_section)
742    }
743
744    /// Sync any pending updates to disk.
745    ///
746    /// Only the tail section can have pending updates since historical sections are synced
747    /// when they become full.
748    pub async fn sync(&self) -> Result<(), Error> {
749        let _timer = self.metrics.sync_timer();
750        self.metrics.sync_calls.inc();
751        // Serialize with append/prune/rewind to ensure section selection is stable, while still allowing
752        // concurrent readers.
753        let inner = self.inner.upgradable_read().await;
754
755        // Sync the tail section
756        let tail_section = inner.size / self.items_per_blob;
757
758        // The tail section may not exist yet if the previous section was just filled, but syncing a
759        // non-existent section is safe (returns Ok).
760        inner.journal.sync(tail_section).await?;
761
762        // Persist metadata only when pruning_boundary is mid-section.
763        let pruning_boundary = inner.pruning_boundary;
764        let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
765        let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
766            let needs_update = pruning_boundary_from_metadata
767                .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
768
769            if needs_update {
770                true
771            } else {
772                return Ok(());
773            }
774        } else if pruning_boundary_from_metadata.is_some() {
775            false
776        } else {
777            return Ok(());
778        };
779
780        // Upgrade only for the metadata mutation; reads were allowed while syncing
781        // the tail section above. Downgrade before the metadata fsync to unblock readers.
782        let mut inner = inner.upgrade().await;
783        if put {
784            inner.metadata.put(
785                PRUNING_BOUNDARY_KEY,
786                pruning_boundary.to_be_bytes().to_vec(),
787            );
788        } else {
789            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
790        }
791        let inner = inner.downgrade_to_upgradable();
792        inner.metadata.sync().await?;
793
794        Ok(())
795    }
796
797    /// Acquire a reader guard that holds a consistent view of the journal.
798    pub async fn reader(&self) -> Reader<'_, E, A> {
799        Reader {
800            guard: self.inner.read().await,
801            items_per_blob: self.items_per_blob,
802            metrics: &self.metrics,
803        }
804    }
805
806    /// Return the total number of items in the journal, irrespective of pruning. The next value
807    /// appended to the journal will be at this position.
808    pub async fn size(&self) -> u64 {
809        self.inner.read().await.size
810    }
811
812    /// Append a new item to the journal. Return the item's position in the journal, or error if the
813    /// operation fails.
814    pub async fn append(&self, item: &A) -> Result<u64, Error> {
815        let _timer = self.metrics.append_timer();
816        self.metrics.append_calls.inc();
817        self.append_many_inner(Many::Flat(std::slice::from_ref(item)))
818            .await
819    }
820
821    /// Append items to the journal, returning the position of the last item appended.
822    ///
823    /// Acquires the write lock once for all items instead of per-item.
824    /// Returns [Error::EmptyAppend] if items is empty.
825    pub async fn append_many<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
826        let _timer = self.metrics.append_many_timer();
827        self.metrics.append_many_calls.inc();
828        self.append_many_inner(items).await
829    }
830
831    // Shared implementation for `append` and `append_many`; public wrappers record metrics.
832    async fn append_many_inner<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
833        if items.is_empty() {
834            return Err(Error::EmptyAppend);
835        }
836
837        // Encode all items into a single contiguous buffer before taking the write guard.
838        // Uses Write::write directly to avoid per-item Bytes allocations from Encode::encode.
839        let items_count = match &items {
840            Many::Flat(items) => items.len(),
841            Many::Nested(nested_items) => nested_items.iter().map(|s| s.len()).sum(),
842        };
843        let mut items_buf = Vec::with_capacity(items_count * A::SIZE);
844        match &items {
845            Many::Flat(items) => {
846                for item in *items {
847                    item.write(&mut items_buf);
848                }
849            }
850            Many::Nested(nested_items) => {
851                for items in *nested_items {
852                    for item in *items {
853                        item.write(&mut items_buf);
854                    }
855                }
856            }
857        }
858
859        // Mutating operations are serialized by taking the write guard.
860        let mut inner = self.inner.write().await;
861        let mut written = 0;
862        while written < items_count {
863            let (section, pos_in_section) = self.position_to_section(inner.size);
864            let remaining_space = (self.items_per_blob - pos_in_section) as usize;
865            let batch_count = remaining_space.min(items_count - written);
866            let start = written * A::SIZE;
867            let end = start + batch_count * A::SIZE;
868
869            inner
870                .journal
871                .append_raw(section, &items_buf[start..end])
872                .await?;
873            inner.size += batch_count as u64;
874            written += batch_count;
875
876            if inner.size.is_multiple_of(self.items_per_blob) {
877                // The section was filled and must be synced. Downgrade so readers can continue
878                // during the sync, but keep mutators blocked. After sync, upgrade again to
879                // create the next tail section before any append can proceed.
880                let inner_ref = inner.downgrade_to_upgradable();
881                inner_ref.journal.sync(section).await?;
882                inner = inner_ref.upgrade().await;
883                inner.journal.ensure_section_exists(section + 1).await?;
884            }
885        }
886
887        self.metrics
888            .update(inner.size, inner.pruning_boundary, self.items_per_blob);
889        Ok(inner.size - 1)
890    }
891
892    /// Rewind the journal to the given `size`. Returns [Error::InvalidRewind] if the rewind point
893    /// precedes the oldest retained element. The journal is not synced after rewinding.
894    ///
895    /// # Warnings
896    ///
897    /// * This operation is not guaranteed to survive restarts until sync is called.
898    /// * This operation is not atomic, but it will always leave the journal in a consistent state
899    ///   in the event of failure since blobs are always removed from newest to oldest.
900    pub async fn rewind(&self, size: u64) -> Result<(), Error> {
901        let mut inner = self.inner.write().await;
902
903        match size.cmp(&inner.size) {
904            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
905            std::cmp::Ordering::Equal => return Ok(()),
906            std::cmp::Ordering::Less => {}
907        }
908
909        if size < inner.pruning_boundary {
910            return Err(Error::InvalidRewind(size));
911        }
912
913        let section = size / self.items_per_blob;
914        let section_start = section * self.items_per_blob;
915
916        // Calculate offset within section for rewind
917        let first_in_section = inner.pruning_boundary.max(section_start);
918        let pos_in_section = size - first_in_section;
919        let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
920
921        inner.journal.rewind(section, byte_offset).await?;
922        inner.size = size;
923        self.metrics
924            .update(inner.size, inner.pruning_boundary, self.items_per_blob);
925
926        Ok(())
927    }
928
929    /// Return the location before which all items have been pruned.
930    pub async fn pruning_boundary(&self) -> u64 {
931        let inner = self.inner.read().await;
932        inner.pruning_boundary
933    }
934
935    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
936    /// such items in order to preserve blob boundaries, but the amount of such items will always be
937    /// less than the configured number of items per blob. Returns true if any items were pruned.
938    ///
939    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
940    /// event of failure as items are always pruned in order from oldest to newest.
941    pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
942        let mut inner = self.inner.write().await;
943
944        // Calculate the section that would contain min_item_pos
945        let target_section = min_item_pos / self.items_per_blob;
946
947        // Calculate the tail section.
948        let tail_section = inner.size / self.items_per_blob;
949
950        // Cap to tail section. The tail section is guaranteed to exist by our invariant.
951        let min_section = std::cmp::min(target_section, tail_section);
952
953        let pruned = inner.journal.prune(min_section).await?;
954
955        // After pruning, update pruning_boundary to the start of the oldest remaining section
956        if pruned {
957            let new_oldest = inner
958                .journal
959                .oldest_section()
960                .expect("all sections pruned - violates tail section invariant");
961            // Pruning boundary only moves forward
962            assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
963            inner.pruning_boundary = new_oldest * self.items_per_blob;
964            self.metrics
965                .update(inner.size, inner.pruning_boundary, self.items_per_blob);
966        }
967
968        Ok(pruned)
969    }
970
971    /// Remove any persisted data created by the journal.
972    pub async fn destroy(self) -> Result<(), Error> {
973        // Destroy inner journal
974        let inner = self.inner.into_inner();
975        inner.journal.destroy().await?;
976
977        // Destroy metadata
978        inner.metadata.destroy().await?;
979
980        Ok(())
981    }
982
983    /// Clear all data and reset the journal to a new starting position.
984    ///
985    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
986    /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`.
987    ///
988    /// # Crash Safety
989    /// If a crash occurs during this operation, `init()` will recover to a consistent state
990    /// (though possibly different from the intended `new_size`).
991    pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
992        // Clear blobs before updating metadata.
993        // This ordering is critical for crash safety:
994        // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored
995        // - Crash after create: old metadata triggers "metadata ahead" warning,
996        //   recovery falls back to blob state
997        let mut inner = self.inner.write().await;
998        inner.journal.clear().await?;
999        let tail_section = new_size / self.items_per_blob;
1000        inner.journal.ensure_section_exists(tail_section).await?;
1001
1002        inner.size = new_size;
1003        inner.pruning_boundary = new_size; // No data exists
1004
1005        // Persist metadata only when pruning_boundary is mid-section.
1006        if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
1007            let value = inner.pruning_boundary.to_be_bytes().to_vec();
1008            inner.metadata.put_sync(PRUNING_BOUNDARY_KEY, value).await?;
1009        } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
1010            inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
1011            inner.metadata.sync().await?;
1012        }
1013
1014        self.metrics
1015            .update(inner.size, inner.pruning_boundary, self.items_per_blob);
1016        Ok(())
1017    }
1018
1019    /// Test helper: Read the item at the given position.
1020    #[cfg(test)]
1021    pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
1022        self.reader().await.read(pos).await
1023    }
1024
1025    /// Test helper: Return the bounds of the journal.
1026    #[cfg(test)]
1027    pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1028        self.reader().await.bounds()
1029    }
1030
1031    /// Test helper: Get the oldest section from the internal segmented journal.
1032    #[cfg(test)]
1033    pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
1034        let inner = self.inner.read().await;
1035        inner.journal.oldest_section()
1036    }
1037
1038    /// Test helper: Get the newest section from the internal segmented journal.
1039    #[cfg(test)]
1040    pub(crate) async fn test_newest_section(&self) -> Option<u64> {
1041        let inner = self.inner.read().await;
1042        inner.journal.newest_section()
1043    }
1044}
1045
1046// Implement Contiguous trait for fixed-length journals
1047impl<E: Context, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
1048    type Item = A;
1049
1050    async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
1051        Self::reader(self).await
1052    }
1053
1054    async fn size(&self) -> u64 {
1055        Self::size(self).await
1056    }
1057}
1058
1059impl<E: Context, A: CodecFixedShared> Mutable for Journal<E, A> {
1060    async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
1061        Self::append(self, item).await
1062    }
1063
1064    async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
1065        Self::append_many(self, items).await
1066    }
1067
1068    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
1069        Self::prune(self, min_position).await
1070    }
1071
1072    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
1073        Self::rewind(self, size).await
1074    }
1075}
1076
1077impl<E: Context, A: CodecFixedShared> Persistable for Journal<E, A> {
1078    type Error = Error;
1079
1080    async fn commit(&self) -> Result<(), Error> {
1081        self.sync().await
1082    }
1083
1084    async fn sync(&self) -> Result<(), Error> {
1085        self.sync().await
1086    }
1087
1088    async fn destroy(self) -> Result<(), Error> {
1089        self.destroy().await
1090    }
1091}
1092
1093#[commonware_macros::stability(ALPHA)]
1094impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> for Journal<E, A> {
1095    type Config = Config;
1096
1097    async fn init<
1098        F: crate::merkle::Family,
1099        H: commonware_cryptography::Hasher,
1100        S: commonware_parallel::Strategy,
1101    >(
1102        context: E,
1103        merkle_cfg: crate::merkle::full::Config<S>,
1104        journal_cfg: Self::Config,
1105        rewind_predicate: fn(&A) -> bool,
1106        bagging: crate::merkle::Bagging,
1107    ) -> Result<
1108        crate::journal::authenticated::Journal<F, E, Self, H, S>,
1109        crate::journal::authenticated::Error<F>,
1110    > {
1111        crate::journal::authenticated::Journal::<F, E, Self, H, S>::new(
1112            context,
1113            merkle_cfg,
1114            journal_cfg,
1115            rewind_predicate,
1116            bagging,
1117        )
1118        .await
1119    }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use super::*;
1125    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1126    use commonware_macros::test_traced;
1127    use commonware_runtime::{
1128        deterministic::{self, Context},
1129        Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _,
1130    };
1131    use commonware_utils::{NZUsize, NZU16, NZU64};
1132    use futures::{pin_mut, StreamExt};
1133    use std::num::NonZeroU16;
1134
1135    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
1136    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
1137
1138    /// Generate a SHA-256 digest for the given value.
1139    fn test_digest(value: u64) -> Digest {
1140        Sha256::hash(&value.to_be_bytes())
1141    }
1142
1143    fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
1144        Config {
1145            partition: "test-partition".into(),
1146            items_per_blob,
1147            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1148            write_buffer: NZUsize!(2048),
1149        }
1150    }
1151
1152    fn blob_partition(cfg: &Config) -> String {
1153        format!("{}-blobs", cfg.partition)
1154    }
1155
1156    async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
1157        match context.scan(partition).await {
1158            Ok(blobs) => blobs,
1159            Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
1160            Err(err) => panic!("Failed to scan partition {partition}: {err}"),
1161        }
1162    }
1163
1164    #[test_traced]
1165    fn test_fixed_journal_init_conflicting_partitions() {
1166        let executor = deterministic::Runner::default();
1167        executor.start(|context| async move {
1168            let cfg = test_cfg(&context, NZU64!(2));
1169            let legacy_partition = cfg.partition.clone();
1170            let blobs_partition = blob_partition(&cfg);
1171
1172            let (legacy_blob, _) = context
1173                .open(&legacy_partition, &0u64.to_be_bytes())
1174                .await
1175                .expect("Failed to open legacy blob");
1176            legacy_blob
1177                .write_at_sync(0, vec![0u8; 1])
1178                .await
1179                .expect("Failed to write legacy blob");
1180
1181            let (new_blob, _) = context
1182                .open(&blobs_partition, &0u64.to_be_bytes())
1183                .await
1184                .expect("Failed to open new blob");
1185            new_blob
1186                .write_at_sync(0, vec![0u8; 1])
1187                .await
1188                .expect("Failed to write new blob");
1189
1190            let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
1191            assert!(matches!(result, Err(Error::Corruption(_))));
1192        });
1193    }
1194
1195    #[test_traced]
1196    fn test_fixed_journal_init_prefers_legacy_partition() {
1197        let executor = deterministic::Runner::default();
1198        executor.start(|context| async move {
1199            let cfg = test_cfg(&context, NZU64!(2));
1200            let legacy_partition = cfg.partition.clone();
1201            let blobs_partition = blob_partition(&cfg);
1202
1203            // Seed legacy partition so it is selected.
1204            let (legacy_blob, _) = context
1205                .open(&legacy_partition, &0u64.to_be_bytes())
1206                .await
1207                .expect("Failed to open legacy blob");
1208            legacy_blob
1209                .write_at_sync(0, vec![0u8; 1])
1210                .await
1211                .expect("Failed to write legacy blob");
1212
1213            let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
1214                .await
1215                .expect("failed to initialize journal");
1216            journal.append(&test_digest(1)).await.unwrap();
1217            journal.sync().await.unwrap();
1218            drop(journal);
1219
1220            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1221            let new_blobs = scan_partition(&context, &blobs_partition).await;
1222            assert!(!legacy_blobs.is_empty());
1223            assert!(new_blobs.is_empty());
1224        });
1225    }
1226
1227    #[test_traced]
1228    fn test_fixed_journal_init_defaults_to_blobs_partition() {
1229        let executor = deterministic::Runner::default();
1230        executor.start(|context| async move {
1231            let cfg = test_cfg(&context, NZU64!(2));
1232            let legacy_partition = cfg.partition.clone();
1233            let blobs_partition = blob_partition(&cfg);
1234
1235            let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
1236                .await
1237                .expect("failed to initialize journal");
1238            journal.append(&test_digest(1)).await.unwrap();
1239            journal.sync().await.unwrap();
1240            drop(journal);
1241
1242            let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1243            let new_blobs = scan_partition(&context, &blobs_partition).await;
1244            assert!(legacy_blobs.is_empty());
1245            assert!(!new_blobs.is_empty());
1246        });
1247    }
1248
1249    #[test_traced]
1250    fn test_fixed_journal_append_and_prune() {
1251        // Initialize the deterministic context
1252        let executor = deterministic::Runner::default();
1253
1254        // Start the test within the executor
1255        executor.start(|context| async move {
1256            // Initialize the journal, allowing a max of 2 items per blob.
1257            let cfg = test_cfg(&context, NZU64!(2));
1258            let journal = Journal::init(context.child("first"), cfg.clone())
1259                .await
1260                .expect("failed to initialize journal");
1261
1262            // Append an item to the journal
1263            let mut pos = journal
1264                .append(&test_digest(0))
1265                .await
1266                .expect("failed to append data 0");
1267            assert_eq!(pos, 0);
1268
1269            // Drop the journal and re-initialize it to simulate a restart
1270            journal.sync().await.expect("Failed to sync journal");
1271            drop(journal);
1272
1273            let cfg = test_cfg(&context, NZU64!(2));
1274            let journal = Journal::init(context.child("second"), cfg.clone())
1275                .await
1276                .expect("failed to re-initialize journal");
1277            assert_eq!(journal.size().await, 1);
1278
1279            // Append two more items to the journal to trigger a new blob creation
1280            pos = journal
1281                .append(&test_digest(1))
1282                .await
1283                .expect("failed to append data 1");
1284            assert_eq!(pos, 1);
1285            pos = journal
1286                .append(&test_digest(2))
1287                .await
1288                .expect("failed to append data 2");
1289            assert_eq!(pos, 2);
1290
1291            // Read the items back
1292            let item0 = journal.read(0).await.expect("failed to read data 0");
1293            assert_eq!(item0, test_digest(0));
1294            let item1 = journal.read(1).await.expect("failed to read data 1");
1295            assert_eq!(item1, test_digest(1));
1296            let item2 = journal.read(2).await.expect("failed to read data 2");
1297            assert_eq!(item2, test_digest(2));
1298            let err = journal.read(3).await.expect_err("expected read to fail");
1299            assert!(matches!(err, Error::ItemOutOfRange(3)));
1300
1301            // Sync the journal
1302            journal.sync().await.expect("failed to sync journal");
1303
1304            // Pruning to 1 should be a no-op because there's no blob with only older items.
1305            journal.prune(1).await.expect("failed to prune journal 1");
1306
1307            // Pruning to 2 should allow the first blob to be pruned.
1308            journal.prune(2).await.expect("failed to prune journal 2");
1309            assert_eq!(journal.bounds().await.start, 2);
1310
1311            // Reading from the first blob should fail since it's now pruned
1312            let result0 = journal.read(0).await;
1313            assert!(matches!(result0, Err(Error::ItemPruned(0))));
1314            let result1 = journal.read(1).await;
1315            assert!(matches!(result1, Err(Error::ItemPruned(1))));
1316
1317            // Third item should still be readable
1318            let result2 = journal.read(2).await.unwrap();
1319            assert_eq!(result2, test_digest(2));
1320
1321            // Should be able to continue to append items
1322            for i in 3..10 {
1323                let pos = journal
1324                    .append(&test_digest(i))
1325                    .await
1326                    .expect("failed to append data");
1327                assert_eq!(pos, i);
1328            }
1329
1330            // Check no-op pruning
1331            journal.prune(0).await.expect("no-op pruning failed");
1332            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1333            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1334            assert_eq!(journal.bounds().await.start, 2);
1335
1336            // Prune first 3 blobs (6 items)
1337            journal
1338                .prune(3 * cfg.items_per_blob.get())
1339                .await
1340                .expect("failed to prune journal 2");
1341            assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1342            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1343            assert_eq!(journal.bounds().await.start, 6);
1344
1345            // Try pruning (more than) everything in the journal.
1346            journal
1347                .prune(10000)
1348                .await
1349                .expect("failed to max-prune journal");
1350            let size = journal.size().await;
1351            assert_eq!(size, 10);
1352            assert_eq!(journal.test_oldest_section().await, Some(5));
1353            assert_eq!(journal.test_newest_section().await, Some(5));
1354            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
1355            // will be empty, and there will be no retained items.
1356            let bounds = journal.bounds().await;
1357            assert!(bounds.is_empty());
1358            // bounds.start should equal bounds.end when empty.
1359            assert_eq!(bounds.start, size);
1360
1361            // Replaying from 0 should fail since all items before bounds.start are pruned
1362            {
1363                let reader = journal.reader().await;
1364                let result = reader.replay(NZUsize!(1024), 0).await;
1365                assert!(matches!(result, Err(Error::ItemPruned(0))));
1366            }
1367
1368            // Replaying from pruning_boundary should return empty stream
1369            {
1370                let reader = journal.reader().await;
1371                let res = reader.replay(NZUsize!(1024), 0).await;
1372                assert!(matches!(res, Err(Error::ItemPruned(_))));
1373
1374                let reader = journal.reader().await;
1375                let stream = reader
1376                    .replay(NZUsize!(1024), journal.bounds().await.start)
1377                    .await
1378                    .expect("failed to replay journal from pruning boundary");
1379                pin_mut!(stream);
1380                let mut items = Vec::new();
1381                while let Some(result) = stream.next().await {
1382                    match result {
1383                        Ok((pos, item)) => {
1384                            assert_eq!(test_digest(pos), item);
1385                            items.push(pos);
1386                        }
1387                        Err(err) => panic!("Failed to read item: {err}"),
1388                    }
1389                }
1390                assert_eq!(items, Vec::<u64>::new());
1391            }
1392
1393            journal.destroy().await.unwrap();
1394        });
1395    }
1396
1397    /// Append a lot of data to make sure we exercise page cache paging boundaries.
1398    #[test_traced]
1399    fn test_fixed_journal_append_a_lot_of_data() {
1400        // Initialize the deterministic context
1401        let executor = deterministic::Runner::default();
1402        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1403        executor.start(|context| async move {
1404            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1405            let journal = Journal::init(context.child("first"), cfg.clone())
1406                .await
1407                .expect("failed to initialize journal");
1408            // Append 2 blobs worth of items.
1409            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1410                journal
1411                    .append(&test_digest(i))
1412                    .await
1413                    .expect("failed to append data");
1414            }
1415            // Sync, reopen, then read back.
1416            journal.sync().await.expect("failed to sync journal");
1417            drop(journal);
1418            let journal = Journal::init(context.child("second"), cfg.clone())
1419                .await
1420                .expect("failed to re-initialize journal");
1421            for i in 0u64..10000 {
1422                let item: Digest = journal.read(i).await.expect("failed to read data");
1423                assert_eq!(item, test_digest(i));
1424            }
1425            journal.destroy().await.expect("failed to destroy journal");
1426        });
1427    }
1428
1429    #[test_traced]
1430    fn test_fixed_journal_replay() {
1431        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1432        // Initialize the deterministic context
1433        let executor = deterministic::Runner::default();
1434
1435        // Start the test within the executor
1436        executor.start(|context| async move {
1437            // Initialize the journal, allowing a max of 7 items per blob.
1438            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1439            let journal = Journal::init(context.child("first"), cfg.clone())
1440                .await
1441                .expect("failed to initialize journal");
1442
1443            // Append many items, filling 100 blobs and part of the 101st
1444            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1445                let pos = journal
1446                    .append(&test_digest(i))
1447                    .await
1448                    .expect("failed to append data");
1449                assert_eq!(pos, i);
1450            }
1451
1452            // Read them back the usual way.
1453            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1454                let item: Digest = journal.read(i).await.expect("failed to read data");
1455                assert_eq!(item, test_digest(i), "i={i}");
1456            }
1457
1458            // Replay should return all items
1459            {
1460                let reader = journal.reader().await;
1461                let stream = reader
1462                    .replay(NZUsize!(1024), 0)
1463                    .await
1464                    .expect("failed to replay journal");
1465                let mut items = Vec::new();
1466                pin_mut!(stream);
1467                while let Some(result) = stream.next().await {
1468                    match result {
1469                        Ok((pos, item)) => {
1470                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1471                            items.push(pos);
1472                        }
1473                        Err(err) => panic!("Failed to read item: {err}"),
1474                    }
1475                }
1476
1477                // Make sure all items were replayed
1478                assert_eq!(
1479                    items.len(),
1480                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1481                );
1482                items.sort();
1483                for (i, pos) in items.iter().enumerate() {
1484                    assert_eq!(i as u64, *pos);
1485                }
1486            }
1487
1488            journal.sync().await.expect("Failed to sync journal");
1489            drop(journal);
1490
1491            // Corrupt one of the bytes and make sure it's detected.
1492            let (blob, _) = context
1493                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1494                .await
1495                .expect("Failed to open blob");
1496            // Write junk bytes.
1497            let bad_bytes = 123456789u32;
1498            blob.write_at_sync(1, bad_bytes.to_be_bytes().to_vec())
1499                .await
1500                .expect("Failed to write bad bytes");
1501
1502            // Re-initialize the journal to simulate a restart
1503            let journal = Journal::init(context.child("second"), cfg.clone())
1504                .await
1505                .expect("Failed to re-initialize journal");
1506
1507            // Make sure reading an item that resides in the corrupted page fails.
1508            let err = journal
1509                .read(40 * ITEMS_PER_BLOB.get() + 1)
1510                .await
1511                .unwrap_err();
1512            assert!(matches!(err, Error::Runtime(_)));
1513
1514            // Replay all items.
1515            {
1516                let mut error_found = false;
1517                let reader = journal.reader().await;
1518                let stream = reader
1519                    .replay(NZUsize!(1024), 0)
1520                    .await
1521                    .expect("failed to replay journal");
1522                let mut items = Vec::new();
1523                pin_mut!(stream);
1524                while let Some(result) = stream.next().await {
1525                    match result {
1526                        Ok((pos, item)) => {
1527                            assert_eq!(test_digest(pos), item);
1528                            items.push(pos);
1529                        }
1530                        Err(err) => {
1531                            error_found = true;
1532                            assert!(matches!(err, Error::Runtime(_)));
1533                            break;
1534                        }
1535                    }
1536                }
1537                assert!(error_found); // error should abort replay
1538            }
1539        });
1540    }
1541
1542    #[test_traced]
1543    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1544        // Initialize the deterministic context
1545        let executor = deterministic::Runner::default();
1546        // Start the test within the executor
1547        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1548        executor.start(|context| async move {
1549            // Initialize the journal, allowing a max of 7 items per blob.
1550            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1551            let journal = Journal::init(context.child("first"), cfg.clone())
1552                .await
1553                .expect("failed to initialize journal");
1554
1555            // Append many items, filling 100 blobs and part of the 101st
1556            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1557                let pos = journal
1558                    .append(&test_digest(i))
1559                    .await
1560                    .expect("failed to append data");
1561                assert_eq!(pos, i);
1562            }
1563            journal.sync().await.expect("Failed to sync journal");
1564            drop(journal);
1565
1566            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1567            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1568            // missing one item. This should be detected during init because all non-tail blobs
1569            // must be full.
1570            let (blob, size) = context
1571                .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1572                .await
1573                .expect("Failed to open blob");
1574            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1575            blob.sync().await.expect("Failed to sync blob");
1576
1577            // The segmented journal will trim the incomplete blob on init, resulting in the blob
1578            // missing one item. This should be detected as corruption during replay.
1579            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1580                .await
1581                .expect("failed to initialize journal");
1582
1583            // Journal size is computed from the tail section, so it's unchanged
1584            // despite the corruption in section 40.
1585            let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1586            assert_eq!(journal.size().await, expected_size);
1587
1588            // Replay should detect corruption (incomplete section) in section 40
1589            let reader = journal.reader().await;
1590            match reader.replay(NZUsize!(1024), 0).await {
1591                Err(Error::Corruption(msg)) => {
1592                    assert!(
1593                        msg.contains("section 40"),
1594                        "Error should mention section 40, got: {msg}"
1595                    );
1596                }
1597                Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1598                Ok(_) => panic!("Expected replay to fail with corruption"),
1599            };
1600        });
1601    }
1602
1603    #[test_traced]
1604    fn test_fixed_journal_replay_with_missing_historical_blob() {
1605        let executor = deterministic::Runner::default();
1606        executor.start(|context| async move {
1607            let cfg = test_cfg(&context, NZU64!(2));
1608            let journal = Journal::init(context.child("first"), cfg.clone())
1609                .await
1610                .expect("failed to initialize journal");
1611            for i in 0u64..5 {
1612                journal
1613                    .append(&test_digest(i))
1614                    .await
1615                    .expect("failed to append data");
1616            }
1617            journal.sync().await.expect("failed to sync journal");
1618            drop(journal);
1619
1620            context
1621                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1622                .await
1623                .expect("failed to remove blob");
1624
1625            // Init won't detect the corruption.
1626            let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1627                .await
1628                .expect("init shouldn't fail");
1629
1630            // But replay will.
1631            let reader = result.reader().await;
1632            match reader.replay(NZUsize!(1024), 0).await {
1633                Err(Error::Corruption(_)) => {}
1634                Err(err) => panic!("expected Corruption, got: {err}"),
1635                Ok(_) => panic!("expected Corruption, got ok"),
1636            };
1637
1638            // As will trying to read an item that was in the deleted blob.
1639            match result.read(2).await {
1640                Err(Error::Corruption(_)) => {}
1641                Err(err) => panic!("expected Corruption, got: {err}"),
1642                Ok(_) => panic!("expected Corruption, got ok"),
1643            };
1644        });
1645    }
1646
1647    #[test_traced]
1648    fn test_fixed_journal_test_trim_blob() {
1649        // Initialize the deterministic context
1650        let executor = deterministic::Runner::default();
1651        // Start the test within the executor
1652        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1653        executor.start(|context| async move {
1654            // Initialize the journal, allowing a max of 7 items per blob.
1655            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1656            let journal = Journal::init(context.child("first"), cfg.clone())
1657                .await
1658                .expect("failed to initialize journal");
1659
1660            // Fill one blob and put 3 items in the second.
1661            let item_count = ITEMS_PER_BLOB.get() + 3;
1662            for i in 0u64..item_count {
1663                journal
1664                    .append(&test_digest(i))
1665                    .await
1666                    .expect("failed to append data");
1667            }
1668            assert_eq!(journal.size().await, item_count);
1669            journal.sync().await.expect("Failed to sync journal");
1670            drop(journal);
1671
1672            // Truncate the tail blob by one byte, which should result in the last item being
1673            // discarded during replay (detected via corruption).
1674            let (blob, size) = context
1675                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1676                .await
1677                .expect("Failed to open blob");
1678            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1679            blob.sync().await.expect("Failed to sync blob");
1680
1681            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1682                .await
1683                .unwrap();
1684
1685            // The truncation invalidates the last page (bad checksum), which is removed.
1686            // This loses one item.
1687            assert_eq!(journal.size().await, item_count - 1);
1688
1689            // Cleanup.
1690            journal.destroy().await.expect("Failed to destroy journal");
1691        });
1692    }
1693
1694    #[test_traced]
1695    fn test_fixed_journal_partial_replay() {
1696        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1697        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1698        // starting position.
1699        const START_POS: u64 = 53;
1700
1701        // Initialize the deterministic context
1702        let executor = deterministic::Runner::default();
1703        // Start the test within the executor
1704        executor.start(|context| async move {
1705            // Initialize the journal, allowing a max of 7 items per blob.
1706            let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1707            let journal = Journal::init(context.child("storage"), cfg.clone())
1708                .await
1709                .expect("failed to initialize journal");
1710
1711            // Append many items, filling 100 blobs and part of the 101st
1712            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1713                let pos = journal
1714                    .append(&test_digest(i))
1715                    .await
1716                    .expect("failed to append data");
1717                assert_eq!(pos, i);
1718            }
1719
1720            // Replay should return all items except the first `START_POS`.
1721            {
1722                let reader = journal.reader().await;
1723                let stream = reader
1724                    .replay(NZUsize!(1024), START_POS)
1725                    .await
1726                    .expect("failed to replay journal");
1727                let mut items = Vec::new();
1728                pin_mut!(stream);
1729                while let Some(result) = stream.next().await {
1730                    match result {
1731                        Ok((pos, item)) => {
1732                            assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1733                            assert_eq!(
1734                                test_digest(pos),
1735                                item,
1736                                "Item at position {pos} did not match expected digest"
1737                            );
1738                            items.push(pos);
1739                        }
1740                        Err(err) => panic!("Failed to read item: {err}"),
1741                    }
1742                }
1743
1744                // Make sure all items were replayed
1745                assert_eq!(
1746                    items.len(),
1747                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1748                        - START_POS as usize
1749                );
1750                items.sort();
1751                for (i, pos) in items.iter().enumerate() {
1752                    assert_eq!(i as u64, *pos - START_POS);
1753                }
1754            }
1755
1756            journal.destroy().await.unwrap();
1757        });
1758    }
1759
1760    #[test_traced]
1761    fn test_fixed_journal_recover_from_partial_write() {
1762        // Initialize the deterministic context
1763        let executor = deterministic::Runner::default();
1764
1765        // Start the test within the executor
1766        executor.start(|context| async move {
1767            // Initialize the journal, allowing a max of 3 items per blob.
1768            let cfg = test_cfg(&context, NZU64!(3));
1769            let journal = Journal::init(context.child("first"), cfg.clone())
1770                .await
1771                .expect("failed to initialize journal");
1772            for i in 0..5 {
1773                journal
1774                    .append(&test_digest(i))
1775                    .await
1776                    .expect("failed to append data");
1777            }
1778            assert_eq!(journal.size().await, 5);
1779            journal.sync().await.expect("Failed to sync journal");
1780            drop(journal);
1781
1782            // Manually truncate most recent blob to simulate a partial write.
1783            let (blob, size) = context
1784                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1785                .await
1786                .expect("Failed to open blob");
1787            // truncate the most recent blob by 1 byte which corrupts the most recent item
1788            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1789            blob.sync().await.expect("Failed to sync blob");
1790
1791            // Re-initialize the journal to simulate a restart
1792            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1793                .await
1794                .expect("Failed to re-initialize journal");
1795            // The truncation invalidates the last page, which is removed. This loses one item.
1796            assert_eq!(journal.pruning_boundary().await, 0);
1797            assert_eq!(journal.size().await, 4);
1798            drop(journal);
1799
1800            // Delete the second blob and re-init
1801            context
1802                .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1803                .await
1804                .expect("Failed to remove blob");
1805
1806            let journal = Journal::<_, Digest>::init(context.child("third"), cfg.clone())
1807                .await
1808                .expect("Failed to re-initialize journal");
1809            // Only the first blob remains
1810            assert_eq!(journal.size().await, 3);
1811
1812            journal.destroy().await.unwrap();
1813        });
1814    }
1815
1816    #[test_traced]
1817    fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1818        let executor = deterministic::Runner::default();
1819        executor.start(|context| async move {
1820            let cfg = test_cfg(&context, NZU64!(5));
1821            let journal =
1822                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
1823                    .await
1824                    .expect("failed to initialize journal at size");
1825
1826            // Append items so section 1 has exactly the expected minimum (3 items).
1827            for i in 0..8u64 {
1828                journal
1829                    .append(&test_digest(100 + i))
1830                    .await
1831                    .expect("failed to append data");
1832            }
1833            journal.sync().await.expect("failed to sync journal");
1834            assert_eq!(journal.pruning_boundary().await, 7);
1835            assert_eq!(journal.size().await, 15);
1836            drop(journal);
1837
1838            // Corrupt the oldest section by truncating one byte (drops one item on recovery).
1839            let (blob, size) = context
1840                .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1841                .await
1842                .expect("failed to open oldest blob");
1843            blob.resize(size - 1).await.expect("failed to corrupt blob");
1844            blob.sync().await.expect("failed to sync blob");
1845
1846            let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
1847            assert!(matches!(result, Err(Error::Corruption(_))));
1848        });
1849    }
1850
1851    #[test_traced]
1852    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1853        let executor = deterministic::Runner::default();
1854        executor.start(|context| async move {
1855            // Initialize the journal, allowing a max of 10 items per blob.
1856            let cfg = test_cfg(&context, NZU64!(10));
1857            let journal = Journal::init(context.child("first"), cfg.clone())
1858                .await
1859                .expect("failed to initialize journal");
1860            // Add only a single item
1861            journal
1862                .append(&test_digest(0))
1863                .await
1864                .expect("failed to append data");
1865            assert_eq!(journal.size().await, 1);
1866            journal.sync().await.expect("Failed to sync journal");
1867            drop(journal);
1868
1869            // Manually truncate most recent blob to simulate a partial write.
1870            let (blob, size) = context
1871                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1872                .await
1873                .expect("Failed to open blob");
1874            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1875            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1876            blob.sync().await.expect("Failed to sync blob");
1877
1878            // Re-initialize the journal to simulate a restart
1879            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1880                .await
1881                .expect("Failed to re-initialize journal");
1882
1883            // Since there was only a single item appended which we then corrupted, recovery should
1884            // leave us in the state of an empty journal.
1885            let bounds = journal.bounds().await;
1886            assert_eq!(bounds.end, 0);
1887            assert!(bounds.is_empty());
1888            // Make sure journal still works for appending.
1889            journal
1890                .append(&test_digest(0))
1891                .await
1892                .expect("failed to append data");
1893            assert_eq!(journal.size().await, 1);
1894
1895            journal.destroy().await.unwrap();
1896        });
1897    }
1898
1899    #[test_traced("DEBUG")]
1900    fn test_fixed_journal_recover_from_unwritten_data() {
1901        let executor = deterministic::Runner::default();
1902        executor.start(|context| async move {
1903            // Initialize the journal, allowing a max of 10 items per blob.
1904            let cfg = test_cfg(&context, NZU64!(10));
1905            let journal = Journal::init(context.child("first"), cfg.clone())
1906                .await
1907                .expect("failed to initialize journal");
1908
1909            // Add only a single item
1910            journal
1911                .append(&test_digest(0))
1912                .await
1913                .expect("failed to append data");
1914            assert_eq!(journal.size().await, 1);
1915            journal.sync().await.expect("Failed to sync journal");
1916            drop(journal);
1917
1918            // Manually extend the blob to simulate a failure where the file was extended, but no
1919            // bytes were written due to failure.
1920            let (blob, size) = context
1921                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1922                .await
1923                .expect("Failed to open blob");
1924            blob.write_at_sync(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1925                .await
1926                .expect("Failed to extend blob");
1927
1928            // Re-initialize the journal to simulate a restart
1929            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1930                .await
1931                .expect("Failed to re-initialize journal");
1932
1933            // The zero-filled pages are detected as invalid (bad checksum) and truncated.
1934            // No items should be lost since we called sync before the corruption.
1935            assert_eq!(journal.size().await, 1);
1936
1937            // Make sure journal still works for appending.
1938            journal
1939                .append(&test_digest(1))
1940                .await
1941                .expect("failed to append data");
1942
1943            journal.destroy().await.unwrap();
1944        });
1945    }
1946
1947    #[test_traced]
1948    fn test_fixed_journal_rewinding() {
1949        let executor = deterministic::Runner::default();
1950        executor.start(|context| async move {
1951            // Initialize the journal, allowing a max of 2 items per blob.
1952            let cfg = test_cfg(&context, NZU64!(2));
1953            let journal = Journal::init(context.child("first"), cfg.clone())
1954                .await
1955                .expect("failed to initialize journal");
1956            assert!(matches!(journal.rewind(0).await, Ok(())));
1957            assert!(matches!(
1958                journal.rewind(1).await,
1959                Err(Error::InvalidRewind(1))
1960            ));
1961
1962            // Append an item to the journal
1963            journal
1964                .append(&test_digest(0))
1965                .await
1966                .expect("failed to append data 0");
1967            assert_eq!(journal.size().await, 1);
1968            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1969            assert!(matches!(journal.rewind(0).await, Ok(())));
1970            assert_eq!(journal.size().await, 0);
1971
1972            // append 7 items
1973            for i in 0..7 {
1974                let pos = journal
1975                    .append(&test_digest(i))
1976                    .await
1977                    .expect("failed to append data");
1978                assert_eq!(pos, i);
1979            }
1980            assert_eq!(journal.size().await, 7);
1981
1982            // rewind back to item #4, which should prune 2 blobs
1983            assert!(matches!(journal.rewind(4).await, Ok(())));
1984            assert_eq!(journal.size().await, 4);
1985
1986            // rewind back to empty and ensure all blobs are rewound over
1987            assert!(matches!(journal.rewind(0).await, Ok(())));
1988            assert_eq!(journal.size().await, 0);
1989
1990            // stress test: add 100 items, rewind 49, repeat x10.
1991            for _ in 0..10 {
1992                for i in 0..100 {
1993                    journal
1994                        .append(&test_digest(i))
1995                        .await
1996                        .expect("failed to append data");
1997                }
1998                journal.rewind(journal.size().await - 49).await.unwrap();
1999            }
2000            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
2001            assert_eq!(journal.size().await, ITEMS_REMAINING);
2002
2003            journal.sync().await.expect("Failed to sync journal");
2004            drop(journal);
2005
2006            // Repeat with a different blob size (3 items per blob)
2007            let mut cfg = test_cfg(&context, NZU64!(3));
2008            cfg.partition = "test-partition-2".into();
2009            let journal = Journal::init(context.child("second"), cfg.clone())
2010                .await
2011                .expect("failed to initialize journal");
2012            for _ in 0..10 {
2013                for i in 0..100 {
2014                    journal
2015                        .append(&test_digest(i))
2016                        .await
2017                        .expect("failed to append data");
2018                }
2019                journal.rewind(journal.size().await - 49).await.unwrap();
2020            }
2021            assert_eq!(journal.size().await, ITEMS_REMAINING);
2022
2023            journal.sync().await.expect("Failed to sync journal");
2024            drop(journal);
2025
2026            // Make sure re-opened journal is as expected
2027            let journal: Journal<_, Digest> = Journal::init(context.child("third"), cfg.clone())
2028                .await
2029                .expect("failed to re-initialize journal");
2030            assert_eq!(journal.size().await, 10 * (100 - 49));
2031
2032            // Make sure rewinding works after pruning
2033            journal.prune(300).await.expect("pruning failed");
2034            assert_eq!(journal.size().await, ITEMS_REMAINING);
2035            // Rewinding prior to our prune point should fail.
2036            assert!(matches!(
2037                journal.rewind(299).await,
2038                Err(Error::InvalidRewind(299))
2039            ));
2040            // Rewinding to the prune point should work.
2041            // always remain in the journal.
2042            assert!(matches!(journal.rewind(300).await, Ok(())));
2043            let bounds = journal.bounds().await;
2044            assert_eq!(bounds.end, 300);
2045            assert!(bounds.is_empty());
2046
2047            journal.destroy().await.unwrap();
2048        });
2049    }
2050
2051    /// Test recovery when blob is truncated to a page boundary with item size not dividing page size.
2052    ///
2053    /// This tests the scenario where:
2054    /// 1. Items (32 bytes) don't divide evenly into page size (44 bytes)
2055    /// 2. Data spans multiple pages
2056    /// 3. Blob is truncated to a page boundary (simulating crash before last page was written)
2057    /// 4. Journal should recover correctly on reopen
2058    #[test_traced]
2059    fn test_fixed_journal_recover_from_page_boundary_truncation() {
2060        let executor = deterministic::Runner::default();
2061        executor.start(|context: Context| async move {
2062            // Use a small items_per_blob to keep the test focused on a single blob
2063            let cfg = test_cfg(&context, NZU64!(100));
2064            let journal = Journal::init(context.child("first"), cfg.clone())
2065                .await
2066                .expect("failed to initialize journal");
2067
2068            // Item size is 32 bytes (Digest), page size is 44 bytes.
2069            // 32 doesn't divide 44, so items will cross page boundaries.
2070            // Physical page size = 44 + 12 (CRC) = 56 bytes.
2071            //
2072            // Write enough items to span multiple pages:
2073            // - 10 items = 320 logical bytes
2074            // - This spans ceil(320/44) = 8 logical pages
2075            for i in 0u64..10 {
2076                journal
2077                    .append(&test_digest(i))
2078                    .await
2079                    .expect("failed to append data");
2080            }
2081            assert_eq!(journal.size().await, 10);
2082            journal.sync().await.expect("Failed to sync journal");
2083            drop(journal);
2084
2085            // Open the blob directly and truncate to a page boundary.
2086            // Physical page size = PAGE_SIZE + CHECKSUM_SIZE = 44 + 12 = 56
2087            let physical_page_size = PAGE_SIZE.get() as u64 + 12;
2088            let (blob, size) = context
2089                .open(&blob_partition(&cfg), &0u64.to_be_bytes())
2090                .await
2091                .expect("Failed to open blob");
2092
2093            // Calculate how many full physical pages we have and truncate to lose the last one.
2094            let full_pages = size / physical_page_size;
2095            assert!(full_pages >= 2, "need at least 2 pages for this test");
2096            let truncate_to = (full_pages - 1) * physical_page_size;
2097
2098            blob.resize(truncate_to)
2099                .await
2100                .expect("Failed to truncate blob");
2101            blob.sync().await.expect("Failed to sync blob");
2102
2103            // Re-initialize the journal - it should recover by truncating to valid data
2104            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2105                .await
2106                .expect("Failed to re-initialize journal after page truncation");
2107
2108            // The journal should have fewer items now (those that fit in the remaining pages).
2109            // With logical page size 44 and item size 32:
2110            // - After truncating to (full_pages-1) physical pages, we have (full_pages-1)*44 logical bytes
2111            // - Number of complete items = floor(logical_bytes / 32)
2112            let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
2113            let expected_items = remaining_logical_bytes / 32; // 32 = Digest::SIZE
2114            assert_eq!(
2115                journal.size().await,
2116                expected_items,
2117                "Journal should recover to {} items after truncation",
2118                expected_items
2119            );
2120
2121            // Verify we can still read the remaining items
2122            for i in 0..expected_items {
2123                let item = journal
2124                    .read(i)
2125                    .await
2126                    .expect("failed to read recovered item");
2127                assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
2128            }
2129
2130            journal.destroy().await.expect("Failed to destroy journal");
2131        });
2132    }
2133
2134    /// Test the contiguous fixed journal with items_per_blob: 1.
2135    ///
2136    /// This is an edge case where each item creates its own blob, and the
2137    /// tail blob is always empty after sync (because the item fills the blob
2138    /// and a new empty one is created).
2139    #[test_traced]
2140    fn test_single_item_per_blob() {
2141        let executor = deterministic::Runner::default();
2142        executor.start(|context| async move {
2143            let cfg = Config {
2144                partition: "single-item-per-blob".into(),
2145                items_per_blob: NZU64!(1),
2146                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2147                write_buffer: NZUsize!(2048),
2148            };
2149
2150            // === Test 1: Basic single item operation ===
2151            let journal = Journal::init(context.child("first"), cfg.clone())
2152                .await
2153                .expect("failed to initialize journal");
2154
2155            // Verify empty state
2156            let bounds = journal.bounds().await;
2157            assert_eq!(bounds.end, 0);
2158            assert!(bounds.is_empty());
2159
2160            // Append 1 item
2161            let pos = journal
2162                .append(&test_digest(0))
2163                .await
2164                .expect("failed to append");
2165            assert_eq!(pos, 0);
2166            assert_eq!(journal.size().await, 1);
2167
2168            // Sync
2169            journal.sync().await.expect("failed to sync");
2170
2171            // Read from size() - 1
2172            let value = journal
2173                .read(journal.size().await - 1)
2174                .await
2175                .expect("failed to read");
2176            assert_eq!(value, test_digest(0));
2177
2178            // === Test 2: Multiple items with single item per blob ===
2179            for i in 1..10u64 {
2180                let pos = journal
2181                    .append(&test_digest(i))
2182                    .await
2183                    .expect("failed to append");
2184                assert_eq!(pos, i);
2185                assert_eq!(journal.size().await, i + 1);
2186
2187                // Verify we can read the just-appended item at size() - 1
2188                let value = journal
2189                    .read(journal.size().await - 1)
2190                    .await
2191                    .expect("failed to read");
2192                assert_eq!(value, test_digest(i));
2193            }
2194
2195            // Verify all items can be read
2196            for i in 0..10u64 {
2197                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2198            }
2199
2200            journal.sync().await.expect("failed to sync");
2201
2202            // === Test 3: Pruning with single item per blob ===
2203            // Prune to position 5 (removes positions 0-4)
2204            journal.prune(5).await.expect("failed to prune");
2205
2206            // Size should still be 10
2207            assert_eq!(journal.size().await, 10);
2208
2209            // bounds.start should be 5
2210            assert_eq!(journal.bounds().await.start, 5);
2211
2212            // Reading from size() - 1 (position 9) should still work
2213            let value = journal
2214                .read(journal.size().await - 1)
2215                .await
2216                .expect("failed to read");
2217            assert_eq!(value, test_digest(9));
2218
2219            // Reading from pruned positions should return ItemPruned
2220            for i in 0..5 {
2221                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2222            }
2223
2224            // Reading from retained positions should work
2225            for i in 5..10u64 {
2226                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2227            }
2228
2229            // Append more items after pruning
2230            for i in 10..15u64 {
2231                let pos = journal
2232                    .append(&test_digest(i))
2233                    .await
2234                    .expect("failed to append");
2235                assert_eq!(pos, i);
2236
2237                // Verify we can read from size() - 1
2238                let value = journal
2239                    .read(journal.size().await - 1)
2240                    .await
2241                    .expect("failed to read");
2242                assert_eq!(value, test_digest(i));
2243            }
2244
2245            journal.sync().await.expect("failed to sync");
2246            drop(journal);
2247
2248            // === Test 4: Restart persistence with single item per blob ===
2249            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2250                .await
2251                .expect("failed to re-initialize journal");
2252
2253            // Verify size is preserved
2254            assert_eq!(journal.size().await, 15);
2255
2256            // Verify bounds.start is preserved
2257            assert_eq!(journal.bounds().await.start, 5);
2258
2259            // Reading from size() - 1 should work after restart
2260            let value = journal
2261                .read(journal.size().await - 1)
2262                .await
2263                .expect("failed to read");
2264            assert_eq!(value, test_digest(14));
2265
2266            // Reading all retained positions should work
2267            for i in 5..15u64 {
2268                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2269            }
2270
2271            journal.destroy().await.expect("failed to destroy journal");
2272
2273            // === Test 5: Restart after pruning with non-zero index ===
2274            // Fresh journal for this test
2275            let journal = Journal::init(context.child("third"), cfg.clone())
2276                .await
2277                .expect("failed to initialize journal");
2278
2279            // Append 10 items (positions 0-9)
2280            for i in 0..10u64 {
2281                journal.append(&test_digest(i + 100)).await.unwrap();
2282            }
2283
2284            // Prune to position 5 (removes positions 0-4)
2285            journal.prune(5).await.unwrap();
2286            let bounds = journal.bounds().await;
2287            assert_eq!(bounds.end, 10);
2288            assert_eq!(bounds.start, 5);
2289
2290            // Sync and restart
2291            journal.sync().await.unwrap();
2292            drop(journal);
2293
2294            // Re-open journal
2295            let journal = Journal::<_, Digest>::init(context.child("fourth"), cfg.clone())
2296                .await
2297                .expect("failed to re-initialize journal");
2298
2299            // Verify state after restart
2300            let bounds = journal.bounds().await;
2301            assert_eq!(bounds.end, 10);
2302            assert_eq!(bounds.start, 5);
2303
2304            // Reading from size() - 1 (position 9) should work
2305            let value = journal.read(journal.size().await - 1).await.unwrap();
2306            assert_eq!(value, test_digest(109));
2307
2308            // Verify all retained positions (5-9) work
2309            for i in 5..10u64 {
2310                assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2311            }
2312
2313            journal.destroy().await.expect("failed to destroy journal");
2314
2315            // === Test 6: Prune all items (edge case) ===
2316            let journal = Journal::init(context.child("storage"), cfg.clone())
2317                .await
2318                .expect("failed to initialize journal");
2319
2320            for i in 0..5u64 {
2321                journal.append(&test_digest(i + 200)).await.unwrap();
2322            }
2323            journal.sync().await.unwrap();
2324
2325            // Prune all items
2326            journal.prune(5).await.unwrap();
2327            let bounds = journal.bounds().await;
2328            assert_eq!(bounds.end, 5); // Size unchanged
2329            assert!(bounds.is_empty()); // All pruned
2330
2331            // size() - 1 = 4, but position 4 is pruned
2332            let result = journal.read(journal.size().await - 1).await;
2333            assert!(matches!(result, Err(Error::ItemPruned(4))));
2334
2335            // After appending, reading works again
2336            journal.append(&test_digest(205)).await.unwrap();
2337            assert_eq!(journal.bounds().await.start, 5);
2338            assert_eq!(
2339                journal.read(journal.size().await - 1).await.unwrap(),
2340                test_digest(205)
2341            );
2342
2343            journal.destroy().await.expect("failed to destroy journal");
2344        });
2345    }
2346
2347    #[test_traced]
2348    fn test_fixed_journal_init_at_size_zero() {
2349        let executor = deterministic::Runner::default();
2350        executor.start(|context| async move {
2351            let cfg = test_cfg(&context, NZU64!(5));
2352            let journal =
2353                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 0)
2354                    .await
2355                    .unwrap();
2356
2357            let bounds = journal.bounds().await;
2358            assert_eq!(bounds.end, 0);
2359            assert!(bounds.is_empty());
2360
2361            // Next append should get position 0
2362            let pos = journal.append(&test_digest(100)).await.unwrap();
2363            assert_eq!(pos, 0);
2364            assert_eq!(journal.size().await, 1);
2365            assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2366
2367            journal.destroy().await.unwrap();
2368        });
2369    }
2370
2371    #[test_traced]
2372    fn test_fixed_journal_init_at_size_section_boundary() {
2373        let executor = deterministic::Runner::default();
2374        executor.start(|context| async move {
2375            let cfg = test_cfg(&context, NZU64!(5));
2376
2377            // Initialize at position 10 (exactly at section 2 boundary with items_per_blob=5)
2378            let journal =
2379                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 10)
2380                    .await
2381                    .unwrap();
2382
2383            let bounds = journal.bounds().await;
2384            assert_eq!(bounds.end, 10);
2385            assert!(bounds.is_empty());
2386
2387            // Next append should get position 10
2388            let pos = journal.append(&test_digest(1000)).await.unwrap();
2389            assert_eq!(pos, 10);
2390            assert_eq!(journal.size().await, 11);
2391            assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2392
2393            // Can continue appending
2394            let pos = journal.append(&test_digest(1001)).await.unwrap();
2395            assert_eq!(pos, 11);
2396            assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2397
2398            journal.destroy().await.unwrap();
2399        });
2400    }
2401
2402    #[test_traced]
2403    fn test_fixed_journal_init_at_size_mid_section() {
2404        let executor = deterministic::Runner::default();
2405        executor.start(|context| async move {
2406            let cfg = test_cfg(&context, NZU64!(5));
2407
2408            // Initialize at position 7 (middle of section 1 with items_per_blob=5)
2409            let journal =
2410                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 7)
2411                    .await
2412                    .unwrap();
2413
2414            let bounds = journal.bounds().await;
2415            assert_eq!(bounds.end, 7);
2416            // No data exists yet after init_at_size
2417            assert!(bounds.is_empty());
2418
2419            // Reading before bounds.start should return ItemPruned
2420            assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2421            assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2422
2423            // Next append should get position 7
2424            let pos = journal.append(&test_digest(700)).await.unwrap();
2425            assert_eq!(pos, 7);
2426            assert_eq!(journal.size().await, 8);
2427            assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2428            // Now bounds.start should be 7 (first data position)
2429            assert_eq!(journal.bounds().await.start, 7);
2430
2431            journal.destroy().await.unwrap();
2432        });
2433    }
2434
2435    #[test_traced]
2436    fn test_fixed_journal_init_at_size_persistence() {
2437        let executor = deterministic::Runner::default();
2438        executor.start(|context| async move {
2439            let cfg = test_cfg(&context, NZU64!(5));
2440
2441            // Initialize at position 15
2442            let journal =
2443                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 15)
2444                    .await
2445                    .unwrap();
2446
2447            // Append some items
2448            for i in 0..5u64 {
2449                let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2450                assert_eq!(pos, 15 + i);
2451            }
2452
2453            assert_eq!(journal.size().await, 20);
2454
2455            // Sync and reopen
2456            journal.sync().await.unwrap();
2457            drop(journal);
2458
2459            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2460                .await
2461                .unwrap();
2462
2463            // Size and data should be preserved
2464            let bounds = journal.bounds().await;
2465            assert_eq!(bounds.end, 20);
2466            assert_eq!(bounds.start, 15);
2467
2468            // Verify data
2469            for i in 0..5u64 {
2470                assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2471            }
2472
2473            // Can continue appending
2474            let pos = journal.append(&test_digest(9999)).await.unwrap();
2475            assert_eq!(pos, 20);
2476            assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2477
2478            journal.destroy().await.unwrap();
2479        });
2480    }
2481
2482    #[test_traced]
2483    fn test_fixed_journal_init_at_size_persistence_without_data() {
2484        let executor = deterministic::Runner::default();
2485        executor.start(|context| async move {
2486            let cfg = test_cfg(&context, NZU64!(5));
2487
2488            // Initialize at position 15
2489            let journal =
2490                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 15)
2491                    .await
2492                    .unwrap();
2493
2494            let bounds = journal.bounds().await;
2495            assert_eq!(bounds.end, 15);
2496            assert!(bounds.is_empty());
2497
2498            // Drop without writing any data
2499            drop(journal);
2500
2501            // Reopen and verify size persisted
2502            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2503                .await
2504                .unwrap();
2505
2506            let bounds = journal.bounds().await;
2507            assert_eq!(bounds.end, 15);
2508            assert!(bounds.is_empty());
2509
2510            // Can append starting at position 15
2511            let pos = journal.append(&test_digest(1500)).await.unwrap();
2512            assert_eq!(pos, 15);
2513            assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2514
2515            journal.destroy().await.unwrap();
2516        });
2517    }
2518
2519    #[test_traced]
2520    fn test_fixed_journal_init_at_size_large_offset() {
2521        let executor = deterministic::Runner::default();
2522        executor.start(|context| async move {
2523            let cfg = test_cfg(&context, NZU64!(5));
2524
2525            // Initialize at a large position (position 1000)
2526            let journal =
2527                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 1000)
2528                    .await
2529                    .unwrap();
2530
2531            let bounds = journal.bounds().await;
2532            assert_eq!(bounds.end, 1000);
2533            assert!(bounds.is_empty());
2534
2535            // Next append should get position 1000
2536            let pos = journal.append(&test_digest(100000)).await.unwrap();
2537            assert_eq!(pos, 1000);
2538            assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2539
2540            journal.destroy().await.unwrap();
2541        });
2542    }
2543
2544    #[test_traced]
2545    fn test_fixed_journal_init_at_size_prune_and_append() {
2546        let executor = deterministic::Runner::default();
2547        executor.start(|context| async move {
2548            let cfg = test_cfg(&context, NZU64!(5));
2549
2550            // Initialize at position 20
2551            let journal =
2552                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 20)
2553                    .await
2554                    .unwrap();
2555
2556            // Append items 20-29
2557            for i in 0..10u64 {
2558                journal.append(&test_digest(2000 + i)).await.unwrap();
2559            }
2560
2561            assert_eq!(journal.size().await, 30);
2562
2563            // Prune to position 25
2564            journal.prune(25).await.unwrap();
2565
2566            let bounds = journal.bounds().await;
2567            assert_eq!(bounds.end, 30);
2568            assert_eq!(bounds.start, 25);
2569
2570            // Verify remaining items are readable
2571            for i in 25..30u64 {
2572                assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2573            }
2574
2575            // Continue appending
2576            let pos = journal.append(&test_digest(3000)).await.unwrap();
2577            assert_eq!(pos, 30);
2578
2579            journal.destroy().await.unwrap();
2580        });
2581    }
2582
2583    #[test_traced]
2584    fn test_fixed_journal_clear_to_size() {
2585        let executor = deterministic::Runner::default();
2586        executor.start(|context| async move {
2587            let cfg = test_cfg(&context, NZU64!(10));
2588            let journal = Journal::init(context.child("journal"), cfg.clone())
2589                .await
2590                .expect("failed to initialize journal");
2591
2592            // Append 25 items (positions 0-24, spanning 3 blobs)
2593            for i in 0..25u64 {
2594                journal.append(&test_digest(i)).await.unwrap();
2595            }
2596            assert_eq!(journal.size().await, 25);
2597            journal.sync().await.unwrap();
2598
2599            // Clear to position 100, effectively resetting the journal
2600            journal.clear_to_size(100).await.unwrap();
2601            assert_eq!(journal.size().await, 100);
2602
2603            // Old positions should fail
2604            for i in 0..25 {
2605                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2606            }
2607
2608            // Verify size persists after restart without writing any data
2609            drop(journal);
2610            let journal =
2611                Journal::<_, Digest>::init(context.child("journal_after_clear"), cfg.clone())
2612                    .await
2613                    .expect("failed to re-initialize journal after clear");
2614            assert_eq!(journal.size().await, 100);
2615
2616            // Append new data starting at position 100
2617            for i in 100..105u64 {
2618                let pos = journal.append(&test_digest(i)).await.unwrap();
2619                assert_eq!(pos, i);
2620            }
2621            assert_eq!(journal.size().await, 105);
2622
2623            // New positions should be readable
2624            for i in 100..105u64 {
2625                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2626            }
2627
2628            // Sync and re-init to verify persistence
2629            journal.sync().await.unwrap();
2630            drop(journal);
2631
2632            let journal = Journal::<_, Digest>::init(context.child("journal_reopened"), cfg)
2633                .await
2634                .expect("failed to re-initialize journal");
2635
2636            assert_eq!(journal.size().await, 105);
2637            for i in 100..105u64 {
2638                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2639            }
2640
2641            journal.destroy().await.unwrap();
2642        });
2643    }
2644
2645    #[test_traced]
2646    fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2647        // Old meta = None (aligned), new boundary = aligned.
2648        let executor = deterministic::Runner::default();
2649        executor.start(|context| async move {
2650            let cfg = test_cfg(&context, NZU64!(5));
2651            let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
2652                .await
2653                .unwrap();
2654
2655            for i in 0..5u64 {
2656                journal.append(&test_digest(i)).await.unwrap();
2657            }
2658            let inner = journal.inner.read().await;
2659            let tail_section = inner.size / journal.items_per_blob;
2660            inner.journal.sync(tail_section).await.unwrap();
2661            drop(inner);
2662            drop(journal);
2663
2664            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2665                .await
2666                .unwrap();
2667            let bounds = journal.bounds().await;
2668            assert_eq!(bounds.start, 0);
2669            assert_eq!(bounds.end, 5);
2670            journal.destroy().await.unwrap();
2671        });
2672    }
2673
2674    #[test_traced]
2675    fn test_fixed_journal_oldest_section_invalid_len() {
2676        // Old meta = None (aligned), new boundary = mid-section.
2677        let executor = deterministic::Runner::default();
2678        executor.start(|context| async move {
2679            let cfg = test_cfg(&context, NZU64!(5));
2680            let journal =
2681                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2682                    .await
2683                    .unwrap();
2684            for i in 0..3u64 {
2685                journal.append(&test_digest(i)).await.unwrap();
2686            }
2687            assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2688            journal.sync().await.unwrap();
2689
2690            // Simulate metadata deletion (corruption).
2691            let mut inner = journal.inner.write().await;
2692            inner.metadata.clear();
2693            inner.metadata.sync().await.unwrap();
2694            drop(inner);
2695            drop(journal);
2696
2697            // Section 1 has items 7,8,9 but metadata is missing, so falls back to blob-based boundary.
2698            // Section 1 has 3 items, but recovery thinks it should have 5 because metadata deletion
2699            // causes us to forget that section 1 starts at logical position 7.
2700            let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
2701            assert!(matches!(result, Err(Error::Corruption(_))));
2702            context.remove(&blob_partition(&cfg), None).await.unwrap();
2703            context
2704                .remove(&format!("{}-metadata", cfg.partition), None)
2705                .await
2706                .unwrap();
2707        });
2708    }
2709
2710    #[test_traced]
2711    fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2712        // Old meta = Some(mid), new boundary = mid-section (same value).
2713        let executor = deterministic::Runner::default();
2714        executor.start(|context| async move {
2715            let cfg = test_cfg(&context, NZU64!(5));
2716            let journal =
2717                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2718                    .await
2719                    .unwrap();
2720            for i in 0..3u64 {
2721                journal.append(&test_digest(i)).await.unwrap();
2722            }
2723            let inner = journal.inner.read().await;
2724            let tail_section = inner.size / journal.items_per_blob;
2725            inner.journal.sync(tail_section).await.unwrap();
2726            drop(inner);
2727            drop(journal);
2728
2729            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2730                .await
2731                .unwrap();
2732            let bounds = journal.bounds().await;
2733            assert_eq!(bounds.start, 7);
2734            assert_eq!(bounds.end, 10);
2735            journal.destroy().await.unwrap();
2736        });
2737    }
2738    #[test_traced]
2739    fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2740        // Old meta = Some(mid), new boundary = aligned.
2741        let executor = deterministic::Runner::default();
2742        executor.start(|context| async move {
2743            let cfg = test_cfg(&context, NZU64!(5));
2744            let journal =
2745                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2746                    .await
2747                    .unwrap();
2748            for i in 0..10u64 {
2749                journal.append(&test_digest(i)).await.unwrap();
2750            }
2751            assert_eq!(journal.size().await, 17);
2752            journal.prune(10).await.unwrap();
2753
2754            let inner = journal.inner.read().await;
2755            let tail_section = inner.size / journal.items_per_blob;
2756            inner.journal.sync(tail_section).await.unwrap();
2757            drop(inner);
2758            drop(journal);
2759
2760            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2761                .await
2762                .unwrap();
2763            let bounds = journal.bounds().await;
2764            assert_eq!(bounds.start, 10);
2765            assert_eq!(bounds.end, 17);
2766            journal.destroy().await.unwrap();
2767        });
2768    }
2769
2770    #[test_traced]
2771    fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2772        // Pruning to a position earlier than pruning_boundary (within the same section)
2773        // should not move the boundary backwards.
2774        let executor = deterministic::Runner::default();
2775        executor.start(|context| async move {
2776            let cfg = test_cfg(&context, NZU64!(5));
2777            // init_at_size(7) sets pruning_boundary = 7 (mid-section in section 1)
2778            let journal =
2779                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2780                    .await
2781                    .unwrap();
2782            // Append 5 items at positions 7-11, filling section 1 and part of section 2
2783            for i in 0..5u64 {
2784                journal.append(&test_digest(i)).await.unwrap();
2785            }
2786            // Prune to position 5 (section 1 start) should NOT move boundary back from 7 to 5
2787            journal.prune(5).await.unwrap();
2788            assert_eq!(journal.bounds().await.start, 7);
2789            journal.destroy().await.unwrap();
2790        });
2791    }
2792
2793    #[test_traced]
2794    fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2795        // Test replay when first section begins mid-section: init_at_size creates a journal
2796        // where pruning_boundary is mid-section, then we append across multiple sections.
2797        let executor = deterministic::Runner::default();
2798        executor.start(|context| async move {
2799            let cfg = test_cfg(&context, NZU64!(5));
2800
2801            // Initialize at position 7 (mid-section with items_per_blob=5)
2802            // Section 1 (positions 5-9) begins mid-section: only positions 7, 8, 9 have data
2803            let journal =
2804                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 7)
2805                    .await
2806                    .unwrap();
2807
2808            // Append 13 items (positions 7-19), spanning sections 1, 2, 3
2809            for i in 0..13u64 {
2810                let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2811                assert_eq!(pos, 7 + i);
2812            }
2813            assert_eq!(journal.size().await, 20);
2814            journal.sync().await.unwrap();
2815
2816            // Replay from pruning_boundary
2817            {
2818                let reader = journal.reader().await;
2819                let stream = reader
2820                    .replay(NZUsize!(1024), 7)
2821                    .await
2822                    .expect("failed to replay");
2823                pin_mut!(stream);
2824                let mut items: Vec<(u64, Digest)> = Vec::new();
2825                while let Some(result) = stream.next().await {
2826                    items.push(result.expect("replay item failed"));
2827                }
2828
2829                // Should get all 13 items with correct logical positions
2830                assert_eq!(items.len(), 13);
2831                for (i, (pos, item)) in items.iter().enumerate() {
2832                    assert_eq!(*pos, 7 + i as u64);
2833                    assert_eq!(*item, test_digest(100 + i as u64));
2834                }
2835            }
2836
2837            // Replay from mid-stream (position 12)
2838            {
2839                let reader = journal.reader().await;
2840                let stream = reader
2841                    .replay(NZUsize!(1024), 12)
2842                    .await
2843                    .expect("failed to replay from mid-stream");
2844                pin_mut!(stream);
2845                let mut items: Vec<(u64, Digest)> = Vec::new();
2846                while let Some(result) = stream.next().await {
2847                    items.push(result.expect("replay item failed"));
2848                }
2849
2850                // Should get items from position 12 onwards
2851                assert_eq!(items.len(), 8);
2852                for (i, (pos, item)) in items.iter().enumerate() {
2853                    assert_eq!(*pos, 12 + i as u64);
2854                    assert_eq!(*item, test_digest(100 + 5 + i as u64));
2855                }
2856            }
2857
2858            journal.destroy().await.unwrap();
2859        });
2860    }
2861
2862    #[test_traced]
2863    fn test_fixed_journal_rewind_error_before_bounds_start() {
2864        // Test that rewind returns error when trying to rewind before bounds.start
2865        let executor = deterministic::Runner::default();
2866        executor.start(|context| async move {
2867            let cfg = test_cfg(&context, NZU64!(5));
2868
2869            let journal =
2870                Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 10)
2871                    .await
2872                    .unwrap();
2873
2874            // Append a few items (positions 10, 11, 12)
2875            for i in 0..3u64 {
2876                journal.append(&test_digest(i)).await.unwrap();
2877            }
2878            assert_eq!(journal.size().await, 13);
2879
2880            // Rewind to position 11 should work
2881            journal.rewind(11).await.unwrap();
2882            assert_eq!(journal.size().await, 11);
2883
2884            // Rewind to position 10 (pruning_boundary) should work
2885            journal.rewind(10).await.unwrap();
2886            assert_eq!(journal.size().await, 10);
2887
2888            // Rewind to before pruning_boundary should fail
2889            let result = journal.rewind(9).await;
2890            assert!(matches!(result, Err(Error::InvalidRewind(9))));
2891
2892            journal.destroy().await.unwrap();
2893        });
2894    }
2895
2896    #[test_traced]
2897    fn test_fixed_journal_init_at_size_crash_scenarios() {
2898        let executor = deterministic::Runner::default();
2899        executor.start(|context| async move {
2900            let cfg = test_cfg(&context, NZU64!(5));
2901
2902            // Setup: Create a journal with some data and mid-section metadata
2903            let journal =
2904                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2905                    .await
2906                    .unwrap();
2907            for i in 0..5u64 {
2908                journal.append(&test_digest(i)).await.unwrap();
2909            }
2910            journal.sync().await.unwrap();
2911            drop(journal);
2912
2913            // Crash Scenario 1: After clear(), before blob creation
2914            // Simulate by manually removing all blobs but leaving metadata
2915            let blob_part = blob_partition(&cfg);
2916            context.remove(&blob_part, None).await.unwrap();
2917
2918            // Recovery should see no blobs and return empty journal, ignoring metadata
2919            let journal = Journal::<_, Digest>::init(
2920                context.child("crash").with_attribute("index", 1),
2921                cfg.clone(),
2922            )
2923            .await
2924            .expect("init failed after clear crash");
2925            let bounds = journal.bounds().await;
2926            assert_eq!(bounds.end, 0);
2927            assert_eq!(bounds.start, 0);
2928            drop(journal);
2929
2930            // Restore metadata for next scenario (it might have been removed by init)
2931            let meta_cfg = MetadataConfig {
2932                partition: format!("{}-metadata", cfg.partition),
2933                codec_config: ((0..).into(), ()),
2934            };
2935            let mut metadata =
2936                Metadata::<_, u64, Vec<u8>>::init(context.child("restore_meta"), meta_cfg.clone())
2937                    .await
2938                    .unwrap();
2939            metadata
2940                .put_sync(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec())
2941                .await
2942                .unwrap();
2943
2944            // Crash Scenario 2: After ensure_section_exists(), before metadata update
2945            // Target: init_at_size(12) -> should be section 2 (starts at 10)
2946            // State: Blob at section 2, Metadata says 7 (section 1)
2947            // Wait, old metadata (7) is BEHIND new blob (12/5 = 2).
2948            // recover_bounds treats "meta < blob" as stale -> uses blob.
2949
2950            // Let's try init_at_size(2) -> section 0.
2951            // Old metadata says 7 (section 1).
2952            // State: Blob at section 0, Metadata says 7 (section 1).
2953            // recover_bounds sees "meta (1) > blob (0)" -> metadata ahead -> uses blob.
2954
2955            // Simulate: Create blob at section 0 (tail for init_at_size(2))
2956            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2957            blob.sync().await.unwrap(); // Ensure it exists
2958
2959            // Recovery should warn "metadata ahead" and use blob state (0, 0)
2960            let journal = Journal::<_, Digest>::init(
2961                context.child("crash").with_attribute("index", 2),
2962                cfg.clone(),
2963            )
2964            .await
2965            .expect("init failed after create crash");
2966
2967            // Should recover to blob state (section 0 aligned)
2968            let bounds = journal.bounds().await;
2969            assert_eq!(bounds.start, 0);
2970            // Size is 0 because blob is empty
2971            assert_eq!(bounds.end, 0);
2972            journal.destroy().await.unwrap();
2973        });
2974    }
2975
2976    #[test_traced]
2977    fn test_fixed_journal_clear_to_size_crash_scenarios() {
2978        let executor = deterministic::Runner::default();
2979        executor.start(|context| async move {
2980            let cfg = test_cfg(&context, NZU64!(5));
2981
2982            // Setup: Init at 12 (Section 2, offset 2)
2983            // Metadata = 12
2984            let journal =
2985                Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 12)
2986                    .await
2987                    .unwrap();
2988            journal.sync().await.unwrap();
2989            drop(journal);
2990
2991            // Crash Scenario: clear_to_size(2) [Section 0]
2992            // We want to simulate crash after blob 0 created, but metadata still 12.
2993
2994            // manually clear blobs
2995            let blob_part = blob_partition(&cfg);
2996            context.remove(&blob_part, None).await.unwrap();
2997
2998            // manually create section 0
2999            let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
3000            blob.sync().await.unwrap();
3001
3002            // Metadata is still 12 (from setup)
3003            // Blob is Section 0
3004            // Metadata (12 -> sec 2) > Blob (sec 0) -> Ahead warning
3005
3006            let journal = Journal::<_, Digest>::init(context.child("crash_clear"), cfg.clone())
3007                .await
3008                .expect("init failed after clear_to_size crash");
3009
3010            // Should fallback to blobs
3011            let bounds = journal.bounds().await;
3012            assert_eq!(bounds.start, 0);
3013            assert_eq!(bounds.end, 0);
3014            journal.destroy().await.unwrap();
3015        });
3016    }
3017
3018    #[test_traced]
3019    fn test_read_many_empty() {
3020        let executor = deterministic::Runner::default();
3021        executor.start(|context| async move {
3022            let cfg = test_cfg(&context, NZU64!(10));
3023            let journal = Journal::<_, Digest>::init(context.child("j"), cfg)
3024                .await
3025                .unwrap();
3026
3027            let items = journal.reader().await.read_many(&[]).await.unwrap();
3028            assert!(items.is_empty());
3029
3030            journal.destroy().await.unwrap();
3031        });
3032    }
3033
3034    #[test_traced]
3035    fn test_read_many_single_blob() {
3036        // All positions within one blob.
3037        let executor = deterministic::Runner::default();
3038        executor.start(|context| async move {
3039            let cfg = test_cfg(&context, NZU64!(10));
3040            let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3041
3042            for i in 0..5u64 {
3043                journal.append(&test_digest(i)).await.unwrap();
3044            }
3045            assert_eq!(journal.size().await, 5);
3046
3047            let items = journal.reader().await.read_many(&[0, 2, 4]).await.unwrap();
3048            assert_eq!(items, vec![test_digest(0), test_digest(2), test_digest(4)]);
3049
3050            journal.destroy().await.unwrap();
3051        });
3052    }
3053
3054    #[test_traced]
3055    fn test_read_many_across_blobs() {
3056        // Positions spanning multiple blobs (items_per_blob=3).
3057        let executor = deterministic::Runner::default();
3058        executor.start(|context| async move {
3059            let cfg = test_cfg(&context, NZU64!(3));
3060            let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3061
3062            for i in 0..9u64 {
3063                journal.append(&test_digest(i)).await.unwrap();
3064            }
3065            assert_eq!(journal.size().await, 9);
3066            // Blobs: [0,1,2], [3,4,5], [6,7,8]
3067
3068            let items = journal.reader().await.read_many(&[1, 4, 7]).await.unwrap();
3069            assert_eq!(items, vec![test_digest(1), test_digest(4), test_digest(7)]);
3070
3071            journal.destroy().await.unwrap();
3072        });
3073    }
3074
3075    #[test_traced]
3076    fn test_read_many_after_prune() {
3077        // Read from positions that survive pruning.
3078        let executor = deterministic::Runner::default();
3079        executor.start(|context| async move {
3080            let cfg = test_cfg(&context, NZU64!(3));
3081            let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3082
3083            for i in 0..9u64 {
3084                journal.append(&test_digest(i)).await.unwrap();
3085            }
3086            assert_eq!(journal.size().await, 9);
3087            journal.sync().await.unwrap();
3088
3089            // Prune first blob [0,1,2].
3090            journal.prune(3).await.unwrap();
3091            assert_eq!(journal.bounds().await, 3..9);
3092
3093            let items = journal.reader().await.read_many(&[3, 5, 8]).await.unwrap();
3094            assert_eq!(items, vec![test_digest(3), test_digest(5), test_digest(8)]);
3095
3096            // Pruned position should error.
3097            let err = journal.reader().await.read_many(&[1]).await.unwrap_err();
3098            assert!(matches!(err, Error::ItemPruned(1)));
3099
3100            journal.destroy().await.unwrap();
3101        });
3102    }
3103
3104    #[test_traced]
3105    fn test_read_many_out_of_range() {
3106        let executor = deterministic::Runner::default();
3107        executor.start(|context| async move {
3108            let cfg = test_cfg(&context, NZU64!(10));
3109            let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3110
3111            for i in 0..3u64 {
3112                journal.append(&test_digest(i)).await.unwrap();
3113            }
3114            assert_eq!(journal.size().await, 3);
3115
3116            let err = journal.reader().await.read_many(&[0, 5]).await.unwrap_err();
3117            assert!(matches!(err, Error::ItemOutOfRange(5)));
3118
3119            journal.destroy().await.unwrap();
3120        });
3121    }
3122
3123    #[test_traced]
3124    fn test_read_many_matches_read() {
3125        // Verify batch read matches individual reads across blobs.
3126        let executor = deterministic::Runner::default();
3127        executor.start(|context| async move {
3128            let cfg = test_cfg(&context, NZU64!(4));
3129            let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3130
3131            for i in 0..20u64 {
3132                journal.append(&test_digest(i)).await.unwrap();
3133            }
3134            assert_eq!(journal.size().await, 20);
3135            journal.sync().await.unwrap();
3136
3137            let positions: Vec<u64> = (0..20).collect();
3138            let reader = journal.reader().await;
3139            let batch = reader.read_many(&positions).await.unwrap();
3140
3141            for &pos in &positions {
3142                let single = reader.read(pos).await.unwrap();
3143                assert_eq!(batch[pos as usize], single);
3144            }
3145            drop(reader);
3146
3147            journal.destroy().await.unwrap();
3148        });
3149    }
3150
3151    #[test_traced]
3152    fn test_fixed_journal_metrics() {
3153        let executor = deterministic::Runner::default();
3154        executor.start(|context| async move {
3155            let cfg = test_cfg(&context, NZU64!(2));
3156            let journal = Journal::<_, Digest>::init(context.child("fixed_metrics"), cfg.clone())
3157                .await
3158                .unwrap();
3159
3160            let items: Vec<_> = (0..5).map(test_digest).collect();
3161            journal.append_many(Many::Flat(&items)).await.unwrap();
3162            journal.append(&test_digest(5)).await.unwrap();
3163            journal.sync().await.unwrap();
3164            journal.reader().await.read(0).await.unwrap();
3165            journal.reader().await.try_read_sync(0).unwrap();
3166            journal.reader().await.read_many(&[1, 2, 4]).await.unwrap();
3167            journal.prune(2).await.unwrap();
3168            journal.rewind(4).await.unwrap();
3169
3170            let buffer = context.encode();
3171            for expected in [
3172                "fixed_metrics_size 4",
3173                "fixed_metrics_pruning_boundary 2",
3174                "fixed_metrics_retained 2",
3175                "fixed_metrics_tail_items 2",
3176                "fixed_metrics_append_calls_total 1",
3177                "fixed_metrics_append_many_calls_total 1",
3178                "fixed_metrics_read_calls_total 1",
3179                "fixed_metrics_read_many_calls_total 1",
3180                "fixed_metrics_try_read_sync_hits_total 1",
3181                "fixed_metrics_items_read_total 5",
3182                "fixed_metrics_sync_calls_total 1",
3183                "fixed_metrics_append_duration_count 1",
3184                "fixed_metrics_append_many_duration_count 1",
3185                "fixed_metrics_read_duration_count 1",
3186                "fixed_metrics_read_many_duration_count 1",
3187                "fixed_metrics_sync_duration_count 1",
3188                "fixed_metrics_cache_hits_total",
3189                "fixed_metrics_cache_misses_total",
3190                "fixed_metrics_blobs_tracked",
3191            ] {
3192                assert!(buffer.contains(expected), "{expected}\n{buffer}");
3193            }
3194
3195            journal.destroy().await.unwrap();
3196        });
3197    }
3198}