Skip to main content

commonware_storage/merkle/persisted/
full.rs

1//! A Merkle structure backed by a fixed-item-length journal.
2//!
3//! A [crate::journal] is used to store all unpruned nodes, and a [crate::metadata] store is
4//! used to preserve digests required for root and proof generation that would have otherwise been
5//! pruned.
6//!
7//! This module is generic over [`Family`], so it works for both MMR and MMB.
8
9use crate::{
10    journal::{
11        contiguous::{
12            fixed::{Config as JConfig, Journal},
13            Many, Reader,
14        },
15        Error as JError,
16    },
17    merkle::{
18        batch,
19        hasher::Hasher,
20        mem::{Config as MemConfig, Mem},
21        Error, Family, Location, Position, Proof, Readable,
22    },
23    metadata::{Config as MConfig, Metadata},
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::Strategy;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::{
30    range::NonEmptyRange,
31    sequence::prefixed_u64::U64,
32    sync::{AsyncMutex, RwLock},
33};
34use std::{
35    collections::BTreeMap,
36    num::{NonZeroU64, NonZeroUsize},
37    sync::Arc,
38};
39use tracing::{debug, error, warn};
40
41/// Append-only wrapper around [`batch::UnmerkleizedBatch`].
42///
43/// The full Merkle structure's [`Merkle::sync`] only persists *appended* nodes
44/// (positions in `[journal_size, state.size())`). Overwrites to existing positions are stored in
45/// the in-memory layer but never flushed, so they would be silently lost on crash recovery. This
46/// wrapper prevents that by exposing only append and merkleize operations, hiding `update_leaf*`
47/// at compile time.
48pub struct UnmerkleizedBatch<F: Family, D: Digest, S: Strategy> {
49    inner: batch::UnmerkleizedBatch<F, D, S>,
50}
51
52impl<F: Family, D: Digest, S: Strategy> UnmerkleizedBatch<F, D, S> {
53    /// Hash `element` and add it as a leaf.
54    pub fn add(self, hasher: &impl Hasher<F, Digest = D>, element: &[u8]) -> Self {
55        Self {
56            inner: self.inner.add(hasher, element),
57        }
58    }
59
60    /// Add a pre-computed leaf digest.
61    pub fn add_leaf_digest(self, digest: D) -> Self {
62        Self {
63            inner: self.inner.add_leaf_digest(digest),
64        }
65    }
66
67    /// The number of leaves visible through this batch.
68    pub fn leaves(&self) -> Location<F> {
69        self.inner.leaves()
70    }
71
72    /// Return a reference to the batch's strategy.
73    pub fn strategy(&self) -> &S {
74        self.inner.strategy()
75    }
76
77    /// Consume this batch and produce an immutable [`batch::MerkleizedBatch`] with computed nodes.
78    /// `base` provides committed node data as fallback during hash computation.
79    pub fn merkleize(
80        self,
81        base: &Mem<F, D>,
82        hasher: &impl Hasher<F, Digest = D>,
83    ) -> Arc<batch::MerkleizedBatch<F, D, S>> {
84        self.inner.merkleize(base, hasher)
85    }
86}
87
88/// Fields of [Merkle] that are protected by an [RwLock] for interior mutability.
89pub(crate) struct Inner<F: Family, D: Digest> {
90    /// A memory resident Merkle structure used to build the structure and cache updates. It caches
91    /// all un-synced nodes, and the pinned node set as derived from both its own pruning boundary
92    /// and the full structure's pruning boundary.
93    pub(crate) mem: Mem<F, D>,
94
95    /// The highest position for which this structure has been pruned, or 0 if it has never been
96    /// pruned.
97    pub(crate) pruned_to_pos: Position<F>,
98}
99
100/// Configuration for a journal-backed Merkle structure.
101#[derive(Clone)]
102pub struct Config<S: Strategy> {
103    /// The name of the `commonware-runtime::Storage` storage partition used for the journal storing
104    /// the nodes.
105    pub journal_partition: String,
106
107    /// The name of the `commonware-runtime::Storage` storage partition used for the metadata
108    /// containing pruned nodes that are still required to calculate the root and generate
109    /// proofs.
110    pub metadata_partition: String,
111
112    /// The maximum number of items to store in each blob in the backing journal.
113    pub items_per_blob: NonZeroU64,
114
115    /// The size of the write buffer to use for each blob in the backing journal.
116    pub write_buffer: NonZeroUsize,
117
118    /// Strategy used to parallelize batch operations.
119    pub strategy: S,
120
121    /// The page cache to use for caching data.
122    pub page_cache: CacheRef,
123}
124
125/// Configuration for initializing a full Merkle structure for synchronization.
126///
127/// Determines how to handle existing persistent data based on sync boundaries:
128/// - **Fresh Start**: Existing data < range start -> discard and start fresh
129/// - **Prune and Reuse**: range contains existing data -> prune and reuse
130/// - **Error**: existing data > range end
131pub struct SyncConfig<F: Family, D: Digest, S: Strategy> {
132    /// Base configuration (journal, metadata, etc.)
133    pub config: Config<S>,
134
135    /// Sync range expressed as leaf-aligned bounds.
136    pub range: NonEmptyRange<Location<F>>,
137
138    /// The pinned nodes the structure needs at the pruning boundary (range start), in the order
139    /// specified by `Family::nodes_to_pin`. If `None`, the pinned nodes are expected to already be
140    /// in the structure's metadata/journal.
141    pub pinned_nodes: Option<Vec<D>>,
142}
143
144/// A Merkle structure backed by a fixed-item-length journal.
145pub struct Merkle<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> {
146    /// Lock-protected mutable state.
147    pub(crate) inner: RwLock<Inner<F, D>>,
148
149    /// Stores all unpruned nodes.
150    pub(crate) journal: Journal<E, D>,
151
152    /// Stores the pinned nodes for the current pruning boundary, and the corresponding pruning
153    /// boundary used to generate them. The metadata remains empty until pruning is invoked, and its
154    /// contents change only when the pruning boundary moves.
155    pub(crate) metadata: Metadata<E, U64, Vec<u8>>,
156
157    /// Serializes concurrent sync calls.
158    pub(crate) sync_lock: AsyncMutex<()>,
159
160    /// The strategy to use for parallelization.
161    pub(crate) strategy: S,
162}
163
164/// Prefix used for nodes in the metadata prefixed U8 key.
165const NODE_PREFIX: u8 = 0;
166
167/// Prefix used for the key storing the pruning boundary (as a leaf index) in the metadata.
168pub(crate) const PRUNED_TO_PREFIX: u8 = 1;
169
170impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Merkle<F, E, D, S> {
171    /// Return the total number of nodes in the structure, irrespective of any pruning. The next
172    /// added element's position will have this value.
173    pub fn size(&self) -> Position<F> {
174        self.inner.read().mem.size()
175    }
176
177    /// Return the total number of leaves in the structure.
178    pub fn leaves(&self) -> Location<F> {
179        self.inner.read().mem.leaves()
180    }
181
182    /// Attempt to get a node from the metadata, with fallback to journal lookup if it fails.
183    /// Assumes the node should exist in at least one of these sources and returns a `MissingNode`
184    /// error otherwise.
185    async fn get_from_metadata_or_journal(
186        metadata: &Metadata<E, U64, Vec<u8>>,
187        journal: &Journal<E, D>,
188        pos: Position<F>,
189    ) -> Result<D, Error<F>> {
190        if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
191            debug!(?pos, "read node from metadata");
192            let digest = D::decode(bytes.as_ref());
193            let Ok(digest) = digest else {
194                error!(
195                    ?pos,
196                    err = %digest.expect_err("digest is Err in else branch"),
197                    "could not convert node from metadata bytes to digest"
198                );
199                return Err(Error::DataCorrupted(
200                    "could not read digest at requested pos",
201                ));
202            };
203            return Ok(digest);
204        }
205
206        // If a node isn't found in the metadata, it might still be in the journal.
207        debug!(?pos, "reading node from journal");
208        let node = journal.reader().await.read(*pos).await;
209        match node {
210            Ok(node) => Ok(node),
211            Err(JError::ItemPruned(_)) => {
212                error!(?pos, "node is missing from metadata and journal");
213                Err(Error::MissingNode(pos))
214            }
215            Err(e) => Err(Error::Journal(e)),
216        }
217    }
218
219    /// Returns [start, end) where `start` is the oldest retained leaf and `end` is the total leaf
220    /// count.
221    pub fn bounds(&self) -> std::ops::Range<Location<F>> {
222        let inner = self.inner.read();
223        Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos")..inner.mem.leaves()
224    }
225
226    /// Adds the pinned nodes based on `prune_pos` to `mem`.
227    async fn add_extra_pinned_nodes(
228        mem: &mut Mem<F, D>,
229        metadata: &Metadata<E, U64, Vec<u8>>,
230        journal: &Journal<E, D>,
231        prune_pos: Position<F>,
232    ) -> Result<(), Error<F>> {
233        let prune_loc = Location::try_from(prune_pos).expect("valid prune_pos");
234        let mut pinned_nodes = BTreeMap::new();
235        for pos in F::nodes_to_pin(prune_loc) {
236            let digest = Self::get_from_metadata_or_journal(metadata, journal, pos).await?;
237            pinned_nodes.insert(pos, digest);
238        }
239        mem.add_pinned_nodes(pinned_nodes);
240
241        Ok(())
242    }
243
244    /// Initialize a new `Merkle` instance.
245    pub async fn init(
246        context: E,
247        hasher: &impl Hasher<F, Digest = D>,
248        cfg: Config<S>,
249    ) -> Result<Self, Error<F>> {
250        let journal_cfg = JConfig {
251            partition: cfg.journal_partition,
252            items_per_blob: cfg.items_per_blob,
253            page_cache: cfg.page_cache,
254            write_buffer: cfg.write_buffer,
255        };
256        let journal = Journal::<E, D>::init(context.child("merkle_journal"), journal_cfg).await?;
257        let mut journal_size = Position::<F>::new(journal.size().await);
258
259        let metadata_cfg = MConfig {
260            partition: cfg.metadata_partition,
261            codec_config: ((0..).into(), ()),
262        };
263        let metadata =
264            Metadata::<_, U64, Vec<u8>>::init(context.child("merkle_metadata"), metadata_cfg)
265                .await?;
266
267        if journal_size == 0 {
268            let mem = Mem::init(MemConfig {
269                nodes: vec![],
270                pruning_boundary: Location::new(0),
271                pinned_nodes: vec![],
272            })?;
273            return Ok(Self {
274                inner: RwLock::new(Inner {
275                    mem,
276                    pruned_to_pos: Position::new(0),
277                }),
278                journal,
279                metadata,
280                sync_lock: AsyncMutex::new(()),
281                strategy: cfg.strategy,
282            });
283        }
284
285        // Make sure the journal's oldest retained node is as expected based on the last pruning
286        // boundary stored in metadata. If they don't match, prune the journal to the appropriate
287        // location.
288        let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
289        let metadata_pruned_to = Location::<F>::new(metadata.get(&key).map_or(0, |bytes| {
290            u64::from_be_bytes(
291                bytes
292                    .as_slice()
293                    .try_into()
294                    .expect("metadata pruned_to is not 8 bytes"),
295            )
296        }));
297        let metadata_prune_pos = Position::try_from(metadata_pruned_to)?;
298        let journal_bounds_start = journal.reader().await.bounds().start;
299        if *metadata_prune_pos > journal_bounds_start {
300            // Metadata is ahead of journal (crashed before completing journal prune).
301            // Prune the journal to match metadata.
302            journal.prune(*metadata_prune_pos).await?;
303            if journal.reader().await.bounds().start != journal_bounds_start {
304                // This should only happen in the event of some failure during the last attempt to
305                // prune the journal.
306                warn!(
307                    journal_bounds_start,
308                    ?metadata_prune_pos,
309                    "journal pruned to match metadata"
310                );
311            }
312        } else if *metadata_prune_pos < journal_bounds_start {
313            // Metadata is stale (e.g., missing/corrupted while journal has valid state).
314            // Use the journal's state as authoritative.
315            warn!(
316                ?metadata_prune_pos,
317                journal_bounds_start, "metadata stale, using journal pruning boundary"
318            );
319        }
320
321        // Use the more restrictive (higher) pruning boundary between metadata and journal.
322        // This handles both cases: metadata ahead (crash during prune) and metadata stale.
323        //
324        // The journal boundary may not be leaf-aligned (it's blob-aligned), so round up to the
325        // position of the first leaf after the boundary.
326        let journal_boundary_pos = Position::<F>::new(journal_bounds_start);
327        let journal_boundary_floor = F::to_nearest_size(journal_boundary_pos);
328        let journal_boundary_leaf_aligned_pos = if journal_boundary_floor == journal_boundary_pos {
329            // `to_nearest_size` rounds down, so equality means the boundary is already
330            // leaf-aligned.
331            journal_boundary_floor
332        } else {
333            // If flooring backed up over the boundary, round up to the next leaf position, which
334            // is guaranteed to be above it.
335            Position::try_from(Location::try_from(journal_boundary_floor)? + 1)?
336        };
337        let effective_prune_pos =
338            std::cmp::max(metadata_prune_pos, journal_boundary_leaf_aligned_pos);
339
340        let last_valid_size = F::to_nearest_size(journal_size);
341        let mut orphaned_leaf: Option<D> = None;
342        if last_valid_size != journal_size {
343            warn!(
344                ?last_valid_size,
345                "encountered invalid structure, recovering from last valid size"
346            );
347            // Check if there is an intact leaf following the last valid size, from which we can
348            // recover its missing parents.
349            let recovered_item = journal.reader().await.read(*last_valid_size).await;
350            if let Ok(item) = recovered_item {
351                orphaned_leaf = Some(item);
352            }
353            journal.rewind(*last_valid_size).await?;
354            journal.sync().await?;
355            journal_size = last_valid_size
356        }
357
358        // Initialize the mem in the "prune_all" state.
359        let journal_leaves = Location::try_from(journal_size)?;
360        let mut pinned_nodes = Vec::new();
361        for pos in F::nodes_to_pin(journal_leaves) {
362            let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
363            pinned_nodes.push(digest);
364        }
365        let mut mem = Mem::init(MemConfig {
366            nodes: vec![],
367            pruning_boundary: journal_leaves,
368            pinned_nodes,
369        })?;
370        Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, effective_prune_pos).await?;
371
372        if let Some(leaf) = orphaned_leaf {
373            // Recover the orphaned leaf and any missing parents.
374            let pos = mem.size();
375            warn!(?pos, "recovering orphaned leaf");
376            let batch = mem
377                .new_batch()
378                .add_leaf_digest(leaf)
379                .merkleize(&mem, hasher);
380            mem.apply_batch(&batch)?;
381            assert_eq!(pos, journal_size);
382
383            // Inline sync: flush recovered nodes to journal.
384            for p in journal.size().await..*mem.size() {
385                let p = Position::new(p);
386                let node = *mem.get_node_unchecked(p);
387                journal.append(&node).await?;
388            }
389            journal.sync().await?;
390            assert_eq!(mem.size(), journal.size().await);
391
392            // Prune mem and reinstate pinned nodes.
393            let effective_prune_loc =
394                Location::try_from(effective_prune_pos).expect("valid effective_prune_pos");
395            let mut pn = BTreeMap::new();
396            for p in F::nodes_to_pin(effective_prune_loc) {
397                let d = mem.get_node_unchecked(p);
398                pn.insert(p, *d);
399            }
400            mem.prune_all();
401            mem.add_pinned_nodes(pn);
402        }
403
404        Ok(Self {
405            inner: RwLock::new(Inner {
406                mem,
407                pruned_to_pos: effective_prune_pos,
408            }),
409            journal,
410            metadata,
411            sync_lock: AsyncMutex::new(()),
412            strategy: cfg.strategy,
413        })
414    }
415
416    /// Initialize a structure for synchronization, reusing existing data if possible.
417    ///
418    /// Handles sync scenarios based on existing journal data vs. the given sync range:
419    ///
420    /// 1. **Fresh Start**: existing_size <= range.start
421    ///    - Deletes existing data (if any)
422    ///    - Creates new [Journal] with pruning boundary and size at `range.start`
423    ///
424    /// 2. **Reuse**: range.start < existing_size <= range.end
425    ///    - Keeps existing journal data
426    ///    - Prunes the journal toward `range.start` (section-aligned)
427    ///
428    /// 3. **Error**: existing_size > range.end
429    ///    - Returns [crate::journal::Error::ItemOutOfRange]
430    pub async fn init_sync(context: E, cfg: SyncConfig<F, D, S>) -> Result<Self, Error<F>> {
431        let prune_pos = Position::try_from(cfg.range.start())?;
432        let end_pos = Position::try_from(cfg.range.end())?;
433        let journal_cfg = JConfig {
434            partition: cfg.config.journal_partition.clone(),
435            items_per_blob: cfg.config.items_per_blob,
436            write_buffer: cfg.config.write_buffer,
437            page_cache: cfg.config.page_cache.clone(),
438        };
439
440        // Open the journal, performing a rewind if necessary for crash recovery.
441        let journal: Journal<E, D> =
442            Journal::init(context.child("merkle_journal"), journal_cfg).await?;
443        let mut journal_size = Position::<F>::new(journal.size().await);
444
445        // If a crash left the journal at an invalid size (e.g., a leaf was written
446        // but its parent nodes were not), rewind to the last valid size.
447        let last_valid_size = F::to_nearest_size(journal_size);
448        if last_valid_size != journal_size {
449            warn!(
450                ?last_valid_size,
451                "init_sync: encountered invalid structure, recovering from last valid size"
452            );
453            journal.rewind(*last_valid_size).await?;
454            journal.sync().await?;
455            journal_size = last_valid_size;
456        }
457
458        // Handle existing data vs sync range.
459        if journal_size > *end_pos {
460            return Err(crate::journal::Error::ItemOutOfRange(*journal_size).into());
461        }
462        if journal_size <= *prune_pos && *prune_pos != 0 {
463            journal.clear_to_size(*prune_pos).await?;
464            journal_size = Position::new(journal.size().await);
465        }
466
467        // Open the metadata.
468        let metadata_cfg = MConfig {
469            partition: cfg.config.metadata_partition,
470            codec_config: ((0..).into(), ()),
471        };
472        let mut metadata = Metadata::init(context.child("merkle_metadata"), metadata_cfg).await?;
473
474        // Write the pruning boundary.
475        let pruning_boundary_key = U64::new(PRUNED_TO_PREFIX, 0);
476        metadata.put(
477            pruning_boundary_key,
478            cfg.range.start().as_u64().to_be_bytes().into(),
479        );
480
481        // Write the required pinned nodes to metadata.
482        // The set of pinned nodes depends only on the prune boundary, not on the total
483        // structure size, so we validate against `nodes_to_pin(prune_loc)` alone.
484        let prune_loc = Location::try_from(prune_pos)?;
485        let journal_leaves = Location::try_from(journal_size)?;
486        if let Some(pinned_nodes) = cfg.pinned_nodes {
487            // Use caller-provided pinned nodes.
488            let nodes_to_pin_persisted: Vec<_> = F::nodes_to_pin(prune_loc).collect();
489            if pinned_nodes.len() != nodes_to_pin_persisted.len() {
490                return Err(Error::<F>::InvalidPinnedNodes);
491            }
492            for (pos, digest) in nodes_to_pin_persisted.into_iter().zip(pinned_nodes.iter()) {
493                metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
494            }
495        }
496
497        // Create the in-memory structure with the pinned nodes required for its size. This must be
498        // performed *before* pruning the journal to range.start to ensure all pinned nodes are
499        // present.
500        let nodes_to_pin_mem = F::nodes_to_pin(journal_leaves);
501        let mut mem_pinned_nodes = Vec::new();
502        for pos in nodes_to_pin_mem {
503            let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
504            mem_pinned_nodes.push(digest);
505        }
506        let mut mem = Mem::init(MemConfig {
507            nodes: vec![],
508            pruning_boundary: Location::try_from(journal_size)?,
509            pinned_nodes: mem_pinned_nodes,
510        })?;
511
512        // Add the additional pinned nodes required for the pruning boundary, if applicable.
513        // This must also be done before pruning.
514        if prune_pos < journal_size {
515            Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, prune_pos).await?;
516        }
517
518        // Sync metadata before pruning so pinned nodes are persisted for crash recovery.
519        metadata.sync().await?;
520
521        // Prune the journal to range.start.
522        journal.prune(*prune_pos).await?;
523
524        Ok(Self {
525            inner: RwLock::new(Inner {
526                mem,
527                pruned_to_pos: prune_pos,
528            }),
529            journal,
530            metadata,
531            sync_lock: AsyncMutex::new(()),
532            strategy: cfg.config.strategy,
533        })
534    }
535
536    /// Compute and add required nodes for the given pruning point to the metadata, and write it to
537    /// disk. Return the computed set of required nodes.
538    async fn update_metadata(
539        &mut self,
540        prune_to_pos: Position<F>,
541    ) -> Result<BTreeMap<Position<F>, D>, Error<F>> {
542        assert!(prune_to_pos >= self.inner.get_mut().pruned_to_pos);
543
544        let prune_loc = Location::try_from(prune_to_pos).expect("valid prune_to_pos");
545        let mut pinned_nodes = BTreeMap::new();
546        for pos in F::nodes_to_pin(prune_loc) {
547            let digest = self.get_node(pos).await?.expect(
548                "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
549            );
550            self.metadata
551                .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
552            pinned_nodes.insert(pos, digest);
553        }
554
555        let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
556        self.metadata
557            .put_sync(
558                key,
559                Location::try_from(prune_to_pos)?
560                    .as_u64()
561                    .to_be_bytes()
562                    .into(),
563            )
564            .await
565            .map_err(Error::Metadata)?;
566
567        Ok(pinned_nodes)
568    }
569
570    pub async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
571        {
572            let inner = self.inner.read();
573            if let Some(node) = inner.mem.get_node(position) {
574                return Ok(Some(node));
575            }
576        }
577
578        match self.journal.reader().await.read(*position).await {
579            Ok(item) => Ok(Some(item)),
580            Err(JError::ItemPruned(_)) => Ok(None),
581            Err(e) => Err(Error::Journal(e)),
582        }
583    }
584
585    /// Return the pinned nodes needed to authenticate a lower leaf boundary at `loc`.
586    pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<D>, Error<F>> {
587        if !loc.is_valid() {
588            return Err(Error::LocationOverflow(loc));
589        }
590        let futs = F::nodes_to_pin(loc)
591            .map(|p| async move { self.get_node(p).await?.ok_or(Error::ElementPruned(p)) })
592            .collect::<Vec<_>>();
593        futures::future::try_join_all(futs).await
594    }
595
596    /// Sync the structure to disk.
597    pub async fn sync(&self) -> Result<(), Error<F>> {
598        let _sync_guard = self.sync_lock.lock().await;
599
600        let journal_size = Position::<F>::new(self.journal.size().await);
601
602        // Snapshot nodes in the mem that are missing from the journal, along with the pinned
603        // node set for the current pruning boundary.
604        let (sync_target_leaves, missing_nodes, pinned_nodes) = {
605            let inner = self.inner.read();
606            let size = inner.mem.size();
607            let sync_target_leaves = inner.mem.leaves();
608
609            assert!(
610                journal_size <= size,
611                "journal size should never exceed in-memory structure size"
612            );
613            if journal_size == size {
614                return Ok(());
615            }
616
617            let mut missing_nodes = Vec::with_capacity((*size - *journal_size) as usize);
618            for pos in *journal_size..*size {
619                let node = *inner.mem.get_node_unchecked(Position::new(pos));
620                missing_nodes.push(node);
621            }
622
623            // Recompute pinned nodes since we'll need to repopulate the cache after it is cleared
624            // by pruning the mem.
625            let prune_loc = Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos");
626            let mut pinned_nodes = BTreeMap::new();
627            for pos in F::nodes_to_pin(prune_loc) {
628                let digest = inner.mem.get_node_unchecked(pos);
629                pinned_nodes.insert(pos, *digest);
630            }
631
632            (sync_target_leaves, missing_nodes, pinned_nodes)
633        };
634
635        // Append missing nodes to the journal without holding the mem read lock.
636        self.journal.append_many(Many::Flat(&missing_nodes)).await?;
637
638        // Sync the journal while still holding the sync_lock to ensure durability before returning.
639        self.journal.sync().await?;
640
641        // Now that the missing nodes are in the journal, it's safe to prune them from the
642        // mem. We prune to the previously captured leaf count to avoid a race with concurrent
643        // appends between the read lock above and this write lock.
644        {
645            let mut inner = self.inner.write();
646            inner
647                .mem
648                .prune(sync_target_leaves)
649                .expect("captured leaves is in bounds");
650            inner.mem.add_pinned_nodes(pinned_nodes);
651        }
652
653        Ok(())
654    }
655
656    /// Prune all nodes up to but not including the given leaf location and update the pinned nodes.
657    ///
658    /// This implementation ensures that no failure can leave the structure in an unrecoverable
659    /// state, requiring it sync the structure to write any potential unsynced updates.
660    ///
661    /// Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
662    /// Returns [Error::LeafOutOfBounds] if `loc` exceeds the current leaf count.
663    pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
664        let pos = Position::try_from(loc)?;
665        {
666            let inner = self.inner.get_mut();
667            if loc > inner.mem.leaves() {
668                return Err(Error::LeafOutOfBounds(loc));
669            }
670            if pos <= inner.pruned_to_pos {
671                return Ok(());
672            }
673        }
674
675        // Flush items cached in the mem to disk to ensure the current state is recoverable.
676        self.sync().await?;
677
678        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
679        // event of a pruning failure.
680        let pinned_nodes = self.update_metadata(pos).await?;
681
682        self.journal.prune(*pos).await?;
683        let inner = self.inner.get_mut();
684        inner.mem.add_pinned_nodes(pinned_nodes);
685        inner.pruned_to_pos = pos;
686
687        Ok(())
688    }
689
690    /// Compute the root of the structure using `inactive_peaks` and the bagging carried by `hasher`.
691    pub fn root(
692        &self,
693        hasher: &impl Hasher<F, Digest = D>,
694        inactive_peaks: usize,
695    ) -> Result<D, Error<F>> {
696        self.inner.read().mem.root(hasher, inactive_peaks)
697    }
698
699    /// Prune as many nodes as possible, leaving behind at most items_per_blob nodes in the current
700    /// blob.
701    pub async fn prune_all(&mut self) -> Result<(), Error<F>> {
702        let leaves = self.inner.get_mut().mem.leaves();
703        if leaves != 0 {
704            self.prune(leaves).await?;
705        }
706        Ok(())
707    }
708
709    /// Close and permanently remove any disk resources.
710    pub async fn destroy(self) -> Result<(), Error<F>> {
711        self.journal.destroy().await?;
712        self.metadata.destroy().await?;
713
714        Ok(())
715    }
716
717    #[cfg(any(test, feature = "fuzzing"))]
718    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
719    /// a partial write for testing failure scenarios.
720    pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error<F>> {
721        if write_limit == 0 {
722            return Ok(());
723        }
724
725        let inner = self.inner.get_mut();
726        let journal_size = Position::<F>::new(self.journal.size().await);
727
728        // Write the nodes cached in the memory-resident structure to the journal, aborting after
729        // write_count nodes have been written.
730        let mut written_count = 0usize;
731        for i in *journal_size..*inner.mem.size() {
732            let node = *inner.mem.get_node_unchecked(Position::new(i));
733            self.journal.append(&node).await?;
734            written_count += 1;
735            if written_count >= write_limit {
736                break;
737            }
738        }
739        self.journal.sync().await?;
740
741        Ok(())
742    }
743
744    #[cfg(test)]
745    /// Return a copy of the currently pinned nodes for recovery tests.
746    pub fn get_pinned_nodes(&self) -> BTreeMap<Position<F>, D> {
747        self.inner.read().mem.pinned_nodes()
748    }
749
750    #[cfg(test)]
751    /// Simulate a crash after pruning metadata is written but before the journal is pruned.
752    pub async fn simulate_pruning_failure(mut self, prune_to: Location<F>) -> Result<(), Error<F>> {
753        let prune_to_pos = Position::try_from(prune_to)?;
754        assert!(prune_to_pos <= self.inner.get_mut().mem.size());
755
756        // Flush items cached in the mem to disk to ensure the current state is recoverable.
757        self.sync().await?;
758
759        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
760        // event of a pruning failure.
761        self.update_metadata(prune_to_pos).await?;
762
763        // Don't actually prune the journal to simulate failure
764        Ok(())
765    }
766
767    /// Apply a merkleized batch to the structure.
768    ///
769    /// A batch is valid if the structure has not been modified since the batch
770    /// chain was created, or if only ancestors of this batch have been applied.
771    /// Already-committed ancestors are skipped automatically.
772    /// Applying a batch from a different fork returns [`Error::StaleBatch`].
773    pub fn apply_batch(&self, batch: &batch::MerkleizedBatch<F, D, S>) -> Result<(), Error<F>> {
774        self.inner.write().mem.apply_batch(batch)?;
775        Ok(())
776    }
777
778    /// Create an owned [`batch::MerkleizedBatch`] representing the current committed state.
779    ///
780    /// The batch has no data (the committed items are on disk, not in memory).
781    /// This is the starting point for building owned batch chains.
782    pub(crate) fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, D, S>> {
783        let inner = self.inner.read();
784        batch::MerkleizedBatch::from_mem_with_strategy(&inner.mem, self.strategy.clone())
785    }
786
787    /// Borrow the committed Mem through the read lock. Holds the lock for
788    /// the duration of the closure.
789    pub fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, D>) -> R) -> R {
790        let inner = self.inner.read();
791        f(&inner.mem)
792    }
793
794    /// Create a new speculative batch with this structure as its parent.
795    pub fn new_batch(&self) -> UnmerkleizedBatch<F, D, S> {
796        let inner = self.inner.read();
797        UnmerkleizedBatch {
798            inner: inner.mem.new_batch_with_strategy(self.strategy.clone()),
799        }
800    }
801
802    /// Return a reference to the merkleization strategy.
803    pub const fn strategy(&self) -> &S {
804        &self.strategy
805    }
806
807    /// Rewind the structure by the given number of leaves.
808    ///
809    /// Adds go through the batch API ([`Self::new_batch`] / [`Self::apply_batch`]), but removing
810    /// leaves requires `rewind`. After `init` or `sync`, the in-memory structure is pruned to O(log
811    /// n) pinned nodes. A batch pop would expose new peaks that are not in memory, and `merkleize`
812    /// cannot load them because [`Readable::get_node`] is synchronous. `rewind` performs async
813    /// journal I/O to rebuild state at the target position.
814    pub(crate) async fn rewind(&mut self, leaves_to_remove: usize) -> Result<(), Error<F>> {
815        if leaves_to_remove == 0 {
816            return Ok(());
817        }
818
819        let current_leaves = *self.leaves();
820        let destination_leaf = match current_leaves.checked_sub(leaves_to_remove as u64) {
821            Some(dest) => dest,
822            None => {
823                let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
824                return Err(if pruned_to_pos == 0 {
825                    Error::Empty
826                } else {
827                    Error::ElementPruned(pruned_to_pos - 1)
828                });
829            }
830        };
831
832        let destination_loc = Location::new(destination_leaf);
833        let new_size = Position::try_from(destination_loc).expect("valid leaf");
834
835        let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
836        if new_size < pruned_to_pos {
837            return Err(Error::ElementPruned(new_size));
838        }
839
840        // Rewind the journal if needed.
841        let journal_size = Position::<F>::new(self.journal.size().await);
842        if new_size < journal_size {
843            self.journal.rewind(*new_size).await?;
844            self.journal.sync().await?;
845        }
846
847        // Truncate the in-memory structure to the target size.
848        // If the in-memory structure has been pruned past the target (e.g. after sync),
849        // rebuild from the journal/metadata instead.
850        let inner = self.inner.get_mut();
851        if new_size >= Position::try_from(inner.mem.bounds().start).expect("valid mem bounds start")
852        {
853            inner.mem.truncate(new_size);
854        } else {
855            let mut pinned_nodes = Vec::new();
856            for pos in F::nodes_to_pin(destination_loc) {
857                pinned_nodes.push(
858                    Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?,
859                );
860            }
861            inner.mem = Mem::init(MemConfig {
862                nodes: vec![],
863                pruning_boundary: destination_loc,
864                pinned_nodes,
865            })?;
866            Self::add_extra_pinned_nodes(
867                &mut inner.mem,
868                &self.metadata,
869                &self.journal,
870                inner.pruned_to_pos,
871            )
872            .await?;
873        }
874
875        Ok(())
876    }
877}
878
879/// The [`Readable`] implementation for the full structure operates only on the in-memory
880/// portion. After [`Merkle::sync`], nodes that have been flushed to the journal are no longer
881/// accessible through this interface. In particular, [`Readable::get_node`] returns `None` for
882/// flushed positions, and [`Readable::pruning_boundary`] reflects the in-memory boundary (which may
883/// be tighter than the journal's prune boundary reported by [`Merkle::bounds`]). This means
884/// batch operations like `update_leaf` will correctly reject leaves that have been synced out of
885/// memory with [`Error::ElementPruned`].
886impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Readable
887    for Merkle<F, E, D, S>
888{
889    type Family = F;
890    type Digest = D;
891    type Error = Error<F>;
892
893    fn size(&self) -> Position<F> {
894        self.size()
895    }
896
897    fn get_node(&self, pos: Position<F>) -> Option<D> {
898        self.inner.read().mem.get_node(pos)
899    }
900
901    fn pruning_boundary(&self) -> Location<F> {
902        self.inner.read().mem.pruning_boundary()
903    }
904}
905
906impl<F: Family, E: RStorage + Clock + Metrics + Sync, D: Digest, S: Strategy>
907    crate::merkle::storage::Storage<F> for Merkle<F, E, D, S>
908{
909    type Digest = D;
910
911    async fn size(&self) -> Position<F> {
912        self.size()
913    }
914
915    async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
916        Self::get_node(self, position).await
917    }
918}
919
920impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Merkle<F, E, D, S> {
921    /// Return an inclusion proof for the element at the location `loc` against a historical
922    /// state with `leaves` leaves.
923    ///
924    /// The proof commits to `inactive_peaks`; peak bagging is selected by `hasher`.
925    ///
926    /// # Errors
927    ///
928    /// - Returns [Error::RangeOutOfBounds] if `leaves` is greater than `self.leaves()` or if `loc`
929    ///   is not provable at that historical size.
930    /// - Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
931    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
932    ///   pruned.
933    pub async fn historical_proof(
934        &self,
935        hasher: &impl Hasher<F, Digest = D>,
936        leaves: Location<F>,
937        loc: Location<F>,
938        inactive_peaks: usize,
939    ) -> Result<Proof<F, D>, Error<F>> {
940        if !loc.is_valid_index() {
941            return Err(Error::LocationOverflow(loc));
942        }
943        // loc is valid so it won't overflow from + 1
944        self.historical_range_proof(hasher, leaves, loc..loc + 1, inactive_peaks)
945            .await
946    }
947
948    /// Return an inclusion proof for the elements in `range` against a historical state with
949    /// `leaves` leaves.
950    ///
951    /// The proof commits to `inactive_peaks`; peak bagging is selected by `hasher`.
952    ///
953    /// # Errors
954    ///
955    /// - Returns [Error::RangeOutOfBounds] if `leaves` is greater than `self.leaves()` or if
956    ///   `range` is not provable at that historical size.
957    /// - Returns [Error::LocationOverflow] if any location in `range` exceeds [Family::MAX_LEAVES].
958    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
959    ///   pruned.
960    /// - Returns [Error::Empty] if the range is empty.
961    pub async fn historical_range_proof(
962        &self,
963        hasher: &impl Hasher<F, Digest = D>,
964        leaves: Location<F>,
965        range: core::ops::Range<Location<F>>,
966        inactive_peaks: usize,
967    ) -> Result<Proof<F, D>, Error<F>> {
968        if leaves > self.leaves() {
969            return Err(Error::RangeOutOfBounds(leaves));
970        }
971        crate::merkle::verification::historical_range_proof(
972            hasher,
973            self,
974            leaves,
975            range,
976            inactive_peaks,
977        )
978        .await
979    }
980
981    /// Return an inclusion proof for the element at the location `loc` that can be verified against
982    /// the current root.
983    ///
984    /// The proof commits to `inactive_peaks`; peak bagging is selected by `hasher`.
985    ///
986    /// Unlike the in-memory `Mem::proof`, this async method can read from the backing journal for
987    /// nodes that have been synced out of memory.
988    ///
989    /// # Errors
990    ///
991    /// - Returns [Error::LocationOverflow] if `loc` exceeds [Family::MAX_LEAVES].
992    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
993    ///   pruned.
994    /// - Returns [Error::Empty] if the range is empty.
995    pub async fn proof(
996        &self,
997        hasher: &impl Hasher<F, Digest = D>,
998        loc: Location<F>,
999        inactive_peaks: usize,
1000    ) -> Result<Proof<F, D>, Error<F>> {
1001        if !loc.is_valid_index() {
1002            return Err(Error::LocationOverflow(loc));
1003        }
1004        // loc is valid so it won't overflow from + 1
1005        self.range_proof(hasher, loc..loc + 1, inactive_peaks).await
1006    }
1007
1008    /// Return an inclusion proof for the elements within the specified location range.
1009    ///
1010    /// The proof commits to `inactive_peaks`; peak bagging is selected by `hasher`.
1011    ///
1012    /// Unlike the in-memory `Mem::range_proof`, this async method can read from the backing
1013    /// journal for nodes that have been synced out of memory.
1014    ///
1015    /// # Errors
1016    ///
1017    /// - Returns [Error::LocationOverflow] if any location in `range` exceeds [Family::MAX_LEAVES].
1018    /// - Returns [Error::ElementPruned] if some element needed to generate the proof has been
1019    ///   pruned.
1020    /// - Returns [Error::Empty] if the range is empty.
1021    pub async fn range_proof(
1022        &self,
1023        hasher: &impl Hasher<F, Digest = D>,
1024        range: core::ops::Range<Location<F>>,
1025        inactive_peaks: usize,
1026    ) -> Result<Proof<F, D>, Error<F>> {
1027        self.historical_range_proof(hasher, self.leaves(), range, inactive_peaks)
1028            .await
1029    }
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use super::*;
1035    use crate::{
1036        journal::contiguous::fixed::{Config as JConfig, Journal},
1037        merkle::{
1038            hasher::Standard, mmb, mmr, Bagging::ForwardFold, Location, LocationRangeExt as _,
1039            Position, Proof,
1040        },
1041        metadata::{Config as MConfig, Metadata},
1042    };
1043    use commonware_cryptography::{
1044        sha256::{self, Digest},
1045        Hasher as _, Sha256,
1046    };
1047    use commonware_macros::test_traced;
1048    use commonware_parallel::Sequential;
1049    use commonware_runtime::{
1050        buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Supervisor as _,
1051    };
1052    use commonware_utils::{non_empty_range, sequence::prefixed_u64::U64, NZUsize, NZU16, NZU64};
1053    use std::{
1054        collections::BTreeMap,
1055        num::{NonZeroU16, NonZeroUsize},
1056    };
1057
1058    fn test_digest(v: usize) -> Digest {
1059        Sha256::hash(&v.to_be_bytes())
1060    }
1061
1062    const PAGE_SIZE: NonZeroU16 = NZU16!(111);
1063    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
1064
1065    fn test_config(pooler: &impl BufferPooler) -> Config<Sequential> {
1066        Config {
1067            journal_partition: "journal-partition".into(),
1068            metadata_partition: "metadata-partition".into(),
1069            items_per_blob: NZU64!(7),
1070            write_buffer: NZUsize!(1024),
1071            strategy: Sequential,
1072            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1073        }
1074    }
1075
1076    async fn full_empty_inner<F: Family>(context: deterministic::Context) {
1077        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1078        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1079            context.child("first"),
1080            &hasher,
1081            test_config(&context),
1082        )
1083        .await
1084        .unwrap();
1085        assert_eq!(mmr.size(), 0);
1086        assert!(mmr.get_node(Position::<F>::new(0)).await.is_err());
1087        let bounds = mmr.bounds();
1088        assert!(bounds.is_empty());
1089        assert!(mmr.prune_all().await.is_ok());
1090        assert_eq!(bounds.start, 0);
1091        assert!(mmr.prune(Location::<F>::new(0)).await.is_ok());
1092        assert!(mmr.sync().await.is_ok());
1093        assert!(matches!(mmr.rewind(1).await, Err(Error::Empty)));
1094
1095        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1096        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1097        mmr.apply_batch(&batch).unwrap();
1098        assert_eq!(mmr.size(), 1);
1099        mmr.sync().await.unwrap();
1100        assert!(mmr.get_node(Position::<F>::new(0)).await.is_ok());
1101        assert!(mmr.rewind(1).await.is_ok());
1102        assert_eq!(mmr.size(), 0);
1103        mmr.sync().await.unwrap();
1104
1105        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1106            context.child("second"),
1107            &hasher,
1108            test_config(&context),
1109        )
1110        .await
1111        .unwrap();
1112        assert_eq!(mmr.size(), 0);
1113
1114        let empty_proof = Proof::<F, Digest>::default();
1115        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1116        let root = mmr.root(&hasher, 0).unwrap();
1117        assert!(empty_proof.verify_range_inclusion(
1118            &hasher,
1119            &[] as &[Digest],
1120            Location::<F>::new(0),
1121            &root
1122        ));
1123        assert!(empty_proof.verify_multi_inclusion(
1124            &hasher,
1125            &[] as &[(Digest, Location<F>)],
1126            &root
1127        ));
1128
1129        // Confirm empty proof no longer verifies after adding an element.
1130        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1131        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1132        mmr.apply_batch(&batch).unwrap();
1133        let root = mmr.root(&hasher, 0).unwrap();
1134        assert!(!empty_proof.verify_range_inclusion(
1135            &hasher,
1136            &[] as &[Digest],
1137            Location::<F>::new(0),
1138            &root
1139        ));
1140        assert!(!empty_proof.verify_multi_inclusion(
1141            &hasher,
1142            &[] as &[(Digest, Location<F>)],
1143            &root
1144        ));
1145
1146        mmr.destroy().await.unwrap();
1147    }
1148
1149    #[test_traced]
1150    fn test_full_empty_mmr() {
1151        let executor = deterministic::Runner::default();
1152        executor.start(full_empty_inner::<mmr::Family>);
1153    }
1154
1155    #[test_traced]
1156    fn test_full_empty_mmb() {
1157        let executor = deterministic::Runner::default();
1158        executor.start(full_empty_inner::<mmb::Family>);
1159    }
1160
1161    async fn full_prune_out_of_bounds_returns_error_inner<F: Family>(
1162        context: deterministic::Context,
1163    ) {
1164        let hasher = Standard::<Sha256>::new(ForwardFold);
1165        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1166            context.child("oob_prune"),
1167            &hasher,
1168            test_config(&context),
1169        )
1170        .await
1171        .unwrap();
1172
1173        let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1174        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1175        mmr.apply_batch(&batch).unwrap();
1176
1177        assert!(matches!(
1178            mmr.prune(Location::<F>::new(2)).await,
1179            Err(Error::LeafOutOfBounds(loc)) if loc == Location::<F>::new(2)
1180        ));
1181
1182        mmr.destroy().await.unwrap();
1183    }
1184
1185    #[test_traced]
1186    fn test_full_prune_out_of_bounds_returns_error_mmr() {
1187        let executor = deterministic::Runner::default();
1188        executor.start(full_prune_out_of_bounds_returns_error_inner::<mmr::Family>);
1189    }
1190
1191    #[test_traced]
1192    fn test_full_prune_out_of_bounds_returns_error_mmb() {
1193        let executor = deterministic::Runner::default();
1194        executor.start(full_prune_out_of_bounds_returns_error_inner::<mmb::Family>);
1195    }
1196
1197    async fn full_rewind_error_leaves_valid_state_inner<F: Family>(
1198        context: deterministic::Context,
1199    ) {
1200        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1201
1202        // Case 1: rewind partially succeeds, then returns ElementPruned.
1203        let element_pruned_context = context.child("element_pruned_case");
1204        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1205            element_pruned_context.child("element_pruned"),
1206            &hasher,
1207            test_config(&element_pruned_context),
1208        )
1209        .await
1210        .unwrap();
1211        let mut batch = mmr.new_batch();
1212        for i in 0u64..32 {
1213            batch = batch.add(&hasher, &i.to_be_bytes());
1214        }
1215        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1216        mmr.apply_batch(&batch).unwrap();
1217        mmr.prune(Location::<F>::new(8)).await.unwrap();
1218        let leaves_before = mmr.leaves();
1219        assert!(matches!(
1220            mmr.rewind(128).await,
1221            Err(Error::ElementPruned(_))
1222        ));
1223        // After error, leaves should reflect any partial rewinds that occurred.
1224        assert!(mmr.leaves() <= leaves_before);
1225        mmr.destroy().await.unwrap();
1226
1227        // Case 2: rewind partially succeeds, then returns Empty.
1228        let empty_context = context.child("empty_case");
1229        let cfg = test_config(&empty_context);
1230        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(empty_context, &hasher, cfg)
1231            .await
1232            .unwrap();
1233        let mut batch = mmr.new_batch();
1234        for i in 0u64..8 {
1235            batch = batch.add(&hasher, &i.to_be_bytes());
1236        }
1237        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1238        mmr.apply_batch(&batch).unwrap();
1239        let leaves_before = mmr.leaves();
1240        assert!(matches!(mmr.rewind(9).await, Err(Error::Empty)));
1241        // Rewind returns error without partial modification.
1242        assert_eq!(mmr.leaves(), leaves_before);
1243        mmr.destroy().await.unwrap();
1244    }
1245
1246    #[test_traced]
1247    fn test_full_rewind_error_leaves_valid_state_mmr() {
1248        let executor = deterministic::Runner::default();
1249        executor.start(full_rewind_error_leaves_valid_state_inner::<mmr::Family>);
1250    }
1251
1252    #[test_traced]
1253    fn test_full_rewind_error_leaves_valid_state_mmb() {
1254        let executor = deterministic::Runner::default();
1255        executor.start(full_rewind_error_leaves_valid_state_inner::<mmb::Family>);
1256    }
1257
1258    async fn full_basic_inner<F: Family>(context: deterministic::Context) {
1259        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1260        let cfg = test_config(&context);
1261        let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1262            .await
1263            .unwrap();
1264        // Build a test structure with 255 leaves
1265        const LEAF_COUNT: usize = 255;
1266        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1267        for i in 0..LEAF_COUNT {
1268            leaves.push(test_digest(i));
1269        }
1270        let mut batch = mmr.new_batch();
1271        for leaf in &leaves {
1272            batch = batch.add(&hasher, leaf);
1273        }
1274        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1275        mmr.apply_batch(&batch).unwrap();
1276        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1277        assert_eq!(mmr.size(), expected_size);
1278
1279        // Generate & verify proof from element that is not yet flushed to the journal.
1280        const TEST_ELEMENT: usize = 133;
1281        let test_element_loc: Location<F> = Location::new(TEST_ELEMENT as u64);
1282
1283        let proof = mmr.proof(&hasher, test_element_loc, 0).await.unwrap();
1284        let root = mmr.root(&hasher, 0).unwrap();
1285        assert!(proof.verify_element_inclusion(
1286            &hasher,
1287            &leaves[TEST_ELEMENT],
1288            test_element_loc,
1289            &root
1290        ));
1291
1292        // Sync the structure, make sure it flushes the in-mem structure as expected.
1293        mmr.sync().await.unwrap();
1294
1295        // Now that the element is flushed from the in-mem structure, confirm its proof is still
1296        // generated correctly.
1297        let proof2 = mmr.proof(&hasher, test_element_loc, 0).await.unwrap();
1298        assert_eq!(proof, proof2);
1299
1300        // Generate & verify a proof that spans flushed elements and the last element.
1301        let range = Location::<F>::new(TEST_ELEMENT as u64)..Location::<F>::new(LEAF_COUNT as u64);
1302        let proof = mmr.range_proof(&hasher, range.clone(), 0).await.unwrap();
1303        assert!(proof.verify_range_inclusion(
1304            &hasher,
1305            &leaves[range.to_usize_range()],
1306            test_element_loc,
1307            &root
1308        ));
1309
1310        mmr.destroy().await.unwrap();
1311    }
1312
1313    #[test_traced]
1314    fn test_full_basic_mmr() {
1315        let executor = deterministic::Runner::default();
1316        executor.start(full_basic_inner::<mmr::Family>);
1317    }
1318
1319    #[test_traced]
1320    fn test_full_basic_mmb() {
1321        let executor = deterministic::Runner::default();
1322        executor.start(full_basic_inner::<mmb::Family>);
1323    }
1324
1325    /// Generates a stateful structure, simulates a crash that wrote a leaf but not its parent
1326    /// nodes, and confirms we appropriately recover to a valid state.
1327    async fn full_recovery_inner<F: Family>(context: deterministic::Context) {
1328        use crate::journal::contiguous::fixed::{Config as JConfig, Journal};
1329
1330        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1331        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1332            context.child("first"),
1333            &hasher,
1334            test_config(&context),
1335        )
1336        .await
1337        .unwrap();
1338        assert_eq!(mmr.size(), 0);
1339
1340        // Build a test structure with 252 leaves
1341        const LEAF_COUNT: usize = 252;
1342        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1343        for i in 0..LEAF_COUNT {
1344            leaves.push(test_digest(i));
1345        }
1346        let mut batch = mmr.new_batch();
1347        for leaf in &leaves {
1348            batch = batch.add(&hasher, leaf);
1349        }
1350        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1351        mmr.apply_batch(&batch).unwrap();
1352        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1353        assert_eq!(mmr.size(), expected_size);
1354        mmr.sync().await.unwrap();
1355        drop(mmr);
1356
1357        // Simulate a crash that wrote a leaf but not its parent nodes by appending one
1358        // extra digest to the journal. This creates an invalid structure size.
1359        {
1360            let journal: Journal<_, Digest> = Journal::init(
1361                context.child("corrupt"),
1362                JConfig {
1363                    partition: "journal-partition".into(),
1364                    items_per_blob: NZU64!(7),
1365                    write_buffer: NZUsize!(1024),
1366                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1367                },
1368            )
1369            .await
1370            .unwrap();
1371            assert_eq!(journal.size().await, expected_size);
1372            journal.append(&Sha256::hash(b"orphan")).await.unwrap();
1373            journal.sync().await.unwrap();
1374            assert_eq!(journal.size().await, expected_size + 1);
1375        }
1376
1377        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1378            context.child("second"),
1379            &hasher,
1380            test_config(&context),
1381        )
1382        .await
1383        .unwrap();
1384        // Since the orphaned leaf is replayed, the structure recovers to the previous valid state
1385        // plus the new leaf.
1386        let recovered_size =
1387            Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64 + 1)).unwrap();
1388        assert_eq!(mmr.size(), recovered_size);
1389
1390        // Make sure dropping it and re-opening it persists the recovered state.
1391        drop(mmr);
1392        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1393            context.child("third"),
1394            &hasher,
1395            test_config(&context),
1396        )
1397        .await
1398        .unwrap();
1399        assert_eq!(mmr.size(), recovered_size);
1400
1401        mmr.destroy().await.unwrap();
1402    }
1403
1404    #[test_traced]
1405    fn test_full_recovery_mmr() {
1406        let executor = deterministic::Runner::default();
1407        executor.start(full_recovery_inner::<mmr::Family>);
1408    }
1409
1410    #[test_traced]
1411    fn test_full_recovery_mmb() {
1412        let executor = deterministic::Runner::default();
1413        executor.start(full_recovery_inner::<mmb::Family>);
1414    }
1415
1416    async fn full_pruning_inner<F: Family>(context: deterministic::Context) {
1417        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1418        // make sure pruning doesn't break root computation, adding of new nodes, etc.
1419        const LEAF_COUNT: usize = 2000;
1420        let cfg_pruned = test_config(&context);
1421        let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1422            context.child("pruned"),
1423            &hasher,
1424            cfg_pruned.clone(),
1425        )
1426        .await
1427        .unwrap();
1428        let cfg_unpruned = Config {
1429            journal_partition: "unpruned-journal-partition".into(),
1430            metadata_partition: "unpruned-metadata-partition".into(),
1431            items_per_blob: NZU64!(7),
1432            write_buffer: NZUsize!(1024),
1433            strategy: Sequential,
1434            page_cache: cfg_pruned.page_cache.clone(),
1435        };
1436        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1437            context.child("unpruned"),
1438            &hasher,
1439            cfg_unpruned,
1440        )
1441        .await
1442        .unwrap();
1443        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1444        for i in 0..LEAF_COUNT {
1445            leaves.push(test_digest(i));
1446        }
1447        let mut batch = mmr.new_batch();
1448        for leaf in &leaves {
1449            batch = batch.add(&hasher, leaf);
1450        }
1451        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1452        mmr.apply_batch(&batch).unwrap();
1453        let mut batch = pruned_mmr.new_batch();
1454        for leaf in &leaves {
1455            batch = batch.add(&hasher, leaf);
1456        }
1457        let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1458        pruned_mmr.apply_batch(&batch).unwrap();
1459        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1460        assert_eq!(mmr.size(), expected_size);
1461        assert_eq!(pruned_mmr.size(), expected_size);
1462
1463        // Prune the structure in increments of 10 making sure the journal is still able to compute
1464        // roots and accept new elements.
1465        for i in 0usize..300 {
1466            let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 10, *pruned_mmr.leaves()));
1467            pruned_mmr.prune(prune_loc).await.unwrap();
1468            assert_eq!(prune_loc, pruned_mmr.bounds().start);
1469
1470            let digest = test_digest(LEAF_COUNT + i);
1471            leaves.push(digest);
1472            let last_leaf = leaves.last().unwrap();
1473            let batch = pruned_mmr.new_batch().add(&hasher, last_leaf);
1474            let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1475            pruned_mmr.apply_batch(&batch).unwrap();
1476            let batch = mmr.new_batch().add(&hasher, last_leaf);
1477            let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1478            mmr.apply_batch(&batch).unwrap();
1479            assert_eq!(
1480                pruned_mmr.root(&hasher, 0).unwrap(),
1481                mmr.root(&hasher, 0).unwrap()
1482            );
1483        }
1484
1485        // Sync the structures.
1486        pruned_mmr.sync().await.unwrap();
1487        assert_eq!(
1488            pruned_mmr.root(&hasher, 0).unwrap(),
1489            mmr.root(&hasher, 0).unwrap()
1490        );
1491
1492        // Sync the structure & reopen.
1493        pruned_mmr.sync().await.unwrap();
1494        drop(pruned_mmr);
1495        let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1496            context.child("pruned_reopen"),
1497            &hasher,
1498            cfg_pruned.clone(),
1499        )
1500        .await
1501        .unwrap();
1502        assert_eq!(
1503            pruned_mmr.root(&hasher, 0).unwrap(),
1504            mmr.root(&hasher, 0).unwrap()
1505        );
1506
1507        // Prune everything.
1508        let size = pruned_mmr.size();
1509        pruned_mmr.prune_all().await.unwrap();
1510        assert_eq!(
1511            pruned_mmr.root(&hasher, 0).unwrap(),
1512            mmr.root(&hasher, 0).unwrap()
1513        );
1514        let bounds = pruned_mmr.bounds();
1515        assert!(bounds.is_empty());
1516        assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1517
1518        // Close structure after adding a new node without syncing and make sure state is as
1519        // expected on reopening.
1520        let batch = mmr.new_batch().add(&hasher, &test_digest(LEAF_COUNT));
1521        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1522        mmr.apply_batch(&batch).unwrap();
1523        let batch = pruned_mmr
1524            .new_batch()
1525            .add(&hasher, &test_digest(LEAF_COUNT));
1526        let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1527        pruned_mmr.apply_batch(&batch).unwrap();
1528        assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1529        pruned_mmr.sync().await.unwrap();
1530        drop(pruned_mmr);
1531        let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1532            context.child("pruned_reopen").with_attribute("index", 2),
1533            &hasher,
1534            cfg_pruned.clone(),
1535        )
1536        .await
1537        .unwrap();
1538        assert_eq!(
1539            pruned_mmr.root(&hasher, 0).unwrap(),
1540            mmr.root(&hasher, 0).unwrap()
1541        );
1542        let bounds = pruned_mmr.bounds();
1543        assert!(!bounds.is_empty());
1544        assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1545
1546        // Make sure pruning to older location is a no-op.
1547        assert!(pruned_mmr
1548            .prune(Location::<F>::try_from(size).unwrap() - 1)
1549            .await
1550            .is_ok());
1551        assert_eq!(
1552            pruned_mmr.bounds().start,
1553            Location::<F>::try_from(size).unwrap()
1554        );
1555
1556        // Add nodes until we are on a blob boundary, and confirm prune_all still removes all
1557        // retained nodes.
1558        while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1559            let batch = pruned_mmr
1560                .new_batch()
1561                .add(&hasher, &test_digest(LEAF_COUNT));
1562            let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1563            pruned_mmr.apply_batch(&batch).unwrap();
1564        }
1565        pruned_mmr.prune_all().await.unwrap();
1566        assert!(pruned_mmr.bounds().is_empty());
1567
1568        pruned_mmr.destroy().await.unwrap();
1569        mmr.destroy().await.unwrap();
1570    }
1571
1572    #[test_traced]
1573    fn test_full_pruning_mmr() {
1574        let executor = deterministic::Runner::default();
1575        executor.start(full_pruning_inner::<mmr::Family>);
1576    }
1577
1578    #[test_traced]
1579    fn test_full_pruning_mmb() {
1580        let executor = deterministic::Runner::default();
1581        executor.start(full_pruning_inner::<mmb::Family>);
1582    }
1583
1584    /// Simulate partial writes after pruning, making sure we recover to a valid state.
1585    async fn full_recovery_with_pruning_inner<F: Family>(context: deterministic::Context) {
1586        // Build structure with 2000 leaves.
1587        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1588        const LEAF_COUNT: usize = 2000;
1589        let mut leaves = Vec::with_capacity(LEAF_COUNT);
1590        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1591            context.child("init"),
1592            &hasher,
1593            test_config(&context),
1594        )
1595        .await
1596        .unwrap();
1597        for i in 0..LEAF_COUNT {
1598            leaves.push(test_digest(i));
1599        }
1600        let mut batch = mmr.new_batch();
1601        for leaf in &leaves {
1602            batch = batch.add(&hasher, leaf);
1603        }
1604        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1605        mmr.apply_batch(&batch).unwrap();
1606        let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1607        assert_eq!(mmr.size(), expected_size);
1608        mmr.sync().await.unwrap();
1609        drop(mmr);
1610
1611        // Prune the structure in increments of 50, simulating a partial write after each prune.
1612        for i in 0usize..200 {
1613            let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1614                context.child("iter").with_attribute("index", i),
1615                &hasher,
1616                test_config(&context),
1617            )
1618            .await
1619            .unwrap();
1620            let start_size = mmr.size();
1621            let start_leaves = *mmr.leaves();
1622            let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 50, start_leaves));
1623            if i % 5 == 0 {
1624                mmr.simulate_pruning_failure(prune_loc).await.unwrap();
1625                continue;
1626            }
1627            mmr.prune(prune_loc).await.unwrap();
1628
1629            // add new elements, simulating a partial write after each.
1630            for j in 0..10 {
1631                let digest = test_digest(100 * (i + 1) + j);
1632                leaves.push(digest);
1633                let batch = mmr
1634                    .new_batch()
1635                    .add(&hasher, leaves.last().unwrap())
1636                    .add(&hasher, leaves.last().unwrap());
1637                let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1638                mmr.apply_batch(&batch).unwrap();
1639                let digest = test_digest(LEAF_COUNT + i);
1640                leaves.push(digest);
1641                let batch = mmr
1642                    .new_batch()
1643                    .add(&hasher, leaves.last().unwrap())
1644                    .add(&hasher, leaves.last().unwrap());
1645                let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1646                mmr.apply_batch(&batch).unwrap();
1647            }
1648            let end_size = mmr.size();
1649            let total_to_write = (*end_size - *start_size) as usize;
1650            let partial_write_limit = i % total_to_write;
1651            mmr.simulate_partial_sync(partial_write_limit)
1652                .await
1653                .unwrap();
1654        }
1655
1656        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1657            context.child("final"),
1658            &hasher,
1659            test_config(&context),
1660        )
1661        .await
1662        .unwrap();
1663        mmr.destroy().await.unwrap();
1664    }
1665
1666    #[test_traced("WARN")]
1667    fn test_full_recovery_with_pruning_mmr() {
1668        let executor = deterministic::Runner::default();
1669        executor.start(full_recovery_with_pruning_inner::<mmr::Family>);
1670    }
1671
1672    #[test_traced("WARN")]
1673    fn test_full_recovery_with_pruning_mmb() {
1674        let executor = deterministic::Runner::default();
1675        executor.start(full_recovery_with_pruning_inner::<mmb::Family>);
1676    }
1677
1678    async fn full_historical_proof_basic_inner<F: Family>(context: deterministic::Context) {
1679        // Create structure with 10 elements
1680        let hasher = Standard::<Sha256>::new(ForwardFold);
1681        let cfg = test_config(&context);
1682        let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1683            .await
1684            .unwrap();
1685        let mut elements = Vec::new();
1686        for i in 0..10 {
1687            elements.push(test_digest(i));
1688        }
1689        let mut batch = mmr.new_batch();
1690        for elt in &elements {
1691            batch = batch.add(&hasher, elt);
1692        }
1693        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1694        mmr.apply_batch(&batch).unwrap();
1695        let original_leaves = mmr.leaves();
1696
1697        // Historical proof should match "regular" proof when historical size == current database size
1698        let historical_proof = mmr
1699            .historical_range_proof(
1700                &hasher,
1701                original_leaves,
1702                Location::<F>::new(2)..Location::<F>::new(6),
1703                0,
1704            )
1705            .await
1706            .unwrap();
1707        assert_eq!(historical_proof.leaves, original_leaves);
1708        let root = mmr.root(&hasher, 0).unwrap();
1709        assert!(historical_proof.verify_range_inclusion(
1710            &hasher,
1711            &elements[2..6],
1712            Location::<F>::new(2),
1713            &root
1714        ));
1715        let regular_proof = mmr
1716            .range_proof(&hasher, Location::<F>::new(2)..Location::<F>::new(6), 0)
1717            .await
1718            .unwrap();
1719        assert_eq!(regular_proof.leaves, historical_proof.leaves);
1720        assert_eq!(regular_proof.digests, historical_proof.digests);
1721
1722        // Add more elements to the structure
1723        for i in 10..20 {
1724            elements.push(test_digest(i));
1725        }
1726        let mut batch = mmr.new_batch();
1727        for elt in &elements[10..20] {
1728            batch = batch.add(&hasher, elt);
1729        }
1730        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1731        mmr.apply_batch(&batch).unwrap();
1732        let new_historical_proof = mmr
1733            .historical_range_proof(
1734                &hasher,
1735                original_leaves,
1736                Location::<F>::new(2)..Location::<F>::new(6),
1737                0,
1738            )
1739            .await
1740            .unwrap();
1741        assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1742        assert_eq!(new_historical_proof.digests, historical_proof.digests);
1743
1744        mmr.destroy().await.unwrap();
1745    }
1746
1747    #[test_traced]
1748    fn test_full_historical_proof_basic_mmr() {
1749        let executor = deterministic::Runner::default();
1750        executor.start(full_historical_proof_basic_inner::<mmr::Family>);
1751    }
1752
1753    #[test_traced]
1754    fn test_full_historical_proof_basic_mmb() {
1755        let executor = deterministic::Runner::default();
1756        executor.start(full_historical_proof_basic_inner::<mmb::Family>);
1757    }
1758
1759    async fn full_historical_proof_with_pruning_inner<F: Family>(context: deterministic::Context) {
1760        let hasher = Standard::<Sha256>::new(ForwardFold);
1761        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1762            context.child("main"),
1763            &hasher,
1764            test_config(&context),
1765        )
1766        .await
1767        .unwrap();
1768
1769        // Add many elements
1770        let mut elements = Vec::new();
1771        for i in 0..50 {
1772            elements.push(test_digest(i));
1773        }
1774        let mut batch = mmr.new_batch();
1775        for elt in &elements {
1776            batch = batch.add(&hasher, elt);
1777        }
1778        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1779        mmr.apply_batch(&batch).unwrap();
1780
1781        // Prune to leaf 16 (position 30)
1782        let prune_loc = Location::<F>::new(16);
1783        mmr.prune(prune_loc).await.unwrap();
1784
1785        // Create reference structure for verification to get correct size
1786        let ref_mmr = Merkle::<F, _, Digest, Sequential>::init(
1787            context.child("ref"),
1788            &hasher,
1789            Config {
1790                journal_partition: "ref-journal-pruned".into(),
1791                metadata_partition: "ref-metadata-pruned".into(),
1792                items_per_blob: NZU64!(7),
1793                write_buffer: NZUsize!(1024),
1794                strategy: Sequential,
1795                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1796            },
1797        )
1798        .await
1799        .unwrap();
1800
1801        let mut batch = ref_mmr.new_batch();
1802        for elt in elements.iter().take(41) {
1803            batch = batch.add(&hasher, elt);
1804        }
1805        let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1806        ref_mmr.apply_batch(&batch).unwrap();
1807        let historical_leaves = ref_mmr.leaves();
1808        let historical_root = ref_mmr.root(&hasher, 0).unwrap();
1809
1810        // Test proof at historical position after pruning
1811        let historical_proof = mmr
1812            .historical_range_proof(
1813                &hasher,
1814                historical_leaves,
1815                Location::<F>::new(35)..Location::<F>::new(39),
1816                0,
1817            )
1818            .await
1819            .unwrap();
1820
1821        assert_eq!(historical_proof.leaves, historical_leaves);
1822
1823        // Verify proof works despite pruning
1824        assert!(historical_proof.verify_range_inclusion(
1825            &hasher,
1826            &elements[35..39],
1827            Location::<F>::new(35),
1828            &historical_root
1829        ));
1830
1831        ref_mmr.destroy().await.unwrap();
1832        mmr.destroy().await.unwrap();
1833    }
1834
1835    #[test_traced]
1836    fn test_full_historical_proof_with_pruning_mmr() {
1837        let executor = deterministic::Runner::default();
1838        executor.start(full_historical_proof_with_pruning_inner::<mmr::Family>);
1839    }
1840
1841    #[test_traced]
1842    fn test_full_historical_proof_with_pruning_mmb() {
1843        let executor = deterministic::Runner::default();
1844        executor.start(full_historical_proof_with_pruning_inner::<mmb::Family>);
1845    }
1846
1847    async fn full_historical_proof_large_inner<F: Family>(context: deterministic::Context) {
1848        let hasher = Standard::<Sha256>::new(ForwardFold);
1849
1850        let mmr = Merkle::<F, _, Digest, Sequential>::init(
1851            context.child("server"),
1852            &hasher,
1853            Config {
1854                journal_partition: "server-journal".into(),
1855                metadata_partition: "server-metadata".into(),
1856                items_per_blob: NZU64!(7),
1857                write_buffer: NZUsize!(1024),
1858                strategy: Sequential,
1859                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1860            },
1861        )
1862        .await
1863        .unwrap();
1864
1865        let mut elements = Vec::new();
1866        for i in 0..100 {
1867            elements.push(test_digest(i));
1868        }
1869        let mut batch = mmr.new_batch();
1870        for elt in &elements {
1871            batch = batch.add(&hasher, elt);
1872        }
1873        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1874        mmr.apply_batch(&batch).unwrap();
1875
1876        let range = Location::<F>::new(30)..Location::<F>::new(61);
1877
1878        // Only apply elements up to end_loc to the reference structure.
1879        let ref_mmr = Merkle::<F, _, Digest, Sequential>::init(
1880            context.child("client"),
1881            &hasher,
1882            Config {
1883                journal_partition: "client-journal".into(),
1884                metadata_partition: "client-metadata".into(),
1885                items_per_blob: NZU64!(7),
1886                write_buffer: NZUsize!(1024),
1887                strategy: Sequential,
1888                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1889            },
1890        )
1891        .await
1892        .unwrap();
1893
1894        // Add elements up to the end of the range to verify historical root
1895        let mut batch = ref_mmr.new_batch();
1896        for elt in elements.iter().take(*range.end as usize) {
1897            batch = batch.add(&hasher, elt);
1898        }
1899        let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1900        ref_mmr.apply_batch(&batch).unwrap();
1901        let historical_leaves = ref_mmr.leaves();
1902        let expected_root = ref_mmr.root(&hasher, 0).unwrap();
1903
1904        // Generate proof from full structure
1905        let proof = mmr
1906            .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
1907            .await
1908            .unwrap();
1909
1910        assert!(proof.verify_range_inclusion(
1911            &hasher,
1912            &elements[range.to_usize_range()],
1913            range.start,
1914            &expected_root, // Compare to historical (reference) root
1915        ));
1916
1917        ref_mmr.destroy().await.unwrap();
1918        mmr.destroy().await.unwrap();
1919    }
1920
1921    #[test_traced]
1922    fn test_full_historical_proof_large_mmr() {
1923        let executor = deterministic::Runner::default();
1924        executor.start(full_historical_proof_large_inner::<mmr::Family>);
1925    }
1926
1927    #[test_traced]
1928    fn test_full_historical_proof_large_mmb() {
1929        let executor = deterministic::Runner::default();
1930        executor.start(full_historical_proof_large_inner::<mmb::Family>);
1931    }
1932
1933    async fn full_historical_proof_singleton_inner<F: Family>(context: deterministic::Context) {
1934        let hasher = Standard::<Sha256>::new(ForwardFold);
1935        let cfg = test_config(&context);
1936        let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1937            .await
1938            .unwrap();
1939
1940        let element = test_digest(0);
1941        let batch = mmr.new_batch().add(&hasher, &element);
1942        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1943        mmr.apply_batch(&batch).unwrap();
1944
1945        // Test single element proof at historical position
1946        let single_proof = mmr
1947            .historical_range_proof(
1948                &hasher,
1949                Location::<F>::new(1),
1950                Location::<F>::new(0)..Location::<F>::new(1),
1951                0,
1952            )
1953            .await
1954            .unwrap();
1955
1956        let root = mmr.root(&hasher, 0).unwrap();
1957        assert!(single_proof.verify_range_inclusion(
1958            &hasher,
1959            &[element],
1960            Location::<F>::new(0),
1961            &root
1962        ));
1963
1964        mmr.destroy().await.unwrap();
1965    }
1966
1967    #[test_traced]
1968    fn test_full_historical_proof_singleton_mmr() {
1969        let executor = deterministic::Runner::default();
1970        executor.start(full_historical_proof_singleton_inner::<mmr::Family>);
1971    }
1972
1973    #[test_traced]
1974    fn test_full_historical_proof_singleton_mmb() {
1975        let executor = deterministic::Runner::default();
1976        executor.start(full_historical_proof_singleton_inner::<mmb::Family>);
1977    }
1978
1979    // Test `init_sync` when there is no persisted data.
1980    async fn full_init_sync_empty_inner<F: Family>(context: deterministic::Context) {
1981        let hasher = Standard::<Sha256>::new(ForwardFold);
1982
1983        // Test fresh start scenario with completely new structure (no existing data)
1984        let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
1985            config: test_config(&context),
1986            range: non_empty_range!(Location::<F>::new(0), Location::<F>::new(52)),
1987            pinned_nodes: None,
1988        };
1989
1990        let sync_mmr =
1991            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("storage"), sync_cfg)
1992                .await
1993                .unwrap();
1994
1995        // Should be fresh structure starting empty
1996        assert_eq!(sync_mmr.size(), 0);
1997        let bounds = sync_mmr.bounds();
1998        assert_eq!(bounds.start, 0);
1999        assert!(bounds.is_empty());
2000
2001        // Should be able to add new elements
2002        let new_element = test_digest(999);
2003        let batch = sync_mmr.new_batch().add(&hasher, &new_element);
2004        let batch = sync_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2005        sync_mmr.apply_batch(&batch).unwrap();
2006
2007        // Root should be computable
2008        let _root = sync_mmr.root(&hasher, 0).unwrap();
2009
2010        sync_mmr.destroy().await.unwrap();
2011    }
2012
2013    #[test_traced]
2014    fn test_full_init_sync_empty_mmr() {
2015        let executor = deterministic::Runner::default();
2016        executor.start(full_init_sync_empty_inner::<mmr::Family>);
2017    }
2018
2019    #[test_traced]
2020    fn test_full_init_sync_empty_mmb() {
2021        let executor = deterministic::Runner::default();
2022        executor.start(full_init_sync_empty_inner::<mmb::Family>);
2023    }
2024
2025    // Test `init_sync` where the persisted structure's persisted nodes match the sync boundaries.
2026    async fn full_init_sync_nonempty_exact_match_inner<F: Family>(context: deterministic::Context) {
2027        let hasher = Standard::<Sha256>::new(ForwardFold);
2028
2029        // Create initial structure with elements.
2030        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2031            context.child("init"),
2032            &hasher,
2033            test_config(&context),
2034        )
2035        .await
2036        .unwrap();
2037        let mut batch = mmr.new_batch();
2038        for i in 0..50 {
2039            batch = batch.add(&hasher, &test_digest(i));
2040        }
2041        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2042        mmr.apply_batch(&batch).unwrap();
2043        mmr.sync().await.unwrap();
2044        let original_size = mmr.size();
2045        let original_leaves = mmr.leaves();
2046        let original_root = mmr.root(&hasher, 0).unwrap();
2047
2048        // Sync with range.start <= existing_size <= range.end should reuse data
2049        let lower_bound_loc = mmr.bounds().start;
2050        let upper_bound_loc = mmr.leaves();
2051        let lower_bound_pos = Position::<F>::try_from(lower_bound_loc).unwrap();
2052        let upper_bound_pos = mmr.size();
2053        let mut expected_nodes = BTreeMap::new();
2054        for i in *lower_bound_pos..*upper_bound_pos {
2055            expected_nodes.insert(
2056                Position::<F>::new(i),
2057                mmr.get_node(Position::<F>::new(i)).await.unwrap().unwrap(),
2058            );
2059        }
2060        let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2061            config: test_config(&context),
2062            range: non_empty_range!(lower_bound_loc, upper_bound_loc),
2063            pinned_nodes: None,
2064        };
2065
2066        mmr.sync().await.unwrap();
2067        drop(mmr);
2068
2069        let sync_mmr =
2070            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2071                .await
2072                .unwrap();
2073
2074        // Should have existing data in the sync range.
2075        assert_eq!(sync_mmr.size(), original_size);
2076        assert_eq!(sync_mmr.leaves(), original_leaves);
2077        let bounds = sync_mmr.bounds();
2078        assert_eq!(bounds.start, lower_bound_loc);
2079        assert!(!bounds.is_empty());
2080        assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2081        for pos in *lower_bound_pos..*upper_bound_pos {
2082            let pos = Position::<F>::new(pos);
2083            assert_eq!(
2084                sync_mmr.get_node(pos).await.unwrap(),
2085                expected_nodes.get(&pos).cloned()
2086            );
2087        }
2088
2089        sync_mmr.destroy().await.unwrap();
2090    }
2091
2092    #[test_traced]
2093    fn test_full_init_sync_nonempty_exact_match_mmr() {
2094        let executor = deterministic::Runner::default();
2095        executor.start(full_init_sync_nonempty_exact_match_inner::<mmr::Family>);
2096    }
2097
2098    #[test_traced]
2099    fn test_full_init_sync_nonempty_exact_match_mmb() {
2100        let executor = deterministic::Runner::default();
2101        executor.start(full_init_sync_nonempty_exact_match_inner::<mmb::Family>);
2102    }
2103
2104    // Test `init_sync` where the persisted structure's data partially overlaps with the sync
2105    // boundaries.
2106    async fn full_init_sync_partial_overlap_inner<F: Family>(context: deterministic::Context) {
2107        let hasher = Standard::<Sha256>::new(ForwardFold);
2108
2109        // Create initial structure with elements.
2110        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2111            context.child("init"),
2112            &hasher,
2113            test_config(&context),
2114        )
2115        .await
2116        .unwrap();
2117        let mut batch = mmr.new_batch();
2118        for i in 0..30 {
2119            batch = batch.add(&hasher, &test_digest(i));
2120        }
2121        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2122        mmr.apply_batch(&batch).unwrap();
2123        mmr.sync().await.unwrap();
2124        mmr.prune(Location::<F>::new(6)).await.unwrap();
2125
2126        let original_size = mmr.size();
2127        let original_leaves = mmr.leaves();
2128        let original_root = mmr.root(&hasher, 0).unwrap();
2129        let original_pruning_boundary = mmr.bounds().start;
2130        let original_pruning_pos = Position::<F>::try_from(original_pruning_boundary).unwrap();
2131
2132        // Sync with boundaries that extend beyond existing data (partial overlap).
2133        let lower_bound_loc = original_pruning_boundary;
2134        let upper_bound_loc = original_leaves + 6; // Extend beyond existing data
2135
2136        let mut expected_nodes = BTreeMap::new();
2137        for i in *original_pruning_pos..*original_size {
2138            let pos = Position::<F>::new(i);
2139            expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
2140        }
2141
2142        let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2143            config: test_config(&context),
2144            range: non_empty_range!(lower_bound_loc, upper_bound_loc),
2145            pinned_nodes: None,
2146        };
2147
2148        mmr.sync().await.unwrap();
2149        drop(mmr);
2150
2151        let sync_mmr =
2152            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2153                .await
2154                .unwrap();
2155
2156        // Should have existing data in the overlapping range.
2157        assert_eq!(sync_mmr.size(), original_size);
2158        let bounds = sync_mmr.bounds();
2159        assert_eq!(bounds.start, lower_bound_loc);
2160        assert!(!bounds.is_empty());
2161        assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2162
2163        // Check that existing nodes are preserved in the overlapping range.
2164        for i in *original_pruning_pos..*original_size {
2165            let pos = Position::<F>::new(i);
2166            assert_eq!(
2167                sync_mmr.get_node(pos).await.unwrap(),
2168                expected_nodes.get(&pos).cloned()
2169            );
2170        }
2171
2172        sync_mmr.destroy().await.unwrap();
2173    }
2174
2175    #[test_traced]
2176    fn test_full_init_sync_partial_overlap_mmr() {
2177        let executor = deterministic::Runner::default();
2178        executor.start(full_init_sync_partial_overlap_inner::<mmr::Family>);
2179    }
2180
2181    #[test_traced]
2182    fn test_full_init_sync_partial_overlap_mmb() {
2183        let executor = deterministic::Runner::default();
2184        executor.start(full_init_sync_partial_overlap_inner::<mmb::Family>);
2185    }
2186
2187    async fn full_init_sync_rejects_extra_pinned_nodes_inner<F: Family>(
2188        context: deterministic::Context,
2189    ) {
2190        let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2191            config: test_config(&context),
2192            range: non_empty_range!(Location::<F>::new(6), Location::<F>::new(20)),
2193            pinned_nodes: Some(vec![test_digest(1), test_digest(2), test_digest(3)]),
2194        };
2195
2196        let result =
2197            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg).await;
2198        assert!(matches!(result, Err(Error::InvalidPinnedNodes)));
2199    }
2200
2201    #[test_traced]
2202    fn test_full_init_sync_rejects_extra_pinned_nodes_mmr() {
2203        let executor = deterministic::Runner::default();
2204        executor.start(full_init_sync_rejects_extra_pinned_nodes_inner::<mmr::Family>);
2205    }
2206
2207    #[test_traced]
2208    fn test_full_init_sync_rejects_extra_pinned_nodes_mmb() {
2209        let executor = deterministic::Runner::default();
2210        executor.start(full_init_sync_rejects_extra_pinned_nodes_inner::<mmb::Family>);
2211    }
2212
2213    // Regression test that init() handles stale metadata (lower pruning boundary than journal).
2214    // Before the fix, this would panic with an assertion failure. After the fix, it returns a
2215    // MissingNode error (which is expected when metadata is corrupted and pinned nodes are lost).
2216    async fn full_init_stale_metadata_returns_error_inner<F: Family>(
2217        context: deterministic::Context,
2218    ) {
2219        let hasher = Standard::<Sha256>::new(ForwardFold);
2220
2221        // Create a structure with some data and prune it
2222        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2223            context.child("init"),
2224            &hasher,
2225            test_config(&context),
2226        )
2227        .await
2228        .unwrap();
2229
2230        // Add 50 elements
2231        let mut batch = mmr.new_batch();
2232        for i in 0..50 {
2233            batch = batch.add(&hasher, &test_digest(i));
2234        }
2235        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2236        mmr.apply_batch(&batch).unwrap();
2237        mmr.sync().await.unwrap();
2238
2239        // Prune enough that the journal boundary's pinned nodes span pruned blobs.
2240        let prune_loc = Location::<F>::new(25);
2241        mmr.prune(prune_loc).await.unwrap();
2242        drop(mmr);
2243
2244        // Simulate a crash after journal prune but before metadata was updated:
2245        // clear all metadata and write only a stale pruning boundary of 0 (no pinned nodes).
2246        let meta_cfg = MConfig {
2247            partition: test_config(&context).metadata_partition,
2248            codec_config: ((0..).into(), ()),
2249        };
2250        let mut metadata =
2251            Metadata::<_, U64, Vec<u8>>::init(context.child("meta_tamper"), meta_cfg)
2252                .await
2253                .unwrap();
2254        metadata.clear();
2255        let key = U64::new(PRUNED_TO_PREFIX, 0);
2256        metadata
2257            .put_sync(key, 0u64.to_be_bytes().to_vec())
2258            .await
2259            .unwrap();
2260        drop(metadata);
2261
2262        // Reopen the structure - before the fix, this would panic with assertion failure
2263        // After the fix, it returns MissingNode error (pinned nodes for the lower
2264        // boundary don't exist since they were pruned from journal and weren't
2265        // stored in metadata at the lower position)
2266        let result = Merkle::<F, _, Digest, Sequential>::init(
2267            context.child("reopened"),
2268            &hasher,
2269            test_config(&context),
2270        )
2271        .await;
2272
2273        match result {
2274            Err(Error::MissingNode(_)) => {} // expected
2275            Ok(_) => panic!("expected MissingNode error, got Ok"),
2276            Err(e) => panic!("expected MissingNode error, got {:?}", e),
2277        }
2278    }
2279
2280    #[test_traced("WARN")]
2281    fn test_full_init_stale_metadata_returns_error_mmr() {
2282        let executor = deterministic::Runner::default();
2283        executor.start(full_init_stale_metadata_returns_error_inner::<mmr::Family>);
2284    }
2285
2286    #[test_traced("WARN")]
2287    fn test_full_init_stale_metadata_returns_error_mmb() {
2288        let executor = deterministic::Runner::default();
2289        executor.start(full_init_stale_metadata_returns_error_inner::<mmb::Family>);
2290    }
2291
2292    // Test that init() handles the case where metadata pruning boundary is ahead
2293    // of journal (crashed before journal prune completed). This should successfully
2294    // prune the journal to match metadata.
2295    async fn full_init_metadata_ahead_inner<F: Family>(context: deterministic::Context) {
2296        let hasher = Standard::<Sha256>::new(ForwardFold);
2297
2298        // Create a structure with some data
2299        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2300            context.child("init"),
2301            &hasher,
2302            test_config(&context),
2303        )
2304        .await
2305        .unwrap();
2306
2307        // Add 50 elements
2308        let mut batch = mmr.new_batch();
2309        for i in 0..50 {
2310            batch = batch.add(&hasher, &test_digest(i));
2311        }
2312        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2313        mmr.apply_batch(&batch).unwrap();
2314        mmr.sync().await.unwrap();
2315
2316        // Prune to position 30 (this stores pinned nodes and updates metadata)
2317        let prune_loc = Location::<F>::new(16);
2318        mmr.prune(prune_loc).await.unwrap();
2319        let expected_root = mmr.root(&hasher, 0).unwrap();
2320        let expected_size = mmr.size();
2321        drop(mmr);
2322
2323        // Reopen the structure - should recover correctly with metadata ahead of
2324        // journal boundary (metadata says 30, journal is section-aligned to 28)
2325        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2326            context.child("reopened"),
2327            &hasher,
2328            test_config(&context),
2329        )
2330        .await
2331        .unwrap();
2332
2333        assert_eq!(mmr.bounds().start, prune_loc);
2334        assert_eq!(mmr.size(), expected_size);
2335        assert_eq!(mmr.root(&hasher, 0).unwrap(), expected_root);
2336
2337        mmr.destroy().await.unwrap();
2338    }
2339
2340    #[test_traced("WARN")]
2341    fn test_full_init_metadata_ahead_mmr() {
2342        let executor = deterministic::Runner::default();
2343        executor.start(full_init_metadata_ahead_inner::<mmr::Family>);
2344    }
2345
2346    #[test_traced("WARN")]
2347    fn test_full_init_metadata_ahead_mmb() {
2348        let executor = deterministic::Runner::default();
2349        executor.start(full_init_metadata_ahead_inner::<mmb::Family>);
2350    }
2351
2352    // Regression test: init_sync must compute pinned nodes BEFORE pruning the journal. Previously,
2353    // init_sync would prune the journal first, then try to read pinned nodes from the pruned
2354    // positions, causing MissingNode errors.
2355    //
2356    // Key setup: We create a structure with data but DON'T prune it, so the metadata has no pinned
2357    // nodes. Then init_sync must read pinned nodes from the journal before pruning it.
2358    async fn full_init_sync_computes_pinned_nodes_before_pruning_inner<F: Family>(
2359        context: deterministic::Context,
2360    ) {
2361        let hasher = Standard::<Sha256>::new(ForwardFold);
2362
2363        // Use small items_per_blob to create many sections and trigger pruning.
2364        let cfg = Config {
2365            journal_partition: "mmr-journal".into(),
2366            metadata_partition: "mmr-metadata".into(),
2367            items_per_blob: NZU64!(7),
2368            write_buffer: NZUsize!(64),
2369            strategy: Sequential,
2370            page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2371        };
2372
2373        // Create structure with enough elements to span multiple sections.
2374        let mmr =
2375            Merkle::<F, _, Digest, Sequential>::init(context.child("init"), &hasher, cfg.clone())
2376                .await
2377                .unwrap();
2378        let mut batch = mmr.new_batch();
2379        for i in 0..100 {
2380            batch = batch.add(&hasher, &test_digest(i));
2381        }
2382        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2383        mmr.apply_batch(&batch).unwrap();
2384        mmr.sync().await.unwrap();
2385
2386        // Don't prune - this ensures metadata has no pinned nodes. init_sync will need to
2387        // read pinned nodes from the journal.
2388        let original_size = mmr.size();
2389        let original_root = mmr.root(&hasher, 0).unwrap();
2390        drop(mmr);
2391
2392        // Reopen via init_sync with range.start > 0. This will prune the journal, so
2393        // init_sync must read pinned nodes BEFORE pruning or they'll be lost.
2394        let prune_loc = Location::<F>::new(32);
2395        let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2396            config: cfg,
2397            range: non_empty_range!(prune_loc, Location::<F>::new(128)),
2398            pinned_nodes: None, // Force init_sync to compute pinned nodes from journal
2399        };
2400
2401        let sync_mmr =
2402            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2403                .await
2404                .unwrap();
2405
2406        // Verify the structure state is correct.
2407        assert_eq!(sync_mmr.size(), original_size);
2408        assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2409        assert_eq!(sync_mmr.bounds().start, prune_loc);
2410
2411        sync_mmr.destroy().await.unwrap();
2412    }
2413
2414    #[test_traced]
2415    fn test_full_init_sync_computes_pinned_nodes_before_pruning_mmr() {
2416        let executor = deterministic::Runner::default();
2417        executor.start(full_init_sync_computes_pinned_nodes_before_pruning_inner::<mmr::Family>);
2418    }
2419
2420    #[test_traced]
2421    fn test_full_init_sync_computes_pinned_nodes_before_pruning_mmb() {
2422        let executor = deterministic::Runner::default();
2423        executor.start(full_init_sync_computes_pinned_nodes_before_pruning_inner::<mmb::Family>);
2424    }
2425
2426    async fn full_historical_proof_pruned_elements_inner<F: Family>(
2427        context: deterministic::Context,
2428    ) {
2429        let hasher = Standard::<Sha256>::new(ForwardFold);
2430
2431        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2432            context.child("init"),
2433            &hasher,
2434            test_config(&context),
2435        )
2436        .await
2437        .unwrap();
2438
2439        let mut batch = mmr.new_batch();
2440        for i in 0..64 {
2441            batch = batch.add(&hasher, &test_digest(i));
2442        }
2443        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2444        mmr.apply_batch(&batch).unwrap();
2445
2446        let prune_loc = Location::<F>::new(16);
2447        mmr.prune(prune_loc).await.unwrap();
2448
2449        let historical_leaves = mmr.leaves();
2450        let mut pruned_loc = None;
2451        for loc_u64 in 0..*historical_leaves {
2452            let loc = Location::<F>::new(loc_u64);
2453            let result = mmr
2454                .historical_range_proof(&hasher, historical_leaves, loc..loc + 1, 0)
2455                .await;
2456            if matches!(result, Err(Error::ElementPruned(_))) {
2457                pruned_loc = Some(loc);
2458                break;
2459            }
2460        }
2461        let pruned_loc = pruned_loc.expect("expected at least one pruned location");
2462
2463        // Add more elements and verify pruned elements still return ElementPruned.
2464        let mut batch = mmr.new_batch();
2465        for i in 0..8 {
2466            batch = batch.add(&hasher, &test_digest(10_000 + i));
2467        }
2468        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2469        mmr.apply_batch(&batch).unwrap();
2470
2471        let requested = mmr.leaves();
2472        let result = mmr
2473            .historical_range_proof(&hasher, requested, pruned_loc..pruned_loc + 1, 0)
2474            .await;
2475        assert!(matches!(result, Err(Error::ElementPruned(_))));
2476
2477        mmr.destroy().await.unwrap();
2478    }
2479
2480    #[test_traced]
2481    fn test_full_historical_proof_pruned_elements_mmr() {
2482        let executor = deterministic::Runner::default();
2483        executor.start(full_historical_proof_pruned_elements_inner::<mmr::Family>);
2484    }
2485
2486    #[test_traced]
2487    fn test_full_historical_proof_pruned_elements_mmb() {
2488        let executor = deterministic::Runner::default();
2489        executor.start(full_historical_proof_pruned_elements_inner::<mmb::Family>);
2490    }
2491
2492    async fn full_append_while_historical_proof_is_available_inner<F: Family>(
2493        context: deterministic::Context,
2494    ) {
2495        let hasher = Standard::<Sha256>::new(ForwardFold);
2496        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2497            context.child("init"),
2498            &hasher,
2499            test_config(&context),
2500        )
2501        .await
2502        .unwrap();
2503
2504        let mut batch = mmr.new_batch();
2505        for i in 0..20 {
2506            batch = batch.add(&hasher, &test_digest(i));
2507        }
2508        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2509        mmr.apply_batch(&batch).unwrap();
2510
2511        let historical_leaves = Location::<F>::new(10);
2512        let range = Location::<F>::new(2)..Location::<F>::new(8);
2513
2514        // Appends should remain allowed while historical proofs are available.
2515        let batch = mmr
2516            .new_batch()
2517            .add(&hasher, &test_digest(100))
2518            .add(&hasher, &test_digest(101));
2519        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2520        mmr.apply_batch(&batch).unwrap();
2521
2522        let proof = mmr
2523            .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
2524            .await
2525            .unwrap();
2526
2527        let expected = mmr
2528            .historical_range_proof(&hasher, historical_leaves, range, 0)
2529            .await
2530            .unwrap();
2531        assert_eq!(proof, expected);
2532
2533        mmr.destroy().await.unwrap();
2534    }
2535
2536    #[test_traced]
2537    fn test_full_append_while_historical_proof_is_available_mmr() {
2538        let executor = deterministic::Runner::default();
2539        executor.start(full_append_while_historical_proof_is_available_inner::<mmr::Family>);
2540    }
2541
2542    #[test_traced]
2543    fn test_full_append_while_historical_proof_is_available_mmb() {
2544        let executor = deterministic::Runner::default();
2545        executor.start(full_append_while_historical_proof_is_available_inner::<mmb::Family>);
2546    }
2547
2548    async fn full_historical_proof_after_sync_reads_from_journal_inner<F: Family>(
2549        context: deterministic::Context,
2550    ) {
2551        let hasher = Standard::<Sha256>::new(ForwardFold);
2552        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2553            context.child("init"),
2554            &hasher,
2555            test_config(&context),
2556        )
2557        .await
2558        .unwrap();
2559
2560        let mut batch = mmr.new_batch();
2561        for i in 0..64 {
2562            batch = batch.add(&hasher, &test_digest(i));
2563        }
2564        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2565        mmr.apply_batch(&batch).unwrap();
2566        mmr.sync().await.unwrap();
2567
2568        let historical_leaves = Location::<F>::new(20);
2569        let range = Location::<F>::new(5)..Location::<F>::new(15);
2570        let expected = mmr
2571            .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
2572            .await
2573            .unwrap();
2574
2575        let actual = mmr
2576            .historical_range_proof(&hasher, historical_leaves, range, 0)
2577            .await
2578            .unwrap();
2579        assert_eq!(actual, expected);
2580
2581        mmr.destroy().await.unwrap();
2582    }
2583
2584    #[test_traced]
2585    fn test_full_historical_proof_after_sync_reads_from_journal_mmr() {
2586        let executor = deterministic::Runner::default();
2587        executor.start(full_historical_proof_after_sync_reads_from_journal_inner::<mmr::Family>);
2588    }
2589
2590    #[test_traced]
2591    fn test_full_historical_proof_after_sync_reads_from_journal_mmb() {
2592        let executor = deterministic::Runner::default();
2593        executor.start(full_historical_proof_after_sync_reads_from_journal_inner::<mmb::Family>);
2594    }
2595
2596    async fn full_historical_proof_after_pruning_inner<F: Family>(context: deterministic::Context) {
2597        let hasher = Standard::<Sha256>::new(ForwardFold);
2598        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2599            context.child("init"),
2600            &hasher,
2601            test_config(&context),
2602        )
2603        .await
2604        .unwrap();
2605
2606        let mut batch = mmr.new_batch();
2607        for i in 0..30 {
2608            batch = batch.add(&hasher, &test_digest(i));
2609        }
2610        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2611        mmr.apply_batch(&batch).unwrap();
2612
2613        let prune_loc = Location::<F>::new(10);
2614        mmr.prune(prune_loc).await.unwrap();
2615
2616        let requested = Location::<F>::new(20);
2617        let range = prune_loc..requested;
2618        let proof = mmr
2619            .historical_range_proof(&hasher, requested, range, 0)
2620            .await
2621            .unwrap();
2622        assert!(proof.leaves > Location::<F>::new(0));
2623
2624        mmr.destroy().await.unwrap();
2625    }
2626
2627    #[test_traced]
2628    fn test_full_historical_proof_after_pruning_mmr() {
2629        let executor = deterministic::Runner::default();
2630        executor.start(full_historical_proof_after_pruning_inner::<mmr::Family>);
2631    }
2632
2633    #[test_traced]
2634    fn test_full_historical_proof_after_pruning_mmb() {
2635        let executor = deterministic::Runner::default();
2636        executor.start(full_historical_proof_after_pruning_inner::<mmb::Family>);
2637    }
2638
2639    async fn full_historical_proof_edge_cases_inner<F: Family>(context: deterministic::Context) {
2640        let hasher = Standard::<Sha256>::new(ForwardFold);
2641
2642        // Case 1: Empty structure.
2643        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2644            context.child("empty"),
2645            &hasher,
2646            test_config(&context),
2647        )
2648        .await
2649        .unwrap();
2650        let empty_end = Location::<F>::new(0);
2651        let empty_result = mmr
2652            .historical_range_proof(&hasher, empty_end, empty_end..empty_end, 0)
2653            .await;
2654        assert!(matches!(empty_result, Err(Error::Empty)));
2655        let oob_result = mmr
2656            .historical_range_proof(&hasher, empty_end + 1, empty_end..empty_end + 1, 0)
2657            .await;
2658        assert!(matches!(
2659            oob_result,
2660            Err(Error::RangeOutOfBounds(loc)) if loc == empty_end + 1
2661        ));
2662        mmr.destroy().await.unwrap();
2663
2664        // Case 2: Structure has nodes but is fully pruned.
2665        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2666            context.child("fully_pruned"),
2667            &hasher,
2668            test_config(&context),
2669        )
2670        .await
2671        .unwrap();
2672        let mut batch = mmr.new_batch();
2673        for i in 0..20 {
2674            batch = batch.add(&hasher, &test_digest(i));
2675        }
2676        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2677        mmr.apply_batch(&batch).unwrap();
2678        let end = mmr.leaves();
2679        mmr.prune_all().await.unwrap();
2680        assert!(mmr.bounds().is_empty());
2681        let pruned_result = mmr
2682            .historical_range_proof(&hasher, end, end - 1..end, 0)
2683            .await;
2684        assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2685        let oob_result = mmr
2686            .historical_range_proof(&hasher, end + 1, end - 1..end, 0)
2687            .await;
2688        assert!(matches!(
2689            oob_result,
2690            Err(Error::RangeOutOfBounds(loc)) if loc == end + 1
2691        ));
2692        mmr.destroy().await.unwrap();
2693
2694        // Case 3: All nodes but one (single leaf) are pruned.
2695        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2696            context.child("single_leaf"),
2697            &hasher,
2698            test_config(&context),
2699        )
2700        .await
2701        .unwrap();
2702        let mut batch = mmr.new_batch();
2703        for i in 0..11 {
2704            batch = batch.add(&hasher, &test_digest(i));
2705        }
2706        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2707        mmr.apply_batch(&batch).unwrap();
2708        let end = mmr.leaves();
2709        let keep_loc = end - 1;
2710        mmr.prune(keep_loc).await.unwrap();
2711        let ok_result = mmr
2712            .historical_range_proof(&hasher, end, keep_loc..end, 0)
2713            .await;
2714        assert!(ok_result.is_ok());
2715        let pruned_end = keep_loc - 1;
2716        // make sure this is in a pruned range, considering blob boundaries.
2717        let start_loc = Location::<F>::new(1);
2718        let pruned_result = mmr
2719            .historical_range_proof(&hasher, end, start_loc..pruned_end + 1, 0)
2720            .await;
2721        assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2722        let oob_result = mmr
2723            .historical_range_proof(&hasher, end + 1, keep_loc..end, 0)
2724            .await;
2725        assert!(matches!(oob_result, Err(Error::RangeOutOfBounds(_))));
2726        mmr.destroy().await.unwrap();
2727    }
2728
2729    #[test_traced]
2730    fn test_full_historical_proof_edge_cases_mmr() {
2731        let executor = deterministic::Runner::default();
2732        executor.start(full_historical_proof_edge_cases_inner::<mmr::Family>);
2733    }
2734
2735    #[test_traced]
2736    fn test_full_historical_proof_edge_cases_mmb() {
2737        let executor = deterministic::Runner::default();
2738        executor.start(full_historical_proof_edge_cases_inner::<mmb::Family>);
2739    }
2740
2741    async fn full_historical_proof_out_of_bounds_inner<F: Family>(context: deterministic::Context) {
2742        let hasher = Standard::<Sha256>::new(ForwardFold);
2743        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2744            context.child("oob"),
2745            &hasher,
2746            test_config(&context),
2747        )
2748        .await
2749        .unwrap();
2750
2751        let mut batch = mmr.new_batch();
2752        for i in 0..8 {
2753            batch = batch.add(&hasher, &test_digest(i));
2754        }
2755        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2756        mmr.apply_batch(&batch).unwrap();
2757        let requested = mmr.leaves() + 1;
2758
2759        let result = mmr
2760            .historical_range_proof(&hasher, requested, Location::<F>::new(0)..requested, 0)
2761            .await;
2762        assert!(matches!(
2763            result,
2764            Err(Error::RangeOutOfBounds(loc)) if loc == requested
2765        ));
2766
2767        mmr.destroy().await.unwrap();
2768    }
2769
2770    #[test_traced]
2771    fn test_full_historical_proof_out_of_bounds_mmr() {
2772        let executor = deterministic::Runner::default();
2773        executor.start(full_historical_proof_out_of_bounds_inner::<mmr::Family>);
2774    }
2775
2776    #[test_traced]
2777    fn test_full_historical_proof_out_of_bounds_mmb() {
2778        let executor = deterministic::Runner::default();
2779        executor.start(full_historical_proof_out_of_bounds_inner::<mmb::Family>);
2780    }
2781
2782    async fn full_historical_proof_range_validation_inner<F: Family>(
2783        context: deterministic::Context,
2784    ) {
2785        let hasher = Standard::<Sha256>::new(ForwardFold);
2786        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2787            context.child("range_validation"),
2788            &hasher,
2789            test_config(&context),
2790        )
2791        .await
2792        .unwrap();
2793
2794        let mut batch = mmr.new_batch();
2795        for i in 0..32 {
2796            batch = batch.add(&hasher, &test_digest(i));
2797        }
2798        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2799        mmr.apply_batch(&batch).unwrap();
2800
2801        let valid_range = Location::<F>::new(0)..Location::<F>::new(1);
2802
2803        // Empty range should report Empty.
2804        let requested = Location::<F>::new(5);
2805        let empty_range = requested..requested;
2806        let empty_result = mmr
2807            .historical_range_proof(&hasher, requested, empty_range, 0)
2808            .await;
2809        assert!(matches!(empty_result, Err(Error::Empty)));
2810
2811        // Requested historical size is out of bounds.
2812        let leaves_oob = mmr.leaves() + 1;
2813        let result = mmr
2814            .historical_range_proof(&hasher, leaves_oob, valid_range.clone(), 0)
2815            .await;
2816        assert!(matches!(
2817            result,
2818            Err(Error::RangeOutOfBounds(loc)) if loc == leaves_oob
2819        ));
2820
2821        // Requested range end is out of bounds for the current structure.
2822        let end_oob = mmr.leaves() + 1;
2823        let range_oob = Location::<F>::new(0)..end_oob;
2824        let result = mmr
2825            .historical_range_proof(&hasher, requested, range_oob, 0)
2826            .await;
2827        assert!(matches!(
2828            result,
2829            Err(Error::RangeOutOfBounds(loc)) if loc == end_oob
2830        ));
2831
2832        // Requested range end out of bounds for the requested historical size but within structure.
2833        let range_end_gt_requested = requested + 1;
2834        let range_oob_at_requested = Location::<F>::new(0)..range_end_gt_requested;
2835        assert!(range_end_gt_requested <= mmr.leaves());
2836        let result = mmr
2837            .historical_range_proof(&hasher, requested, range_oob_at_requested, 0)
2838            .await;
2839        assert!(matches!(
2840            result,
2841            Err(Error::RangeOutOfBounds(loc)) if loc == range_end_gt_requested
2842        ));
2843
2844        // Range location overflow is caught as out-of-bounds (the bounds check
2845        // fires before the position conversion that would detect overflow).
2846        let overflow_loc = Location::<F>::new(u64::MAX);
2847        let overflow_range = Location::<F>::new(0)..overflow_loc;
2848        let result = mmr
2849            .historical_range_proof(&hasher, requested, overflow_range, 0)
2850            .await;
2851        assert!(matches!(
2852            result,
2853            Err(Error::RangeOutOfBounds(loc)) if loc == overflow_loc
2854        ));
2855
2856        mmr.destroy().await.unwrap();
2857    }
2858
2859    #[test_traced]
2860    fn test_full_historical_proof_range_validation_mmr() {
2861        let executor = deterministic::Runner::default();
2862        executor.start(full_historical_proof_range_validation_inner::<mmr::Family>);
2863    }
2864
2865    #[test_traced]
2866    fn test_full_historical_proof_range_validation_mmb() {
2867        let executor = deterministic::Runner::default();
2868        executor.start(full_historical_proof_range_validation_inner::<mmb::Family>);
2869    }
2870
2871    async fn full_historical_proof_non_size_prune_excludes_pruned_leaves_inner<F: Family>(
2872        context: deterministic::Context,
2873    ) {
2874        let hasher = Standard::<Sha256>::new(ForwardFold);
2875        let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2876            context.child("non_size_prune"),
2877            &hasher,
2878            test_config(&context),
2879        )
2880        .await
2881        .unwrap();
2882
2883        let mut batch = mmr.new_batch();
2884        for i in 0..16 {
2885            batch = batch.add(&hasher, &test_digest(i));
2886        }
2887        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2888        mmr.apply_batch(&batch).unwrap();
2889
2890        let end = mmr.leaves();
2891        let mut failures = Vec::new();
2892        for prune_leaf in 1..*end {
2893            let prune_loc = Location::<F>::new(prune_leaf);
2894            mmr.prune(prune_loc).await.unwrap();
2895            for loc_u64 in 0..*end {
2896                let loc = Location::<F>::new(loc_u64);
2897                let range_includes_pruned_leaf = loc < prune_loc;
2898                match mmr.historical_proof(&hasher, end, loc, 0).await {
2899                    Ok(_) => {}
2900                    Err(Error::ElementPruned(_)) if range_includes_pruned_leaf => {}
2901                    Err(Error::ElementPruned(_)) => failures.push(format!(
2902                        "prune_loc={prune_loc} loc={loc} returned ElementPruned without a pruned range element"
2903                    )),
2904                    Err(err) => failures
2905                        .push(format!("prune_loc={prune_loc} loc={loc} err={err}")),
2906                }
2907            }
2908        }
2909
2910        assert!(
2911            failures.is_empty(),
2912            "historical proof generation returned unexpected errors: {failures:?}"
2913        );
2914
2915        mmr.destroy().await.unwrap();
2916    }
2917
2918    #[test_traced]
2919    fn test_full_historical_proof_non_size_prune_excludes_pruned_leaves_mmr() {
2920        let executor = deterministic::Runner::default();
2921        executor.start(
2922            full_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmr::Family>,
2923        );
2924    }
2925
2926    #[test_traced]
2927    fn test_full_historical_proof_non_size_prune_excludes_pruned_leaves_mmb() {
2928        let executor = deterministic::Runner::default();
2929        executor.start(
2930            full_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmb::Family>,
2931        );
2932    }
2933
2934    /// Regression: init_sync must recover from a journal left at an invalid size
2935    /// (e.g., a crash wrote a leaf but not its parent nodes).
2936    async fn full_init_sync_recovers_from_invalid_journal_size_inner<F: Family>(
2937        context: deterministic::Context,
2938    ) {
2939        let hasher = Standard::<Sha256>::new(ForwardFold);
2940
2941        // Build a structure with 3 leaves, sync, and drop.
2942        let mmr = Merkle::<F, _, Digest, Sequential>::init(
2943            context.child("init"),
2944            &hasher,
2945            test_config(&context),
2946        )
2947        .await
2948        .unwrap();
2949        let mut batch = mmr.new_batch();
2950        for i in 0..3 {
2951            batch = batch.add(&hasher, &test_digest(i));
2952        }
2953        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2954        mmr.apply_batch(&batch).unwrap();
2955        let valid_size = mmr.size();
2956        let valid_root = mmr.root(&hasher, 0).unwrap();
2957        mmr.sync().await.unwrap();
2958        drop(mmr);
2959
2960        // Append one extra digest to the journal, simulating a crash that wrote a
2961        // leaf (for the 4th element) but not its parent nodes. This makes the
2962        // journal size invalid.
2963        {
2964            let journal: Journal<_, Digest> = Journal::init(
2965                context.child("corrupt"),
2966                JConfig {
2967                    partition: "journal-partition".into(),
2968                    items_per_blob: NZU64!(7),
2969                    write_buffer: NZUsize!(1024),
2970                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2971                },
2972            )
2973            .await
2974            .unwrap();
2975            assert_eq!(journal.size().await, valid_size);
2976            journal.append(&Sha256::hash(b"orphan")).await.unwrap();
2977            journal.sync().await.unwrap();
2978            assert_eq!(journal.size().await, valid_size + 1);
2979        }
2980
2981        // init_sync should recover by rewinding to the last valid size.
2982        let sync_cfg = SyncConfig::<F, Digest, Sequential> {
2983            config: test_config(&context),
2984            range: non_empty_range!(Location::<F>::new(0), Location::<F>::new(100)),
2985            pinned_nodes: None,
2986        };
2987        let sync_mmr =
2988            Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2989                .await
2990                .unwrap();
2991
2992        assert_eq!(sync_mmr.size(), valid_size);
2993        assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), valid_root);
2994
2995        sync_mmr.destroy().await.unwrap();
2996    }
2997
2998    #[test_traced]
2999    fn test_init_sync_recovers_from_invalid_journal_size_mmr() {
3000        let executor = deterministic::Runner::default();
3001        executor.start(full_init_sync_recovers_from_invalid_journal_size_inner::<mmr::Family>);
3002    }
3003
3004    #[test_traced]
3005    fn test_init_sync_recovers_from_invalid_journal_size_mmb() {
3006        let executor = deterministic::Runner::default();
3007        executor.start(full_init_sync_recovers_from_invalid_journal_size_inner::<mmb::Family>);
3008    }
3009
3010    async fn full_stale_batch_inner<F: Family>(context: deterministic::Context) {
3011        let hasher: Standard<Sha256> = Standard::new(ForwardFold);
3012        let mmr = Merkle::<F, _, Digest, Sequential>::init(
3013            context.child("storage"),
3014            &Standard::<Sha256>::new(ForwardFold),
3015            test_config(&context),
3016        )
3017        .await
3018        .unwrap();
3019
3020        // Create two batches from the same base.
3021        let batch_a = mmr.new_batch().add(&hasher, b"leaf-a");
3022        let batch_a = mmr.with_mem(|mem| batch_a.merkleize(mem, &hasher));
3023        let batch_b = mmr.new_batch().add(&hasher, b"leaf-b");
3024        let batch_b = mmr.with_mem(|mem| batch_b.merkleize(mem, &hasher));
3025
3026        // Apply A -- should succeed.
3027        mmr.apply_batch(&batch_a).unwrap();
3028
3029        // Apply B -- should fail (stale).
3030        let result = mmr.apply_batch(&batch_b);
3031        assert!(
3032            matches!(result, Err(Error::StaleBatch { .. })),
3033            "expected StaleBatch, got {result:?}"
3034        );
3035
3036        mmr.destroy().await.unwrap();
3037    }
3038
3039    #[test]
3040    fn test_stale_batch_mmr() {
3041        let executor = deterministic::Runner::default();
3042        executor.start(full_stale_batch_inner::<mmr::Family>);
3043    }
3044
3045    #[test]
3046    fn test_stale_batch_mmb() {
3047        let executor = deterministic::Runner::default();
3048        executor.start(full_stale_batch_inner::<mmb::Family>);
3049    }
3050
3051    /// Regression: `new_batch` must return the append-only full wrapper.
3052    async fn full_new_batch_returns_append_only_wrapper_inner<F: Family>(
3053        context: deterministic::Context,
3054    ) {
3055        let hasher = Standard::<Sha256>::new(ForwardFold);
3056        let mmr = Merkle::<F, _, Digest, Sequential>::init(
3057            context.child("storage"),
3058            &hasher,
3059            test_config(&context),
3060        )
3061        .await
3062        .unwrap();
3063
3064        let _batch: UnmerkleizedBatch<F, Digest, Sequential> = mmr.new_batch();
3065
3066        mmr.destroy().await.unwrap();
3067    }
3068
3069    #[test_traced]
3070    fn test_new_batch_returns_append_only_wrapper_mmr() {
3071        let executor = deterministic::Runner::default();
3072        executor.start(full_new_batch_returns_append_only_wrapper_inner::<mmr::Family>);
3073    }
3074
3075    #[test_traced]
3076    fn test_new_batch_returns_append_only_wrapper_mmb() {
3077        let executor = deterministic::Runner::default();
3078        executor.start(full_new_batch_returns_append_only_wrapper_inner::<mmb::Family>);
3079    }
3080
3081    /// Regression: update_leaf on a synced-out leaf must return ElementPruned, not panic.
3082    /// Before the fix, `Readable::pruning_boundary` returned the journal's prune boundary
3083    /// (which could be 0), so the batch accepted the update. During merkleize, get_node
3084    /// returned None for the synced-out sibling and hit an expect panic.
3085    async fn full_update_leaf_after_sync_returns_pruned_inner<F: Family>(
3086        context: deterministic::Context,
3087    ) {
3088        let hasher = Standard::<Sha256>::new(ForwardFold);
3089        let mmr = Merkle::<F, _, Digest, Sequential>::init(
3090            context.child("storage"),
3091            &hasher,
3092            test_config(&context),
3093        )
3094        .await
3095        .unwrap();
3096
3097        // Add 50 elements and sync (flushes all nodes to journal, prunes mem).
3098        let mut batch = mmr.new_batch();
3099        for i in 0..50 {
3100            batch = batch.add(&hasher, &test_digest(i));
3101        }
3102        let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
3103        mmr.apply_batch(&batch).unwrap();
3104        mmr.sync().await.unwrap();
3105
3106        // Attempt to update leaf 0 which has been synced out of memory.
3107        // Use the inner batch type directly since the full wrapper
3108        // intentionally hides update_leaf.
3109        let batch = mmr.to_batch().new_batch();
3110        let result = batch.update_leaf(&hasher, Location::<F>::new(0), b"updated");
3111        assert!(matches!(result, Err(Error::ElementPruned(_))));
3112
3113        mmr.destroy().await.unwrap();
3114    }
3115
3116    #[test_traced]
3117    fn test_update_leaf_after_sync_returns_pruned_mmr() {
3118        let executor = deterministic::Runner::default();
3119        executor.start(full_update_leaf_after_sync_returns_pruned_inner::<mmr::Family>);
3120    }
3121
3122    #[test_traced]
3123    fn test_update_leaf_after_sync_returns_pruned_mmb() {
3124        let executor = deterministic::Runner::default();
3125        executor.start(full_update_leaf_after_sync_returns_pruned_inner::<mmb::Family>);
3126    }
3127}