Skip to main content

commonware_storage/merkle/
journaled.rs

1//! A Merkle structure backed by a fixed-item-length journal.
2//!
3//! A [crate::journal] is used to store all unpruned nodes, and a [crate::metadata] store is
4//! used to preserve digests required for root and proof generation that would have otherwise been
5//! pruned.
6//!
7//! This module is generic over [`Family`], so it works for both MMR and MMB.
8
9use crate::{
10    journal::{
11        contiguous::{
12            fixed::{Config as JConfig, Journal},
13            Many, Reader,
14        },
15        Error as JError,
16    },
17    merkle::{
18        batch,
19        hasher::Hasher,
20        mem::{Config as MemConfig, Mem},
21        Error, Family, Location, Position, Proof, Readable,
22    },
23    metadata::{Config as MConfig, Metadata},
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::ThreadPool;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::{
30    sequence::prefixed_u64::U64,
31    sync::{AsyncMutex, RwLock},
32};
33use std::{
34    collections::BTreeMap,
35    num::{NonZeroU64, NonZeroUsize},
36    sync::Arc,
37};
38use tracing::{debug, error, warn};
39
40/// Append-only wrapper around [`batch::UnmerkleizedBatch`].
41///
42/// The journaled Merkle structure's [`Journaled::sync`] only persists *appended* nodes
43/// (positions in `[journal_size, state.size())`). Overwrites to existing positions are stored in
44/// the in-memory layer but never flushed, so they would be silently lost on crash recovery. This
45/// wrapper prevents that by exposing only append and merkleize operations, hiding `update_leaf*`
46/// at compile time.
47pub struct UnmerkleizedBatch<F: Family, D: Digest> {
48    inner: batch::UnmerkleizedBatch<F, D>,
49}
50
51impl<F: Family, D: Digest> UnmerkleizedBatch<F, D> {
52    /// Hash `element` and add it as a leaf.
53    pub fn add(self, hasher: &impl Hasher<F, Digest = D>, element: &[u8]) -> Self {
54        Self {
55            inner: self.inner.add(hasher, element),
56        }
57    }
58
59    /// Add a pre-computed leaf digest.
60    pub fn add_leaf_digest(self, digest: D) -> Self {
61        Self {
62            inner: self.inner.add_leaf_digest(digest),
63        }
64    }
65
66    /// The number of leaves visible through this batch.
67    pub fn leaves(&self) -> Location<F> {
68        self.inner.leaves()
69    }
70
71    /// Set a thread pool for parallel merkleization.
72    #[cfg(feature = "std")]
73    pub fn with_pool(self, pool: Option<ThreadPool>) -> Self {
74        Self {
75            inner: self.inner.with_pool(pool),
76        }
77    }
78
79    /// Consume this batch and produce an immutable [`batch::MerkleizedBatch`] with computed root.
80    /// `base` provides committed node data as fallback during hash computation.
81    pub fn merkleize(
82        self,
83        base: &Mem<F, D>,
84        hasher: &impl Hasher<F, Digest = D>,
85    ) -> Arc<batch::MerkleizedBatch<F, D>> {
86        self.inner.merkleize(base, hasher)
87    }
88}
89
90/// Fields of [Journaled] that are protected by an [RwLock] for interior mutability.
91pub(crate) struct Inner<F: Family, D: Digest> {
92    /// A memory resident Merkle structure used to build the structure and cache updates. It caches
93    /// all un-synced nodes, and the pinned node set as derived from both its own pruning boundary
94    /// and the journaled structure's pruning boundary.
95    pub(crate) mem: Mem<F, D>,
96
97    /// The highest position for which this structure has been pruned, or 0 if it has never been
98    /// pruned.
99    pub(crate) pruned_to_pos: Position<F>,
100}
101
102/// Configuration for a journal-backed Merkle structure.
103#[derive(Clone)]
104pub struct Config {
105    /// The name of the `commonware-runtime::Storage` storage partition used for the journal storing
106    /// the nodes.
107    pub journal_partition: String,
108
109    /// The name of the `commonware-runtime::Storage` storage partition used for the metadata
110    /// containing pruned nodes that are still required to calculate the root and generate
111    /// proofs.
112    pub metadata_partition: String,
113
114    /// The maximum number of items to store in each blob in the backing journal.
115    pub items_per_blob: NonZeroU64,
116
117    /// The size of the write buffer to use for each blob in the backing journal.
118    pub write_buffer: NonZeroUsize,
119
120    /// Optional thread pool to use for parallelizing batch operations.
121    pub thread_pool: Option<ThreadPool>,
122
123    /// The page cache to use for caching data.
124    pub page_cache: CacheRef,
125}
126
127/// Configuration for initializing a journaled Merkle structure for synchronization.
128///
129/// Determines how to handle existing persistent data based on sync boundaries:
130/// - **Fresh Start**: Existing data < range start -> discard and start fresh
131/// - **Prune and Reuse**: range contains existing data -> prune and reuse
132/// - **Error**: existing data > range end
133pub struct SyncConfig<F: Family, D: Digest> {
134    /// Base configuration (journal, metadata, etc.)
135    pub config: Config,
136
137    /// Sync range expressed as leaf-aligned bounds.
138    pub range: std::ops::Range<Location<F>>,
139
140    /// The pinned nodes the structure needs at the pruning boundary (range start), in the order
141    /// specified by `Family::nodes_to_pin`. If `None`, the pinned nodes are expected to already be
142    /// in the structure's metadata/journal.
143    pub pinned_nodes: Option<Vec<D>>,
144}
145
146/// A Merkle structure backed by a fixed-item-length journal.
147pub struct Journaled<F: Family, E: RStorage + Clock + Metrics, D: Digest> {
148    /// Lock-protected mutable state.
149    pub(crate) inner: RwLock<Inner<F, D>>,
150
151    /// Stores all unpruned nodes.
152    pub(crate) journal: Journal<E, D>,
153
154    /// Stores all "pinned nodes" (pruned nodes required for proving & root generation), and the
155    /// corresponding pruning boundary used to generate them. The metadata remains empty until
156    /// pruning is invoked, and its contents change only when the pruning boundary moves.
157    pub(crate) metadata: Metadata<E, U64, Vec<u8>>,
158
159    /// Serializes concurrent sync calls.
160    pub(crate) sync_lock: AsyncMutex<()>,
161
162    /// The thread pool to use for parallelization.
163    pub(crate) pool: Option<ThreadPool>,
164}
165
166/// Prefix used for nodes in the metadata prefixed U8 key.
167const NODE_PREFIX: u8 = 0;
168
169/// Prefix used for the key storing the pruning boundary (as a leaf index) in the metadata.
170pub(crate) const PRUNED_TO_PREFIX: u8 = 1;
171
172impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
173    /// Return the total number of nodes in the structure, irrespective of any pruning. The next
174    /// added element's position will have this value.
175    pub fn size(&self) -> Position<F> {
176        self.inner.read().mem.size()
177    }
178
179    /// Return the total number of leaves in the structure.
180    pub fn leaves(&self) -> Location<F> {
181        self.inner.read().mem.leaves()
182    }
183
184    /// Attempt to get a node from the metadata, with fallback to journal lookup if it fails.
185    /// Assumes the node should exist in at least one of these sources and returns a `MissingNode`
186    /// error otherwise.
187    async fn get_from_metadata_or_journal(
188        metadata: &Metadata<E, U64, Vec<u8>>,
189        journal: &Journal<E, D>,
190        pos: Position<F>,
191    ) -> Result<D, Error<F>> {
192        if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
193            debug!(?pos, "read node from metadata");
194            let digest = D::decode(bytes.as_ref());
195            let Ok(digest) = digest else {
196                error!(
197                    ?pos,
198                    err = %digest.expect_err("digest is Err in else branch"),
199                    "could not convert node from metadata bytes to digest"
200                );
201                return Err(Error::DataCorrupted(
202                    "could not read digest at requested pos",
203                ));
204            };
205            return Ok(digest);
206        }
207
208        // If a node isn't found in the metadata, it might still be in the journal.
209        debug!(?pos, "reading node from journal");
210        let node = journal.reader().await.read(*pos).await;
211        match node {
212            Ok(node) => Ok(node),
213            Err(JError::ItemPruned(_)) => {
214                error!(?pos, "node is missing from metadata and journal");
215                Err(Error::MissingNode(pos))
216            }
217            Err(e) => Err(Error::Journal(e)),
218        }
219    }
220
221    /// Returns [start, end) where `start` is the oldest retained leaf and `end` is the total leaf
222    /// count.
223    pub fn bounds(&self) -> std::ops::Range<Location<F>> {
224        let inner = self.inner.read();
225        Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos")..inner.mem.leaves()
226    }
227
228    /// Adds the pinned nodes based on `prune_pos` to `mem`.
229    async fn add_extra_pinned_nodes(
230        mem: &mut Mem<F, D>,
231        metadata: &Metadata<E, U64, Vec<u8>>,
232        journal: &Journal<E, D>,
233        prune_pos: Position<F>,
234    ) -> Result<(), Error<F>> {
235        let prune_loc = Location::try_from(prune_pos).expect("valid prune_pos");
236        let mut pinned_nodes = BTreeMap::new();
237        for pos in F::nodes_to_pin(prune_loc) {
238            let digest = Self::get_from_metadata_or_journal(metadata, journal, pos).await?;
239            pinned_nodes.insert(pos, digest);
240        }
241        mem.add_pinned_nodes(pinned_nodes);
242
243        Ok(())
244    }
245
246    /// Initialize a new `Journaled` instance.
247    pub async fn init(
248        context: E,
249        hasher: &impl Hasher<F, Digest = D>,
250        cfg: Config,
251    ) -> Result<Self, Error<F>> {
252        let journal_cfg = JConfig {
253            partition: cfg.journal_partition,
254            items_per_blob: cfg.items_per_blob,
255            page_cache: cfg.page_cache,
256            write_buffer: cfg.write_buffer,
257        };
258        let journal =
259            Journal::<E, D>::init(context.with_label("merkle_journal"), journal_cfg).await?;
260        let mut journal_size = Position::<F>::new(journal.size().await);
261
262        let metadata_cfg = MConfig {
263            partition: cfg.metadata_partition,
264            codec_config: ((0..).into(), ()),
265        };
266        let metadata =
267            Metadata::<_, U64, Vec<u8>>::init(context.with_label("merkle_metadata"), metadata_cfg)
268                .await?;
269
270        if journal_size == 0 {
271            let mem = Mem::init(
272                MemConfig {
273                    nodes: vec![],
274                    pruning_boundary: Location::new(0),
275                    pinned_nodes: vec![],
276                },
277                hasher,
278            )?;
279            return Ok(Self {
280                inner: RwLock::new(Inner {
281                    mem,
282                    pruned_to_pos: Position::new(0),
283                }),
284                journal,
285                metadata,
286                sync_lock: AsyncMutex::new(()),
287                pool: cfg.thread_pool,
288            });
289        }
290
291        // Make sure the journal's oldest retained node is as expected based on the last pruning
292        // boundary stored in metadata. If they don't match, prune the journal to the appropriate
293        // location.
294        let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
295        let metadata_pruned_to = Location::<F>::new(metadata.get(&key).map_or(0, |bytes| {
296            u64::from_be_bytes(
297                bytes
298                    .as_slice()
299                    .try_into()
300                    .expect("metadata pruned_to is not 8 bytes"),
301            )
302        }));
303        let metadata_prune_pos = Position::try_from(metadata_pruned_to)?;
304        let journal_bounds_start = journal.reader().await.bounds().start;
305        if *metadata_prune_pos > journal_bounds_start {
306            // Metadata is ahead of journal (crashed before completing journal prune).
307            // Prune the journal to match metadata.
308            journal.prune(*metadata_prune_pos).await?;
309            if journal.reader().await.bounds().start != journal_bounds_start {
310                // This should only happen in the event of some failure during the last attempt to
311                // prune the journal.
312                warn!(
313                    journal_bounds_start,
314                    ?metadata_prune_pos,
315                    "journal pruned to match metadata"
316                );
317            }
318        } else if *metadata_prune_pos < journal_bounds_start {
319            // Metadata is stale (e.g., missing/corrupted while journal has valid state).
320            // Use the journal's state as authoritative.
321            warn!(
322                ?metadata_prune_pos,
323                journal_bounds_start, "metadata stale, using journal pruning boundary"
324            );
325        }
326
327        // Use the more restrictive (higher) pruning boundary between metadata and journal.
328        // This handles both cases: metadata ahead (crash during prune) and metadata stale.
329        //
330        // The journal boundary may not be leaf-aligned (it's blob-aligned), so round up to the
331        // position of the first leaf after the boundary.
332        let journal_boundary_pos = Position::<F>::new(journal_bounds_start);
333        let journal_boundary_floor = F::to_nearest_size(journal_boundary_pos);
334        let journal_boundary_leaf_aligned_pos = if journal_boundary_floor == journal_boundary_pos {
335            // `to_nearest_size` rounds down, so equality means the boundary is already
336            // leaf-aligned.
337            journal_boundary_floor
338        } else {
339            // If flooring backed up over the boundary, round up to the next leaf position, which
340            // is guaranteed to be above it.
341            Position::try_from(Location::try_from(journal_boundary_floor)? + 1)?
342        };
343        let effective_prune_pos =
344            std::cmp::max(metadata_prune_pos, journal_boundary_leaf_aligned_pos);
345
346        let last_valid_size = F::to_nearest_size(journal_size);
347        let mut orphaned_leaf: Option<D> = None;
348        if last_valid_size != journal_size {
349            warn!(
350                ?last_valid_size,
351                "encountered invalid structure, recovering from last valid size"
352            );
353            // Check if there is an intact leaf following the last valid size, from which we can
354            // recover its missing parents.
355            let recovered_item = journal.reader().await.read(*last_valid_size).await;
356            if let Ok(item) = recovered_item {
357                orphaned_leaf = Some(item);
358            }
359            journal.rewind(*last_valid_size).await?;
360            journal.sync().await?;
361            journal_size = last_valid_size
362        }
363
364        // Initialize the mem in the "prune_all" state.
365        let journal_leaves = Location::try_from(journal_size)?;
366        let mut pinned_nodes = Vec::new();
367        for pos in F::nodes_to_pin(journal_leaves) {
368            let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
369            pinned_nodes.push(digest);
370        }
371        let mut mem = Mem::init(
372            MemConfig {
373                nodes: vec![],
374                pruning_boundary: journal_leaves,
375                pinned_nodes,
376            },
377            hasher,
378        )?;
379        Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, effective_prune_pos).await?;
380
381        if let Some(leaf) = orphaned_leaf {
382            // Recover the orphaned leaf and any missing parents.
383            let pos = mem.size();
384            warn!(?pos, "recovering orphaned leaf");
385            let batch = mem
386                .new_batch()
387                .add_leaf_digest(leaf)
388                .merkleize(&mem, hasher);
389            mem.apply_batch(&batch)?;
390            assert_eq!(pos, journal_size);
391
392            // Inline sync: flush recovered nodes to journal.
393            for p in journal.size().await..*mem.size() {
394                let p = Position::new(p);
395                let node = *mem.get_node_unchecked(p);
396                journal.append(&node).await?;
397            }
398            journal.sync().await?;
399            assert_eq!(mem.size(), journal.size().await);
400
401            // Prune mem and reinstate pinned nodes.
402            let effective_prune_loc =
403                Location::try_from(effective_prune_pos).expect("valid effective_prune_pos");
404            let mut pn = BTreeMap::new();
405            for p in F::nodes_to_pin(effective_prune_loc) {
406                let d = mem.get_node_unchecked(p);
407                pn.insert(p, *d);
408            }
409            mem.prune_all();
410            mem.add_pinned_nodes(pn);
411        }
412
413        Ok(Self {
414            inner: RwLock::new(Inner {
415                mem,
416                pruned_to_pos: effective_prune_pos,
417            }),
418            journal,
419            metadata,
420            sync_lock: AsyncMutex::new(()),
421            pool: cfg.thread_pool,
422        })
423    }
424
425    /// Initialize a structure for synchronization, reusing existing data if possible.
426    ///
427    /// Handles sync scenarios based on existing journal data vs. the given sync range:
428    ///
429    /// 1. **Fresh Start**: existing_size <= range.start
430    ///    - Deletes existing data (if any)
431    ///    - Creates new [Journal] with pruning boundary and size at `range.start`
432    ///
433    /// 2. **Reuse**: range.start < existing_size <= range.end
434    ///    - Keeps existing journal data
435    ///    - Prunes the journal toward `range.start` (section-aligned)
436    ///
437    /// 3. **Error**: existing_size > range.end
438    ///    - Returns [crate::journal::Error::ItemOutOfRange]
439    pub async fn init_sync(
440        context: E,
441        cfg: SyncConfig<F, D>,
442        hasher: &impl Hasher<F, Digest = D>,
443    ) -> Result<Self, Error<F>> {
444        let prune_pos = Position::try_from(cfg.range.start)?;
445        let end_pos = Position::try_from(cfg.range.end)?;
446        let journal_cfg = JConfig {
447            partition: cfg.config.journal_partition.clone(),
448            items_per_blob: cfg.config.items_per_blob,
449            write_buffer: cfg.config.write_buffer,
450            page_cache: cfg.config.page_cache.clone(),
451        };
452
453        // Open the journal, performing a rewind if necessary for crash recovery.
454        let journal: Journal<E, D> =
455            Journal::init(context.with_label("merkle_journal"), journal_cfg).await?;
456        let mut journal_size = Position::<F>::new(journal.size().await);
457
458        // If a crash left the journal at an invalid size (e.g., a leaf was written
459        // but its parent nodes were not), rewind to the last valid size.
460        let last_valid_size = F::to_nearest_size(journal_size);
461        if last_valid_size != journal_size {
462            warn!(
463                ?last_valid_size,
464                "init_sync: encountered invalid structure, recovering from last valid size"
465            );
466            journal.rewind(*last_valid_size).await?;
467            journal.sync().await?;
468            journal_size = last_valid_size;
469        }
470
471        // Handle existing data vs sync range.
472        assert!(!cfg.range.is_empty(), "range must not be empty");
473        if journal_size > *end_pos {
474            return Err(crate::journal::Error::ItemOutOfRange(*journal_size).into());
475        }
476        if journal_size <= *prune_pos && *prune_pos != 0 {
477            journal.clear_to_size(*prune_pos).await?;
478            journal_size = Position::new(journal.size().await);
479        }
480
481        // Open the metadata.
482        let metadata_cfg = MConfig {
483            partition: cfg.config.metadata_partition,
484            codec_config: ((0..).into(), ()),
485        };
486        let mut metadata =
487            Metadata::init(context.with_label("merkle_metadata"), metadata_cfg).await?;
488
489        // Write the pruning boundary.
490        let pruning_boundary_key = U64::new(PRUNED_TO_PREFIX, 0);
491        metadata.put(
492            pruning_boundary_key,
493            cfg.range.start.as_u64().to_be_bytes().into(),
494        );
495
496        // Write the required pinned nodes to metadata.
497        // The set of pinned nodes depends only on the prune boundary, not on the total
498        // structure size, so we validate against `nodes_to_pin(prune_loc)` alone.
499        let prune_loc = Location::try_from(prune_pos)?;
500        let journal_leaves = Location::try_from(journal_size)?;
501        if let Some(pinned_nodes) = cfg.pinned_nodes {
502            // Use caller-provided pinned nodes.
503            let nodes_to_pin_persisted: Vec<_> = F::nodes_to_pin(prune_loc).collect();
504            if pinned_nodes.len() != nodes_to_pin_persisted.len() {
505                return Err(Error::<F>::InvalidPinnedNodes);
506            }
507            for (pos, digest) in nodes_to_pin_persisted.into_iter().zip(pinned_nodes.iter()) {
508                metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
509            }
510        }
511
512        // Create the in-memory structure with the pinned nodes required for its size. This must be
513        // performed *before* pruning the journal to range.start to ensure all pinned nodes are
514        // present.
515        let nodes_to_pin_mem = F::nodes_to_pin(journal_leaves);
516        let mut mem_pinned_nodes = Vec::new();
517        for pos in nodes_to_pin_mem {
518            let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
519            mem_pinned_nodes.push(digest);
520        }
521        let mut mem = Mem::init(
522            MemConfig {
523                nodes: vec![],
524                pruning_boundary: Location::try_from(journal_size)?,
525                pinned_nodes: mem_pinned_nodes,
526            },
527            hasher,
528        )?;
529
530        // Add the additional pinned nodes required for the pruning boundary, if applicable.
531        // This must also be done before pruning.
532        if prune_pos < journal_size {
533            Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, prune_pos).await?;
534        }
535
536        // Sync metadata before pruning so pinned nodes are persisted for crash recovery.
537        metadata.sync().await?;
538
539        // Prune the journal to range.start.
540        journal.prune(*prune_pos).await?;
541
542        Ok(Self {
543            inner: RwLock::new(Inner {
544                mem,
545                pruned_to_pos: prune_pos,
546            }),
547            journal,
548            metadata,
549            sync_lock: AsyncMutex::new(()),
550            pool: cfg.config.thread_pool,
551        })
552    }
553
554    /// Compute and add required nodes for the given pruning point to the metadata, and write it to
555    /// disk. Return the computed set of required nodes.
556    async fn update_metadata(
557        &mut self,
558        prune_to_pos: Position<F>,
559    ) -> Result<BTreeMap<Position<F>, D>, Error<F>> {
560        assert!(prune_to_pos >= self.inner.get_mut().pruned_to_pos);
561
562        let prune_loc = Location::try_from(prune_to_pos).expect("valid prune_to_pos");
563        let mut pinned_nodes = BTreeMap::new();
564        for pos in F::nodes_to_pin(prune_loc) {
565            let digest = self.get_node(pos).await?.expect(
566                "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
567            );
568            self.metadata
569                .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
570            pinned_nodes.insert(pos, digest);
571        }
572
573        let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
574        self.metadata.put(
575            key,
576            Location::try_from(prune_to_pos)?
577                .as_u64()
578                .to_be_bytes()
579                .into(),
580        );
581
582        self.metadata.sync().await.map_err(Error::Metadata)?;
583
584        Ok(pinned_nodes)
585    }
586
587    pub async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
588        {
589            let inner = self.inner.read();
590            if let Some(node) = inner.mem.get_node(position) {
591                return Ok(Some(node));
592            }
593        }
594
595        match self.journal.reader().await.read(*position).await {
596            Ok(item) => Ok(Some(item)),
597            Err(JError::ItemPruned(_)) => Ok(None),
598            Err(e) => Err(Error::Journal(e)),
599        }
600    }
601
602    /// Sync the structure to disk.
603    pub async fn sync(&self) -> Result<(), Error<F>> {
604        let _sync_guard = self.sync_lock.lock().await;
605
606        let journal_size = Position::<F>::new(self.journal.size().await);
607
608        // Snapshot nodes in the mem that are missing from the journal, along with the pinned
609        // node set for the current pruning boundary.
610        let (sync_target_leaves, missing_nodes, pinned_nodes) = {
611            let inner = self.inner.read();
612            let size = inner.mem.size();
613            let sync_target_leaves = inner.mem.leaves();
614
615            assert!(
616                journal_size <= size,
617                "journal size should never exceed in-memory structure size"
618            );
619            if journal_size == size {
620                return Ok(());
621            }
622
623            let mut missing_nodes = Vec::with_capacity((*size - *journal_size) as usize);
624            for pos in *journal_size..*size {
625                let node = *inner.mem.get_node_unchecked(Position::new(pos));
626                missing_nodes.push(node);
627            }
628
629            // Recompute pinned nodes since we'll need to repopulate the cache after it is cleared
630            // by pruning the mem.
631            let prune_loc = Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos");
632            let mut pinned_nodes = BTreeMap::new();
633            for pos in F::nodes_to_pin(prune_loc) {
634                let digest = inner.mem.get_node_unchecked(pos);
635                pinned_nodes.insert(pos, *digest);
636            }
637
638            (sync_target_leaves, missing_nodes, pinned_nodes)
639        };
640
641        // Append missing nodes to the journal without holding the mem read lock.
642        self.journal.append_many(Many::Flat(&missing_nodes)).await?;
643
644        // Sync the journal while still holding the sync_lock to ensure durability before returning.
645        self.journal.sync().await?;
646
647        // Now that the missing nodes are in the journal, it's safe to prune them from the
648        // mem. We prune to the previously captured leaf count to avoid a race with concurrent
649        // appends between the read lock above and this write lock.
650        {
651            let mut inner = self.inner.write();
652            inner
653                .mem
654                .prune(sync_target_leaves)
655                .expect("captured leaves is in bounds");
656            inner.mem.add_pinned_nodes(pinned_nodes);
657        }
658
659        Ok(())
660    }
661
662    /// Prune all nodes up to but not including the given leaf location and update the pinned nodes.
663    ///
664    /// This implementation ensures that no failure can leave the structure in an unrecoverable
665    /// state, requiring it sync the structure to write any potential unsynced updates.
666    ///
667    /// Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
668    /// Returns [Error::LeafOutOfBounds] if `loc` exceeds the current leaf count.
669    pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
670        let pos = Position::try_from(loc)?;
671        {
672            let inner = self.inner.get_mut();
673            if loc > inner.mem.leaves() {
674                return Err(Error::LeafOutOfBounds(loc));
675            }
676            if pos <= inner.pruned_to_pos {
677                return Ok(());
678            }
679        }
680
681        // Flush items cached in the mem to disk to ensure the current state is recoverable.
682        self.sync().await?;
683
684        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
685        // event of a pruning failure.
686        let pinned_nodes = self.update_metadata(pos).await?;
687
688        self.journal.prune(*pos).await?;
689        let inner = self.inner.get_mut();
690        inner.mem.add_pinned_nodes(pinned_nodes);
691        inner.pruned_to_pos = pos;
692
693        Ok(())
694    }
695
696    /// Return the root of the structure.
697    pub fn root(&self) -> D {
698        *self.inner.read().mem.root()
699    }
700
701    /// Prune as many nodes as possible, leaving behind at most items_per_blob nodes in the current
702    /// blob.
703    pub async fn prune_all(&mut self) -> Result<(), Error<F>> {
704        let leaves = self.inner.get_mut().mem.leaves();
705        if leaves != 0 {
706            self.prune(leaves).await?;
707        }
708        Ok(())
709    }
710
711    /// Close and permanently remove any disk resources.
712    pub async fn destroy(self) -> Result<(), Error<F>> {
713        self.journal.destroy().await?;
714        self.metadata.destroy().await?;
715
716        Ok(())
717    }
718
719    #[cfg(any(test, feature = "fuzzing"))]
720    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
721    /// a partial write for testing failure scenarios.
722    pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error<F>> {
723        if write_limit == 0 {
724            return Ok(());
725        }
726
727        let inner = self.inner.get_mut();
728        let journal_size = Position::<F>::new(self.journal.size().await);
729
730        // Write the nodes cached in the memory-resident structure to the journal, aborting after
731        // write_count nodes have been written.
732        let mut written_count = 0usize;
733        for i in *journal_size..*inner.mem.size() {
734            let node = *inner.mem.get_node_unchecked(Position::new(i));
735            self.journal.append(&node).await?;
736            written_count += 1;
737            if written_count >= write_limit {
738                break;
739            }
740        }
741        self.journal.sync().await?;
742
743        Ok(())
744    }
745
746    #[cfg(test)]
747    pub fn get_pinned_nodes(&self) -> BTreeMap<Position<F>, D> {
748        self.inner.read().mem.pinned_nodes()
749    }
750
751    #[cfg(test)]
752    pub async fn simulate_pruning_failure(mut self, prune_to: Location<F>) -> Result<(), Error<F>> {
753        let prune_to_pos = Position::try_from(prune_to)?;
754        assert!(prune_to_pos <= self.inner.get_mut().mem.size());
755
756        // Flush items cached in the mem to disk to ensure the current state is recoverable.
757        self.sync().await?;
758
759        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
760        // event of a pruning failure.
761        self.update_metadata(prune_to_pos).await?;
762
763        // Don't actually prune the journal to simulate failure
764        Ok(())
765    }
766
767    /// Apply a merkleized batch to the structure.
768    ///
769    /// A batch is valid if the structure has not been modified since the batch
770    /// chain was created, or if only ancestors of this batch have been applied.
771    /// Already-committed ancestors are skipped automatically.
772    /// Applying a batch from a different fork returns [`Error::StaleBatch`].
773    pub fn apply_batch(&mut self, batch: &batch::MerkleizedBatch<F, D>) -> Result<(), Error<F>> {
774        self.inner.get_mut().mem.apply_batch(batch)?;
775        Ok(())
776    }
777
778    /// Create an owned [`batch::MerkleizedBatch`] representing the current committed state.
779    ///
780    /// The batch has no data (the committed items are on disk, not in memory).
781    /// This is the starting point for building owned batch chains.
782    pub(crate) fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, D>> {
783        let inner = self.inner.read();
784        let mut batch = batch::MerkleizedBatch::from_mem(&inner.mem);
785        #[cfg(feature = "std")]
786        if let Some(pool) = &self.pool {
787            Arc::get_mut(&mut batch).expect("just created").pool = Some(pool.clone());
788        }
789        batch
790    }
791
792    /// Borrow the committed Mem through the read lock. Holds the lock for
793    /// the duration of the closure.
794    pub fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, D>) -> R) -> R {
795        let inner = self.inner.read();
796        f(&inner.mem)
797    }
798
799    /// Create a new speculative batch with this structure as its parent.
800    pub fn new_batch(&self) -> UnmerkleizedBatch<F, D> {
801        let inner = self.inner.read();
802        let root = batch::MerkleizedBatch::from_mem(&inner.mem);
803        drop(inner);
804        UnmerkleizedBatch {
805            inner: root.new_batch(),
806        }
807        .with_pool(self.pool())
808    }
809
810    /// Return the thread pool, if any.
811    pub fn pool(&self) -> Option<ThreadPool> {
812        self.pool.clone()
813    }
814
815    /// Rewind the structure by the given number of leaves.
816    ///
817    /// Adds go through the batch API ([`Self::new_batch`] / [`Self::apply_batch`]), but removing
818    /// leaves requires `rewind`. After `init` or `sync`, the in-memory structure is pruned to
819    /// O(log n) pinned peaks. A batch pop would expose new peaks that are not in memory, and
820    /// `merkleize` cannot load them because [`Readable::get_node`] is synchronous. `rewind`
821    /// performs async journal I/O to rebuild state at the target position.
822    pub(crate) async fn rewind(
823        &mut self,
824        leaves_to_remove: usize,
825        hasher: &impl Hasher<F, Digest = D>,
826    ) -> Result<(), Error<F>> {
827        if leaves_to_remove == 0 {
828            return Ok(());
829        }
830
831        let current_leaves = *self.leaves();
832        let destination_leaf = match current_leaves.checked_sub(leaves_to_remove as u64) {
833            Some(dest) => dest,
834            None => {
835                let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
836                return Err(if pruned_to_pos == 0 {
837                    Error::Empty
838                } else {
839                    Error::ElementPruned(pruned_to_pos - 1)
840                });
841            }
842        };
843
844        let destination_loc = Location::new(destination_leaf);
845        let new_size = Position::try_from(destination_loc).expect("valid leaf");
846
847        let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
848        if new_size < pruned_to_pos {
849            return Err(Error::ElementPruned(new_size));
850        }
851
852        // Rewind the journal if needed.
853        let journal_size = Position::<F>::new(self.journal.size().await);
854        if new_size < journal_size {
855            self.journal.rewind(*new_size).await?;
856            self.journal.sync().await?;
857        }
858
859        // Truncate the in-memory structure to the target size and recompute the root.
860        // If the in-memory structure has been pruned past the target (e.g. after sync),
861        // rebuild from the journal/metadata instead.
862        let inner = self.inner.get_mut();
863        if new_size >= Position::try_from(inner.mem.bounds().start).expect("valid mem bounds start")
864        {
865            inner.mem.truncate(new_size, hasher);
866        } else {
867            let mut pinned_nodes = Vec::new();
868            for pos in F::nodes_to_pin(destination_loc) {
869                pinned_nodes.push(
870                    Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?,
871                );
872            }
873            inner.mem = Mem::from_components(hasher, vec![], destination_loc, pinned_nodes)?;
874            Self::add_extra_pinned_nodes(
875                &mut inner.mem,
876                &self.metadata,
877                &self.journal,
878                inner.pruned_to_pos,
879            )
880            .await?;
881        }
882
883        Ok(())
884    }
885}
886
887/// The [`Readable`] implementation for the journaled structure operates only on the in-memory
888/// portion. After [`Journaled::sync`], nodes that have been flushed to the journal are no longer
889/// accessible through this interface. In particular, [`Readable::get_node`] returns `None` for
890/// flushed positions, and [`Readable::pruning_boundary`] reflects the in-memory boundary (which may
891/// be tighter than the journal's prune boundary reported by [`Journaled::bounds`]). This means
892/// batch operations like `update_leaf` will correctly reject leaves that have been synced out of
893/// memory with [`Error::ElementPruned`].
894impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Readable for Journaled<F, E, D> {
895    type Family = F;
896    type Digest = D;
897    type Error = Error<F>;
898
899    fn size(&self) -> Position<F> {
900        self.size()
901    }
902
903    fn get_node(&self, pos: Position<F>) -> Option<D> {
904        self.inner.read().mem.get_node(pos)
905    }
906
907    fn root(&self) -> D {
908        *self.inner.read().mem.root()
909    }
910
911    fn pruning_boundary(&self) -> Location<F> {
912        self.inner.read().mem.pruning_boundary()
913    }
914
915    fn proof(
916        &self,
917        hasher: &impl Hasher<F, Digest = D>,
918        loc: Location<F>,
919    ) -> Result<Proof<F, D>, Error<F>> {
920        if !loc.is_valid_index() {
921            return Err(Error::LocationOverflow(loc));
922        }
923        crate::merkle::proof::build_range_proof(
924            hasher,
925            self.leaves(),
926            loc..loc + 1,
927            |pos| <Self as Readable>::get_node(self, pos),
928            Error::ElementPruned,
929        )
930        .map_err(|e| match e {
931            Error::RangeOutOfBounds(_) => Error::LeafOutOfBounds(loc),
932            _ => e,
933        })
934    }
935
936    fn range_proof(
937        &self,
938        hasher: &impl Hasher<F, Digest = D>,
939        range: core::ops::Range<Location<F>>,
940    ) -> Result<Proof<F, D>, Error<F>> {
941        crate::merkle::proof::build_range_proof(
942            hasher,
943            self.leaves(),
944            range,
945            |pos| <Self as Readable>::get_node(self, pos),
946            Error::ElementPruned,
947        )
948    }
949}
950
951impl<F: Family, E: RStorage + Clock + Metrics + Sync, D: Digest> crate::merkle::storage::Storage<F>
952    for Journaled<F, E, D>
953{
954    type Digest = D;
955
956    async fn size(&self) -> Position<F> {
957        self.size()
958    }
959
960    async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
961        Self::get_node(self, position).await
962    }
963}
964
965impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
966    /// Return an inclusion proof for the element at the location `loc` against a historical
967    /// state with `leaves` leaves.
968    ///
969    /// # Errors
970    ///
971    /// - Returns [Error::RangeOutOfBounds] if `leaves` is greater than `self.leaves()` or if `loc`
972    ///   is not provable at that historical size.
973    /// - Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
974    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
975    ///   pruned.
976    pub async fn historical_proof(
977        &self,
978        hasher: &impl Hasher<F, Digest = D>,
979        leaves: Location<F>,
980        loc: Location<F>,
981    ) -> Result<Proof<F, D>, Error<F>> {
982        if !loc.is_valid_index() {
983            return Err(Error::LocationOverflow(loc));
984        }
985        // loc is valid so it won't overflow from + 1
986        self.historical_range_proof(hasher, leaves, loc..loc + 1)
987            .await
988    }
989
990    /// Return an inclusion proof for the elements in `range` against a historical state with
991    /// `leaves` leaves.
992    ///
993    /// # Errors
994    ///
995    /// - Returns [Error::RangeOutOfBounds] if `leaves` is greater than `self.leaves()` or if
996    ///   `range` is not provable at that historical size.
997    /// - Returns [Error::LocationOverflow] if any location in `range` exceeds [Family::MAX_LEAVES].
998    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
999    ///   pruned.
1000    /// - Returns [Error::Empty] if the range is empty.
1001    pub async fn historical_range_proof(
1002        &self,
1003        hasher: &impl Hasher<F, Digest = D>,
1004        leaves: Location<F>,
1005        range: core::ops::Range<Location<F>>,
1006    ) -> Result<Proof<F, D>, Error<F>> {
1007        if leaves > self.leaves() {
1008            return Err(Error::RangeOutOfBounds(leaves));
1009        }
1010        crate::merkle::verification::historical_range_proof(hasher, self, leaves, range).await
1011    }
1012
1013    /// Return an inclusion proof for the element at the location `loc` that can be verified against
1014    /// the current root.
1015    ///
1016    /// This async inherent method shadows [`Readable::proof`] and can read from the backing
1017    /// journal for nodes that have been synced out of memory.
1018    ///
1019    /// # Errors
1020    ///
1021    /// - Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
1022    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
1023    ///   pruned.
1024    /// - Returns [Error::Empty] if the range is empty.
1025    pub async fn proof(
1026        &self,
1027        hasher: &impl Hasher<F, Digest = D>,
1028        loc: Location<F>,
1029    ) -> Result<Proof<F, D>, Error<F>> {
1030        if !loc.is_valid_index() {
1031            return Err(Error::LocationOverflow(loc));
1032        }
1033        // loc is valid so it won't overflow from + 1
1034        self.range_proof(hasher, loc..loc + 1).await
1035    }
1036
1037    /// Return an inclusion proof for the elements within the specified location range.
1038    ///
1039    /// This async inherent method shadows [`Readable::range_proof`] and can read from the backing
1040    /// journal for nodes that have been synced out of memory.
1041    ///
1042    /// # Errors
1043    ///
1044    /// - Returns [Error::LocationOverflow] if any location in `range` exceeds [Family::MAX_LEAVES].
1045    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
1046    ///   pruned.
1047    /// - Returns [Error::Empty] if the range is empty.
1048    pub async fn range_proof(
1049        &self,
1050        hasher: &impl Hasher<F, Digest = D>,
1051        range: core::ops::Range<Location<F>>,
1052    ) -> Result<Proof<F, D>, Error<F>> {
1053        self.historical_range_proof(hasher, self.leaves(), range)
1054            .await
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use super::*;
1061    use crate::{
1062        journal::contiguous::fixed::{Config as JConfig, Journal},
1063        merkle::{hasher::Standard, mmb, mmr, Location, LocationRangeExt as _, Position, Proof},
1064        metadata::{Config as MConfig, Metadata},
1065    };
1066    use commonware_cryptography::{
1067        sha256::{self, Digest},
1068        Hasher as _, Sha256,
1069    };
1070    use commonware_macros::test_traced;
1071    use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
1072    use commonware_utils::{sequence::prefixed_u64::U64, NZUsize, NZU16, NZU64};
1073    use std::{
1074        collections::BTreeMap,
1075        num::{NonZeroU16, NonZeroUsize},
1076    };
1077
1078    fn test_digest(v: usize) -> Digest {
1079        Sha256::hash(&v.to_be_bytes())
1080    }
1081
1082    const PAGE_SIZE: NonZeroU16 = NZU16!(111);
1083    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
1084
1085    fn test_config(pooler: &impl BufferPooler) -> Config {
1086        Config {
1087            journal_partition: "journal-partition".into(),
1088            metadata_partition: "metadata-partition".into(),
1089            items_per_blob: NZU64!(7),
1090            write_buffer: NZUsize!(1024),
1091            thread_pool: None,
1092            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1093        }
1094    }
1095
1096    async fn journaled_empty_inner<F: Family>(context: deterministic::Context) {
1097        let hasher: Standard<Sha256> = Standard::new();
1098        let mut mmr = Journaled::<F, _, Digest>::init(
1099            context.with_label("first"),
1100            &hasher,
1101            test_config(&context),
1102        )
1103        .await
1104        .unwrap();
1105        assert_eq!(mmr.size(), 0);
1106        assert!(mmr.get_node(Position::<F>::new(0)).await.is_err());
1107        let bounds = mmr.bounds();
1108        assert!(bounds.is_empty());
1109        assert!(mmr.prune_all().await.is_ok());
1110        assert_eq!(bounds.start, 0);
1111        assert!(mmr.prune(Location::<F>::new(0)).await.is_ok());
1112        assert!(mmr.sync().await.is_ok());
1113        assert!(matches!(mmr.rewind(1, &hasher).await, Err(Error::Empty)));
1114
1115        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1116        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1117        mmr.apply_batch(&batch).unwrap();
1118        assert_eq!(mmr.size(), 1);
1119        mmr.sync().await.unwrap();
1120        assert!(mmr.get_node(Position::<F>::new(0)).await.is_ok());
1121        assert!(mmr.rewind(1, &hasher).await.is_ok());
1122        assert_eq!(mmr.size(), 0);
1123        mmr.sync().await.unwrap();
1124
1125        let mut mmr = Journaled::<F, _, Digest>::init(
1126            context.with_label("second"),
1127            &hasher,
1128            test_config(&context),
1129        )
1130        .await
1131        .unwrap();
1132        assert_eq!(mmr.size(), 0);
1133
1134        let empty_proof = Proof::<F, Digest>::default();
1135        let hasher: Standard<Sha256> = Standard::new();
1136        let root = mmr.root();
1137        assert!(empty_proof.verify_range_inclusion(
1138            &hasher,
1139            &[] as &[Digest],
1140            Location::<F>::new(0),
1141            &root
1142        ));
1143        assert!(empty_proof.verify_multi_inclusion(
1144            &hasher,
1145            &[] as &[(Digest, Location<F>)],
1146            &root
1147        ));
1148
1149        // Confirm empty proof no longer verifies after adding an element.
1150        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1151        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1152        mmr.apply_batch(&batch).unwrap();
1153        let root = mmr.root();
1154        assert!(!empty_proof.verify_range_inclusion(
1155            &hasher,
1156            &[] as &[Digest],
1157            Location::<F>::new(0),
1158            &root
1159        ));
1160        assert!(!empty_proof.verify_multi_inclusion(
1161            &hasher,
1162            &[] as &[(Digest, Location<F>)],
1163            &root
1164        ));
1165
1166        mmr.destroy().await.unwrap();
1167    }
1168
1169    #[test_traced]
1170    fn test_journaled_empty_mmr() {
1171        let executor = deterministic::Runner::default();
1172        executor.start(journaled_empty_inner::<mmr::Family>);
1173    }
1174
1175    #[test_traced]
1176    fn test_journaled_empty_mmb() {
1177        let executor = deterministic::Runner::default();
1178        executor.start(journaled_empty_inner::<mmb::Family>);
1179    }
1180
1181    async fn journaled_prune_out_of_bounds_returns_error_inner<F: Family>(
1182        context: deterministic::Context,
1183    ) {
1184        let hasher = Standard::<Sha256>::new();
1185        let mut mmr = Journaled::<F, _, Digest>::init(
1186            context.with_label("oob_prune"),
1187            &hasher,
1188            test_config(&context),
1189        )
1190        .await
1191        .unwrap();
1192
1193        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1194        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1195        mmr.apply_batch(&batch).unwrap();
1196
1197        assert!(matches!(
1198            mmr.prune(Location::<F>::new(2)).await,
1199            Err(Error::LeafOutOfBounds(loc)) if loc == Location::<F>::new(2)
1200        ));
1201
1202        mmr.destroy().await.unwrap();
1203    }
1204
1205    #[test_traced]
1206    fn test_journaled_prune_out_of_bounds_returns_error_mmr() {
1207        let executor = deterministic::Runner::default();
1208        executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmr::Family>);
1209    }
1210
1211    #[test_traced]
1212    fn test_journaled_prune_out_of_bounds_returns_error_mmb() {
1213        let executor = deterministic::Runner::default();
1214        executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmb::Family>);
1215    }
1216
1217    async fn journaled_rewind_error_leaves_valid_state_inner<F: Family>(
1218        context: deterministic::Context,
1219    ) {
1220        let hasher: Standard<Sha256> = Standard::new();
1221
1222        // Case 1: rewind partially succeeds, then returns ElementPruned.
1223        let element_pruned_context = context.with_label("element_pruned_case");
1224        let mut mmr = Journaled::<F, _, Digest>::init(
1225            element_pruned_context.clone(),
1226            &hasher,
1227            test_config(&element_pruned_context),
1228        )
1229        .await
1230        .unwrap();
1231        let mut batch = mmr.new_batch();
1232        for i in 0u64..32 {
1233            batch = batch.add(&hasher, &i.to_be_bytes());
1234        }
1235        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1236        mmr.apply_batch(&batch).unwrap();
1237        mmr.prune(Location::<F>::new(8)).await.unwrap();
1238        let leaves_before = mmr.leaves();
1239        assert!(matches!(
1240            mmr.rewind(128, &hasher).await,
1241            Err(Error::ElementPruned(_))
1242        ));
1243        // After error, leaves should reflect any partial rewinds that occurred.
1244        assert!(mmr.leaves() <= leaves_before);
1245        mmr.destroy().await.unwrap();
1246
1247        // Case 2: rewind partially succeeds, then returns Empty.
1248        let empty_context = context.with_label("empty_case");
1249        let cfg = test_config(&empty_context);
1250        let mut mmr = Journaled::<F, _, Digest>::init(empty_context, &hasher, cfg)
1251            .await
1252            .unwrap();
1253        let mut batch = mmr.new_batch();
1254        for i in 0u64..8 {
1255            batch = batch.add(&hasher, &i.to_be_bytes());
1256        }
1257        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1258        mmr.apply_batch(&batch).unwrap();
1259        let leaves_before = mmr.leaves();
1260        assert!(matches!(mmr.rewind(9, &hasher).await, Err(Error::Empty)));
1261        // Rewind returns error without partial modification.
1262        assert_eq!(mmr.leaves(), leaves_before);
1263        mmr.destroy().await.unwrap();
1264    }
1265
1266    #[test_traced]
1267    fn test_journaled_rewind_error_leaves_valid_state_mmr() {
1268        let executor = deterministic::Runner::default();
1269        executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmr::Family>);
1270    }
1271
1272    #[test_traced]
1273    fn test_journaled_rewind_error_leaves_valid_state_mmb() {
1274        let executor = deterministic::Runner::default();
1275        executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmb::Family>);
1276    }
1277
1278    async fn journaled_basic_inner<F: Family>(context: deterministic::Context) {
1279        let hasher: Standard<Sha256> = Standard::new();
1280        let cfg = test_config(&context);
1281        let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1282            .await
1283            .unwrap();
1284        // Build a test structure with 255 leaves
1285        const LEAF_COUNT: usize = 255;
1286        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1287        for i in 0..LEAF_COUNT {
1288            leaves.push(test_digest(i));
1289        }
1290        let mut batch = mmr.new_batch();
1291        for leaf in &leaves {
1292            batch = batch.add(&hasher, leaf);
1293        }
1294        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1295        mmr.apply_batch(&batch).unwrap();
1296        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1297        assert_eq!(mmr.size(), expected_size);
1298
1299        // Generate & verify proof from element that is not yet flushed to the journal.
1300        const TEST_ELEMENT: usize = 133;
1301        let test_element_loc: Location<F> = Location::new(TEST_ELEMENT as u64);
1302
1303        let proof = mmr.proof(&hasher, test_element_loc).await.unwrap();
1304        let root = mmr.root();
1305        assert!(proof.verify_element_inclusion(
1306            &hasher,
1307            &leaves[TEST_ELEMENT],
1308            test_element_loc,
1309            &root,
1310        ));
1311
1312        // Sync the structure, make sure it flushes the in-mem structure as expected.
1313        mmr.sync().await.unwrap();
1314
1315        // Now that the element is flushed from the in-mem structure, confirm its proof is still
1316        // generated correctly.
1317        let proof2 = mmr.proof(&hasher, test_element_loc).await.unwrap();
1318        assert_eq!(proof, proof2);
1319
1320        // Generate & verify a proof that spans flushed elements and the last element.
1321        let range = Location::<F>::new(TEST_ELEMENT as u64)..Location::<F>::new(LEAF_COUNT as u64);
1322        let proof = mmr.range_proof(&hasher, range.clone()).await.unwrap();
1323        assert!(proof.verify_range_inclusion(
1324            &hasher,
1325            &leaves[range.to_usize_range()],
1326            test_element_loc,
1327            &root
1328        ));
1329
1330        mmr.destroy().await.unwrap();
1331    }
1332
1333    #[test_traced]
1334    fn test_journaled_basic_mmr() {
1335        let executor = deterministic::Runner::default();
1336        executor.start(journaled_basic_inner::<mmr::Family>);
1337    }
1338
1339    #[test_traced]
1340    fn test_journaled_basic_mmb() {
1341        let executor = deterministic::Runner::default();
1342        executor.start(journaled_basic_inner::<mmb::Family>);
1343    }
1344
1345    /// Generates a stateful structure, simulates a crash that wrote a leaf but not its parent
1346    /// nodes, and confirms we appropriately recover to a valid state.
1347    async fn journaled_recovery_inner<F: Family>(context: deterministic::Context) {
1348        use crate::journal::contiguous::fixed::{Config as JConfig, Journal};
1349
1350        let hasher: Standard<Sha256> = Standard::new();
1351        let mut mmr = Journaled::<F, _, Digest>::init(
1352            context.with_label("first"),
1353            &hasher,
1354            test_config(&context),
1355        )
1356        .await
1357        .unwrap();
1358        assert_eq!(mmr.size(), 0);
1359
1360        // Build a test structure with 252 leaves
1361        const LEAF_COUNT: usize = 252;
1362        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1363        for i in 0..LEAF_COUNT {
1364            leaves.push(test_digest(i));
1365        }
1366        let mut batch = mmr.new_batch();
1367        for leaf in &leaves {
1368            batch = batch.add(&hasher, leaf);
1369        }
1370        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1371        mmr.apply_batch(&batch).unwrap();
1372        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1373        assert_eq!(mmr.size(), expected_size);
1374        mmr.sync().await.unwrap();
1375        drop(mmr);
1376
1377        // Simulate a crash that wrote a leaf but not its parent nodes by appending one
1378        // extra digest to the journal. This creates an invalid structure size.
1379        {
1380            let journal: Journal<_, Digest> = Journal::init(
1381                context.with_label("corrupt"),
1382                JConfig {
1383                    partition: "journal-partition".into(),
1384                    items_per_blob: NZU64!(7),
1385                    write_buffer: NZUsize!(1024),
1386                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1387                },
1388            )
1389            .await
1390            .unwrap();
1391            assert_eq!(journal.size().await, expected_size);
1392            journal.append(&Sha256::hash(b"orphan")).await.unwrap();
1393            journal.sync().await.unwrap();
1394            assert_eq!(journal.size().await, expected_size + 1);
1395        }
1396
1397        let mmr = Journaled::<F, _, Digest>::init(
1398            context.with_label("second"),
1399            &hasher,
1400            test_config(&context),
1401        )
1402        .await
1403        .unwrap();
1404        // Since the orphaned leaf is replayed, the structure recovers to the previous valid state
1405        // plus the new leaf.
1406        let recovered_size =
1407            Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64 + 1)).unwrap();
1408        assert_eq!(mmr.size(), recovered_size);
1409
1410        // Make sure dropping it and re-opening it persists the recovered state.
1411        drop(mmr);
1412        let mmr = Journaled::<F, _, Digest>::init(
1413            context.with_label("third"),
1414            &hasher,
1415            test_config(&context),
1416        )
1417        .await
1418        .unwrap();
1419        assert_eq!(mmr.size(), recovered_size);
1420
1421        mmr.destroy().await.unwrap();
1422    }
1423
1424    #[test_traced]
1425    fn test_journaled_recovery_mmr() {
1426        let executor = deterministic::Runner::default();
1427        executor.start(journaled_recovery_inner::<mmr::Family>);
1428    }
1429
1430    #[test_traced]
1431    fn test_journaled_recovery_mmb() {
1432        let executor = deterministic::Runner::default();
1433        executor.start(journaled_recovery_inner::<mmb::Family>);
1434    }
1435
1436    async fn journaled_pruning_inner<F: Family>(context: deterministic::Context) {
1437        let hasher: Standard<Sha256> = Standard::new();
1438        // make sure pruning doesn't break root computation, adding of new nodes, etc.
1439        const LEAF_COUNT: usize = 2000;
1440        let cfg_pruned = test_config(&context);
1441        let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1442            context.with_label("pruned"),
1443            &hasher,
1444            cfg_pruned.clone(),
1445        )
1446        .await
1447        .unwrap();
1448        let cfg_unpruned = Config {
1449            journal_partition: "unpruned-journal-partition".into(),
1450            metadata_partition: "unpruned-metadata-partition".into(),
1451            items_per_blob: NZU64!(7),
1452            write_buffer: NZUsize!(1024),
1453            thread_pool: None,
1454            page_cache: cfg_pruned.page_cache.clone(),
1455        };
1456        let mut mmr =
1457            Journaled::<F, _, Digest>::init(context.with_label("unpruned"), &hasher, cfg_unpruned)
1458                .await
1459                .unwrap();
1460        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1461        for i in 0..LEAF_COUNT {
1462            leaves.push(test_digest(i));
1463        }
1464        let mut batch = mmr.new_batch();
1465        for leaf in &leaves {
1466            batch = batch.add(&hasher, leaf);
1467        }
1468        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1469        mmr.apply_batch(&batch).unwrap();
1470        let mut batch = pruned_mmr.new_batch();
1471        for leaf in &leaves {
1472            batch = batch.add(&hasher, leaf);
1473        }
1474        let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1475        pruned_mmr.apply_batch(&batch).unwrap();
1476        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1477        assert_eq!(mmr.size(), expected_size);
1478        assert_eq!(pruned_mmr.size(), expected_size);
1479
1480        // Prune the structure in increments of 10 making sure the journal is still able to compute
1481        // roots and accept new elements.
1482        for i in 0usize..300 {
1483            let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 10, *pruned_mmr.leaves()));
1484            pruned_mmr.prune(prune_loc).await.unwrap();
1485            assert_eq!(prune_loc, pruned_mmr.bounds().start);
1486
1487            let digest = test_digest(LEAF_COUNT + i);
1488            leaves.push(digest);
1489            let last_leaf = leaves.last().unwrap();
1490            let batch = pruned_mmr.new_batch().add(&hasher, last_leaf);
1491            let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1492            pruned_mmr.apply_batch(&batch).unwrap();
1493            let batch = mmr.new_batch().add(&hasher, last_leaf);
1494            let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1495            mmr.apply_batch(&batch).unwrap();
1496            assert_eq!(pruned_mmr.root(), mmr.root());
1497        }
1498
1499        // Sync the structures.
1500        pruned_mmr.sync().await.unwrap();
1501        assert_eq!(pruned_mmr.root(), mmr.root());
1502
1503        // Sync the structure & reopen.
1504        pruned_mmr.sync().await.unwrap();
1505        drop(pruned_mmr);
1506        let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1507            context.with_label("pruned_reopen"),
1508            &hasher,
1509            cfg_pruned.clone(),
1510        )
1511        .await
1512        .unwrap();
1513        assert_eq!(pruned_mmr.root(), mmr.root());
1514
1515        // Prune everything.
1516        let size = pruned_mmr.size();
1517        pruned_mmr.prune_all().await.unwrap();
1518        assert_eq!(pruned_mmr.root(), mmr.root());
1519        let bounds = pruned_mmr.bounds();
1520        assert!(bounds.is_empty());
1521        assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1522
1523        // Close structure after adding a new node without syncing and make sure state is as
1524        // expected on reopening.
1525        let batch = mmr.new_batch().add(&hasher, &test_digest(LEAF_COUNT));
1526        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1527        mmr.apply_batch(&batch).unwrap();
1528        let batch = pruned_mmr
1529            .new_batch()
1530            .add(&hasher, &test_digest(LEAF_COUNT));
1531        let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1532        pruned_mmr.apply_batch(&batch).unwrap();
1533        assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1534        pruned_mmr.sync().await.unwrap();
1535        drop(pruned_mmr);
1536        let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1537            context.with_label("pruned_reopen2"),
1538            &hasher,
1539            cfg_pruned.clone(),
1540        )
1541        .await
1542        .unwrap();
1543        assert_eq!(pruned_mmr.root(), mmr.root());
1544        let bounds = pruned_mmr.bounds();
1545        assert!(!bounds.is_empty());
1546        assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1547
1548        // Make sure pruning to older location is a no-op.
1549        assert!(pruned_mmr
1550            .prune(Location::<F>::try_from(size).unwrap() - 1)
1551            .await
1552            .is_ok());
1553        assert_eq!(
1554            pruned_mmr.bounds().start,
1555            Location::<F>::try_from(size).unwrap()
1556        );
1557
1558        // Add nodes until we are on a blob boundary, and confirm prune_all still removes all
1559        // retained nodes.
1560        while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1561            let batch = pruned_mmr
1562                .new_batch()
1563                .add(&hasher, &test_digest(LEAF_COUNT));
1564            let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1565            pruned_mmr.apply_batch(&batch).unwrap();
1566        }
1567        pruned_mmr.prune_all().await.unwrap();
1568        assert!(pruned_mmr.bounds().is_empty());
1569
1570        pruned_mmr.destroy().await.unwrap();
1571        mmr.destroy().await.unwrap();
1572    }
1573
1574    #[test_traced]
1575    fn test_journaled_pruning_mmr() {
1576        let executor = deterministic::Runner::default();
1577        executor.start(journaled_pruning_inner::<mmr::Family>);
1578    }
1579
1580    #[test_traced]
1581    fn test_journaled_pruning_mmb() {
1582        let executor = deterministic::Runner::default();
1583        executor.start(journaled_pruning_inner::<mmb::Family>);
1584    }
1585
1586    /// Simulate partial writes after pruning, making sure we recover to a valid state.
1587    async fn journaled_recovery_with_pruning_inner<F: Family>(context: deterministic::Context) {
1588        // Build structure with 2000 leaves.
1589        let hasher: Standard<Sha256> = Standard::new();
1590        const LEAF_COUNT: usize = 2000;
1591        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1592        let mut mmr = Journaled::<F, _, Digest>::init(
1593            context.with_label("init"),
1594            &hasher,
1595            test_config(&context),
1596        )
1597        .await
1598        .unwrap();
1599        for i in 0..LEAF_COUNT {
1600            leaves.push(test_digest(i));
1601        }
1602        let mut batch = mmr.new_batch();
1603        for leaf in &leaves {
1604            batch = batch.add(&hasher, leaf);
1605        }
1606        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1607        mmr.apply_batch(&batch).unwrap();
1608        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1609        assert_eq!(mmr.size(), expected_size);
1610        mmr.sync().await.unwrap();
1611        drop(mmr);
1612
1613        // Prune the structure in increments of 50, simulating a partial write after each prune.
1614        for i in 0usize..200 {
1615            let label = format!("iter_{i}");
1616            let mut mmr = Journaled::<F, _, Digest>::init(
1617                context.with_label(&label),
1618                &hasher,
1619                test_config(&context),
1620            )
1621            .await
1622            .unwrap();
1623            let start_size = mmr.size();
1624            let start_leaves = *mmr.leaves();
1625            let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 50, start_leaves));
1626            if i % 5 == 0 {
1627                mmr.simulate_pruning_failure(prune_loc).await.unwrap();
1628                continue;
1629            }
1630            mmr.prune(prune_loc).await.unwrap();
1631
1632            // add new elements, simulating a partial write after each.
1633            for j in 0..10 {
1634                let digest = test_digest(100 * (i + 1) + j);
1635                leaves.push(digest);
1636                let batch = mmr
1637                    .new_batch()
1638                    .add(&hasher, leaves.last().unwrap())
1639                    .add(&hasher, leaves.last().unwrap());
1640                let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1641                mmr.apply_batch(&batch).unwrap();
1642                let digest = test_digest(LEAF_COUNT + i);
1643                leaves.push(digest);
1644                let batch = mmr
1645                    .new_batch()
1646                    .add(&hasher, leaves.last().unwrap())
1647                    .add(&hasher, leaves.last().unwrap());
1648                let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1649                mmr.apply_batch(&batch).unwrap();
1650            }
1651            let end_size = mmr.size();
1652            let total_to_write = (*end_size - *start_size) as usize;
1653            let partial_write_limit = i % total_to_write;
1654            mmr.simulate_partial_sync(partial_write_limit)
1655                .await
1656                .unwrap();
1657        }
1658
1659        let mmr = Journaled::<F, _, Digest>::init(
1660            context.with_label("final"),
1661            &hasher,
1662            test_config(&context),
1663        )
1664        .await
1665        .unwrap();
1666        mmr.destroy().await.unwrap();
1667    }
1668
1669    #[test_traced("WARN")]
1670    fn test_journaled_recovery_with_pruning_mmr() {
1671        let executor = deterministic::Runner::default();
1672        executor.start(journaled_recovery_with_pruning_inner::<mmr::Family>);
1673    }
1674
1675    #[test_traced("WARN")]
1676    fn test_journaled_recovery_with_pruning_mmb() {
1677        let executor = deterministic::Runner::default();
1678        executor.start(journaled_recovery_with_pruning_inner::<mmb::Family>);
1679    }
1680
1681    async fn journaled_historical_proof_basic_inner<F: Family>(context: deterministic::Context) {
1682        // Create structure with 10 elements
1683        let hasher = Standard::<Sha256>::new();
1684        let cfg = test_config(&context);
1685        let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1686            .await
1687            .unwrap();
1688        let mut elements = Vec::new();
1689        for i in 0..10 {
1690            elements.push(test_digest(i));
1691        }
1692        let mut batch = mmr.new_batch();
1693        for elt in &elements {
1694            batch = batch.add(&hasher, elt);
1695        }
1696        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1697        mmr.apply_batch(&batch).unwrap();
1698        let original_leaves = mmr.leaves();
1699
1700        // Historical proof should match "regular" proof when historical size == current database size
1701        let historical_proof = mmr
1702            .historical_range_proof(
1703                &hasher,
1704                original_leaves,
1705                Location::<F>::new(2)..Location::<F>::new(6),
1706            )
1707            .await
1708            .unwrap();
1709        assert_eq!(historical_proof.leaves, original_leaves);
1710        let root = mmr.root();
1711        assert!(historical_proof.verify_range_inclusion(
1712            &hasher,
1713            &elements[2..6],
1714            Location::<F>::new(2),
1715            &root
1716        ));
1717        let regular_proof = mmr
1718            .range_proof(&hasher, Location::<F>::new(2)..Location::<F>::new(6))
1719            .await
1720            .unwrap();
1721        assert_eq!(regular_proof.leaves, historical_proof.leaves);
1722        assert_eq!(regular_proof.digests, historical_proof.digests);
1723
1724        // Add more elements to the structure
1725        for i in 10..20 {
1726            elements.push(test_digest(i));
1727        }
1728        let mut batch = mmr.new_batch();
1729        for elt in &elements[10..20] {
1730            batch = batch.add(&hasher, elt);
1731        }
1732        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1733        mmr.apply_batch(&batch).unwrap();
1734        let new_historical_proof = mmr
1735            .historical_range_proof(
1736                &hasher,
1737                original_leaves,
1738                Location::<F>::new(2)..Location::<F>::new(6),
1739            )
1740            .await
1741            .unwrap();
1742        assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1743        assert_eq!(new_historical_proof.digests, historical_proof.digests);
1744
1745        mmr.destroy().await.unwrap();
1746    }
1747
1748    #[test_traced]
1749    fn test_journaled_historical_proof_basic_mmr() {
1750        let executor = deterministic::Runner::default();
1751        executor.start(journaled_historical_proof_basic_inner::<mmr::Family>);
1752    }
1753
1754    #[test_traced]
1755    fn test_journaled_historical_proof_basic_mmb() {
1756        let executor = deterministic::Runner::default();
1757        executor.start(journaled_historical_proof_basic_inner::<mmb::Family>);
1758    }
1759
1760    async fn journaled_historical_proof_with_pruning_inner<F: Family>(
1761        context: deterministic::Context,
1762    ) {
1763        let hasher = Standard::<Sha256>::new();
1764        let mut mmr = Journaled::<F, _, Digest>::init(
1765            context.with_label("main"),
1766            &hasher,
1767            test_config(&context),
1768        )
1769        .await
1770        .unwrap();
1771
1772        // Add many elements
1773        let mut elements = Vec::new();
1774        for i in 0..50 {
1775            elements.push(test_digest(i));
1776        }
1777        let mut batch = mmr.new_batch();
1778        for elt in &elements {
1779            batch = batch.add(&hasher, elt);
1780        }
1781        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1782        mmr.apply_batch(&batch).unwrap();
1783
1784        // Prune to leaf 16 (position 30)
1785        let prune_loc = Location::<F>::new(16);
1786        mmr.prune(prune_loc).await.unwrap();
1787
1788        // Create reference structure for verification to get correct size
1789        let mut ref_mmr = Journaled::<F, _, Digest>::init(
1790            context.with_label("ref"),
1791            &hasher,
1792            Config {
1793                journal_partition: "ref-journal-pruned".into(),
1794                metadata_partition: "ref-metadata-pruned".into(),
1795                items_per_blob: NZU64!(7),
1796                write_buffer: NZUsize!(1024),
1797                thread_pool: None,
1798                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1799            },
1800        )
1801        .await
1802        .unwrap();
1803
1804        let mut batch = ref_mmr.new_batch();
1805        for elt in elements.iter().take(41) {
1806            batch = batch.add(&hasher, elt);
1807        }
1808        let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1809        ref_mmr.apply_batch(&batch).unwrap();
1810        let historical_leaves = ref_mmr.leaves();
1811        let historical_root = ref_mmr.root();
1812
1813        // Test proof at historical position after pruning
1814        let historical_proof = mmr
1815            .historical_range_proof(
1816                &hasher,
1817                historical_leaves,
1818                Location::<F>::new(35)..Location::<F>::new(39),
1819            )
1820            .await
1821            .unwrap();
1822
1823        assert_eq!(historical_proof.leaves, historical_leaves);
1824
1825        // Verify proof works despite pruning
1826        assert!(historical_proof.verify_range_inclusion(
1827            &hasher,
1828            &elements[35..39],
1829            Location::<F>::new(35),
1830            &historical_root
1831        ));
1832
1833        ref_mmr.destroy().await.unwrap();
1834        mmr.destroy().await.unwrap();
1835    }
1836
1837    #[test_traced]
1838    fn test_journaled_historical_proof_with_pruning_mmr() {
1839        let executor = deterministic::Runner::default();
1840        executor.start(journaled_historical_proof_with_pruning_inner::<mmr::Family>);
1841    }
1842
1843    #[test_traced]
1844    fn test_journaled_historical_proof_with_pruning_mmb() {
1845        let executor = deterministic::Runner::default();
1846        executor.start(journaled_historical_proof_with_pruning_inner::<mmb::Family>);
1847    }
1848
1849    async fn journaled_historical_proof_large_inner<F: Family>(context: deterministic::Context) {
1850        let hasher = Standard::<Sha256>::new();
1851
1852        let mut mmr = Journaled::<F, _, Digest>::init(
1853            context.with_label("server"),
1854            &hasher,
1855            Config {
1856                journal_partition: "server-journal".into(),
1857                metadata_partition: "server-metadata".into(),
1858                items_per_blob: NZU64!(7),
1859                write_buffer: NZUsize!(1024),
1860                thread_pool: None,
1861                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1862            },
1863        )
1864        .await
1865        .unwrap();
1866
1867        let mut elements = Vec::new();
1868        for i in 0..100 {
1869            elements.push(test_digest(i));
1870        }
1871        let mut batch = mmr.new_batch();
1872        for elt in &elements {
1873            batch = batch.add(&hasher, elt);
1874        }
1875        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1876        mmr.apply_batch(&batch).unwrap();
1877
1878        let range = Location::<F>::new(30)..Location::<F>::new(61);
1879
1880        // Only apply elements up to end_loc to the reference structure.
1881        let mut ref_mmr = Journaled::<F, _, Digest>::init(
1882            context.with_label("client"),
1883            &hasher,
1884            Config {
1885                journal_partition: "client-journal".into(),
1886                metadata_partition: "client-metadata".into(),
1887                items_per_blob: NZU64!(7),
1888                write_buffer: NZUsize!(1024),
1889                thread_pool: None,
1890                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1891            },
1892        )
1893        .await
1894        .unwrap();
1895
1896        // Add elements up to the end of the range to verify historical root
1897        let mut batch = ref_mmr.new_batch();
1898        for elt in elements.iter().take(*range.end as usize) {
1899            batch = batch.add(&hasher, elt);
1900        }
1901        let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1902        ref_mmr.apply_batch(&batch).unwrap();
1903        let historical_leaves = ref_mmr.leaves();
1904        let expected_root = ref_mmr.root();
1905
1906        // Generate proof from full structure
1907        let proof = mmr
1908            .historical_range_proof(&hasher, historical_leaves, range.clone())
1909            .await
1910            .unwrap();
1911
1912        assert!(proof.verify_range_inclusion(
1913            &hasher,
1914            &elements[range.to_usize_range()],
1915            range.start,
1916            &expected_root // Compare to historical (reference) root
1917        ));
1918
1919        ref_mmr.destroy().await.unwrap();
1920        mmr.destroy().await.unwrap();
1921    }
1922
1923    #[test_traced]
1924    fn test_journaled_historical_proof_large_mmr() {
1925        let executor = deterministic::Runner::default();
1926        executor.start(journaled_historical_proof_large_inner::<mmr::Family>);
1927    }
1928
1929    #[test_traced]
1930    fn test_journaled_historical_proof_large_mmb() {
1931        let executor = deterministic::Runner::default();
1932        executor.start(journaled_historical_proof_large_inner::<mmb::Family>);
1933    }
1934
1935    async fn journaled_historical_proof_singleton_inner<F: Family>(
1936        context: deterministic::Context,
1937    ) {
1938        let hasher = Standard::<Sha256>::new();
1939        let cfg = test_config(&context);
1940        let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1941            .await
1942            .unwrap();
1943
1944        let element = test_digest(0);
1945        let batch = mmr.new_batch().add(&hasher, &element);
1946        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1947        mmr.apply_batch(&batch).unwrap();
1948
1949        // Test single element proof at historical position
1950        let single_proof = mmr
1951            .historical_range_proof(
1952                &hasher,
1953                Location::<F>::new(1),
1954                Location::<F>::new(0)..Location::<F>::new(1),
1955            )
1956            .await
1957            .unwrap();
1958
1959        let root = mmr.root();
1960        assert!(single_proof.verify_range_inclusion(
1961            &hasher,
1962            &[element],
1963            Location::<F>::new(0),
1964            &root
1965        ));
1966
1967        mmr.destroy().await.unwrap();
1968    }
1969
1970    #[test_traced]
1971    fn test_journaled_historical_proof_singleton_mmr() {
1972        let executor = deterministic::Runner::default();
1973        executor.start(journaled_historical_proof_singleton_inner::<mmr::Family>);
1974    }
1975
1976    #[test_traced]
1977    fn test_journaled_historical_proof_singleton_mmb() {
1978        let executor = deterministic::Runner::default();
1979        executor.start(journaled_historical_proof_singleton_inner::<mmb::Family>);
1980    }
1981
1982    // Test `init_sync` when there is no persisted data.
1983    async fn journaled_init_sync_empty_inner<F: Family>(context: deterministic::Context) {
1984        let hasher = Standard::<Sha256>::new();
1985
1986        // Test fresh start scenario with completely new structure (no existing data)
1987        let sync_cfg = SyncConfig::<F, sha256::Digest> {
1988            config: test_config(&context),
1989            range: Location::<F>::new(0)..Location::<F>::new(52),
1990            pinned_nodes: None,
1991        };
1992
1993        let mut sync_mmr = Journaled::<F, _, Digest>::init_sync(context.clone(), sync_cfg, &hasher)
1994            .await
1995            .unwrap();
1996
1997        // Should be fresh structure starting empty
1998        assert_eq!(sync_mmr.size(), 0);
1999        let bounds = sync_mmr.bounds();
2000        assert_eq!(bounds.start, 0);
2001        assert!(bounds.is_empty());
2002
2003        // Should be able to add new elements
2004        let new_element = test_digest(999);
2005        let batch = sync_mmr.new_batch().add(&hasher, &new_element);
2006        let batch = sync_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2007        sync_mmr.apply_batch(&batch).unwrap();
2008
2009        // Root should be computable
2010        let _root = sync_mmr.root();
2011
2012        sync_mmr.destroy().await.unwrap();
2013    }
2014
2015    #[test_traced]
2016    fn test_journaled_init_sync_empty_mmr() {
2017        let executor = deterministic::Runner::default();
2018        executor.start(journaled_init_sync_empty_inner::<mmr::Family>);
2019    }
2020
2021    #[test_traced]
2022    fn test_journaled_init_sync_empty_mmb() {
2023        let executor = deterministic::Runner::default();
2024        executor.start(journaled_init_sync_empty_inner::<mmb::Family>);
2025    }
2026
2027    // Test `init_sync` where the persisted structure's persisted nodes match the sync boundaries.
2028    async fn journaled_init_sync_nonempty_exact_match_inner<F: Family>(
2029        context: deterministic::Context,
2030    ) {
2031        let hasher = Standard::<Sha256>::new();
2032
2033        // Create initial structure with elements.
2034        let mut mmr = Journaled::<F, _, Digest>::init(
2035            context.with_label("init"),
2036            &hasher,
2037            test_config(&context),
2038        )
2039        .await
2040        .unwrap();
2041        let mut batch = mmr.new_batch();
2042        for i in 0..50 {
2043            batch = batch.add(&hasher, &test_digest(i));
2044        }
2045        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2046        mmr.apply_batch(&batch).unwrap();
2047        mmr.sync().await.unwrap();
2048        let original_size = mmr.size();
2049        let original_leaves = mmr.leaves();
2050        let original_root = mmr.root();
2051
2052        // Sync with range.start <= existing_size <= range.end should reuse data
2053        let lower_bound_loc = mmr.bounds().start;
2054        let upper_bound_loc = mmr.leaves();
2055        let lower_bound_pos = Position::<F>::try_from(lower_bound_loc).unwrap();
2056        let upper_bound_pos = mmr.size();
2057        let mut expected_nodes = BTreeMap::new();
2058        for i in *lower_bound_pos..*upper_bound_pos {
2059            expected_nodes.insert(
2060                Position::<F>::new(i),
2061                mmr.get_node(Position::<F>::new(i)).await.unwrap().unwrap(),
2062            );
2063        }
2064        let sync_cfg = SyncConfig::<F, sha256::Digest> {
2065            config: test_config(&context),
2066            range: lower_bound_loc..upper_bound_loc,
2067            pinned_nodes: None,
2068        };
2069
2070        mmr.sync().await.unwrap();
2071        drop(mmr);
2072
2073        let sync_mmr =
2074            Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2075                .await
2076                .unwrap();
2077
2078        // Should have existing data in the sync range.
2079        assert_eq!(sync_mmr.size(), original_size);
2080        assert_eq!(sync_mmr.leaves(), original_leaves);
2081        let bounds = sync_mmr.bounds();
2082        assert_eq!(bounds.start, lower_bound_loc);
2083        assert!(!bounds.is_empty());
2084        assert_eq!(sync_mmr.root(), original_root);
2085        for pos in *lower_bound_pos..*upper_bound_pos {
2086            let pos = Position::<F>::new(pos);
2087            assert_eq!(
2088                sync_mmr.get_node(pos).await.unwrap(),
2089                expected_nodes.get(&pos).cloned()
2090            );
2091        }
2092
2093        sync_mmr.destroy().await.unwrap();
2094    }
2095
2096    #[test_traced]
2097    fn test_journaled_init_sync_nonempty_exact_match_mmr() {
2098        let executor = deterministic::Runner::default();
2099        executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmr::Family>);
2100    }
2101
2102    #[test_traced]
2103    fn test_journaled_init_sync_nonempty_exact_match_mmb() {
2104        let executor = deterministic::Runner::default();
2105        executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmb::Family>);
2106    }
2107
2108    // Test `init_sync` where the persisted structure's data partially overlaps with the sync
2109    // boundaries.
2110    async fn journaled_init_sync_partial_overlap_inner<F: Family>(context: deterministic::Context) {
2111        let hasher = Standard::<Sha256>::new();
2112
2113        // Create initial structure with elements.
2114        let mut mmr = Journaled::<F, _, Digest>::init(
2115            context.with_label("init"),
2116            &hasher,
2117            test_config(&context),
2118        )
2119        .await
2120        .unwrap();
2121        let mut batch = mmr.new_batch();
2122        for i in 0..30 {
2123            batch = batch.add(&hasher, &test_digest(i));
2124        }
2125        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2126        mmr.apply_batch(&batch).unwrap();
2127        mmr.sync().await.unwrap();
2128        mmr.prune(Location::<F>::new(6)).await.unwrap();
2129
2130        let original_size = mmr.size();
2131        let original_leaves = mmr.leaves();
2132        let original_root = mmr.root();
2133        let original_pruning_boundary = mmr.bounds().start;
2134        let original_pruning_pos = Position::<F>::try_from(original_pruning_boundary).unwrap();
2135
2136        // Sync with boundaries that extend beyond existing data (partial overlap).
2137        let lower_bound_loc = original_pruning_boundary;
2138        let upper_bound_loc = original_leaves + 6; // Extend beyond existing data
2139
2140        let mut expected_nodes = BTreeMap::new();
2141        for i in *original_pruning_pos..*original_size {
2142            let pos = Position::<F>::new(i);
2143            expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
2144        }
2145
2146        let sync_cfg = SyncConfig::<F, sha256::Digest> {
2147            config: test_config(&context),
2148            range: lower_bound_loc..upper_bound_loc,
2149            pinned_nodes: None,
2150        };
2151
2152        mmr.sync().await.unwrap();
2153        drop(mmr);
2154
2155        let sync_mmr =
2156            Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2157                .await
2158                .unwrap();
2159
2160        // Should have existing data in the overlapping range.
2161        assert_eq!(sync_mmr.size(), original_size);
2162        let bounds = sync_mmr.bounds();
2163        assert_eq!(bounds.start, lower_bound_loc);
2164        assert!(!bounds.is_empty());
2165        assert_eq!(sync_mmr.root(), original_root);
2166
2167        // Check that existing nodes are preserved in the overlapping range.
2168        for i in *original_pruning_pos..*original_size {
2169            let pos = Position::<F>::new(i);
2170            assert_eq!(
2171                sync_mmr.get_node(pos).await.unwrap(),
2172                expected_nodes.get(&pos).cloned()
2173            );
2174        }
2175
2176        sync_mmr.destroy().await.unwrap();
2177    }
2178
2179    #[test_traced]
2180    fn test_journaled_init_sync_partial_overlap_mmr() {
2181        let executor = deterministic::Runner::default();
2182        executor.start(journaled_init_sync_partial_overlap_inner::<mmr::Family>);
2183    }
2184
2185    #[test_traced]
2186    fn test_journaled_init_sync_partial_overlap_mmb() {
2187        let executor = deterministic::Runner::default();
2188        executor.start(journaled_init_sync_partial_overlap_inner::<mmb::Family>);
2189    }
2190
2191    async fn journaled_init_sync_rejects_extra_pinned_nodes_inner<F: Family>(
2192        context: deterministic::Context,
2193    ) {
2194        let hasher = Standard::<Sha256>::new();
2195
2196        let sync_cfg = SyncConfig::<F, sha256::Digest> {
2197            config: test_config(&context),
2198            range: Location::<F>::new(6)..Location::<F>::new(20),
2199            pinned_nodes: Some(vec![test_digest(1), test_digest(2), test_digest(3)]),
2200        };
2201
2202        let result =
2203            Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2204                .await;
2205        assert!(matches!(result, Err(Error::InvalidPinnedNodes)));
2206    }
2207
2208    #[test_traced]
2209    fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmr() {
2210        let executor = deterministic::Runner::default();
2211        executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmr::Family>);
2212    }
2213
2214    #[test_traced]
2215    fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmb() {
2216        let executor = deterministic::Runner::default();
2217        executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmb::Family>);
2218    }
2219
2220    // Regression test that init() handles stale metadata (lower pruning boundary than journal).
2221    // Before the fix, this would panic with an assertion failure. After the fix, it returns a
2222    // MissingNode error (which is expected when metadata is corrupted and pinned nodes are lost).
2223    async fn journaled_init_stale_metadata_returns_error_inner<F: Family>(
2224        context: deterministic::Context,
2225    ) {
2226        let hasher = Standard::<Sha256>::new();
2227
2228        // Create a structure with some data and prune it
2229        let mut mmr = Journaled::<F, _, Digest>::init(
2230            context.with_label("init"),
2231            &hasher,
2232            test_config(&context),
2233        )
2234        .await
2235        .unwrap();
2236
2237        // Add 50 elements
2238        let mut batch = mmr.new_batch();
2239        for i in 0..50 {
2240            batch = batch.add(&hasher, &test_digest(i));
2241        }
2242        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2243        mmr.apply_batch(&batch).unwrap();
2244        mmr.sync().await.unwrap();
2245
2246        // Prune enough that the journal boundary's pinned nodes span pruned blobs.
2247        let prune_loc = Location::<F>::new(25);
2248        mmr.prune(prune_loc).await.unwrap();
2249        drop(mmr);
2250
2251        // Simulate a crash after journal prune but before metadata was updated:
2252        // clear all metadata and write only a stale pruning boundary of 0 (no pinned nodes).
2253        let meta_cfg = MConfig {
2254            partition: test_config(&context).metadata_partition,
2255            codec_config: ((0..).into(), ()),
2256        };
2257        let mut metadata =
2258            Metadata::<_, U64, Vec<u8>>::init(context.with_label("meta_tamper"), meta_cfg)
2259                .await
2260                .unwrap();
2261        metadata.clear();
2262        let key = U64::new(PRUNED_TO_PREFIX, 0);
2263        metadata.put(key, 0u64.to_be_bytes().to_vec());
2264        metadata.sync().await.unwrap();
2265        drop(metadata);
2266
2267        // Reopen the structure - before the fix, this would panic with assertion failure
2268        // After the fix, it returns MissingNode error (pinned nodes for the lower
2269        // boundary don't exist since they were pruned from journal and weren't
2270        // stored in metadata at the lower position)
2271        let result = Journaled::<F, _, Digest>::init(
2272            context.with_label("reopened"),
2273            &hasher,
2274            test_config(&context),
2275        )
2276        .await;
2277
2278        match result {
2279            Err(Error::MissingNode(_)) => {} // expected
2280            Ok(_) => panic!("expected MissingNode error, got Ok"),
2281            Err(e) => panic!("expected MissingNode error, got {:?}", e),
2282        }
2283    }
2284
2285    #[test_traced("WARN")]
2286    fn test_journaled_init_stale_metadata_returns_error_mmr() {
2287        let executor = deterministic::Runner::default();
2288        executor.start(journaled_init_stale_metadata_returns_error_inner::<mmr::Family>);
2289    }
2290
2291    #[test_traced("WARN")]
2292    fn test_journaled_init_stale_metadata_returns_error_mmb() {
2293        let executor = deterministic::Runner::default();
2294        executor.start(journaled_init_stale_metadata_returns_error_inner::<mmb::Family>);
2295    }
2296
2297    // Test that init() handles the case where metadata pruning boundary is ahead
2298    // of journal (crashed before journal prune completed). This should successfully
2299    // prune the journal to match metadata.
2300    async fn journaled_init_metadata_ahead_inner<F: Family>(context: deterministic::Context) {
2301        let hasher = Standard::<Sha256>::new();
2302
2303        // Create a structure with some data
2304        let mut mmr = Journaled::<F, _, Digest>::init(
2305            context.with_label("init"),
2306            &hasher,
2307            test_config(&context),
2308        )
2309        .await
2310        .unwrap();
2311
2312        // Add 50 elements
2313        let mut batch = mmr.new_batch();
2314        for i in 0..50 {
2315            batch = batch.add(&hasher, &test_digest(i));
2316        }
2317        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2318        mmr.apply_batch(&batch).unwrap();
2319        mmr.sync().await.unwrap();
2320
2321        // Prune to position 30 (this stores pinned nodes and updates metadata)
2322        let prune_loc = Location::<F>::new(16);
2323        mmr.prune(prune_loc).await.unwrap();
2324        let expected_root = mmr.root();
2325        let expected_size = mmr.size();
2326        drop(mmr);
2327
2328        // Reopen the structure - should recover correctly with metadata ahead of
2329        // journal boundary (metadata says 30, journal is section-aligned to 28)
2330        let mmr = Journaled::<F, _, Digest>::init(
2331            context.with_label("reopened"),
2332            &hasher,
2333            test_config(&context),
2334        )
2335        .await
2336        .unwrap();
2337
2338        assert_eq!(mmr.bounds().start, prune_loc);
2339        assert_eq!(mmr.size(), expected_size);
2340        assert_eq!(mmr.root(), expected_root);
2341
2342        mmr.destroy().await.unwrap();
2343    }
2344
2345    #[test_traced("WARN")]
2346    fn test_journaled_init_metadata_ahead_mmr() {
2347        let executor = deterministic::Runner::default();
2348        executor.start(journaled_init_metadata_ahead_inner::<mmr::Family>);
2349    }
2350
2351    #[test_traced("WARN")]
2352    fn test_journaled_init_metadata_ahead_mmb() {
2353        let executor = deterministic::Runner::default();
2354        executor.start(journaled_init_metadata_ahead_inner::<mmb::Family>);
2355    }
2356
2357    // Regression test: init_sync must compute pinned nodes BEFORE pruning the journal. Previously,
2358    // init_sync would prune the journal first, then try to read pinned nodes from the pruned
2359    // positions, causing MissingNode errors.
2360    //
2361    // Key setup: We create a structure with data but DON'T prune it, so the metadata has no pinned
2362    // nodes. Then init_sync must read pinned nodes from the journal before pruning it.
2363    async fn journaled_init_sync_computes_pinned_nodes_before_pruning_inner<F: Family>(
2364        context: deterministic::Context,
2365    ) {
2366        let hasher = Standard::<Sha256>::new();
2367
2368        // Use small items_per_blob to create many sections and trigger pruning.
2369        let cfg = Config {
2370            journal_partition: "mmr-journal".into(),
2371            metadata_partition: "mmr-metadata".into(),
2372            items_per_blob: NZU64!(7),
2373            write_buffer: NZUsize!(64),
2374            thread_pool: None,
2375            page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2376        };
2377
2378        // Create structure with enough elements to span multiple sections.
2379        let mut mmr =
2380            Journaled::<F, _, Digest>::init(context.with_label("init"), &hasher, cfg.clone())
2381                .await
2382                .unwrap();
2383        let mut batch = mmr.new_batch();
2384        for i in 0..100 {
2385            batch = batch.add(&hasher, &test_digest(i));
2386        }
2387        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2388        mmr.apply_batch(&batch).unwrap();
2389        mmr.sync().await.unwrap();
2390
2391        // Don't prune - this ensures metadata has no pinned nodes. init_sync will need to
2392        // read pinned nodes from the journal.
2393        let original_size = mmr.size();
2394        let original_root = mmr.root();
2395        drop(mmr);
2396
2397        // Reopen via init_sync with range.start > 0. This will prune the journal, so
2398        // init_sync must read pinned nodes BEFORE pruning or they'll be lost.
2399        let prune_loc = Location::<F>::new(32);
2400        let sync_cfg = SyncConfig::<F, sha256::Digest> {
2401            config: cfg,
2402            range: prune_loc..Location::<F>::new(128),
2403            pinned_nodes: None, // Force init_sync to compute pinned nodes from journal
2404        };
2405
2406        let sync_mmr =
2407            Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2408                .await
2409                .unwrap();
2410
2411        // Verify the structure state is correct.
2412        assert_eq!(sync_mmr.size(), original_size);
2413        assert_eq!(sync_mmr.root(), original_root);
2414        assert_eq!(sync_mmr.bounds().start, prune_loc);
2415
2416        sync_mmr.destroy().await.unwrap();
2417    }
2418
2419    #[test_traced]
2420    fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmr() {
2421        let executor = deterministic::Runner::default();
2422        executor
2423            .start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmr::Family>);
2424    }
2425
2426    #[test_traced]
2427    fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmb() {
2428        let executor = deterministic::Runner::default();
2429        executor
2430            .start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmb::Family>);
2431    }
2432
2433    async fn journaled_historical_proof_pruned_elements_inner<F: Family>(
2434        context: deterministic::Context,
2435    ) {
2436        let hasher = Standard::<Sha256>::new();
2437
2438        let mut mmr = Journaled::<F, _, Digest>::init(
2439            context.with_label("init"),
2440            &hasher,
2441            test_config(&context),
2442        )
2443        .await
2444        .unwrap();
2445
2446        let mut batch = mmr.new_batch();
2447        for i in 0..64 {
2448            batch = batch.add(&hasher, &test_digest(i));
2449        }
2450        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2451        mmr.apply_batch(&batch).unwrap();
2452
2453        let prune_loc = Location::<F>::new(16);
2454        mmr.prune(prune_loc).await.unwrap();
2455
2456        let historical_leaves = mmr.leaves();
2457        let mut pruned_loc = None;
2458        for loc_u64 in 0..*historical_leaves {
2459            let loc = Location::<F>::new(loc_u64);
2460            let result = mmr
2461                .historical_range_proof(&hasher, historical_leaves, loc..loc + 1)
2462                .await;
2463            if matches!(result, Err(Error::ElementPruned(_))) {
2464                pruned_loc = Some(loc);
2465                break;
2466            }
2467        }
2468        let pruned_loc = pruned_loc.expect("expected at least one pruned location");
2469
2470        // Add more elements and verify pruned elements still return ElementPruned.
2471        let mut batch = mmr.new_batch();
2472        for i in 0..8 {
2473            batch = batch.add(&hasher, &test_digest(10_000 + i));
2474        }
2475        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2476        mmr.apply_batch(&batch).unwrap();
2477
2478        let requested = mmr.leaves();
2479        let result = mmr
2480            .historical_range_proof(&hasher, requested, pruned_loc..pruned_loc + 1)
2481            .await;
2482        assert!(matches!(result, Err(Error::ElementPruned(_))));
2483
2484        mmr.destroy().await.unwrap();
2485    }
2486
2487    #[test_traced]
2488    fn test_journaled_historical_proof_pruned_elements_mmr() {
2489        let executor = deterministic::Runner::default();
2490        executor.start(journaled_historical_proof_pruned_elements_inner::<mmr::Family>);
2491    }
2492
2493    #[test_traced]
2494    fn test_journaled_historical_proof_pruned_elements_mmb() {
2495        let executor = deterministic::Runner::default();
2496        executor.start(journaled_historical_proof_pruned_elements_inner::<mmb::Family>);
2497    }
2498
2499    async fn journaled_append_while_historical_proof_is_available_inner<F: Family>(
2500        context: deterministic::Context,
2501    ) {
2502        let hasher = Standard::<Sha256>::new();
2503        let mut mmr = Journaled::<F, _, Digest>::init(
2504            context.with_label("init"),
2505            &hasher,
2506            test_config(&context),
2507        )
2508        .await
2509        .unwrap();
2510
2511        let mut batch = mmr.new_batch();
2512        for i in 0..20 {
2513            batch = batch.add(&hasher, &test_digest(i));
2514        }
2515        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2516        mmr.apply_batch(&batch).unwrap();
2517
2518        let historical_leaves = Location::<F>::new(10);
2519        let range = Location::<F>::new(2)..Location::<F>::new(8);
2520
2521        // Appends should remain allowed while historical proofs are available.
2522        let batch = mmr
2523            .new_batch()
2524            .add(&hasher, &test_digest(100))
2525            .add(&hasher, &test_digest(101));
2526        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2527        mmr.apply_batch(&batch).unwrap();
2528
2529        let proof = mmr
2530            .historical_range_proof(&hasher, historical_leaves, range.clone())
2531            .await
2532            .unwrap();
2533
2534        let expected = mmr
2535            .historical_range_proof(&hasher, historical_leaves, range)
2536            .await
2537            .unwrap();
2538        assert_eq!(proof, expected);
2539
2540        mmr.destroy().await.unwrap();
2541    }
2542
2543    #[test_traced]
2544    fn test_journaled_append_while_historical_proof_is_available_mmr() {
2545        let executor = deterministic::Runner::default();
2546        executor.start(journaled_append_while_historical_proof_is_available_inner::<mmr::Family>);
2547    }
2548
2549    #[test_traced]
2550    fn test_journaled_append_while_historical_proof_is_available_mmb() {
2551        let executor = deterministic::Runner::default();
2552        executor.start(journaled_append_while_historical_proof_is_available_inner::<mmb::Family>);
2553    }
2554
2555    async fn journaled_historical_proof_after_sync_reads_from_journal_inner<F: Family>(
2556        context: deterministic::Context,
2557    ) {
2558        let hasher = Standard::<Sha256>::new();
2559        let mut mmr = Journaled::<F, _, Digest>::init(
2560            context.with_label("init"),
2561            &hasher,
2562            test_config(&context),
2563        )
2564        .await
2565        .unwrap();
2566
2567        let mut batch = mmr.new_batch();
2568        for i in 0..64 {
2569            batch = batch.add(&hasher, &test_digest(i));
2570        }
2571        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2572        mmr.apply_batch(&batch).unwrap();
2573        mmr.sync().await.unwrap();
2574
2575        let historical_leaves = Location::<F>::new(20);
2576        let range = Location::<F>::new(5)..Location::<F>::new(15);
2577        let expected = mmr
2578            .historical_range_proof(&hasher, historical_leaves, range.clone())
2579            .await
2580            .unwrap();
2581
2582        let actual = mmr
2583            .historical_range_proof(&hasher, historical_leaves, range)
2584            .await
2585            .unwrap();
2586        assert_eq!(actual, expected);
2587
2588        mmr.destroy().await.unwrap();
2589    }
2590
2591    #[test_traced]
2592    fn test_journaled_historical_proof_after_sync_reads_from_journal_mmr() {
2593        let executor = deterministic::Runner::default();
2594        executor
2595            .start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmr::Family>);
2596    }
2597
2598    #[test_traced]
2599    fn test_journaled_historical_proof_after_sync_reads_from_journal_mmb() {
2600        let executor = deterministic::Runner::default();
2601        executor
2602            .start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmb::Family>);
2603    }
2604
2605    async fn journaled_historical_proof_after_pruning_inner<F: Family>(
2606        context: deterministic::Context,
2607    ) {
2608        let hasher = Standard::<Sha256>::new();
2609        let mut mmr = Journaled::<F, _, Digest>::init(
2610            context.with_label("init"),
2611            &hasher,
2612            test_config(&context),
2613        )
2614        .await
2615        .unwrap();
2616
2617        let mut batch = mmr.new_batch();
2618        for i in 0..30 {
2619            batch = batch.add(&hasher, &test_digest(i));
2620        }
2621        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2622        mmr.apply_batch(&batch).unwrap();
2623
2624        let prune_loc = Location::<F>::new(10);
2625        mmr.prune(prune_loc).await.unwrap();
2626
2627        let requested = Location::<F>::new(20);
2628        let range = prune_loc..requested;
2629        let proof = mmr
2630            .historical_range_proof(&hasher, requested, range)
2631            .await
2632            .unwrap();
2633        assert!(proof.leaves > Location::<F>::new(0));
2634
2635        mmr.destroy().await.unwrap();
2636    }
2637
2638    #[test_traced]
2639    fn test_journaled_historical_proof_after_pruning_mmr() {
2640        let executor = deterministic::Runner::default();
2641        executor.start(journaled_historical_proof_after_pruning_inner::<mmr::Family>);
2642    }
2643
2644    #[test_traced]
2645    fn test_journaled_historical_proof_after_pruning_mmb() {
2646        let executor = deterministic::Runner::default();
2647        executor.start(journaled_historical_proof_after_pruning_inner::<mmb::Family>);
2648    }
2649
2650    async fn journaled_historical_proof_edge_cases_inner<F: Family>(
2651        context: deterministic::Context,
2652    ) {
2653        let hasher = Standard::<Sha256>::new();
2654
2655        // Case 1: Empty structure.
2656        let mmr = Journaled::<F, _, Digest>::init(
2657            context.with_label("empty"),
2658            &hasher,
2659            test_config(&context),
2660        )
2661        .await
2662        .unwrap();
2663        let empty_end = Location::<F>::new(0);
2664        let empty_result = mmr
2665            .historical_range_proof(&hasher, empty_end, empty_end..empty_end)
2666            .await;
2667        assert!(matches!(empty_result, Err(Error::Empty)));
2668        let oob_result = mmr
2669            .historical_range_proof(&hasher, empty_end + 1, empty_end..empty_end + 1)
2670            .await;
2671        assert!(matches!(
2672            oob_result,
2673            Err(Error::RangeOutOfBounds(loc)) if loc == empty_end + 1
2674        ));
2675        mmr.destroy().await.unwrap();
2676
2677        // Case 2: Structure has nodes but is fully pruned.
2678        let mut mmr = Journaled::<F, _, Digest>::init(
2679            context.with_label("fully_pruned"),
2680            &hasher,
2681            test_config(&context),
2682        )
2683        .await
2684        .unwrap();
2685        let mut batch = mmr.new_batch();
2686        for i in 0..20 {
2687            batch = batch.add(&hasher, &test_digest(i));
2688        }
2689        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2690        mmr.apply_batch(&batch).unwrap();
2691        let end = mmr.leaves();
2692        mmr.prune_all().await.unwrap();
2693        assert!(mmr.bounds().is_empty());
2694        let pruned_result = mmr.historical_range_proof(&hasher, end, end - 1..end).await;
2695        assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2696        let oob_result = mmr
2697            .historical_range_proof(&hasher, end + 1, end - 1..end)
2698            .await;
2699        assert!(matches!(
2700            oob_result,
2701            Err(Error::RangeOutOfBounds(loc)) if loc == end + 1
2702        ));
2703        mmr.destroy().await.unwrap();
2704
2705        // Case 3: All nodes but one (single leaf) are pruned.
2706        let mut mmr = Journaled::<F, _, Digest>::init(
2707            context.with_label("single_leaf"),
2708            &hasher,
2709            test_config(&context),
2710        )
2711        .await
2712        .unwrap();
2713        let mut batch = mmr.new_batch();
2714        for i in 0..11 {
2715            batch = batch.add(&hasher, &test_digest(i));
2716        }
2717        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2718        mmr.apply_batch(&batch).unwrap();
2719        let end = mmr.leaves();
2720        let keep_loc = end - 1;
2721        mmr.prune(keep_loc).await.unwrap();
2722        let ok_result = mmr
2723            .historical_range_proof(&hasher, end, keep_loc..end)
2724            .await;
2725        assert!(ok_result.is_ok());
2726        let pruned_end = keep_loc - 1;
2727        // make sure this is in a pruned range, considering blob boundaries.
2728        let start_loc = Location::<F>::new(1);
2729        let pruned_result = mmr
2730            .historical_range_proof(&hasher, end, start_loc..pruned_end + 1)
2731            .await;
2732        assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2733        let oob_result = mmr
2734            .historical_range_proof(&hasher, end + 1, keep_loc..end)
2735            .await;
2736        assert!(matches!(oob_result, Err(Error::RangeOutOfBounds(_))));
2737        mmr.destroy().await.unwrap();
2738    }
2739
2740    #[test_traced]
2741    fn test_journaled_historical_proof_edge_cases_mmr() {
2742        let executor = deterministic::Runner::default();
2743        executor.start(journaled_historical_proof_edge_cases_inner::<mmr::Family>);
2744    }
2745
2746    #[test_traced]
2747    fn test_journaled_historical_proof_edge_cases_mmb() {
2748        let executor = deterministic::Runner::default();
2749        executor.start(journaled_historical_proof_edge_cases_inner::<mmb::Family>);
2750    }
2751
2752    async fn journaled_historical_proof_out_of_bounds_inner<F: Family>(
2753        context: deterministic::Context,
2754    ) {
2755        let hasher = Standard::<Sha256>::new();
2756        let mut mmr = Journaled::<F, _, Digest>::init(
2757            context.with_label("oob"),
2758            &hasher,
2759            test_config(&context),
2760        )
2761        .await
2762        .unwrap();
2763
2764        let mut batch = mmr.new_batch();
2765        for i in 0..8 {
2766            batch = batch.add(&hasher, &test_digest(i));
2767        }
2768        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2769        mmr.apply_batch(&batch).unwrap();
2770        let requested = mmr.leaves() + 1;
2771
2772        let result = mmr
2773            .historical_range_proof(&hasher, requested, Location::<F>::new(0)..requested)
2774            .await;
2775        assert!(matches!(
2776            result,
2777            Err(Error::RangeOutOfBounds(loc)) if loc == requested
2778        ));
2779
2780        mmr.destroy().await.unwrap();
2781    }
2782
2783    #[test_traced]
2784    fn test_journaled_historical_proof_out_of_bounds_mmr() {
2785        let executor = deterministic::Runner::default();
2786        executor.start(journaled_historical_proof_out_of_bounds_inner::<mmr::Family>);
2787    }
2788
2789    #[test_traced]
2790    fn test_journaled_historical_proof_out_of_bounds_mmb() {
2791        let executor = deterministic::Runner::default();
2792        executor.start(journaled_historical_proof_out_of_bounds_inner::<mmb::Family>);
2793    }
2794
2795    async fn journaled_historical_proof_range_validation_inner<F: Family>(
2796        context: deterministic::Context,
2797    ) {
2798        let hasher = Standard::<Sha256>::new();
2799        let mut mmr = Journaled::<F, _, Digest>::init(
2800            context.with_label("range_validation"),
2801            &hasher,
2802            test_config(&context),
2803        )
2804        .await
2805        .unwrap();
2806
2807        let mut batch = mmr.new_batch();
2808        for i in 0..32 {
2809            batch = batch.add(&hasher, &test_digest(i));
2810        }
2811        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2812        mmr.apply_batch(&batch).unwrap();
2813
2814        let valid_range = Location::<F>::new(0)..Location::<F>::new(1);
2815
2816        // Empty range should report Empty.
2817        let requested = Location::<F>::new(5);
2818        let empty_range = requested..requested;
2819        let empty_result = mmr
2820            .historical_range_proof(&hasher, requested, empty_range)
2821            .await;
2822        assert!(matches!(empty_result, Err(Error::Empty)));
2823
2824        // Requested historical size is out of bounds.
2825        let leaves_oob = mmr.leaves() + 1;
2826        let result = mmr
2827            .historical_range_proof(&hasher, leaves_oob, valid_range.clone())
2828            .await;
2829        assert!(matches!(
2830            result,
2831            Err(Error::RangeOutOfBounds(loc)) if loc == leaves_oob
2832        ));
2833
2834        // Requested range end is out of bounds for the current structure.
2835        let end_oob = mmr.leaves() + 1;
2836        let range_oob = Location::<F>::new(0)..end_oob;
2837        let result = mmr
2838            .historical_range_proof(&hasher, requested, range_oob)
2839            .await;
2840        assert!(matches!(
2841            result,
2842            Err(Error::RangeOutOfBounds(loc)) if loc == end_oob
2843        ));
2844
2845        // Requested range end out of bounds for the requested historical size but within structure.
2846        let range_end_gt_requested = requested + 1;
2847        let range_oob_at_requested = Location::<F>::new(0)..range_end_gt_requested;
2848        assert!(range_end_gt_requested <= mmr.leaves());
2849        let result = mmr
2850            .historical_range_proof(&hasher, requested, range_oob_at_requested)
2851            .await;
2852        assert!(matches!(
2853            result,
2854            Err(Error::RangeOutOfBounds(loc)) if loc == range_end_gt_requested
2855        ));
2856
2857        // Range location overflow is caught as out-of-bounds (the bounds check
2858        // fires before the position conversion that would detect overflow).
2859        let overflow_loc = Location::<F>::new(u64::MAX);
2860        let overflow_range = Location::<F>::new(0)..overflow_loc;
2861        let result = mmr
2862            .historical_range_proof(&hasher, requested, overflow_range)
2863            .await;
2864        assert!(matches!(
2865            result,
2866            Err(Error::RangeOutOfBounds(loc)) if loc == overflow_loc
2867        ));
2868
2869        mmr.destroy().await.unwrap();
2870    }
2871
2872    #[test_traced]
2873    fn test_journaled_historical_proof_range_validation_mmr() {
2874        let executor = deterministic::Runner::default();
2875        executor.start(journaled_historical_proof_range_validation_inner::<mmr::Family>);
2876    }
2877
2878    #[test_traced]
2879    fn test_journaled_historical_proof_range_validation_mmb() {
2880        let executor = deterministic::Runner::default();
2881        executor.start(journaled_historical_proof_range_validation_inner::<mmb::Family>);
2882    }
2883
2884    async fn journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner<F: Family>(
2885        context: deterministic::Context,
2886    ) {
2887        let hasher = Standard::<Sha256>::new();
2888        let mut mmr = Journaled::<F, _, Digest>::init(
2889            context.with_label("non_size_prune"),
2890            &hasher,
2891            test_config(&context),
2892        )
2893        .await
2894        .unwrap();
2895
2896        let mut batch = mmr.new_batch();
2897        for i in 0..16 {
2898            batch = batch.add(&hasher, &test_digest(i));
2899        }
2900        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2901        mmr.apply_batch(&batch).unwrap();
2902
2903        let end = mmr.leaves();
2904        let mut failures = Vec::new();
2905        for prune_leaf in 1..*end {
2906            let prune_loc = Location::<F>::new(prune_leaf);
2907            mmr.prune(prune_loc).await.unwrap();
2908            for loc_u64 in 0..*end {
2909                let loc = Location::<F>::new(loc_u64);
2910                let range_includes_pruned_leaf = loc < prune_loc;
2911                match mmr.historical_proof(&hasher, end, loc).await {
2912                    Ok(_) => {}
2913                    Err(Error::ElementPruned(_)) if range_includes_pruned_leaf => {}
2914                    Err(Error::ElementPruned(_)) => failures.push(format!(
2915                        "prune_loc={prune_loc} loc={loc} returned ElementPruned without a pruned range element"
2916                    )),
2917                    Err(err) => failures
2918                        .push(format!("prune_loc={prune_loc} loc={loc} err={err}")),
2919                }
2920            }
2921        }
2922
2923        assert!(
2924            failures.is_empty(),
2925            "historical proof generation returned unexpected errors: {failures:?}"
2926        );
2927
2928        mmr.destroy().await.unwrap();
2929    }
2930
2931    #[test_traced]
2932    fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmr() {
2933        let executor = deterministic::Runner::default();
2934        executor.start(
2935            journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmr::Family>,
2936        );
2937    }
2938
2939    #[test_traced]
2940    fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmb() {
2941        let executor = deterministic::Runner::default();
2942        executor.start(
2943            journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmb::Family>,
2944        );
2945    }
2946
2947    /// Regression: init_sync must recover from a journal left at an invalid size
2948    /// (e.g., a crash wrote a leaf but not its parent nodes).
2949    async fn journaled_init_sync_recovers_from_invalid_journal_size_inner<F: Family>(
2950        context: deterministic::Context,
2951    ) {
2952        let hasher = Standard::<Sha256>::new();
2953
2954        // Build a structure with 3 leaves, sync, and drop.
2955        let mut mmr = Journaled::<F, _, Digest>::init(
2956            context.with_label("init"),
2957            &hasher,
2958            test_config(&context),
2959        )
2960        .await
2961        .unwrap();
2962        let mut batch = mmr.new_batch();
2963        for i in 0..3 {
2964            batch = batch.add(&hasher, &test_digest(i));
2965        }
2966        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2967        mmr.apply_batch(&batch).unwrap();
2968        let valid_size = mmr.size();
2969        let valid_root = mmr.root();
2970        mmr.sync().await.unwrap();
2971        drop(mmr);
2972
2973        // Append one extra digest to the journal, simulating a crash that wrote a
2974        // leaf (for the 4th element) but not its parent nodes. This makes the
2975        // journal size invalid.
2976        {
2977            let journal: Journal<_, Digest> = Journal::init(
2978                context.with_label("corrupt"),
2979                JConfig {
2980                    partition: "journal-partition".into(),
2981                    items_per_blob: NZU64!(7),
2982                    write_buffer: NZUsize!(1024),
2983                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2984                },
2985            )
2986            .await
2987            .unwrap();
2988            assert_eq!(journal.size().await, valid_size);
2989            journal.append(&Sha256::hash(b"orphan")).await.unwrap();
2990            journal.sync().await.unwrap();
2991            assert_eq!(journal.size().await, valid_size + 1);
2992        }
2993
2994        // init_sync should recover by rewinding to the last valid size.
2995        let sync_cfg = SyncConfig::<F, Digest> {
2996            config: test_config(&context),
2997            range: Location::<F>::new(0)..Location::<F>::new(100),
2998            pinned_nodes: None,
2999        };
3000        let sync_mmr =
3001            Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
3002                .await
3003                .unwrap();
3004
3005        assert_eq!(sync_mmr.size(), valid_size);
3006        assert_eq!(sync_mmr.root(), valid_root);
3007
3008        sync_mmr.destroy().await.unwrap();
3009    }
3010
3011    #[test_traced]
3012    fn test_init_sync_recovers_from_invalid_journal_size_mmr() {
3013        let executor = deterministic::Runner::default();
3014        executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmr::Family>);
3015    }
3016
3017    #[test_traced]
3018    fn test_init_sync_recovers_from_invalid_journal_size_mmb() {
3019        let executor = deterministic::Runner::default();
3020        executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmb::Family>);
3021    }
3022
3023    async fn journaled_stale_batch_inner<F: Family>(context: deterministic::Context) {
3024        let hasher: Standard<Sha256> = Standard::new();
3025        let mut mmr = Journaled::<F, _, Digest>::init(
3026            context.clone(),
3027            &Standard::<Sha256>::new(),
3028            test_config(&context),
3029        )
3030        .await
3031        .unwrap();
3032
3033        // Create two batches from the same base.
3034        let batch_a = mmr.new_batch().add(&hasher, b"leaf-a");
3035        let batch_a = mmr.with_mem(|mem| batch_a.merkleize(mem, &hasher));
3036        let batch_b = mmr.new_batch().add(&hasher, b"leaf-b");
3037        let batch_b = mmr.with_mem(|mem| batch_b.merkleize(mem, &hasher));
3038
3039        // Apply A -- should succeed.
3040        mmr.apply_batch(&batch_a).unwrap();
3041
3042        // Apply B -- should fail (stale).
3043        let result = mmr.apply_batch(&batch_b);
3044        assert!(
3045            matches!(result, Err(Error::StaleBatch { .. })),
3046            "expected StaleBatch, got {result:?}"
3047        );
3048
3049        mmr.destroy().await.unwrap();
3050    }
3051
3052    #[test]
3053    fn test_stale_batch_mmr() {
3054        let executor = deterministic::Runner::default();
3055        executor.start(journaled_stale_batch_inner::<mmr::Family>);
3056    }
3057
3058    #[test]
3059    fn test_stale_batch_mmb() {
3060        let executor = deterministic::Runner::default();
3061        executor.start(journaled_stale_batch_inner::<mmb::Family>);
3062    }
3063
3064    /// Regression: `new_batch` must return the append-only journaled wrapper.
3065    async fn journaled_new_batch_returns_append_only_wrapper_inner<F: Family>(
3066        context: deterministic::Context,
3067    ) {
3068        let hasher = Standard::<Sha256>::new();
3069        let mmr = Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
3070            .await
3071            .unwrap();
3072
3073        let _batch: UnmerkleizedBatch<F, Digest> = mmr.new_batch();
3074
3075        mmr.destroy().await.unwrap();
3076    }
3077
3078    #[test_traced]
3079    fn test_new_batch_returns_append_only_wrapper_mmr() {
3080        let executor = deterministic::Runner::default();
3081        executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmr::Family>);
3082    }
3083
3084    #[test_traced]
3085    fn test_new_batch_returns_append_only_wrapper_mmb() {
3086        let executor = deterministic::Runner::default();
3087        executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmb::Family>);
3088    }
3089
3090    /// Regression: update_leaf on a synced-out leaf must return ElementPruned, not panic.
3091    /// Before the fix, `Readable::pruning_boundary` returned the journal's prune boundary
3092    /// (which could be 0), so the batch accepted the update. During merkleize, get_node
3093    /// returned None for the synced-out sibling and hit an expect panic.
3094    async fn journaled_update_leaf_after_sync_returns_pruned_inner<F: Family>(
3095        context: deterministic::Context,
3096    ) {
3097        let hasher = Standard::<Sha256>::new();
3098        let mut mmr =
3099            Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
3100                .await
3101                .unwrap();
3102
3103        // Add 50 elements and sync (flushes all nodes to journal, prunes mem).
3104        let mut batch = mmr.new_batch();
3105        for i in 0..50 {
3106            batch = batch.add(&hasher, &test_digest(i));
3107        }
3108        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
3109        mmr.apply_batch(&batch).unwrap();
3110        mmr.sync().await.unwrap();
3111
3112        // Attempt to update leaf 0 which has been synced out of memory.
3113        // Use the inner batch type directly since the journaled wrapper
3114        // intentionally hides update_leaf.
3115        let batch = mmr.to_batch().new_batch();
3116        let result = batch.update_leaf(&hasher, Location::<F>::new(0), b"updated");
3117        assert!(matches!(result, Err(Error::ElementPruned(_))));
3118
3119        mmr.destroy().await.unwrap();
3120    }
3121
3122    #[test_traced]
3123    fn test_update_leaf_after_sync_returns_pruned_mmr() {
3124        let executor = deterministic::Runner::default();
3125        executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmr::Family>);
3126    }
3127
3128    #[test_traced]
3129    fn test_update_leaf_after_sync_returns_pruned_mmb() {
3130        let executor = deterministic::Runner::default();
3131        executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmb::Family>);
3132    }
3133}