commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length _items_ on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs. Each `Blob` contains a
12//! configurable maximum of `items_per_blob`, with page-level data integrity provided by a buffer
13//! pool.
14//!
15//! ```text
16//! +--------+----- --+--- -+----------+
17//! | item_0 | item_1 | ... | item_n-1 |
18//! +--------+-----------+--------+----0
19//!
20//! n = config.items_per_blob
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! Data fetched from disk is always checked for integrity before being returned. If the data is
27//! found to be invalid, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` and/or pruning old items.
33//!
34//! # Consistency
35//!
36//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
37//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
38//! calling `close`, all pending data is automatically synced and any open blobs are closed.
39//!
40//! # Pruning
41//!
42//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
43//! given point in history.
44//!
45//! # State Sync
46//!
47//! `Journal::init_sync` initializes a journal for state sync, handling existing data appropriately:
48//! - If no data exists, creates a journal at the sync range start
49//! - If data exists within range, prunes to the lower bound
50//! - If data exceeds the range, returns an error
51//! - If data is stale (before range), destroys and recreates
52//!
53//! # Replay
54//!
55//! The `replay` method supports fast reading of all unpruned items into memory.
56
57use crate::{
58    journal::{
59        contiguous::MutableContiguous,
60        segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
61        Error,
62    },
63    mmr::Location,
64    Persistable,
65};
66use commonware_codec::CodecFixedShared;
67use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
68use futures::{stream::Stream, StreamExt};
69use std::{
70    num::{NonZeroU64, NonZeroUsize},
71    ops::Range,
72};
73use tracing::debug;
74
75/// Configuration for `Journal` storage.
76#[derive(Clone)]
77pub struct Config {
78    /// The `commonware-runtime::Storage` partition to use for storing journal blobs.
79    pub partition: String,
80
81    /// The maximum number of journal items to store in each blob.
82    ///
83    /// Any unpruned historical blobs will contain exactly this number of items.
84    /// Only the newest blob may contain fewer items.
85    pub items_per_blob: NonZeroU64,
86
87    /// The buffer pool to use for caching data.
88    pub buffer_pool: PoolRef,
89
90    /// The size of the write buffer to use for each blob.
91    pub write_buffer: NonZeroUsize,
92}
93
94/// Implementation of `Journal` storage.
95///
96/// This is implemented as a wrapper around [SegmentedJournal] that provides position-based access
97/// where positions are automatically mapped to (section, position_in_section) pairs.
98///
99/// # Repair
100///
101/// Like
102/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
103/// and
104/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
105/// the first invalid data read will be considered the new end of the journal (and the
106/// underlying blob will be truncated to the last valid item). Repair is performed
107/// by the underlying [SegmentedJournal] during init.
108pub struct Journal<E: Storage + Metrics, A: CodecFixedShared> {
109    inner: SegmentedJournal<E, A>,
110
111    /// The maximum number of items per blob (section).
112    items_per_blob: u64,
113
114    /// Total number of items appended (not affected by pruning).
115    size: u64,
116}
117
118impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
119    /// Size of each entry in bytes.
120    pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
121
122    /// Size of each entry in bytes (as u64).
123    pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
124
125    /// Initialize a new `Journal` instance.
126    ///
127    /// All backing blobs are opened but not read during initialization. The `replay` method can be
128    /// used to iterate over all items in the `Journal`.
129    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
130        let items_per_blob = cfg.items_per_blob.get();
131
132        let segmented_cfg = SegmentedConfig {
133            partition: cfg.partition,
134            buffer_pool: cfg.buffer_pool,
135            write_buffer: cfg.write_buffer,
136        };
137
138        let mut inner = SegmentedJournal::init(context, segmented_cfg).await?;
139
140        // Calculate size and pruning_boundary from the inner journal's state
141        let oldest_section = inner.oldest_section();
142        let newest_section = inner.newest_section();
143
144        let size = match (oldest_section, newest_section) {
145            (Some(_), Some(newest)) => {
146                // Compute size from the tail (newest) section
147                let tail_len = inner.section_len(newest).await?;
148                newest * items_per_blob + tail_len
149            }
150            _ => 0,
151        };
152
153        // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
154        // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where
155        // the next append would go).
156        let tail_section = size / items_per_blob;
157        inner.ensure_section_exists(tail_section).await?;
158
159        Ok(Self {
160            inner,
161            items_per_blob,
162            size,
163        })
164    }
165
166    /// Initialize a new `Journal` instance in a pruned state at a given size.
167    ///
168    /// This is used for state sync to create a journal that appears to have had `size` items
169    /// appended and then pruned up to that point.
170    ///
171    /// # Arguments
172    /// * `context` - The storage context
173    /// * `cfg` - Configuration for the journal
174    /// * `size` - The number of operations that have been "pruned"
175    ///
176    /// # Behavior
177    /// - Creates only the tail blob at the section that would contain position `size-1`
178    /// - The items in the tail blob before `size` are filled with zeros (dummy data)
179    /// - `oldest_retained_pos()` returns the start of the tail section, matching behavior if the
180    ///   journal were reopened normally
181    /// - Positions within the tail section but before `size` contain dummy zero data
182    ///
183    /// # Invariants
184    /// - The directory given by `cfg.partition` should be empty
185    pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
186        let items_per_blob = cfg.items_per_blob.get();
187
188        // Calculate the tail blob section and items within it
189        let tail_section = size / items_per_blob;
190        let tail_items = size % items_per_blob;
191
192        // Initialize the segmented journal (empty)
193        let segmented_cfg = SegmentedConfig {
194            partition: cfg.partition,
195            buffer_pool: cfg.buffer_pool,
196            write_buffer: cfg.write_buffer,
197        };
198
199        let mut inner = SegmentedJournal::init(context, segmented_cfg).await?;
200
201        // Initialize the tail section with zero-filled items if needed. This uses resize internally
202        // which appropriately uses the underlying page-oriented layout with checksums.
203        inner.init_section_at_size(tail_section, tail_items).await?;
204        inner.sync_all().await?;
205
206        Ok(Self {
207            inner,
208            items_per_blob,
209            size,
210        })
211    }
212
213    /// Initialize a journal for synchronization, reusing existing data if possible.
214    ///
215    /// Handles sync scenarios based on existing journal data vs. the given sync range:
216    ///
217    /// 1. **No existing data**: Creates journal at `range.start` (or empty if `range.start == 0`)
218    /// 2. **Data within range**: Prunes to `range.start` and reuses existing data
219    /// 3. **Data exceeds range**: Returns error
220    /// 4. **Stale data**: Destroys and recreates at `range.start`
221    pub(crate) async fn init_sync(
222        context: E,
223        cfg: Config,
224        range: Range<u64>,
225    ) -> Result<Self, crate::qmdb::Error> {
226        assert!(!range.is_empty(), "range must not be empty");
227
228        debug!(
229            range.start,
230            range.end,
231            items_per_blob = cfg.items_per_blob.get(),
232            "initializing contiguous fixed journal for sync"
233        );
234
235        let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
236        let size = journal.size();
237
238        // No existing data - initialize at the start of the sync range if needed
239        if size == 0 {
240            if range.start == 0 {
241                debug!("no existing journal data, returning empty journal");
242                return Ok(journal);
243            } else {
244                debug!(
245                    range.start,
246                    "no existing journal data, initializing at sync range start"
247                );
248                journal.destroy().await?;
249                return Ok(Self::init_at_size(context, cfg, range.start).await?);
250            }
251        }
252
253        // Check if data exceeds the sync range
254        if size > range.end {
255            return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
256                size,
257            )));
258        }
259
260        // If all existing data is before our sync range, destroy and recreate fresh
261        if size <= range.start {
262            debug!(
263                size,
264                range.start, "existing journal data is stale, re-initializing at start position"
265            );
266            journal.destroy().await?;
267            return Ok(Self::init_at_size(context, cfg, range.start).await?);
268        }
269
270        // Prune to lower bound if needed
271        let oldest = journal.oldest_retained_pos();
272        if let Some(oldest_pos) = oldest {
273            if oldest_pos < range.start {
274                debug!(
275                    oldest_pos,
276                    range.start, "pruning journal to sync range start"
277                );
278                journal.prune(range.start).await?;
279            }
280        }
281
282        Ok(journal)
283    }
284
285    /// Convert a global position to (section, position_in_section).
286    #[inline]
287    const fn position_to_section(&self, position: u64) -> (u64, u64) {
288        let section = position / self.items_per_blob;
289        let pos_in_section = position % self.items_per_blob;
290        (section, pos_in_section)
291    }
292
293    /// Sync any pending updates to disk.
294    ///
295    /// Only the tail section can have pending updates since historical sections are synced
296    /// when they become full.
297    pub async fn sync(&mut self) -> Result<(), Error> {
298        let tail_section = self.size / self.items_per_blob;
299        self.inner.sync(tail_section).await
300    }
301
302    /// Return the total number of items in the journal, irrespective of pruning. The next value
303    /// appended to the journal will be at this position.
304    pub const fn size(&self) -> u64 {
305        self.size
306    }
307
308    /// Append a new item to the journal. Return the item's position in the journal, or error if the
309    /// operation fails.
310    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
311        let position = self.size;
312        let (section, _pos_in_section) = self.position_to_section(position);
313
314        self.inner.append(section, item).await?;
315        self.size += 1;
316
317        // If we just filled up a section, sync it and create the next tail blob. This maintains the
318        // invariant that the tail blob always exists.
319        if self.size.is_multiple_of(self.items_per_blob) {
320            self.inner.sync(section).await?;
321            // Create the new tail blob.
322            self.inner.ensure_section_exists(section + 1).await?;
323        }
324
325        Ok(position)
326    }
327
328    /// Rewind the journal to the given `size`. Returns [Error::InvalidRewind] if the rewind point
329    /// precedes the oldest retained element point. The journal is not synced after rewinding.
330    ///
331    /// # Warnings
332    ///
333    /// * This operation is not guaranteed to survive restarts until sync is called.
334    /// * This operation is not atomic, but it will always leave the journal in a consistent state
335    ///   in the event of failure since blobs are always removed from newest to oldest.
336    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
337        match size.cmp(&self.size) {
338            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
339            std::cmp::Ordering::Equal => return Ok(()),
340            std::cmp::Ordering::Less => {}
341        }
342
343        if size < self.pruning_boundary() {
344            return Err(Error::InvalidRewind(size));
345        }
346
347        let (section, pos_in_section) = self.position_to_section(size);
348        let byte_offset = pos_in_section * SegmentedJournal::<E, A>::CHUNK_SIZE as u64;
349
350        self.inner.rewind(section, byte_offset).await?;
351        self.size = size;
352
353        Ok(())
354    }
355
356    /// Return the position of the oldest item in the journal that remains readable.
357    ///
358    /// Note that this value could be older than the `min_item_pos` last passed to prune.
359    pub fn oldest_retained_pos(&self) -> Option<u64> {
360        if self.pruning_boundary() == self.size {
361            return None;
362        }
363        Some(self.pruning_boundary())
364    }
365
366    /// Return the location before which all items have been pruned.
367    pub fn pruning_boundary(&self) -> u64 {
368        self.inner
369            .oldest_section()
370            .expect("journal should have at least one section")
371            * self.items_per_blob
372    }
373
374    /// Read the item at position `pos` in the journal.
375    ///
376    /// # Errors
377    ///
378    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
379    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
380    pub async fn read(&self, pos: u64) -> Result<A, Error> {
381        if pos >= self.size {
382            return Err(Error::ItemOutOfRange(pos));
383        }
384        if pos < self.pruning_boundary() {
385            return Err(Error::ItemPruned(pos));
386        }
387
388        // Get the relevant blob to read from.
389        let (section, pos_in_section) = self.position_to_section(pos);
390
391        self.inner.get(section, pos_in_section).await.map_err(|e| {
392            // Since we check bounds above, any failure here is unexpected.
393            match e {
394                Error::SectionOutOfRange(e)
395                | Error::AlreadyPrunedToSection(e)
396                | Error::ItemOutOfRange(e) => {
397                    Error::Corruption(format!("section/item should be found, but got: {e}"))
398                }
399                other => other,
400            }
401        })
402    }
403
404    /// Returns an ordered stream of all items in the journal with position >= `start_pos`.
405    ///
406    /// # Integrity
407    ///
408    /// If any corrupted data is found, or if any non-tail section has fewer items than
409    /// `items_per_blob`, the stream will return an error.
410    pub async fn replay(
411        &self,
412        buffer: NonZeroUsize,
413        start_pos: u64,
414    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
415        if start_pos > self.size {
416            return Err(Error::ItemOutOfRange(start_pos));
417        }
418
419        let (start_section, start_pos_in_section) = self.position_to_section(start_pos);
420        let items_per_blob = self.items_per_blob;
421
422        // Check all non-tail sections in range are complete before starting the stream.
423        let oldest = self.inner.oldest_section().unwrap_or(start_section);
424        if let Some(newest) = self.inner.newest_section() {
425            for section in start_section.max(oldest)..newest {
426                let len = self.inner.section_len(section).await?;
427                if len < items_per_blob {
428                    return Err(Error::Corruption(format!(
429                        "section {section} incomplete: expected {items_per_blob} items, got {len}"
430                    )));
431                }
432            }
433        }
434
435        let inner_stream = self
436            .inner
437            .replay(start_section, start_pos_in_section, buffer)
438            .await?;
439
440        // Transform (section, pos_in_section, item) to (global_pos, item).
441        let stream = inner_stream.map(move |result| {
442            result.map(|(section, pos_in_section, item)| {
443                let global_pos = section * items_per_blob + pos_in_section;
444                (global_pos, item)
445            })
446        });
447
448        Ok(stream)
449    }
450
451    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
452    /// such items in order to preserve blob boundaries, but the amount of such items will always be
453    /// less than the configured number of items per blob. Returns true if any items were pruned.
454    ///
455    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
456    /// event of failure as items are always pruned in order from oldest to newest.
457    pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
458        // Calculate the section that would contain min_item_pos
459        let target_section = min_item_pos / self.items_per_blob;
460
461        // Calculate the tail section.
462        let tail_section = self.size / self.items_per_blob;
463
464        // Cap to tail section. The tail section is guaranteed to exist by our invariant.
465        let min_section = std::cmp::min(target_section, tail_section);
466
467        self.inner.prune(min_section).await
468    }
469
470    /// Remove any underlying blobs created by the journal.
471    pub async fn destroy(self) -> Result<(), Error> {
472        self.inner.destroy().await
473    }
474}
475
476// Implement Contiguous trait for fixed-length journals
477impl<E: Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
478    type Item = A;
479
480    fn size(&self) -> u64 {
481        Self::size(self)
482    }
483
484    fn oldest_retained_pos(&self) -> Option<u64> {
485        Self::oldest_retained_pos(self)
486    }
487
488    fn pruning_boundary(&self) -> u64 {
489        Self::pruning_boundary(self)
490    }
491
492    async fn replay(
493        &self,
494        start_pos: u64,
495        buffer: NonZeroUsize,
496    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
497        Self::replay(self, buffer, start_pos).await
498    }
499
500    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
501        Self::read(self, position).await
502    }
503}
504
505impl<E: Storage + Metrics, A: CodecFixedShared> MutableContiguous for Journal<E, A> {
506    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
507        Self::append(self, item).await
508    }
509
510    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
511        Self::prune(self, min_position).await
512    }
513
514    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
515        Self::rewind(self, size).await
516    }
517}
518
519impl<E: Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
520    type Error = Error;
521
522    async fn commit(&mut self) -> Result<(), Error> {
523        Self::sync(self).await
524    }
525
526    async fn sync(&mut self) -> Result<(), Error> {
527        Self::sync(self).await
528    }
529
530    async fn destroy(self) -> Result<(), Error> {
531        Self::destroy(self).await
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
539    use commonware_macros::test_traced;
540    use commonware_runtime::{
541        deterministic::{self, Context},
542        Blob, Runner, Storage,
543    };
544    use commonware_utils::{NZUsize, NZU16, NZU64};
545    use futures::{pin_mut, StreamExt};
546    use std::num::NonZeroU16;
547
548    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
549    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
550
551    /// Generate a SHA-256 digest for the given value.
552    fn test_digest(value: u64) -> Digest {
553        Sha256::hash(&value.to_be_bytes())
554    }
555
556    fn test_cfg(items_per_blob: NonZeroU64) -> Config {
557        Config {
558            partition: "test_partition".into(),
559            items_per_blob,
560            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
561            write_buffer: NZUsize!(2048),
562        }
563    }
564
565    #[test_traced]
566    fn test_fixed_journal_append_and_prune() {
567        // Initialize the deterministic context
568        let executor = deterministic::Runner::default();
569
570        // Start the test within the executor
571        executor.start(|context| async move {
572            // Initialize the journal, allowing a max of 2 items per blob.
573            let cfg = test_cfg(NZU64!(2));
574            let mut journal = Journal::init(context.clone(), cfg.clone())
575                .await
576                .expect("failed to initialize journal");
577
578            // Append an item to the journal
579            let mut pos = journal
580                .append(test_digest(0))
581                .await
582                .expect("failed to append data 0");
583            assert_eq!(pos, 0);
584
585            // Drop the journal and re-initialize it to simulate a restart
586            journal.sync().await.expect("Failed to sync journal");
587            drop(journal);
588
589            let cfg = test_cfg(NZU64!(2));
590            let mut journal = Journal::init(context.clone(), cfg.clone())
591                .await
592                .expect("failed to re-initialize journal");
593            assert_eq!(journal.size(), 1);
594
595            // Append two more items to the journal to trigger a new blob creation
596            pos = journal
597                .append(test_digest(1))
598                .await
599                .expect("failed to append data 1");
600            assert_eq!(pos, 1);
601            pos = journal
602                .append(test_digest(2))
603                .await
604                .expect("failed to append data 2");
605            assert_eq!(pos, 2);
606
607            // Read the items back
608            let item0 = journal.read(0).await.expect("failed to read data 0");
609            assert_eq!(item0, test_digest(0));
610            let item1 = journal.read(1).await.expect("failed to read data 1");
611            assert_eq!(item1, test_digest(1));
612            let item2 = journal.read(2).await.expect("failed to read data 2");
613            assert_eq!(item2, test_digest(2));
614            let err = journal.read(3).await.expect_err("expected read to fail");
615            assert!(matches!(err, Error::ItemOutOfRange(3)));
616
617            // Sync the journal
618            journal.sync().await.expect("failed to sync journal");
619
620            // Pruning to 1 should be a no-op because there's no blob with only older items.
621            journal.prune(1).await.expect("failed to prune journal 1");
622
623            // Pruning to 2 should allow the first blob to be pruned.
624            journal.prune(2).await.expect("failed to prune journal 2");
625            assert_eq!(journal.oldest_retained_pos(), Some(2));
626
627            // Reading from the first blob should fail since it's now pruned
628            let result0 = journal.read(0).await;
629            assert!(matches!(result0, Err(Error::ItemPruned(0))));
630            let result1 = journal.read(1).await;
631            assert!(matches!(result1, Err(Error::ItemPruned(1))));
632
633            // Third item should still be readable
634            let result2 = journal.read(2).await.unwrap();
635            assert_eq!(result2, test_digest(2));
636
637            // Should be able to continue to append items
638            for i in 3..10 {
639                let pos = journal
640                    .append(test_digest(i))
641                    .await
642                    .expect("failed to append data");
643                assert_eq!(pos, i);
644            }
645
646            // Check no-op pruning
647            journal.prune(0).await.expect("no-op pruning failed");
648            assert_eq!(journal.inner.oldest_section(), Some(1));
649            assert_eq!(journal.inner.newest_section(), Some(5));
650            assert_eq!(journal.oldest_retained_pos(), Some(2));
651
652            // Prune first 3 blobs (6 items)
653            journal
654                .prune(3 * cfg.items_per_blob.get())
655                .await
656                .expect("failed to prune journal 2");
657            assert_eq!(journal.inner.oldest_section(), Some(3));
658            assert_eq!(journal.inner.newest_section(), Some(5));
659            assert_eq!(journal.oldest_retained_pos(), Some(6));
660
661            // Try pruning (more than) everything in the journal.
662            journal
663                .prune(10000)
664                .await
665                .expect("failed to max-prune journal");
666            let size = journal.size();
667            assert_eq!(size, 10);
668            assert_eq!(journal.inner.oldest_section(), Some(5));
669            assert_eq!(journal.inner.newest_section(), Some(5));
670            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
671            // will be empty, and there will be no retained items.
672            assert_eq!(journal.oldest_retained_pos(), None);
673            // Pruning boundary should equal size when oldest_retained is None.
674            assert_eq!(journal.pruning_boundary(), size);
675
676            {
677                let stream = journal
678                    .replay(NZUsize!(1024), 0)
679                    .await
680                    .expect("failed to replay journal");
681                pin_mut!(stream);
682                let mut items = Vec::new();
683                while let Some(result) = stream.next().await {
684                    match result {
685                        Ok((pos, item)) => {
686                            assert_eq!(test_digest(pos), item);
687                            items.push(pos);
688                        }
689                        Err(err) => panic!("Failed to read item: {err}"),
690                    }
691                }
692                assert_eq!(items, Vec::<u64>::new());
693            }
694
695            journal.destroy().await.unwrap();
696        });
697    }
698
699    /// Append a lot of data to make sure we exercise buffer pool paging boundaries.
700    #[test_traced]
701    fn test_fixed_journal_append_a_lot_of_data() {
702        // Initialize the deterministic context
703        let executor = deterministic::Runner::default();
704        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
705        executor.start(|context| async move {
706            let cfg = test_cfg(ITEMS_PER_BLOB);
707            let mut journal = Journal::init(context.clone(), cfg.clone())
708                .await
709                .expect("failed to initialize journal");
710            // Append 2 blobs worth of items.
711            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
712                journal
713                    .append(test_digest(i))
714                    .await
715                    .expect("failed to append data");
716            }
717            // Sync, reopen, then read back.
718            journal.sync().await.expect("failed to sync journal");
719            drop(journal);
720            let journal = Journal::init(context.clone(), cfg.clone())
721                .await
722                .expect("failed to re-initialize journal");
723            for i in 0u64..10000 {
724                let item: Digest = journal.read(i).await.expect("failed to read data");
725                assert_eq!(item, test_digest(i));
726            }
727            journal.destroy().await.expect("failed to destroy journal");
728        });
729    }
730
731    #[test_traced]
732    fn test_fixed_journal_replay() {
733        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
734        // Initialize the deterministic context
735        let executor = deterministic::Runner::default();
736
737        // Start the test within the executor
738        executor.start(|context| async move {
739            // Initialize the journal, allowing a max of 7 items per blob.
740            let cfg = test_cfg(ITEMS_PER_BLOB);
741            let mut journal = Journal::init(context.clone(), cfg.clone())
742                .await
743                .expect("failed to initialize journal");
744
745            // Append many items, filling 100 blobs and part of the 101st
746            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
747                let pos = journal
748                    .append(test_digest(i))
749                    .await
750                    .expect("failed to append data");
751                assert_eq!(pos, i);
752            }
753
754            // Read them back the usual way.
755            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
756                let item: Digest = journal.read(i).await.expect("failed to read data");
757                assert_eq!(item, test_digest(i), "i={i}");
758            }
759
760            // Replay should return all items
761            {
762                let stream = journal
763                    .replay(NZUsize!(1024), 0)
764                    .await
765                    .expect("failed to replay journal");
766                let mut items = Vec::new();
767                pin_mut!(stream);
768                while let Some(result) = stream.next().await {
769                    match result {
770                        Ok((pos, item)) => {
771                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
772                            items.push(pos);
773                        }
774                        Err(err) => panic!("Failed to read item: {err}"),
775                    }
776                }
777
778                // Make sure all items were replayed
779                assert_eq!(
780                    items.len(),
781                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
782                );
783                items.sort();
784                for (i, pos) in items.iter().enumerate() {
785                    assert_eq!(i as u64, *pos);
786                }
787            }
788
789            journal.sync().await.expect("Failed to sync journal");
790            drop(journal);
791
792            // Corrupt one of the bytes and make sure it's detected.
793            let (blob, _) = context
794                .open(&cfg.partition, &40u64.to_be_bytes())
795                .await
796                .expect("Failed to open blob");
797            // Write junk bytes.
798            let bad_bytes = 123456789u32;
799            blob.write_at(bad_bytes.to_be_bytes().to_vec(), 1)
800                .await
801                .expect("Failed to write bad bytes");
802            blob.sync().await.expect("Failed to sync blob");
803
804            // Re-initialize the journal to simulate a restart
805            let journal = Journal::init(context.clone(), cfg.clone())
806                .await
807                .expect("Failed to re-initialize journal");
808
809            // Make sure reading an item that resides in the corrupted page fails.
810            let err = journal
811                .read(40 * ITEMS_PER_BLOB.get() + 1)
812                .await
813                .unwrap_err();
814            assert!(matches!(err, Error::Runtime(_)));
815
816            // Replay all items.
817            {
818                let mut error_found = false;
819                let stream = journal
820                    .replay(NZUsize!(1024), 0)
821                    .await
822                    .expect("failed to replay journal");
823                let mut items = Vec::new();
824                pin_mut!(stream);
825                while let Some(result) = stream.next().await {
826                    match result {
827                        Ok((pos, item)) => {
828                            assert_eq!(test_digest(pos), item);
829                            items.push(pos);
830                        }
831                        Err(err) => {
832                            error_found = true;
833                            assert!(matches!(err, Error::Runtime(_)));
834                            break;
835                        }
836                    }
837                }
838                assert!(error_found); // error should abort replay
839            }
840        });
841    }
842
843    #[test_traced]
844    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
845        // Initialize the deterministic context
846        let executor = deterministic::Runner::default();
847        // Start the test within the executor
848        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
849        executor.start(|context| async move {
850            // Initialize the journal, allowing a max of 7 items per blob.
851            let cfg = test_cfg(ITEMS_PER_BLOB);
852            let mut journal = Journal::init(context.clone(), cfg.clone())
853                .await
854                .expect("failed to initialize journal");
855
856            // Append many items, filling 100 blobs and part of the 101st
857            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
858                let pos = journal
859                    .append(test_digest(i))
860                    .await
861                    .expect("failed to append data");
862                assert_eq!(pos, i);
863            }
864            journal.sync().await.expect("Failed to sync journal");
865            drop(journal);
866
867            // Manually truncate a non-tail blob to make sure it's detected during initialization.
868            // The segmented journal will trim the incomplete blob on init, resulting in the blob
869            // missing one item. This should be detected during init because all non-tail blobs
870            // must be full.
871            let (blob, size) = context
872                .open(&cfg.partition, &40u64.to_be_bytes())
873                .await
874                .expect("Failed to open blob");
875            blob.resize(size - 1).await.expect("Failed to corrupt blob");
876            blob.sync().await.expect("Failed to sync blob");
877
878            // The segmented journal will trim the incomplete blob on init, resulting in the blob
879            // missing one item. This should be detected as corruption during replay.
880            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
881                .await
882                .expect("failed to initialize journal");
883
884            // Journal size is computed from the tail section, so it's unchanged
885            // despite the corruption in section 40.
886            let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
887            assert_eq!(journal.size(), expected_size);
888
889            // Replay should detect corruption (incomplete section) in section 40
890            match journal.replay(NZUsize!(1024), 0).await {
891                Err(Error::Corruption(msg)) => {
892                    assert!(
893                        msg.contains("section 40"),
894                        "Error should mention section 40, got: {msg}"
895                    );
896                }
897                Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
898                Ok(_) => panic!("Expected replay to fail with corruption"),
899            };
900        });
901    }
902
903    #[test_traced]
904    fn test_fixed_journal_replay_with_missing_historical_blob() {
905        let executor = deterministic::Runner::default();
906        executor.start(|context| async move {
907            let cfg = test_cfg(NZU64!(2));
908            let mut journal = Journal::init(context.clone(), cfg.clone())
909                .await
910                .expect("failed to initialize journal");
911
912            for i in 0u64..5 {
913                journal
914                    .append(test_digest(i))
915                    .await
916                    .expect("failed to append data");
917            }
918            journal.sync().await.expect("failed to sync journal");
919            drop(journal);
920
921            context
922                .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
923                .await
924                .expect("failed to remove blob");
925
926            // Init won't detect the corruption.
927            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone())
928                .await
929                .expect("init shouldn't fail");
930
931            // But replay will.
932            match result.replay(NZUsize!(1024), 0).await {
933                Err(Error::Corruption(_)) => {}
934                Err(err) => panic!("expected Corruption, got: {err}"),
935                Ok(_) => panic!("expected Corruption, got ok"),
936            };
937
938            // As will trying to read an item that was in the deleted blob.
939            match result.read(2).await {
940                Err(Error::Corruption(_)) => {}
941                Err(err) => panic!("expected Corruption, got: {err}"),
942                Ok(_) => panic!("expected Corruption, got ok"),
943            };
944        });
945    }
946
947    #[test_traced]
948    fn test_fixed_journal_test_trim_blob() {
949        // Initialize the deterministic context
950        let executor = deterministic::Runner::default();
951        // Start the test within the executor
952        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
953        executor.start(|context| async move {
954            // Initialize the journal, allowing a max of 7 items per blob.
955            let cfg = test_cfg(ITEMS_PER_BLOB);
956            let mut journal = Journal::init(context.clone(), cfg.clone())
957                .await
958                .expect("failed to initialize journal");
959
960            // Fill one blob and put 3 items in the second.
961            let item_count = ITEMS_PER_BLOB.get() + 3;
962            for i in 0u64..item_count {
963                journal
964                    .append(test_digest(i))
965                    .await
966                    .expect("failed to append data");
967            }
968            assert_eq!(journal.size(), item_count);
969            journal.sync().await.expect("Failed to sync journal");
970            drop(journal);
971
972            // Truncate the tail blob by one byte, which should result in the last item being
973            // discarded during replay (detected via corruption).
974            let (blob, size) = context
975                .open(&cfg.partition, &1u64.to_be_bytes())
976                .await
977                .expect("Failed to open blob");
978            blob.resize(size - 1).await.expect("Failed to corrupt blob");
979            blob.sync().await.expect("Failed to sync blob");
980
981            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
982                .await
983                .unwrap();
984
985            // The truncation invalidates the last page (bad checksum), which is removed.
986            // This loses one item.
987            assert_eq!(journal.size(), item_count - 1);
988
989            // Cleanup.
990            journal.destroy().await.expect("Failed to destroy journal");
991        });
992    }
993
994    #[test_traced]
995    fn test_fixed_journal_partial_replay() {
996        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
997        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
998        // starting position.
999        const START_POS: u64 = 53;
1000
1001        // Initialize the deterministic context
1002        let executor = deterministic::Runner::default();
1003        // Start the test within the executor
1004        executor.start(|context| async move {
1005            // Initialize the journal, allowing a max of 7 items per blob.
1006            let cfg = test_cfg(ITEMS_PER_BLOB);
1007            let mut journal = Journal::init(context.clone(), cfg.clone())
1008                .await
1009                .expect("failed to initialize journal");
1010
1011            // Append many items, filling 100 blobs and part of the 101st
1012            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1013                let pos = journal
1014                    .append(test_digest(i))
1015                    .await
1016                    .expect("failed to append data");
1017                assert_eq!(pos, i);
1018            }
1019
1020            // Replay should return all items except the first `START_POS`.
1021            {
1022                let stream = journal
1023                    .replay(NZUsize!(1024), START_POS)
1024                    .await
1025                    .expect("failed to replay journal");
1026                let mut items = Vec::new();
1027                pin_mut!(stream);
1028                while let Some(result) = stream.next().await {
1029                    match result {
1030                        Ok((pos, item)) => {
1031                            assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1032                            assert_eq!(
1033                                test_digest(pos),
1034                                item,
1035                                "Item at position {pos} did not match expected digest"
1036                            );
1037                            items.push(pos);
1038                        }
1039                        Err(err) => panic!("Failed to read item: {err}"),
1040                    }
1041                }
1042
1043                // Make sure all items were replayed
1044                assert_eq!(
1045                    items.len(),
1046                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1047                        - START_POS as usize
1048                );
1049                items.sort();
1050                for (i, pos) in items.iter().enumerate() {
1051                    assert_eq!(i as u64, *pos - START_POS);
1052                }
1053            }
1054
1055            journal.destroy().await.unwrap();
1056        });
1057    }
1058
1059    #[test_traced]
1060    fn test_fixed_journal_recover_from_partial_write() {
1061        // Initialize the deterministic context
1062        let executor = deterministic::Runner::default();
1063
1064        // Start the test within the executor
1065        executor.start(|context| async move {
1066            // Initialize the journal, allowing a max of 3 items per blob.
1067            let cfg = test_cfg(NZU64!(3));
1068            let mut journal = Journal::init(context.clone(), cfg.clone())
1069                .await
1070                .expect("failed to initialize journal");
1071            for i in 0..5 {
1072                journal
1073                    .append(test_digest(i))
1074                    .await
1075                    .expect("failed to append data");
1076            }
1077            assert_eq!(journal.size(), 5);
1078            journal.sync().await.expect("Failed to sync journal");
1079            drop(journal);
1080
1081            // Manually truncate most recent blob to simulate a partial write.
1082            let (blob, size) = context
1083                .open(&cfg.partition, &1u64.to_be_bytes())
1084                .await
1085                .expect("Failed to open blob");
1086            // truncate the most recent blob by 1 byte which corrupts the most recent item
1087            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1088            blob.sync().await.expect("Failed to sync blob");
1089
1090            // Re-initialize the journal to simulate a restart
1091            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1092                .await
1093                .expect("Failed to re-initialize journal");
1094            // The truncation invalidates the last page, which is removed. This loses one item.
1095            assert_eq!(journal.size(), 4);
1096            drop(journal);
1097
1098            // Delete the second blob and re-init
1099            context
1100                .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1101                .await
1102                .expect("Failed to remove blob");
1103            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1104                .await
1105                .expect("Failed to re-initialize journal");
1106            // Only the first blob remains
1107            assert_eq!(journal.size(), 3);
1108
1109            journal.destroy().await.unwrap();
1110        });
1111    }
1112
1113    #[test_traced]
1114    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1115        let executor = deterministic::Runner::default();
1116        executor.start(|context| async move {
1117            // Initialize the journal, allowing a max of 10 items per blob.
1118            let cfg = test_cfg(NZU64!(10));
1119            let mut journal = Journal::init(context.clone(), cfg.clone())
1120                .await
1121                .expect("failed to initialize journal");
1122            // Add only a single item
1123            journal
1124                .append(test_digest(0))
1125                .await
1126                .expect("failed to append data");
1127            assert_eq!(journal.size(), 1);
1128            journal.sync().await.expect("Failed to sync journal");
1129            drop(journal);
1130
1131            // Manually truncate most recent blob to simulate a partial write.
1132            let (blob, size) = context
1133                .open(&cfg.partition, &0u64.to_be_bytes())
1134                .await
1135                .expect("Failed to open blob");
1136            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1137            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1138            blob.sync().await.expect("Failed to sync blob");
1139
1140            // Re-initialize the journal to simulate a restart
1141            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1142                .await
1143                .expect("Failed to re-initialize journal");
1144
1145            // Since there was only a single item appended which we then corrupted, recovery should
1146            // leave us in the state of an empty journal.
1147            assert_eq!(journal.size(), 0);
1148            assert_eq!(journal.oldest_retained_pos(), None);
1149            // Make sure journal still works for appending.
1150            journal
1151                .append(test_digest(0))
1152                .await
1153                .expect("failed to append data");
1154            assert_eq!(journal.size(), 1);
1155
1156            journal.destroy().await.unwrap();
1157        });
1158    }
1159
1160    #[test_traced("DEBUG")]
1161    fn test_fixed_journal_recover_from_unwritten_data() {
1162        let executor = deterministic::Runner::default();
1163        executor.start(|context| async move {
1164            // Initialize the journal, allowing a max of 10 items per blob.
1165            let cfg = test_cfg(NZU64!(10));
1166            let mut journal = Journal::init(context.clone(), cfg.clone())
1167                .await
1168                .expect("failed to initialize journal");
1169
1170            // Add only a single item
1171            journal
1172                .append(test_digest(0))
1173                .await
1174                .expect("failed to append data");
1175            assert_eq!(journal.size(), 1);
1176            journal.sync().await.expect("Failed to sync journal");
1177            drop(journal);
1178
1179            // Manually extend the blob to simulate a failure where the file was extended, but no
1180            // bytes were written due to failure.
1181            let (blob, size) = context
1182                .open(&cfg.partition, &0u64.to_be_bytes())
1183                .await
1184                .expect("Failed to open blob");
1185            blob.write_at(vec![0u8; PAGE_SIZE.get() as usize * 3], size)
1186                .await
1187                .expect("Failed to extend blob");
1188            blob.sync().await.expect("Failed to sync blob");
1189
1190            // Re-initialize the journal to simulate a restart
1191            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1192                .await
1193                .expect("Failed to re-initialize journal");
1194
1195            // The zero-filled pages are detected as invalid (bad checksum) and truncated.
1196            // No items should be lost since we called sync before the corruption.
1197            assert_eq!(journal.size(), 1);
1198
1199            // Make sure journal still works for appending.
1200            journal
1201                .append(test_digest(1))
1202                .await
1203                .expect("failed to append data");
1204
1205            journal.destroy().await.unwrap();
1206        });
1207    }
1208
1209    #[test_traced]
1210    fn test_fixed_journal_rewinding() {
1211        let executor = deterministic::Runner::default();
1212        executor.start(|context| async move {
1213            // Initialize the journal, allowing a max of 2 items per blob.
1214            let cfg = test_cfg(NZU64!(2));
1215            let mut journal = Journal::init(context.clone(), cfg.clone())
1216                .await
1217                .expect("failed to initialize journal");
1218            assert!(matches!(journal.rewind(0).await, Ok(())));
1219            assert!(matches!(
1220                journal.rewind(1).await,
1221                Err(Error::InvalidRewind(1))
1222            ));
1223
1224            // Append an item to the journal
1225            journal
1226                .append(test_digest(0))
1227                .await
1228                .expect("failed to append data 0");
1229            assert_eq!(journal.size(), 1);
1230            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1231            assert!(matches!(journal.rewind(0).await, Ok(())));
1232            assert_eq!(journal.size(), 0);
1233
1234            // append 7 items
1235            for i in 0..7 {
1236                let pos = journal
1237                    .append(test_digest(i))
1238                    .await
1239                    .expect("failed to append data");
1240                assert_eq!(pos, i);
1241            }
1242            assert_eq!(journal.size(), 7);
1243
1244            // rewind back to item #4, which should prune 2 blobs
1245            assert!(matches!(journal.rewind(4).await, Ok(())));
1246            assert_eq!(journal.size(), 4);
1247
1248            // rewind back to empty and ensure all blobs are rewound over
1249            assert!(matches!(journal.rewind(0).await, Ok(())));
1250            assert_eq!(journal.size(), 0);
1251
1252            // stress test: add 100 items, rewind 49, repeat x10.
1253            for _ in 0..10 {
1254                for i in 0..100 {
1255                    journal
1256                        .append(test_digest(i))
1257                        .await
1258                        .expect("failed to append data");
1259                }
1260                journal.rewind(journal.size() - 49).await.unwrap();
1261            }
1262            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1263            assert_eq!(journal.size(), ITEMS_REMAINING);
1264
1265            journal.sync().await.expect("Failed to sync journal");
1266            drop(journal);
1267
1268            // Repeat with a different blob size (3 items per blob)
1269            let mut cfg = test_cfg(NZU64!(3));
1270            cfg.partition = "test_partition_2".into();
1271            let mut journal = Journal::init(context.clone(), cfg.clone())
1272                .await
1273                .expect("failed to initialize journal");
1274            for _ in 0..10 {
1275                for i in 0..100 {
1276                    journal
1277                        .append(test_digest(i))
1278                        .await
1279                        .expect("failed to append data");
1280                }
1281                journal.rewind(journal.size() - 49).await.unwrap();
1282            }
1283            assert_eq!(journal.size(), ITEMS_REMAINING);
1284
1285            journal.sync().await.expect("Failed to sync journal");
1286            drop(journal);
1287
1288            // Make sure re-opened journal is as expected
1289            let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1290                .await
1291                .expect("failed to re-initialize journal");
1292            assert_eq!(journal.size(), 10 * (100 - 49));
1293
1294            // Make sure rewinding works after pruning
1295            journal.prune(300).await.expect("pruning failed");
1296            assert_eq!(journal.size(), ITEMS_REMAINING);
1297            // Rewinding prior to our prune point should fail.
1298            assert!(matches!(
1299                journal.rewind(299).await,
1300                Err(Error::InvalidRewind(299))
1301            ));
1302            // Rewinding to the prune point should work.
1303            // always remain in the journal.
1304            assert!(matches!(journal.rewind(300).await, Ok(())));
1305            assert_eq!(journal.size(), 300);
1306            assert_eq!(journal.oldest_retained_pos(), None);
1307
1308            journal.destroy().await.unwrap();
1309        });
1310    }
1311
1312    /// Test recovery when blob is truncated to a page boundary with item size not dividing page size.
1313    ///
1314    /// This tests the scenario where:
1315    /// 1. Items (32 bytes) don't divide evenly into page size (44 bytes)
1316    /// 2. Data spans multiple pages
1317    /// 3. Blob is truncated to a page boundary (simulating crash before last page was written)
1318    /// 4. Journal should recover correctly on reopen
1319    #[test_traced]
1320    fn test_fixed_journal_recover_from_page_boundary_truncation() {
1321        let executor = deterministic::Runner::default();
1322        executor.start(|context: Context| async move {
1323            // Use a small items_per_blob to keep the test focused on a single blob
1324            let cfg = test_cfg(NZU64!(100));
1325            let mut journal = Journal::init(context.clone(), cfg.clone())
1326                .await
1327                .expect("failed to initialize journal");
1328
1329            // Item size is 32 bytes (Digest), page size is 44 bytes.
1330            // 32 doesn't divide 44, so items will cross page boundaries.
1331            // Physical page size = 44 + 12 (CRC) = 56 bytes.
1332            //
1333            // Write enough items to span multiple pages:
1334            // - 10 items = 320 logical bytes
1335            // - This spans ceil(320/44) = 8 logical pages
1336            for i in 0u64..10 {
1337                journal
1338                    .append(test_digest(i))
1339                    .await
1340                    .expect("failed to append data");
1341            }
1342            assert_eq!(journal.size(), 10);
1343            journal.sync().await.expect("Failed to sync journal");
1344            drop(journal);
1345
1346            // Open the blob directly and truncate to a page boundary.
1347            // Physical page size = PAGE_SIZE + CHECKSUM_SIZE = 44 + 12 = 56
1348            let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1349            let (blob, size) = context
1350                .open(&cfg.partition, &0u64.to_be_bytes())
1351                .await
1352                .expect("Failed to open blob");
1353
1354            // Calculate how many full physical pages we have and truncate to lose the last one.
1355            let full_pages = size / physical_page_size;
1356            assert!(full_pages >= 2, "need at least 2 pages for this test");
1357            let truncate_to = (full_pages - 1) * physical_page_size;
1358
1359            blob.resize(truncate_to)
1360                .await
1361                .expect("Failed to truncate blob");
1362            blob.sync().await.expect("Failed to sync blob");
1363
1364            // Re-initialize the journal - it should recover by truncating to valid data
1365            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1366                .await
1367                .expect("Failed to re-initialize journal after page truncation");
1368
1369            // The journal should have fewer items now (those that fit in the remaining pages).
1370            // With logical page size 44 and item size 32:
1371            // - After truncating to (full_pages-1) physical pages, we have (full_pages-1)*44 logical bytes
1372            // - Number of complete items = floor(logical_bytes / 32)
1373            let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1374            let expected_items = remaining_logical_bytes / 32; // 32 = Digest::SIZE
1375            assert_eq!(
1376                journal.size(),
1377                expected_items,
1378                "Journal should recover to {} items after truncation",
1379                expected_items
1380            );
1381
1382            // Verify we can still read the remaining items
1383            for i in 0..expected_items {
1384                let item = journal
1385                    .read(i)
1386                    .await
1387                    .expect("failed to read recovered item");
1388                assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1389            }
1390
1391            journal.destroy().await.expect("Failed to destroy journal");
1392        });
1393    }
1394
1395    /// Test the contiguous fixed journal with items_per_blob: 1.
1396    ///
1397    /// This is an edge case where each item creates its own blob, and the
1398    /// tail blob is always empty after sync (because the item fills the blob
1399    /// and a new empty one is created).
1400    #[test_traced]
1401    fn test_single_item_per_blob() {
1402        let executor = deterministic::Runner::default();
1403        executor.start(|context| async move {
1404            let cfg = Config {
1405                partition: "single_item_per_blob".into(),
1406                items_per_blob: NZU64!(1),
1407                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1408                write_buffer: NZUsize!(2048),
1409            };
1410
1411            // === Test 1: Basic single item operation ===
1412            let mut journal = Journal::init(context.clone(), cfg.clone())
1413                .await
1414                .expect("failed to initialize journal");
1415
1416            // Verify empty state
1417            assert_eq!(journal.size(), 0);
1418            assert_eq!(journal.oldest_retained_pos(), None);
1419
1420            // Append 1 item
1421            let pos = journal
1422                .append(test_digest(0))
1423                .await
1424                .expect("failed to append");
1425            assert_eq!(pos, 0);
1426            assert_eq!(journal.size(), 1);
1427
1428            // Sync
1429            journal.sync().await.expect("failed to sync");
1430
1431            // Read from size() - 1
1432            let value = journal
1433                .read(journal.size() - 1)
1434                .await
1435                .expect("failed to read");
1436            assert_eq!(value, test_digest(0));
1437
1438            // === Test 2: Multiple items with single item per blob ===
1439            for i in 1..10u64 {
1440                let pos = journal
1441                    .append(test_digest(i))
1442                    .await
1443                    .expect("failed to append");
1444                assert_eq!(pos, i);
1445                assert_eq!(journal.size(), i + 1);
1446
1447                // Verify we can read the just-appended item at size() - 1
1448                let value = journal
1449                    .read(journal.size() - 1)
1450                    .await
1451                    .expect("failed to read");
1452                assert_eq!(value, test_digest(i));
1453            }
1454
1455            // Verify all items can be read
1456            for i in 0..10u64 {
1457                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1458            }
1459
1460            journal.sync().await.expect("failed to sync");
1461
1462            // === Test 3: Pruning with single item per blob ===
1463            // Prune to position 5 (removes positions 0-4)
1464            journal.prune(5).await.expect("failed to prune");
1465
1466            // Size should still be 10
1467            assert_eq!(journal.size(), 10);
1468
1469            // oldest_retained_pos should be 5
1470            assert_eq!(journal.oldest_retained_pos(), Some(5));
1471
1472            // Reading from size() - 1 (position 9) should still work
1473            let value = journal
1474                .read(journal.size() - 1)
1475                .await
1476                .expect("failed to read");
1477            assert_eq!(value, test_digest(9));
1478
1479            // Reading from pruned positions should return ItemPruned
1480            for i in 0..5 {
1481                assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1482            }
1483
1484            // Reading from retained positions should work
1485            for i in 5..10u64 {
1486                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1487            }
1488
1489            // Append more items after pruning
1490            for i in 10..15u64 {
1491                let pos = journal
1492                    .append(test_digest(i))
1493                    .await
1494                    .expect("failed to append");
1495                assert_eq!(pos, i);
1496
1497                // Verify we can read from size() - 1
1498                let value = journal
1499                    .read(journal.size() - 1)
1500                    .await
1501                    .expect("failed to read");
1502                assert_eq!(value, test_digest(i));
1503            }
1504
1505            journal.sync().await.expect("failed to sync");
1506            drop(journal);
1507
1508            // === Test 4: Restart persistence with single item per blob ===
1509            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1510                .await
1511                .expect("failed to re-initialize journal");
1512
1513            // Verify size is preserved
1514            assert_eq!(journal.size(), 15);
1515
1516            // Verify oldest_retained_pos is preserved
1517            assert_eq!(journal.oldest_retained_pos(), Some(5));
1518
1519            // Reading from size() - 1 should work after restart
1520            let value = journal
1521                .read(journal.size() - 1)
1522                .await
1523                .expect("failed to read");
1524            assert_eq!(value, test_digest(14));
1525
1526            // Reading all retained positions should work
1527            for i in 5..15u64 {
1528                assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1529            }
1530
1531            journal.destroy().await.expect("failed to destroy journal");
1532
1533            // === Test 5: Restart after pruning with non-zero index ===
1534            // Fresh journal for this test
1535            let mut journal = Journal::init(context.clone(), cfg.clone())
1536                .await
1537                .expect("failed to initialize journal");
1538
1539            // Append 10 items (positions 0-9)
1540            for i in 0..10u64 {
1541                journal.append(test_digest(i + 100)).await.unwrap();
1542            }
1543
1544            // Prune to position 5 (removes positions 0-4)
1545            journal.prune(5).await.unwrap();
1546            assert_eq!(journal.size(), 10);
1547            assert_eq!(journal.oldest_retained_pos(), Some(5));
1548
1549            // Sync and restart
1550            journal.sync().await.unwrap();
1551            drop(journal);
1552
1553            // Re-open journal
1554            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1555                .await
1556                .expect("failed to re-initialize journal");
1557
1558            // Verify state after restart
1559            assert_eq!(journal.size(), 10);
1560            assert_eq!(journal.oldest_retained_pos(), Some(5));
1561
1562            // Reading from size() - 1 (position 9) should work
1563            let value = journal.read(journal.size() - 1).await.unwrap();
1564            assert_eq!(value, test_digest(109));
1565
1566            // Verify all retained positions (5-9) work
1567            for i in 5..10u64 {
1568                assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
1569            }
1570
1571            journal.destroy().await.expect("failed to destroy journal");
1572
1573            // === Test 6: Prune all items (edge case) ===
1574            let mut journal = Journal::init(context.clone(), cfg.clone())
1575                .await
1576                .expect("failed to initialize journal");
1577
1578            for i in 0..5u64 {
1579                journal.append(test_digest(i + 200)).await.unwrap();
1580            }
1581            journal.sync().await.unwrap();
1582
1583            // Prune all items
1584            journal.prune(5).await.unwrap();
1585            assert_eq!(journal.size(), 5); // Size unchanged
1586            assert_eq!(journal.oldest_retained_pos(), None); // All pruned
1587
1588            // size() - 1 = 4, but position 4 is pruned
1589            let result = journal.read(journal.size() - 1).await;
1590            assert!(matches!(result, Err(Error::ItemPruned(4))));
1591
1592            // After appending, reading works again
1593            journal.append(test_digest(205)).await.unwrap();
1594            assert_eq!(journal.oldest_retained_pos(), Some(5));
1595            assert_eq!(
1596                journal.read(journal.size() - 1).await.unwrap(),
1597                test_digest(205)
1598            );
1599
1600            journal.destroy().await.expect("failed to destroy journal");
1601        });
1602    }
1603}