Skip to main content

commonware_storage/mmr/
journaled.rs

1//! An MMR backed by a fixed-item-length journal.
2//!
3//! A [crate::journal] is used to store all unpruned MMR nodes, and a [crate::metadata] store is
4//! used to preserve digests required for root and proof generation that would have otherwise been
5//! pruned.
6
7use crate::{
8    journal::{
9        contiguous::fixed::{Config as JConfig, Journal},
10        Error as JError,
11    },
12    metadata::{Config as MConfig, Metadata},
13    mmr::{
14        hasher::Hasher,
15        iterator::{nodes_to_pin, PeakIterator},
16        location::Location,
17        mem::{Clean, Config as MemConfig, Dirty, DirtyMmr as DirtyMemMmr, Mmr as MemMmr, State},
18        position::Position,
19        storage::Storage,
20        verification,
21        Error::{self, *},
22        Proof,
23    },
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::ThreadPool;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::sequence::prefixed_u64::U64;
30use core::ops::Range;
31use std::{
32    collections::BTreeMap,
33    num::{NonZeroU64, NonZeroUsize},
34};
35use tracing::{debug, error, warn};
36
37pub type DirtyMmr<E, D> = Mmr<E, D, Dirty>;
38pub type CleanMmr<E, D> = Mmr<E, D, Clean<D>>;
39
40/// Configuration for a journal-backed MMR.
41#[derive(Clone)]
42pub struct Config {
43    /// The name of the `commonware-runtime::Storage` storage partition used for the journal storing
44    /// the MMR nodes.
45    pub journal_partition: String,
46
47    /// The name of the `commonware-runtime::Storage` storage partition used for the metadata
48    /// containing pruned MMR nodes that are still required to calculate the root and generate
49    /// proofs.
50    pub metadata_partition: String,
51
52    /// The maximum number of items to store in each blob in the backing journal.
53    pub items_per_blob: NonZeroU64,
54
55    /// The size of the write buffer to use for each blob in the backing journal.
56    pub write_buffer: NonZeroUsize,
57
58    /// Optional thread pool to use for parallelizing batch operations.
59    pub thread_pool: Option<ThreadPool>,
60
61    /// The page cache to use for caching data.
62    pub page_cache: CacheRef,
63}
64
65/// Configuration for initializing a journaled MMR for synchronization.
66///
67/// Determines how to handle existing persistent data based on sync boundaries:
68/// - **Fresh Start**: Existing data < range start → discard and start fresh
69/// - **Prune and Reuse**: range contains existing data → prune and reuse
70/// - **Prune and Rewind**: existing data > range end → prune and rewind to range end
71pub struct SyncConfig<D: Digest> {
72    /// Base MMR configuration (journal, metadata, etc.)
73    pub config: Config,
74
75    /// Sync range - nodes outside this range are pruned/rewound.
76    pub range: std::ops::Range<Position>,
77
78    /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order
79    /// specified by `nodes_to_pin`. If `None`, the pinned nodes are expected to already be in the
80    /// MMR's metadata/journal.
81    pub pinned_nodes: Option<Vec<D>>,
82}
83
84/// A MMR backed by a fixed-item-length journal.
85pub struct Mmr<E: RStorage + Clock + Metrics, D: Digest, S: State<D> + Send + Sync = Dirty> {
86    /// A memory resident MMR used to build the MMR structure and cache updates. It caches all
87    /// un-synced nodes, and the pinned node set as derived from both its own pruning boundary and
88    /// the journaled MMR's pruning boundary.
89    mem_mmr: MemMmr<D, S>,
90
91    /// Stores all unpruned MMR nodes.
92    journal: Journal<E, D>,
93
94    /// The size of the journal irrespective of any pruned nodes or any un-synced nodes currently
95    /// cached in the memory resident MMR.
96    journal_size: Position,
97
98    /// Stores all "pinned nodes" (pruned nodes required for proving & root generation) for the MMR,
99    /// and the corresponding pruning boundary used to generate them. The metadata remains empty
100    /// until pruning is invoked, and its contents change only when the pruning boundary moves.
101    metadata: Metadata<E, U64, Vec<u8>>,
102
103    /// The highest position for which this MMR has been pruned, or 0 if this MMR has never been
104    /// pruned.
105    pruned_to_pos: Position,
106
107    /// The thread pool to use for parallelization.
108    pool: Option<ThreadPool>,
109}
110
111impl<E: RStorage + Clock + Metrics, D: Digest> From<CleanMmr<E, D>> for DirtyMmr<E, D> {
112    fn from(clean: Mmr<E, D, Clean<D>>) -> Self {
113        Self {
114            mem_mmr: clean.mem_mmr.into(),
115            journal: clean.journal,
116            journal_size: clean.journal_size,
117            metadata: clean.metadata,
118            pruned_to_pos: clean.pruned_to_pos,
119            pool: clean.pool,
120        }
121    }
122}
123
124/// Prefix used for nodes in the metadata prefixed U8 key.
125const NODE_PREFIX: u8 = 0;
126
127/// Prefix used for the key storing the prune_to_pos position in the metadata.
128const PRUNE_TO_POS_PREFIX: u8 = 1;
129
130impl<E: RStorage + Clock + Metrics, D: Digest, S: State<D> + Send + Sync> Mmr<E, D, S> {
131    /// Return the total number of nodes in the MMR, irrespective of any pruning. The next added
132    /// element's position will have this value.
133    pub fn size(&self) -> Position {
134        self.mem_mmr.size()
135    }
136
137    /// Return the total number of leaves in the MMR.
138    pub fn leaves(&self) -> Location {
139        self.mem_mmr.leaves()
140    }
141
142    /// Return the position of the last leaf in this MMR, or None if the MMR is empty.
143    pub fn last_leaf_pos(&self) -> Option<Position> {
144        self.mem_mmr.last_leaf_pos()
145    }
146
147    /// Attempt to get a node from the metadata, with fallback to journal lookup if it fails.
148    /// Assumes the node should exist in at least one of these sources and returns a `MissingNode`
149    /// error otherwise.
150    async fn get_from_metadata_or_journal(
151        metadata: &Metadata<E, U64, Vec<u8>>,
152        journal: &Journal<E, D>,
153        pos: Position,
154    ) -> Result<D, Error> {
155        if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
156            debug!(?pos, "read node from metadata");
157            let digest = D::decode(bytes.as_ref());
158            let Ok(digest) = digest else {
159                error!(
160                    ?pos,
161                    err = %digest.expect_err("digest is Err in else branch"),
162                    "could not convert node from metadata bytes to digest"
163                );
164                return Err(Error::MissingNode(pos));
165            };
166            return Ok(digest);
167        }
168
169        // If a node isn't found in the metadata, it might still be in the journal.
170        debug!(?pos, "reading node from journal");
171        let node = journal.read(*pos).await;
172        match node {
173            Ok(node) => Ok(node),
174            Err(JError::ItemPruned(_)) => {
175                error!(?pos, "node is missing from metadata and journal");
176                Err(Error::MissingNode(pos))
177            }
178            Err(e) => Err(Error::JournalError(e)),
179        }
180    }
181
182    /// Returns [start, end) where `start` and `end - 1` are the positions of the oldest and newest
183    /// retained nodes respectively.
184    pub fn bounds(&self) -> std::ops::Range<Position> {
185        self.pruned_to_pos..self.size()
186    }
187
188    /// Adds the pinned nodes based on `prune_pos` to `mem_mmr`.
189    async fn add_extra_pinned_nodes(
190        mem_mmr: &mut MemMmr<D, S>,
191        metadata: &Metadata<E, U64, Vec<u8>>,
192        journal: &Journal<E, D>,
193        prune_pos: Position,
194    ) -> Result<(), Error> {
195        let mut pinned_nodes = BTreeMap::new();
196        for pos in nodes_to_pin(prune_pos) {
197            let digest =
198                Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(metadata, journal, pos).await?;
199            pinned_nodes.insert(pos, digest);
200        }
201        mem_mmr.add_pinned_nodes(pinned_nodes);
202
203        Ok(())
204    }
205}
206
207impl<E: RStorage + Clock + Metrics, D: Digest> CleanMmr<E, D> {
208    /// Initialize a new `Mmr` instance.
209    pub async fn init(
210        context: E,
211        hasher: &mut impl Hasher<Digest = D>,
212        cfg: Config,
213    ) -> Result<Self, Error> {
214        let journal_cfg = JConfig {
215            partition: cfg.journal_partition,
216            items_per_blob: cfg.items_per_blob,
217            page_cache: cfg.page_cache,
218            write_buffer: cfg.write_buffer,
219        };
220        let mut journal =
221            Journal::<E, D>::init(context.with_label("mmr_journal"), journal_cfg).await?;
222        let mut journal_size = Position::new(journal.bounds().end);
223
224        let metadata_cfg = MConfig {
225            partition: cfg.metadata_partition,
226            codec_config: ((0..).into(), ()),
227        };
228        let metadata =
229            Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
230                .await?;
231
232        if journal_size == 0 {
233            let mem_mmr = MemMmr::init(
234                MemConfig {
235                    nodes: vec![],
236                    pruned_to_pos: Position::new(0),
237                    pinned_nodes: vec![],
238                },
239                hasher,
240            )?;
241            return Ok(Self {
242                mem_mmr,
243                journal,
244                journal_size,
245                metadata,
246                pruned_to_pos: Position::new(0),
247                pool: cfg.thread_pool,
248            });
249        }
250
251        // Make sure the journal's oldest retained node is as expected based on the last pruning
252        // boundary stored in metadata. If they don't match, prune the journal to the appropriate
253        // location.
254        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
255        let metadata_prune_pos = metadata.get(&key).map_or(0, |bytes| {
256            u64::from_be_bytes(
257                bytes
258                    .as_slice()
259                    .try_into()
260                    .expect("metadata prune position is not 8 bytes"),
261            )
262        });
263        let journal_bounds_start = journal.bounds().start;
264        if metadata_prune_pos > journal_bounds_start {
265            // Metadata is ahead of journal (crashed before completing journal prune).
266            // Prune the journal to match metadata.
267            journal.prune(metadata_prune_pos).await?;
268            if journal.bounds().start != journal_bounds_start {
269                // This should only happen in the event of some failure during the last attempt to
270                // prune the journal.
271                warn!(
272                    journal_bounds_start,
273                    metadata_prune_pos, "journal pruned to match metadata"
274                );
275            }
276        } else if metadata_prune_pos < journal_bounds_start {
277            // Metadata is stale (e.g., missing/corrupted while journal has valid state).
278            // Use the journal's state as authoritative.
279            warn!(
280                metadata_prune_pos,
281                journal_bounds_start, "metadata stale, using journal pruning boundary"
282            );
283        }
284
285        // Use the more restrictive (higher) pruning boundary between metadata and journal.
286        // This handles both cases: metadata ahead (crash during prune) and metadata stale.
287        let effective_prune_pos = std::cmp::max(metadata_prune_pos, journal_bounds_start);
288
289        let last_valid_size = PeakIterator::to_nearest_size(journal_size);
290        let mut orphaned_leaf: Option<D> = None;
291        if last_valid_size != journal_size {
292            warn!(
293                ?last_valid_size,
294                "encountered invalid MMR structure, recovering from last valid size"
295            );
296            // Check if there is an intact leaf following the last valid size, from which we can
297            // recover its missing parents.
298            let recovered_item = journal.read(*last_valid_size).await;
299            if let Ok(item) = recovered_item {
300                orphaned_leaf = Some(item);
301            }
302            journal.rewind(*last_valid_size).await?;
303            journal.sync().await?;
304            journal_size = last_valid_size
305        }
306
307        // Initialize the mem_mmr in the "prune_all" state.
308        let mut pinned_nodes = Vec::new();
309        for pos in nodes_to_pin(journal_size) {
310            let digest =
311                Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
312            pinned_nodes.push(digest);
313        }
314        let mut mem_mmr = MemMmr::init(
315            MemConfig {
316                nodes: vec![],
317                pruned_to_pos: journal_size,
318                pinned_nodes,
319            },
320            hasher,
321        )?;
322        let prune_pos = Position::new(effective_prune_pos);
323        Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, prune_pos).await?;
324
325        let mut s = Self {
326            mem_mmr,
327            journal,
328            journal_size,
329            metadata,
330            pruned_to_pos: prune_pos,
331            pool: cfg.thread_pool,
332        };
333
334        if let Some(leaf) = orphaned_leaf {
335            // Recover the orphaned leaf and any missing parents.
336            let pos = s.mem_mmr.size();
337            warn!(?pos, "recovering orphaned leaf");
338            let mut dirty_mmr = s.mem_mmr.into_dirty();
339            dirty_mmr.add_leaf_digest(leaf);
340            s.mem_mmr = dirty_mmr.merkleize(hasher, None);
341            assert_eq!(pos, journal_size);
342            s.sync().await?;
343            assert_eq!(s.size(), s.journal.bounds().end);
344        }
345
346        Ok(s)
347    }
348
349    /// Initialize an MMR for synchronization, reusing existing data if possible.
350    ///
351    /// Handles sync scenarios based on existing journal data vs. the given sync range:
352    ///
353    /// 1. **Fresh Start**: existing_size <= range.start
354    ///    - Deletes existing data (if any)
355    ///    - Creates new [Journal] with pruning boundary and size at `range.start`
356    ///
357    /// 2. **Reuse**: range.start < existing_size <= range.end
358    ///    - Keeps existing journal data
359    ///    - Prunes the journal toward `range.start` (section-aligned)
360    ///
361    /// 3. **Error**: existing_size > range.end
362    ///    - Returns [crate::journal::Error::ItemOutOfRange]
363    pub async fn init_sync(
364        context: E,
365        cfg: SyncConfig<D>,
366        hasher: &mut impl Hasher<Digest = D>,
367    ) -> Result<Self, crate::qmdb::Error> {
368        let journal_cfg = JConfig {
369            partition: cfg.config.journal_partition.clone(),
370            items_per_blob: cfg.config.items_per_blob,
371            write_buffer: cfg.config.write_buffer,
372            page_cache: cfg.config.page_cache.clone(),
373        };
374
375        // Open the journal, handling existing data vs sync range.
376        assert!(!cfg.range.is_empty(), "range must not be empty");
377        let mut journal: Journal<E, D> =
378            Journal::init(context.with_label("mmr_journal"), journal_cfg).await?;
379        let size = journal.size();
380
381        if size > *cfg.range.end {
382            return Err(crate::journal::Error::ItemOutOfRange(size).into());
383        }
384        if size <= *cfg.range.start && *cfg.range.start != 0 {
385            journal.clear_to_size(*cfg.range.start).await?;
386        }
387
388        let journal_size = Position::new(journal.size());
389
390        // Open the metadata.
391        let metadata_cfg = MConfig {
392            partition: cfg.config.metadata_partition,
393            codec_config: ((0..).into(), ()),
394        };
395        let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
396
397        // Write the pruning boundary.
398        let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
399        metadata.put(
400            pruning_boundary_key,
401            (*cfg.range.start).to_be_bytes().into(),
402        );
403
404        // Write the required pinned nodes to metadata.
405        if let Some(pinned_nodes) = cfg.pinned_nodes {
406            // Use caller-provided pinned nodes.
407            let nodes_to_pin_persisted = nodes_to_pin(cfg.range.start);
408            for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
409                metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
410            }
411        }
412
413        // Create the in-memory MMR with the pinned nodes required for its size. This must be
414        // performed *before* pruning the journal to range.start to ensure all pinned nodes are
415        // present.
416        let nodes_to_pin_mem = nodes_to_pin(journal_size);
417        let mut mem_pinned_nodes = Vec::new();
418        for pos in nodes_to_pin_mem {
419            let digest =
420                Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
421            mem_pinned_nodes.push(digest);
422        }
423        let mut mem_mmr = MemMmr::init(
424            MemConfig {
425                nodes: vec![],
426                pruned_to_pos: journal_size,
427                pinned_nodes: mem_pinned_nodes,
428            },
429            hasher,
430        )?;
431
432        // Add the additional pinned nodes required for the pruning boundary, if applicable.
433        // This must also be done before pruning.
434        if cfg.range.start < journal_size {
435            Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.range.start)
436                .await?;
437        }
438
439        // Sync metadata before pruning so pinned nodes are persisted for crash recovery.
440        metadata.sync().await?;
441
442        // Prune the journal to range.start.
443        journal.prune(*cfg.range.start).await?;
444
445        Ok(Self {
446            mem_mmr,
447            journal,
448            journal_size,
449            metadata,
450            pruned_to_pos: cfg.range.start,
451            pool: cfg.config.thread_pool,
452        })
453    }
454
455    /// Compute and add required nodes for the given pruning point to the metadata, and write it to
456    /// disk. Return the computed set of required nodes.
457    async fn update_metadata(
458        &mut self,
459        prune_to_pos: Position,
460    ) -> Result<BTreeMap<Position, D>, Error> {
461        assert!(prune_to_pos >= self.pruned_to_pos);
462
463        let mut pinned_nodes = BTreeMap::new();
464        for pos in nodes_to_pin(prune_to_pos) {
465            let digest = self.get_node(pos).await?.expect(
466                "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
467            );
468            self.metadata
469                .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
470            pinned_nodes.insert(pos, digest);
471        }
472
473        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
474        self.metadata.put(key, (*prune_to_pos).to_be_bytes().into());
475
476        self.metadata.sync().await.map_err(Error::MetadataError)?;
477
478        Ok(pinned_nodes)
479    }
480
481    pub async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
482        if let Some(node) = self.mem_mmr.get_node(position) {
483            return Ok(Some(node));
484        }
485
486        match self.journal.read(*position).await {
487            Ok(item) => Ok(Some(item)),
488            Err(JError::ItemPruned(_)) => Ok(None),
489            Err(e) => Err(Error::JournalError(e)),
490        }
491    }
492
493    /// Sync the MMR to disk.
494    pub async fn sync(&mut self) -> Result<(), Error> {
495        // Write the nodes cached in the memory-resident MMR to the journal.
496        for pos in *self.journal_size..*self.size() {
497            let pos = Position::new(pos);
498            let node = *self.mem_mmr.get_node_unchecked(pos);
499            self.journal.append(node).await?;
500        }
501        self.journal_size = self.size();
502        self.journal.sync().await?;
503        assert_eq!(self.journal_size, self.journal.bounds().end);
504
505        // Recompute pinned nodes since we'll need to repopulate the cache after it is cleared by
506        // pruning the mem_mmr.
507        let mut pinned_nodes = BTreeMap::new();
508        for pos in nodes_to_pin(self.pruned_to_pos) {
509            let digest = self.mem_mmr.get_node_unchecked(pos);
510            pinned_nodes.insert(pos, *digest);
511        }
512
513        // Now that the pinned node set has been recomputed, it's safe to prune the mem_mmr and
514        // reinstate them.
515        self.mem_mmr.prune_all();
516        self.mem_mmr.add_pinned_nodes(pinned_nodes);
517
518        Ok(())
519    }
520
521    /// Prune all nodes up to but not including the given position and update the pinned nodes.
522    ///
523    /// This implementation ensures that no failure can leave the MMR in an unrecoverable state,
524    /// requiring it sync the MMR to write any potential unmerkleized updates.
525    pub async fn prune_to_pos(&mut self, pos: Position) -> Result<(), Error> {
526        assert!(pos <= self.size());
527        if pos <= self.pruned_to_pos {
528            return Ok(());
529        }
530
531        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
532        self.sync().await?;
533
534        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
535        // event of a pruning failure.
536        let pinned_nodes = self.update_metadata(pos).await?;
537
538        self.journal.prune(*pos).await?;
539        self.mem_mmr.add_pinned_nodes(pinned_nodes);
540        self.pruned_to_pos = pos;
541
542        Ok(())
543    }
544
545    /// Return the root of the MMR.
546    pub const fn root(&self) -> D {
547        *self.mem_mmr.root()
548    }
549
550    /// Return an inclusion proof for the element at the location `loc`.
551    ///
552    /// # Errors
553    ///
554    /// Returns [Error::LocationOverflow] if `loc` exceeds [crate::mmr::MAX_LOCATION].
555    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
556    /// Returns [Error::Empty] if the range is empty.
557    pub async fn proof(&self, loc: Location) -> Result<Proof<D>, Error> {
558        if !loc.is_valid() {
559            return Err(Error::LocationOverflow(loc));
560        }
561        // loc is valid so it won't overflow from + 1
562        self.range_proof(loc..loc + 1).await
563    }
564
565    /// Return an inclusion proof for the elements within the specified location range.
566    ///
567    /// Locations are validated by [verification::range_proof].
568    ///
569    /// # Errors
570    ///
571    /// Returns [Error::LocationOverflow] if any location in `range` exceeds [crate::mmr::MAX_LOCATION].
572    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
573    /// Returns [Error::Empty] if the range is empty.
574    pub async fn range_proof(&self, range: Range<Location>) -> Result<Proof<D>, Error> {
575        verification::range_proof(self, range).await
576    }
577
578    /// Analogous to range_proof but for a previous database state. Specifically, the state when the
579    /// MMR had `leaves` leaves.
580    ///
581    /// Locations are validated by [verification::historical_range_proof].
582    ///
583    /// # Errors
584    ///
585    /// Returns [Error::LocationOverflow] if any location in `range` exceeds [crate::mmr::MAX_LOCATION].
586    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
587    /// Returns [Error::Empty] if the range is empty.
588    pub async fn historical_range_proof(
589        &self,
590        leaves: Location,
591        range: Range<Location>,
592    ) -> Result<Proof<D>, Error> {
593        verification::historical_range_proof(self, leaves, range).await
594    }
595
596    /// Prune as many nodes as possible, leaving behind at most items_per_blob nodes in the current
597    /// blob.
598    pub async fn prune_all(&mut self) -> Result<(), Error> {
599        if self.size() != 0 {
600            self.prune_to_pos(self.size()).await?;
601            return Ok(());
602        }
603        Ok(())
604    }
605
606    /// Close and permanently remove any disk resources.
607    pub async fn destroy(self) -> Result<(), Error> {
608        self.journal.destroy().await?;
609        self.metadata.destroy().await?;
610
611        Ok(())
612    }
613
614    /// Convert this MMR into its dirty counterpart for batched updates.
615    pub fn into_dirty(self) -> DirtyMmr<E, D> {
616        self.into()
617    }
618
619    #[cfg(any(test, feature = "fuzzing"))]
620    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
621    /// a partial write for testing failure scenarios.
622    pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error> {
623        if write_limit == 0 {
624            return Ok(());
625        }
626
627        // Write the nodes cached in the memory-resident MMR to the journal, aborting after
628        // write_count nodes have been written.
629        let mut written_count = 0usize;
630        for i in *self.journal_size..*self.size() {
631            let node = *self.mem_mmr.get_node_unchecked(Position::new(i));
632            self.journal.append(node).await?;
633            written_count += 1;
634            if written_count >= write_limit {
635                break;
636            }
637        }
638        self.journal.sync().await?;
639
640        Ok(())
641    }
642
643    #[cfg(test)]
644    pub fn get_pinned_nodes(&self) -> BTreeMap<Position, D> {
645        self.mem_mmr.pinned_nodes()
646    }
647
648    #[cfg(test)]
649    pub async fn simulate_pruning_failure(mut self, prune_to_pos: Position) -> Result<(), Error> {
650        assert!(prune_to_pos <= self.size());
651
652        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
653        self.sync().await?;
654
655        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
656        // event of a pruning failure.
657        self.update_metadata(prune_to_pos).await?;
658
659        // Don't actually prune the journal to simulate failure
660        Ok(())
661    }
662}
663
664impl<E: RStorage + Clock + Metrics, D: Digest> DirtyMmr<E, D> {
665    /// Merkleize the MMR and compute the root digest.
666    pub fn merkleize(self, h: &mut impl Hasher<Digest = D>) -> CleanMmr<E, D> {
667        CleanMmr {
668            mem_mmr: self.mem_mmr.merkleize(h, self.pool.clone()),
669            journal: self.journal,
670            journal_size: self.journal_size,
671            metadata: self.metadata,
672            pruned_to_pos: self.pruned_to_pos,
673            pool: self.pool,
674        }
675    }
676
677    /// Add an element to the MMR and return its position in the MMR. Elements added to the MMR
678    /// aren't persisted to disk until `sync` is called.
679    pub async fn add(
680        &mut self,
681        h: &mut impl Hasher<Digest = D>,
682        element: &[u8],
683    ) -> Result<Position, Error> {
684        Ok(self.mem_mmr.add(h, element))
685    }
686
687    /// Pop elements while staying in Dirty state. No root recomputation occurs until merkleize.
688    pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
689        while leaves_to_pop > 0 {
690            match self.mem_mmr.pop() {
691                Ok(_) => leaves_to_pop -= 1,
692                Err(ElementPruned(_)) => break,
693                Err(Empty) => return Err(Error::Empty),
694                Err(e) => return Err(e),
695            }
696        }
697        if leaves_to_pop == 0 {
698            return Ok(());
699        }
700
701        let mut new_size = self.size();
702        while leaves_to_pop > 0 {
703            if new_size == 0 {
704                return Err(Error::Empty);
705            }
706            new_size -= 1;
707            if new_size < self.pruned_to_pos {
708                return Err(Error::ElementPruned(new_size));
709            }
710            if new_size.is_mmr_size() {
711                leaves_to_pop -= 1;
712            }
713        }
714
715        self.journal.rewind(*new_size).await?;
716        self.journal.sync().await?;
717        self.journal_size = new_size;
718
719        let mut pinned_nodes = Vec::new();
720        for pos in nodes_to_pin(new_size) {
721            let digest = Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(
722                &self.metadata,
723                &self.journal,
724                pos,
725            )
726            .await?;
727            pinned_nodes.push(digest);
728        }
729
730        self.mem_mmr = DirtyMemMmr::from_components(vec![], new_size, pinned_nodes);
731        Self::add_extra_pinned_nodes(
732            &mut self.mem_mmr,
733            &self.metadata,
734            &self.journal,
735            self.pruned_to_pos,
736        )
737        .await?;
738
739        Ok(())
740    }
741
742    #[cfg(any(test, feature = "fuzzing"))]
743    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
744    /// a partial write for testing failure scenarios.
745    pub async fn simulate_partial_sync(
746        self,
747        hasher: &mut impl Hasher<Digest = D>,
748        write_limit: usize,
749    ) -> Result<(), Error> {
750        if write_limit == 0 {
751            return Ok(());
752        }
753
754        let mut clean_mmr = self.merkleize(hasher);
755
756        // Write the nodes cached in the memory-resident MMR to the journal, aborting after
757        // write_count nodes have been written.
758        let mut written_count = 0usize;
759        for i in *clean_mmr.journal_size..*clean_mmr.size() {
760            let node = *clean_mmr.mem_mmr.get_node_unchecked(Position::new(i));
761            clean_mmr.journal.append(node).await?;
762            written_count += 1;
763            if written_count >= write_limit {
764                break;
765            }
766        }
767        clean_mmr.journal.sync().await?;
768
769        Ok(())
770    }
771}
772
773impl<E: RStorage + Clock + Metrics + Sync, D: Digest> Storage<D> for CleanMmr<E, D> {
774    fn size(&self) -> Position {
775        self.size()
776    }
777
778    async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
779        self.get_node(position).await
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use super::*;
786    use crate::mmr::{
787        conformance::build_test_mmr, hasher::Hasher as _, location::LocationRangeExt as _, mem,
788        Location, StandardHasher as Standard,
789    };
790    use commonware_cryptography::{
791        sha256::{self, Digest},
792        Hasher, Sha256,
793    };
794    use commonware_macros::test_traced;
795    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Blob as _, Runner};
796    use commonware_utils::{NZUsize, NZU16, NZU64};
797    use std::num::NonZeroU16;
798
799    fn test_digest(v: usize) -> Digest {
800        Sha256::hash(&v.to_be_bytes())
801    }
802
803    const PAGE_SIZE: NonZeroU16 = NZU16!(111);
804    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
805
806    fn test_config() -> Config {
807        Config {
808            journal_partition: "journal_partition".into(),
809            metadata_partition: "metadata_partition".into(),
810            items_per_blob: NZU64!(7),
811            write_buffer: NZUsize!(1024),
812            thread_pool: None,
813            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
814        }
815    }
816
817    /// Test that the journaled MMR produces the same root as the in-memory reference.
818    #[test]
819    fn test_journaled_mmr_batched_root() {
820        let executor = deterministic::Runner::default();
821        executor.start(|context| async move {
822            const NUM_ELEMENTS: u64 = 199;
823            let mut hasher: Standard<Sha256> = Standard::new();
824            let test_mmr = mem::CleanMmr::new(&mut hasher);
825            let test_mmr = build_test_mmr(&mut hasher, test_mmr, NUM_ELEMENTS);
826            let expected_root = test_mmr.root();
827
828            let mut journaled_mmr = Mmr::init(
829                context.clone(),
830                &mut Standard::<Sha256>::new(),
831                test_config(),
832            )
833            .await
834            .unwrap()
835            .into_dirty();
836
837            for i in 0u64..NUM_ELEMENTS {
838                hasher.inner().update(&i.to_be_bytes());
839                let element = hasher.inner().finalize();
840                journaled_mmr.add(&mut hasher, &element).await.unwrap();
841            }
842
843            let journaled_mmr = journaled_mmr.merkleize(&mut hasher);
844            assert_eq!(journaled_mmr.root(), *expected_root);
845
846            journaled_mmr.destroy().await.unwrap();
847        });
848    }
849
850    #[test_traced]
851    fn test_journaled_mmr_empty() {
852        let executor = deterministic::Runner::default();
853        executor.start(|context| async move {
854            let mut hasher: Standard<Sha256> = Standard::new();
855            let mut mmr = Mmr::init(context.with_label("first"), &mut hasher, test_config())
856                .await
857                .unwrap();
858            assert_eq!(mmr.size(), 0);
859            assert!(mmr.get_node(Position::new(0)).await.is_err());
860            let bounds = mmr.bounds();
861            assert!(bounds.is_empty());
862            assert!(mmr.prune_all().await.is_ok());
863            assert_eq!(bounds.start, 0);
864            assert!(mmr.prune_to_pos(Position::new(0)).await.is_ok());
865            assert!(mmr.sync().await.is_ok());
866            let mut mmr = mmr.into_dirty();
867            assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
868
869            mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
870            assert_eq!(mmr.size(), 1);
871            let mut mmr = mmr.merkleize(&mut hasher);
872            mmr.sync().await.unwrap();
873            assert!(mmr.get_node(Position::new(0)).await.is_ok());
874            let mut mmr = mmr.into_dirty();
875            assert!(mmr.pop(1).await.is_ok());
876            assert_eq!(mmr.size(), 0);
877            let mut mmr = mmr.merkleize(&mut hasher);
878            mmr.sync().await.unwrap();
879
880            let mmr = Mmr::init(context.with_label("second"), &mut hasher, test_config())
881                .await
882                .unwrap();
883            assert_eq!(mmr.size(), 0);
884
885            let empty_proof = Proof::default();
886            let mut hasher: Standard<Sha256> = Standard::new();
887            let root = mmr.root();
888            assert!(empty_proof.verify_range_inclusion(
889                &mut hasher,
890                &[] as &[Digest],
891                Location::new_unchecked(0),
892                &root
893            ));
894            assert!(empty_proof.verify_multi_inclusion(
895                &mut hasher,
896                &[] as &[(Digest, Location)],
897                &root
898            ));
899
900            // Confirm empty proof no longer verifies after adding an element.
901            let mut mmr = mmr.into_dirty();
902            mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
903            let mmr = mmr.merkleize(&mut hasher);
904            let root = mmr.root();
905            assert!(!empty_proof.verify_range_inclusion(
906                &mut hasher,
907                &[] as &[Digest],
908                Location::new_unchecked(0),
909                &root
910            ));
911            assert!(!empty_proof.verify_multi_inclusion(
912                &mut hasher,
913                &[] as &[(Digest, Location)],
914                &root
915            ));
916
917            mmr.destroy().await.unwrap();
918        });
919    }
920
921    #[test_traced]
922    fn test_journaled_mmr_pop() {
923        let executor = deterministic::Runner::default();
924        executor.start(|context| async move {
925            const NUM_ELEMENTS: u64 = 200;
926
927            let mut hasher: Standard<Sha256> = Standard::new();
928            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
929                .await
930                .unwrap()
931                .into_dirty();
932
933            let mut c_hasher = Sha256::new();
934            for i in 0u64..NUM_ELEMENTS {
935                c_hasher.update(&i.to_be_bytes());
936                let element = c_hasher.finalize();
937                mmr.add(&mut hasher, &element).await.unwrap();
938            }
939
940            // Pop off one node at a time without syncing until empty, confirming the root matches.
941            for i in (0..NUM_ELEMENTS).rev() {
942                assert!(mmr.pop(1).await.is_ok());
943                let clean_mmr = mmr.merkleize(&mut hasher);
944                let root = clean_mmr.root();
945                let mut reference_mmr = mem::DirtyMmr::new();
946                for j in 0..i {
947                    c_hasher.update(&j.to_be_bytes());
948                    let element = c_hasher.finalize();
949                    reference_mmr.add(&mut hasher, &element);
950                }
951                let reference_mmr = reference_mmr.merkleize(&mut hasher, None);
952                assert_eq!(
953                    root,
954                    *reference_mmr.root(),
955                    "root mismatch after pop at {i}"
956                );
957                mmr = clean_mmr.into_dirty();
958            }
959            assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
960            assert!(mmr.pop(0).await.is_ok());
961
962            // Repeat the test though sync part of the way to tip to test crossing the boundary from
963            // cached to uncached leaves, and pop 2 at a time instead of just 1.
964            for i in 0u64..NUM_ELEMENTS {
965                c_hasher.update(&i.to_be_bytes());
966                let element = c_hasher.finalize();
967                mmr.add(&mut hasher, &element).await.unwrap();
968                if i == 101 {
969                    let mut clean_mmr = mmr.merkleize(&mut hasher);
970                    clean_mmr.sync().await.unwrap();
971                    mmr = clean_mmr.into_dirty();
972                }
973            }
974
975            for i in (0..NUM_ELEMENTS - 1).rev().step_by(2) {
976                assert!(mmr.pop(2).await.is_ok(), "at position {i:?}");
977                let clean_mmr = mmr.merkleize(&mut hasher);
978                let root = clean_mmr.root();
979                let reference_mmr = mem::CleanMmr::new(&mut hasher);
980                let reference_mmr = build_test_mmr(&mut hasher, reference_mmr, i);
981                assert_eq!(
982                    root,
983                    *reference_mmr.root(),
984                    "root mismatch at position {i:?}"
985                );
986                mmr = clean_mmr.into_dirty();
987            }
988            assert!(matches!(mmr.pop(99).await, Err(Error::Empty)));
989
990            // Repeat one more time only after pruning the MMR first.
991            for i in 0u64..NUM_ELEMENTS {
992                c_hasher.update(&i.to_be_bytes());
993                let element = c_hasher.finalize();
994                mmr.add(&mut hasher, &element).await.unwrap();
995                if i == 101 {
996                    let mut clean_mmr = mmr.merkleize(&mut hasher);
997                    clean_mmr.sync().await.unwrap();
998                    mmr = clean_mmr.into_dirty();
999                }
1000            }
1001            let mut mmr = mmr.merkleize(&mut hasher);
1002            let leaf_pos = Position::try_from(Location::new_unchecked(50)).unwrap();
1003            mmr.prune_to_pos(leaf_pos).await.unwrap();
1004            // Pop enough nodes to cause the mem-mmr to be completely emptied, and then some.
1005            let mut mmr = mmr.into_dirty();
1006            mmr.pop(80).await.unwrap();
1007            let mmr = mmr.merkleize(&mut hasher);
1008            // Make sure the pinned node boundary is valid by generating a proof for the oldest item.
1009            mmr.proof(Location::try_from(leaf_pos).unwrap())
1010                .await
1011                .unwrap();
1012            // prune all remaining leaves 1 at a time.
1013            let mut mmr = mmr.into_dirty();
1014            while mmr.size() > leaf_pos {
1015                assert!(mmr.pop(1).await.is_ok());
1016            }
1017            assert!(matches!(mmr.pop(1).await, Err(Error::ElementPruned(_))));
1018
1019            // Make sure pruning to an older location is a no-op.
1020            let mut mmr = mmr.merkleize(&mut hasher);
1021            assert!(mmr.prune_to_pos(leaf_pos - 1).await.is_ok());
1022            assert_eq!(mmr.bounds().start, leaf_pos);
1023
1024            mmr.destroy().await.unwrap();
1025        });
1026    }
1027
1028    #[test_traced]
1029    fn test_journaled_mmr_basic() {
1030        let executor = deterministic::Runner::default();
1031        executor.start(|context| async move {
1032            let mut hasher: Standard<Sha256> = Standard::new();
1033            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1034                .await
1035                .unwrap();
1036            // Build a test MMR with 255 leaves
1037            const LEAF_COUNT: usize = 255;
1038            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1039            let mut positions = Vec::with_capacity(LEAF_COUNT);
1040            let mut mmr = mmr.into_dirty();
1041            for i in 0..LEAF_COUNT {
1042                let digest = test_digest(i);
1043                leaves.push(digest);
1044                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1045                positions.push(pos);
1046            }
1047            let mut mmr = mmr.merkleize(&mut hasher);
1048            assert_eq!(mmr.size(), Position::new(502));
1049            assert_eq!(mmr.journal_size, Position::new(0));
1050
1051            // Generate & verify proof from element that is not yet flushed to the journal.
1052            const TEST_ELEMENT: usize = 133;
1053            const TEST_ELEMENT_LOC: Location = Location::new_unchecked(TEST_ELEMENT as u64);
1054
1055            let proof = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1056            let root = mmr.root();
1057            assert!(proof.verify_element_inclusion(
1058                &mut hasher,
1059                &leaves[TEST_ELEMENT],
1060                TEST_ELEMENT_LOC,
1061                &root,
1062            ));
1063
1064            // Sync the MMR, make sure it flushes the in-mem MMR as expected.
1065            mmr.sync().await.unwrap();
1066            assert_eq!(mmr.journal_size, Position::new(502));
1067            assert!(mmr.mem_mmr.bounds().is_empty());
1068
1069            // Now that the element is flushed from the in-mem MMR, confirm its proof is still is
1070            // generated correctly.
1071            let proof2 = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1072            assert_eq!(proof, proof2);
1073
1074            // Generate & verify a proof that spans flushed elements and the last element.
1075            let range = Location::new_unchecked(TEST_ELEMENT as u64)
1076                ..Location::new_unchecked(LEAF_COUNT as u64);
1077            let proof = mmr.range_proof(range.clone()).await.unwrap();
1078            assert!(proof.verify_range_inclusion(
1079                &mut hasher,
1080                &leaves[range.to_usize_range()],
1081                TEST_ELEMENT_LOC,
1082                &root
1083            ));
1084
1085            mmr.destroy().await.unwrap();
1086        });
1087    }
1088
1089    #[test_traced]
1090    /// Generates a stateful MMR, simulates various partial-write scenarios, and confirms we
1091    /// appropriately recover to a valid state.
1092    fn test_journaled_mmr_recovery() {
1093        let executor = deterministic::Runner::default();
1094        executor.start(|context| async move {
1095            let mut hasher: Standard<Sha256> = Standard::new();
1096            let mut mmr = Mmr::init(context.with_label("first"), &mut hasher, test_config())
1097                .await
1098                .unwrap()
1099                .into_dirty();
1100            assert_eq!(mmr.size(), 0);
1101
1102            // Build a test MMR with 252 leaves
1103            const LEAF_COUNT: usize = 252;
1104            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1105            let mut positions = Vec::with_capacity(LEAF_COUNT);
1106            for i in 0..LEAF_COUNT {
1107                let digest = test_digest(i);
1108                leaves.push(digest);
1109                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1110                positions.push(pos);
1111            }
1112            let mut mmr = mmr.merkleize(&mut hasher);
1113            assert_eq!(mmr.size(), 498);
1114            let root = mmr.root();
1115            mmr.sync().await.unwrap();
1116            drop(mmr);
1117
1118            // The very last element we added (pos=495) resulted in new parents at positions 496 &
1119            // 497. Simulate a partial write by corrupting the last page's checksum by truncating
1120            // the last blob by a single byte.
1121            let partition: String = "journal_partition-blobs".into();
1122            let (blob, len) = context
1123                .open(&partition, &71u64.to_be_bytes())
1124                .await
1125                .expect("Failed to open blob");
1126            // A full page w/ CRC should have been written on sync.
1127            assert_eq!(len, PAGE_SIZE.get() as u64 + 12);
1128
1129            // truncate the blob by one byte to corrupt the page CRC.
1130            blob.resize(len - 1).await.expect("Failed to corrupt blob");
1131            blob.sync().await.expect("Failed to sync blob");
1132
1133            let mmr = Mmr::init(context.with_label("second"), &mut hasher, test_config())
1134                .await
1135                .unwrap();
1136            // Since we didn't corrupt the leaf, the MMR is able to replay the leaf and recover to
1137            // the previous state.
1138            assert_eq!(mmr.size(), 498);
1139            assert_eq!(mmr.root(), root);
1140
1141            // Make sure dropping it and re-opening it persists the recovered state.
1142            drop(mmr);
1143            let mmr = Mmr::init(context.with_label("third"), &mut hasher, test_config())
1144                .await
1145                .unwrap();
1146            assert_eq!(mmr.size(), 498);
1147
1148            mmr.destroy().await.unwrap();
1149        });
1150    }
1151
1152    #[test_traced]
1153    fn test_journaled_mmr_pruning() {
1154        let executor = deterministic::Runner::default();
1155        executor.start(|context| async move {
1156            let mut hasher: Standard<Sha256> = Standard::new();
1157            // make sure pruning doesn't break root computation, adding of new nodes, etc.
1158            const LEAF_COUNT: usize = 2000;
1159            let cfg_pruned = test_config();
1160            let pruned_mmr = Mmr::init(
1161                context.with_label("pruned"),
1162                &mut hasher,
1163                cfg_pruned.clone(),
1164            )
1165            .await
1166            .unwrap();
1167            let cfg_unpruned = Config {
1168                journal_partition: "unpruned_journal_partition".into(),
1169                metadata_partition: "unpruned_metadata_partition".into(),
1170                items_per_blob: NZU64!(7),
1171                write_buffer: NZUsize!(1024),
1172                thread_pool: None,
1173                page_cache: cfg_pruned.page_cache.clone(),
1174            };
1175            let mut mmr = Mmr::init(context.with_label("unpruned"), &mut hasher, cfg_unpruned)
1176                .await
1177                .unwrap()
1178                .into_dirty();
1179            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1180            let mut positions = Vec::with_capacity(LEAF_COUNT);
1181            let mut pruned_mmr = pruned_mmr.into_dirty();
1182            for i in 0..LEAF_COUNT {
1183                let digest = test_digest(i);
1184                leaves.push(digest);
1185                let last_leaf = leaves.last().unwrap();
1186                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1187                positions.push(pos);
1188                pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1189            }
1190            let mut mmr = mmr.merkleize(&mut hasher);
1191            let mut pruned_mmr = pruned_mmr.merkleize(&mut hasher);
1192            assert_eq!(mmr.size(), 3994);
1193            assert_eq!(pruned_mmr.size(), 3994);
1194
1195            // Prune the MMR in increments of 10 making sure the journal is still able to compute
1196            // roots and accept new elements.
1197            for i in 0usize..300 {
1198                let prune_pos = i as u64 * 10;
1199                pruned_mmr
1200                    .prune_to_pos(Position::new(prune_pos))
1201                    .await
1202                    .unwrap();
1203                assert_eq!(prune_pos, pruned_mmr.bounds().start);
1204
1205                let digest = test_digest(LEAF_COUNT + i);
1206                leaves.push(digest);
1207                let last_leaf = leaves.last().unwrap();
1208                let mut dirty_pruned_mmr = pruned_mmr.into_dirty();
1209                let pos = dirty_pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1210                pruned_mmr = dirty_pruned_mmr.merkleize(&mut hasher);
1211                positions.push(pos);
1212                let mut dirty_mmr = mmr.into_dirty();
1213                dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1214                mmr = dirty_mmr.merkleize(&mut hasher);
1215                assert_eq!(pruned_mmr.root(), mmr.root());
1216            }
1217
1218            // Sync the MMRs.
1219            pruned_mmr.sync().await.unwrap();
1220            assert_eq!(pruned_mmr.root(), mmr.root());
1221
1222            // Sync the MMR & reopen.
1223            pruned_mmr.sync().await.unwrap();
1224            drop(pruned_mmr);
1225            let mut pruned_mmr = Mmr::init(
1226                context.with_label("pruned_reopen"),
1227                &mut hasher,
1228                cfg_pruned.clone(),
1229            )
1230            .await
1231            .unwrap();
1232            assert_eq!(pruned_mmr.root(), mmr.root());
1233
1234            // Prune everything.
1235            let size = pruned_mmr.size();
1236            pruned_mmr.prune_all().await.unwrap();
1237            assert_eq!(pruned_mmr.root(), mmr.root());
1238            let bounds = pruned_mmr.bounds();
1239            assert!(bounds.is_empty());
1240            assert_eq!(bounds.start, size);
1241
1242            // Close MMR after adding a new node without syncing and make sure state is as expected
1243            // on reopening.
1244            let mut mmr = mmr.into_dirty();
1245            mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1246                .await
1247                .unwrap();
1248            let mmr = mmr.merkleize(&mut hasher);
1249            let mut dirty_pruned = pruned_mmr.into_dirty();
1250            dirty_pruned
1251                .add(&mut hasher, &test_digest(LEAF_COUNT))
1252                .await
1253                .unwrap();
1254            let mut pruned_mmr = dirty_pruned.merkleize(&mut hasher);
1255            assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1256            pruned_mmr.sync().await.unwrap();
1257            drop(pruned_mmr);
1258            let mut pruned_mmr = Mmr::init(
1259                context.with_label("pruned_reopen2"),
1260                &mut hasher,
1261                cfg_pruned.clone(),
1262            )
1263            .await
1264            .unwrap();
1265            assert_eq!(pruned_mmr.root(), mmr.root());
1266            let bounds = pruned_mmr.bounds();
1267            assert!(!bounds.is_empty());
1268            assert_eq!(bounds.start, size);
1269
1270            // Make sure pruning to older location is a no-op.
1271            assert!(pruned_mmr.prune_to_pos(size - 1).await.is_ok());
1272            assert_eq!(pruned_mmr.bounds().start, size);
1273
1274            // Add nodes until we are on a blob boundary, and confirm prune_all still removes all
1275            // retained nodes.
1276            while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1277                let mut dirty_pruned_mmr = pruned_mmr.into_dirty();
1278                dirty_pruned_mmr
1279                    .add(&mut hasher, &test_digest(LEAF_COUNT))
1280                    .await
1281                    .unwrap();
1282                pruned_mmr = dirty_pruned_mmr.merkleize(&mut hasher);
1283            }
1284            pruned_mmr.prune_all().await.unwrap();
1285            assert!(pruned_mmr.bounds().is_empty());
1286
1287            pruned_mmr.destroy().await.unwrap();
1288            mmr.destroy().await.unwrap();
1289        });
1290    }
1291
1292    #[test_traced("WARN")]
1293    /// Simulate partial writes after pruning, making sure we recover to a valid state.
1294    fn test_journaled_mmr_recovery_with_pruning() {
1295        let executor = deterministic::Runner::default();
1296        executor.start(|context| async move {
1297            // Build MMR with 2000 leaves.
1298            let mut hasher: Standard<Sha256> = Standard::new();
1299            const LEAF_COUNT: usize = 2000;
1300            let mut mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1301                .await
1302                .unwrap()
1303                .into_dirty();
1304            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1305            let mut positions = Vec::with_capacity(LEAF_COUNT);
1306            for i in 0..LEAF_COUNT {
1307                let digest = test_digest(i);
1308                leaves.push(digest);
1309                let last_leaf = leaves.last().unwrap();
1310                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1311                positions.push(pos);
1312            }
1313            let mut mmr = mmr.merkleize(&mut hasher);
1314            assert_eq!(mmr.size(), 3994);
1315            mmr.sync().await.unwrap();
1316            drop(mmr);
1317
1318            // Prune the MMR in increments of 50, simulating a partial write after each prune.
1319            for i in 0usize..200 {
1320                let label = format!("iter_{i}");
1321                let mut mmr = Mmr::init(context.with_label(&label), &mut hasher, test_config())
1322                    .await
1323                    .unwrap();
1324                let start_size = mmr.size();
1325                let prune_pos = std::cmp::min(i as u64 * 50, *start_size);
1326                let prune_pos = Position::new(prune_pos);
1327                if i % 5 == 0 {
1328                    mmr.simulate_pruning_failure(prune_pos).await.unwrap();
1329                    continue;
1330                }
1331                mmr.prune_to_pos(prune_pos).await.unwrap();
1332
1333                // add 25 new elements, simulating a partial write after each.
1334                for j in 0..10 {
1335                    let digest = test_digest(100 * (i + 1) + j);
1336                    leaves.push(digest);
1337                    let last_leaf = leaves.last().unwrap();
1338                    let mut dirty_mmr = mmr.into_dirty();
1339                    let pos = dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1340                    positions.push(pos);
1341                    dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1342                    mmr = dirty_mmr.merkleize(&mut hasher);
1343                    assert_eq!(mmr.root(), mmr.root());
1344                    let digest = test_digest(LEAF_COUNT + i);
1345                    leaves.push(digest);
1346                    let last_leaf = leaves.last().unwrap();
1347                    let mut dirty_mmr = mmr.into_dirty();
1348                    let pos = dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1349                    positions.push(pos);
1350                    dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1351                    mmr = dirty_mmr.merkleize(&mut hasher);
1352                }
1353                let end_size = mmr.size();
1354                let total_to_write = (*end_size - *start_size) as usize;
1355                let partial_write_limit = i % total_to_write;
1356                mmr.simulate_partial_sync(partial_write_limit)
1357                    .await
1358                    .unwrap();
1359            }
1360
1361            let mmr = Mmr::init(context.with_label("final"), &mut hasher, test_config())
1362                .await
1363                .unwrap();
1364            mmr.destroy().await.unwrap();
1365        });
1366    }
1367
1368    #[test_traced]
1369    fn test_journaled_mmr_historical_range_proof_basic() {
1370        let executor = deterministic::Runner::default();
1371        executor.start(|context| async move {
1372            // Create MMR with 10 elements
1373            let mut hasher = Standard::<Sha256>::new();
1374            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1375                .await
1376                .unwrap()
1377                .into_dirty();
1378            let mut elements = Vec::new();
1379            let mut positions = Vec::new();
1380            for i in 0..10 {
1381                elements.push(test_digest(i));
1382                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1383            }
1384            let mmr = mmr.merkleize(&mut hasher);
1385            let original_leaves = mmr.leaves();
1386
1387            // Historical proof should match "regular" proof when historical size == current database size
1388            let historical_proof = mmr
1389                .historical_range_proof(
1390                    original_leaves,
1391                    Location::new_unchecked(2)..Location::new_unchecked(6),
1392                )
1393                .await
1394                .unwrap();
1395            assert_eq!(historical_proof.leaves, original_leaves);
1396            let root = mmr.root();
1397            assert!(historical_proof.verify_range_inclusion(
1398                &mut hasher,
1399                &elements[2..6],
1400                Location::new_unchecked(2),
1401                &root
1402            ));
1403            let regular_proof = mmr
1404                .range_proof(Location::new_unchecked(2)..Location::new_unchecked(6))
1405                .await
1406                .unwrap();
1407            assert_eq!(regular_proof.leaves, historical_proof.leaves);
1408            assert_eq!(regular_proof.digests, historical_proof.digests);
1409
1410            // Add more elements to the MMR
1411            let mut mmr = mmr.into_dirty();
1412            for i in 10..20 {
1413                elements.push(test_digest(i));
1414                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1415            }
1416            let mmr = mmr.merkleize(&mut hasher);
1417            let new_historical_proof = mmr
1418                .historical_range_proof(
1419                    original_leaves,
1420                    Location::new_unchecked(2)..Location::new_unchecked(6),
1421                )
1422                .await
1423                .unwrap();
1424            assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1425            assert_eq!(new_historical_proof.digests, historical_proof.digests);
1426
1427            mmr.destroy().await.unwrap();
1428        });
1429    }
1430
1431    #[test_traced]
1432    fn test_journaled_mmr_historical_range_proof_with_pruning() {
1433        let executor = deterministic::Runner::default();
1434        executor.start(|context| async move {
1435            let mut hasher = Standard::<Sha256>::new();
1436            let mmr = Mmr::init(context.with_label("main"), &mut hasher, test_config())
1437                .await
1438                .unwrap();
1439
1440            // Add many elements
1441            let mut elements = Vec::new();
1442            let mut positions = Vec::new();
1443            let mut mmr = mmr.into_dirty();
1444            for i in 0..50 {
1445                elements.push(test_digest(i));
1446                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1447            }
1448            let mut mmr = mmr.merkleize(&mut hasher);
1449
1450            // Prune to position 30
1451            let prune_pos = Position::new(30);
1452            mmr.prune_to_pos(prune_pos).await.unwrap();
1453
1454            // Create reference MMR for verification to get correct size
1455            let ref_mmr = Mmr::init(
1456                context.with_label("ref"),
1457                &mut hasher,
1458                Config {
1459                    journal_partition: "ref_journal_pruned".into(),
1460                    metadata_partition: "ref_metadata_pruned".into(),
1461                    items_per_blob: NZU64!(7),
1462                    write_buffer: NZUsize!(1024),
1463                    thread_pool: None,
1464                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1465                },
1466            )
1467            .await
1468            .unwrap();
1469
1470            let mut ref_mmr = ref_mmr.into_dirty();
1471            for elt in elements.iter().take(41) {
1472                ref_mmr.add(&mut hasher, elt).await.unwrap();
1473            }
1474            let ref_mmr = ref_mmr.merkleize(&mut hasher);
1475            let historical_leaves = ref_mmr.leaves();
1476            let historical_root = ref_mmr.root();
1477
1478            // Test proof at historical position after pruning
1479            let historical_proof = mmr
1480                .historical_range_proof(
1481                    historical_leaves,
1482                    Location::new_unchecked(35)..Location::new_unchecked(39), // Start after prune point to end at historical size
1483                )
1484                .await
1485                .unwrap();
1486
1487            assert_eq!(historical_proof.leaves, historical_leaves);
1488
1489            // Verify proof works despite pruning
1490            assert!(historical_proof.verify_range_inclusion(
1491                &mut hasher,
1492                &elements[35..39],
1493                Location::new_unchecked(35),
1494                &historical_root
1495            ));
1496
1497            ref_mmr.destroy().await.unwrap();
1498            mmr.destroy().await.unwrap();
1499        });
1500    }
1501
1502    #[test_traced]
1503    fn test_journaled_mmr_historical_range_proof_large() {
1504        let executor = deterministic::Runner::default();
1505        executor.start(|context| async move {
1506            let mut hasher = Standard::<Sha256>::new();
1507
1508            let mmr = Mmr::init(
1509                context.with_label("server"),
1510                &mut hasher,
1511                Config {
1512                    journal_partition: "server_journal".into(),
1513                    metadata_partition: "server_metadata".into(),
1514                    items_per_blob: NZU64!(7),
1515                    write_buffer: NZUsize!(1024),
1516                    thread_pool: None,
1517                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1518                },
1519            )
1520            .await
1521            .unwrap();
1522
1523            let mut elements = Vec::new();
1524            let mut positions = Vec::new();
1525            let mut mmr = mmr.into_dirty();
1526            for i in 0..100 {
1527                elements.push(test_digest(i));
1528                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1529            }
1530            let mmr = mmr.merkleize(&mut hasher);
1531
1532            let range = Location::new_unchecked(30)..Location::new_unchecked(61);
1533
1534            // Only apply elements up to end_loc to the reference MMR.
1535            let ref_mmr = Mmr::init(
1536                context.with_label("client"),
1537                &mut hasher,
1538                Config {
1539                    journal_partition: "client_journal".into(),
1540                    metadata_partition: "client_metadata".into(),
1541                    items_per_blob: NZU64!(7),
1542                    write_buffer: NZUsize!(1024),
1543                    thread_pool: None,
1544                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1545                },
1546            )
1547            .await
1548            .unwrap();
1549
1550            // Add elements up to the end of the range to verify historical root
1551            let mut ref_mmr = ref_mmr.into_dirty();
1552            for elt in elements.iter().take(*range.end as usize) {
1553                ref_mmr.add(&mut hasher, elt).await.unwrap();
1554            }
1555            let ref_mmr = ref_mmr.merkleize(&mut hasher);
1556            let historical_leaves = ref_mmr.leaves();
1557            let expected_root = ref_mmr.root();
1558
1559            // Generate proof from full MMR
1560            let proof = mmr
1561                .historical_range_proof(historical_leaves, range.clone())
1562                .await
1563                .unwrap();
1564
1565            assert!(proof.verify_range_inclusion(
1566                &mut hasher,
1567                &elements[range.to_usize_range()],
1568                range.start,
1569                &expected_root // Compare to historical (reference) root
1570            ));
1571
1572            ref_mmr.destroy().await.unwrap();
1573            mmr.destroy().await.unwrap();
1574        });
1575    }
1576
1577    #[test_traced]
1578    fn test_journaled_mmr_historical_range_proof_singleton() {
1579        let executor = deterministic::Runner::default();
1580        executor.start(|context| async move {
1581            let mut hasher = Standard::<Sha256>::new();
1582            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1583                .await
1584                .unwrap()
1585                .into_dirty();
1586
1587            let element = test_digest(0);
1588            mmr.add(&mut hasher, &element).await.unwrap();
1589            let mmr = mmr.merkleize(&mut hasher);
1590
1591            // Test single element proof at historical position
1592            let single_proof = mmr
1593                .historical_range_proof(
1594                    Location::new_unchecked(1),
1595                    Location::new_unchecked(0)..Location::new_unchecked(1),
1596                )
1597                .await
1598                .unwrap();
1599
1600            let root = mmr.root();
1601            assert!(single_proof.verify_range_inclusion(
1602                &mut hasher,
1603                &[element],
1604                Location::new_unchecked(0),
1605                &root
1606            ));
1607
1608            mmr.destroy().await.unwrap();
1609        });
1610    }
1611
1612    // Test `init_sync` when there is no persisted data.
1613    #[test_traced]
1614    fn test_journaled_mmr_init_sync_empty() {
1615        let executor = deterministic::Runner::default();
1616        executor.start(|context| async move {
1617            let mut hasher = Standard::<Sha256>::new();
1618
1619            // Test fresh start scenario with completely new MMR (no existing data)
1620            let sync_cfg = SyncConfig::<sha256::Digest> {
1621                config: test_config(),
1622                range: Position::new(0)..Position::new(100),
1623                pinned_nodes: None,
1624            };
1625
1626            let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1627                .await
1628                .unwrap();
1629
1630            // Should be fresh MMR starting empty
1631            assert_eq!(sync_mmr.size(), 0);
1632            let bounds = sync_mmr.bounds();
1633            assert_eq!(bounds.start, 0);
1634            assert!(bounds.is_empty());
1635
1636            // Should be able to add new elements
1637            let new_element = test_digest(999);
1638            let mut sync_mmr = sync_mmr.into_dirty();
1639            sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1640            let sync_mmr = sync_mmr.merkleize(&mut hasher);
1641
1642            // Root should be computable
1643            let _root = sync_mmr.root();
1644
1645            sync_mmr.destroy().await.unwrap();
1646        });
1647    }
1648
1649    // Test `init_sync` where the persisted MMR's persisted nodes match the sync boundaries.
1650    #[test_traced]
1651    fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1652        let executor = deterministic::Runner::default();
1653        executor.start(|context| async move {
1654            let mut hasher = Standard::<Sha256>::new();
1655
1656            // Create initial MMR with elements.
1657            let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1658                .await
1659                .unwrap();
1660            let mut mmr = mmr.into_dirty();
1661            for i in 0..50 {
1662                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1663            }
1664            let mut mmr = mmr.merkleize(&mut hasher);
1665            mmr.sync().await.unwrap();
1666            let original_size = mmr.size();
1667            let original_leaves = mmr.leaves();
1668            let original_root = mmr.root();
1669
1670            // Sync with range.start <= existing_size <= range.end should reuse data
1671            let lower_bound_pos = mmr.bounds().start;
1672            let upper_bound_pos = mmr.size();
1673            let mut expected_nodes = BTreeMap::new();
1674            for i in *lower_bound_pos..*upper_bound_pos {
1675                expected_nodes.insert(
1676                    Position::new(i),
1677                    mmr.get_node(Position::new(i)).await.unwrap().unwrap(),
1678                );
1679            }
1680            let sync_cfg = SyncConfig::<sha256::Digest> {
1681                config: test_config(),
1682                range: lower_bound_pos..upper_bound_pos,
1683                pinned_nodes: None,
1684            };
1685
1686            mmr.sync().await.unwrap();
1687            drop(mmr);
1688
1689            let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1690                .await
1691                .unwrap();
1692
1693            // Should have existing data in the sync range.
1694            assert_eq!(sync_mmr.size(), original_size);
1695            assert_eq!(sync_mmr.leaves(), original_leaves);
1696            let bounds = sync_mmr.bounds();
1697            assert_eq!(bounds.start, lower_bound_pos);
1698            assert!(!bounds.is_empty());
1699            assert_eq!(sync_mmr.root(), original_root);
1700            for pos in *lower_bound_pos..*upper_bound_pos {
1701                let pos = Position::new(pos);
1702                assert_eq!(
1703                    sync_mmr.get_node(pos).await.unwrap(),
1704                    expected_nodes.get(&pos).cloned()
1705                );
1706            }
1707
1708            sync_mmr.destroy().await.unwrap();
1709        });
1710    }
1711
1712    // Test `init_sync` where the persisted MMR's data partially overlaps with the sync boundaries.
1713    #[test_traced]
1714    fn test_journaled_mmr_init_sync_partial_overlap() {
1715        let executor = deterministic::Runner::default();
1716        executor.start(|context| async move {
1717            let mut hasher = Standard::<Sha256>::new();
1718
1719            // Create initial MMR with elements.
1720            let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1721                .await
1722                .unwrap();
1723            let mut mmr = mmr.into_dirty();
1724            for i in 0..30 {
1725                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1726            }
1727            let mut mmr = mmr.merkleize(&mut hasher);
1728            mmr.sync().await.unwrap();
1729            mmr.prune_to_pos(Position::new(10)).await.unwrap();
1730
1731            let original_size = mmr.size();
1732            let original_root = mmr.root();
1733            let original_pruned_to = mmr.bounds().start;
1734
1735            // Sync with boundaries that extend beyond existing data (partial overlap).
1736            let lower_bound_pos = original_pruned_to;
1737            let upper_bound_pos = original_size + 11; // Extend beyond existing data
1738
1739            let mut expected_nodes = BTreeMap::new();
1740            for pos in *lower_bound_pos..*original_size {
1741                let pos = Position::new(pos);
1742                expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
1743            }
1744
1745            let sync_cfg = SyncConfig::<sha256::Digest> {
1746                config: test_config(),
1747                range: lower_bound_pos..upper_bound_pos,
1748                pinned_nodes: None,
1749            };
1750
1751            mmr.sync().await.unwrap();
1752            drop(mmr);
1753
1754            let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1755                .await
1756                .unwrap();
1757
1758            // Should have existing data in the overlapping range.
1759            assert_eq!(sync_mmr.size(), original_size);
1760            let bounds = sync_mmr.bounds();
1761            assert_eq!(bounds.start, lower_bound_pos);
1762            assert!(!bounds.is_empty());
1763            assert_eq!(sync_mmr.root(), original_root);
1764
1765            // Check that existing nodes are preserved in the overlapping range.
1766            for pos in *lower_bound_pos..*original_size {
1767                let pos = Position::new(pos);
1768                assert_eq!(
1769                    sync_mmr.get_node(pos).await.unwrap(),
1770                    expected_nodes.get(&pos).cloned()
1771                );
1772            }
1773
1774            sync_mmr.destroy().await.unwrap();
1775        });
1776    }
1777
1778    // Regression test that MMR init() handles stale metadata (lower pruning boundary than journal).
1779    // Before the fix, this would panic with an assertion failure. After the fix, it returns a
1780    // MissingNode error (which is expected when metadata is corrupted and pinned nodes are lost).
1781    #[test_traced("WARN")]
1782    fn test_journaled_mmr_init_stale_metadata_returns_error() {
1783        let executor = deterministic::Runner::default();
1784        executor.start(|context| async move {
1785            let mut hasher = Standard::<Sha256>::new();
1786
1787            // Create an MMR with some data and prune it
1788            let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1789                .await
1790                .unwrap();
1791
1792            // Add 50 elements
1793            let mut mmr = mmr.into_dirty();
1794            for i in 0..50 {
1795                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1796            }
1797            let mut mmr = mmr.merkleize(&mut hasher);
1798            mmr.sync().await.unwrap();
1799
1800            // Prune to position 20 (this stores pinned nodes in metadata for position 20)
1801            let prune_pos = Position::new(20);
1802            mmr.prune_to_pos(prune_pos).await.unwrap();
1803            drop(mmr);
1804
1805            // Tamper with metadata to have a stale (lower) pruning boundary
1806            let meta_cfg = MConfig {
1807                partition: test_config().metadata_partition,
1808                codec_config: ((0..).into(), ()),
1809            };
1810            let mut metadata =
1811                Metadata::<_, U64, Vec<u8>>::init(context.with_label("meta_tamper"), meta_cfg)
1812                    .await
1813                    .unwrap();
1814
1815            // Set pruning boundary to 0 (stale)
1816            let key = U64::new(PRUNE_TO_POS_PREFIX, 0);
1817            metadata.put(key, 0u64.to_be_bytes().to_vec());
1818            metadata.sync().await.unwrap();
1819            drop(metadata);
1820
1821            // Reopen the MMR - before the fix, this would panic with assertion failure
1822            // After the fix, it returns MissingNode error (pinned nodes for the lower
1823            // boundary don't exist since they were pruned from journal and weren't
1824            // stored in metadata at the lower position)
1825            let result = CleanMmr::<_, Digest>::init(
1826                context.with_label("reopened"),
1827                &mut hasher,
1828                test_config(),
1829            )
1830            .await;
1831
1832            match result {
1833                Err(Error::MissingNode(_)) => {} // expected
1834                Ok(_) => panic!("expected MissingNode error, got Ok"),
1835                Err(e) => panic!("expected MissingNode error, got {:?}", e),
1836            }
1837        });
1838    }
1839
1840    // Test that MMR init() handles the case where metadata pruning boundary is ahead
1841    // of journal (crashed before journal prune completed). This should successfully
1842    // prune the journal to match metadata.
1843    #[test_traced("WARN")]
1844    fn test_journaled_mmr_init_metadata_ahead() {
1845        let executor = deterministic::Runner::default();
1846        executor.start(|context| async move {
1847            let mut hasher = Standard::<Sha256>::new();
1848
1849            // Create an MMR with some data
1850            let mut mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1851                .await
1852                .unwrap()
1853                .into_dirty();
1854
1855            // Add 50 elements
1856            for i in 0..50 {
1857                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1858            }
1859            let mut mmr = mmr.merkleize(&mut hasher);
1860            mmr.sync().await.unwrap();
1861
1862            // Prune to position 30 (this stores pinned nodes and updates metadata)
1863            let prune_pos = Position::new(30);
1864            mmr.prune_to_pos(prune_pos).await.unwrap();
1865            let expected_root = mmr.root();
1866            let expected_size = mmr.size();
1867            drop(mmr);
1868
1869            // Reopen the MMR - should recover correctly with metadata ahead of
1870            // journal boundary (metadata says 30, journal is section-aligned to 28)
1871            let mmr = Mmr::init(context.with_label("reopened"), &mut hasher, test_config())
1872                .await
1873                .unwrap();
1874
1875            assert_eq!(mmr.bounds().start, prune_pos);
1876            assert_eq!(mmr.size(), expected_size);
1877            assert_eq!(mmr.root(), expected_root);
1878
1879            mmr.destroy().await.unwrap();
1880        });
1881    }
1882
1883    // Regression test: init_sync must compute pinned nodes BEFORE pruning the journal. Previously,
1884    // init_sync would prune the journal first, then try to read pinned nodes from the pruned
1885    // positions, causing MissingNode errors.
1886    //
1887    // Key setup: We create an MMR with data but DON'T prune it, so the metadata has no pinned
1888    // nodes. Then init_sync must read pinned nodes from the journal before pruning it.
1889    #[test_traced]
1890    fn test_journaled_mmr_init_sync_computes_pinned_nodes_before_pruning() {
1891        let executor = deterministic::Runner::default();
1892        executor.start(|context| async move {
1893            let mut hasher = Standard::<Sha256>::new();
1894
1895            // Use small items_per_blob to create many sections and trigger pruning.
1896            let cfg = Config {
1897                journal_partition: "mmr_journal".to_string(),
1898                metadata_partition: "mmr_metadata".to_string(),
1899                items_per_blob: NZU64!(7),
1900                write_buffer: NZUsize!(64),
1901                thread_pool: None,
1902                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1903            };
1904
1905            // Create MMR with enough elements to span multiple sections.
1906            let mmr = Mmr::init(context.with_label("init"), &mut hasher, cfg.clone())
1907                .await
1908                .unwrap();
1909            let mut mmr = mmr.into_dirty();
1910            for i in 0..100 {
1911                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1912            }
1913            let mut mmr = mmr.merkleize(&mut hasher);
1914            mmr.sync().await.unwrap();
1915
1916            // Don't prune - this ensures metadata has no pinned nodes. init_sync will need to
1917            // read pinned nodes from the journal.
1918            let original_size = mmr.size();
1919            let original_root = mmr.root();
1920            drop(mmr);
1921
1922            // Reopen via init_sync with range.start > 0. This will prune the journal, so
1923            // init_sync must read pinned nodes BEFORE pruning or they'll be lost.
1924            let prune_pos = Position::new(50);
1925            let sync_cfg = SyncConfig::<sha256::Digest> {
1926                config: cfg,
1927                range: prune_pos..Position::new(200),
1928                pinned_nodes: None, // Force init_sync to compute pinned nodes from journal
1929            };
1930
1931            let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1932                .await
1933                .unwrap();
1934
1935            // Verify the MMR state is correct.
1936            assert_eq!(sync_mmr.size(), original_size);
1937            assert_eq!(sync_mmr.root(), original_root);
1938            assert_eq!(sync_mmr.bounds().start, prune_pos);
1939
1940            sync_mmr.destroy().await.unwrap();
1941        });
1942    }
1943}