commonware_storage/mmr/
journaled.rs

1//! An MMR backed by a fixed-item-length journal.
2//!
3//! A [crate::journal] is used to store all unpruned MMR nodes, and a [crate::metadata] store is
4//! used to preserve digests required for root and proof generation that would have otherwise been
5//! pruned.
6
7use crate::{
8    journal::{
9        contiguous::fixed::{Config as JConfig, Journal},
10        Error as JError,
11    },
12    metadata::{Config as MConfig, Metadata},
13    mmr::{
14        hasher::Hasher,
15        iterator::{nodes_to_pin, PeakIterator},
16        location::Location,
17        mem::{
18            Clean, CleanMmr as CleanMemMmr, Config as MemConfig, Dirty, DirtyMmr as DirtyMemMmr,
19            Mmr as MemMmr, State,
20        },
21        position::Position,
22        storage::Storage,
23        verification,
24        Error::{self, *},
25        Proof,
26    },
27    qmdb::any::unordered::sync::{init_journal, init_journal_at_size},
28};
29use commonware_codec::DecodeExt;
30use commonware_cryptography::Digest;
31use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
32use commonware_utils::sequence::prefixed_u64::U64;
33use core::ops::Range;
34use std::{
35    collections::BTreeMap,
36    num::{NonZeroU64, NonZeroUsize},
37};
38use tracing::{debug, error, warn};
39
40pub type DirtyMmr<E, D> = Mmr<E, D, Dirty>;
41pub type CleanMmr<E, D> = Mmr<E, D, Clean<D>>;
42
43/// Configuration for a journal-backed MMR.
44#[derive(Clone)]
45pub struct Config {
46    /// The name of the `commonware-runtime::Storage` storage partition used for the journal storing
47    /// the MMR nodes.
48    pub journal_partition: String,
49
50    /// The name of the `commonware-runtime::Storage` storage partition used for the metadata
51    /// containing pruned MMR nodes that are still required to calculate the root and generate
52    /// proofs.
53    pub metadata_partition: String,
54
55    /// The maximum number of items to store in each blob in the backing journal.
56    pub items_per_blob: NonZeroU64,
57
58    /// The size of the write buffer to use for each blob in the backing journal.
59    pub write_buffer: NonZeroUsize,
60
61    /// Optional thread pool to use for parallelizing batch operations.
62    pub thread_pool: Option<ThreadPool>,
63
64    /// The buffer pool to use for caching data.
65    pub buffer_pool: PoolRef,
66}
67
68/// Configuration for initializing a journaled MMR for synchronization.
69///
70/// Determines how to handle existing persistent data based on sync boundaries:
71/// - **Fresh Start**: Existing data < range start → discard and start fresh
72/// - **Prune and Reuse**: range contains existing data → prune and reuse
73/// - **Prune and Rewind**: existing data > range end → prune and rewind to range end
74pub struct SyncConfig<D: Digest> {
75    /// Base MMR configuration (journal, metadata, etc.)
76    pub config: Config,
77
78    /// Sync range - nodes outside this range are pruned/rewound.
79    pub range: std::ops::Range<Position>,
80
81    /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order
82    /// specified by `nodes_to_pin`. If `None`, the pinned nodes are expected to already be in the
83    /// MMR's metadata/journal.
84    pub pinned_nodes: Option<Vec<D>>,
85}
86
87/// A MMR backed by a fixed-item-length journal.
88pub struct Mmr<E: RStorage + Clock + Metrics, D: Digest, S: State<D> = Dirty> {
89    /// A memory resident MMR used to build the MMR structure and cache updates. It caches all
90    /// un-synced nodes, and the pinned node set as derived from both its own pruning boundary and
91    /// the journaled MMR's pruning boundary.
92    mem_mmr: MemMmr<D, S>,
93
94    /// Stores all unpruned MMR nodes.
95    journal: Journal<E, D>,
96
97    /// The size of the journal irrespective of any pruned nodes or any un-synced nodes currently
98    /// cached in the memory resident MMR.
99    journal_size: Position,
100
101    /// Stores all "pinned nodes" (pruned nodes required for proving & root generation) for the MMR,
102    /// and the corresponding pruning boundary used to generate them. The metadata remains empty
103    /// until pruning is invoked, and its contents change only when the pruning boundary moves.
104    metadata: Metadata<E, U64, Vec<u8>>,
105
106    /// The highest position for which this MMR has been pruned, or 0 if this MMR has never been
107    /// pruned.
108    pruned_to_pos: Position,
109
110    /// The thread pool to use for parallelization.
111    pool: Option<ThreadPool>,
112}
113
114impl<E: RStorage + Clock + Metrics, D: Digest> From<CleanMmr<E, D>> for DirtyMmr<E, D> {
115    fn from(clean: Mmr<E, D, Clean<D>>) -> Self {
116        Self {
117            mem_mmr: clean.mem_mmr.into(),
118            journal: clean.journal,
119            journal_size: clean.journal_size,
120            metadata: clean.metadata,
121            pruned_to_pos: clean.pruned_to_pos,
122            pool: clean.pool,
123        }
124    }
125}
126
127/// Prefix used for nodes in the metadata prefixed U8 key.
128const NODE_PREFIX: u8 = 0;
129
130/// Prefix used for the key storing the prune_to_pos position in the metadata.
131const PRUNE_TO_POS_PREFIX: u8 = 1;
132
133impl<E: RStorage + Clock + Metrics, D: Digest, S: State<D>> Mmr<E, D, S> {
134    /// Return the total number of nodes in the MMR, irrespective of any pruning. The next added
135    /// element's position will have this value.
136    pub fn size(&self) -> Position {
137        self.mem_mmr.size()
138    }
139
140    /// Return the total number of leaves in the MMR.
141    pub fn leaves(&self) -> Location {
142        self.mem_mmr.leaves()
143    }
144
145    /// Return the position of the last leaf in this MMR, or None if the MMR is empty.
146    pub fn last_leaf_pos(&self) -> Option<Position> {
147        self.mem_mmr.last_leaf_pos()
148    }
149
150    /// Attempt to get a node from the metadata, with fallback to journal lookup if it fails.
151    /// Assumes the node should exist in at least one of these sources and returns a `MissingNode`
152    /// error otherwise.
153    async fn get_from_metadata_or_journal(
154        metadata: &Metadata<E, U64, Vec<u8>>,
155        journal: &Journal<E, D>,
156        pos: Position,
157    ) -> Result<D, Error> {
158        if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
159            debug!(?pos, "read node from metadata");
160            let digest = D::decode(bytes.as_ref());
161            let Ok(digest) = digest else {
162                error!(
163                    ?pos,
164                    err = %digest.expect_err("digest is Err in else branch"),
165                    "could not convert node from metadata bytes to digest"
166                );
167                return Err(Error::MissingNode(pos));
168            };
169            return Ok(digest);
170        }
171
172        // If a node isn't found in the metadata, it might still be in the journal.
173        debug!(?pos, "reading node from journal");
174        let node = journal.read(*pos).await;
175        match node {
176            Ok(node) => Ok(node),
177            Err(JError::ItemPruned(_)) => {
178                error!(?pos, "node is missing from metadata and journal");
179                Err(Error::MissingNode(pos))
180            }
181            Err(e) => Err(Error::JournalError(e)),
182        }
183    }
184
185    /// Add an element to the MMR and return its position in the MMR. Elements added to the MMR
186    /// aren't persisted to disk until `sync` is called.
187    pub async fn add(&mut self, h: &mut impl Hasher<D>, element: &[u8]) -> Result<Position, Error> {
188        Ok(self.mem_mmr.add(h, element))
189    }
190
191    /// The highest position for which this MMR has been pruned, or 0 if this MMR has never been
192    /// pruned.
193    pub const fn pruned_to_pos(&self) -> Position {
194        self.pruned_to_pos
195    }
196
197    /// Return the position of the oldest retained node in the MMR, not including pinned nodes.
198    pub fn oldest_retained_pos(&self) -> Option<Position> {
199        if self.pruned_to_pos == self.size() {
200            return None;
201        }
202
203        Some(self.pruned_to_pos)
204    }
205
206    /// Adds the pinned nodes based on `prune_pos` to `mem_mmr`.
207    async fn add_extra_pinned_nodes(
208        mem_mmr: &mut MemMmr<D, S>,
209        metadata: &Metadata<E, U64, Vec<u8>>,
210        journal: &Journal<E, D>,
211        prune_pos: Position,
212    ) -> Result<(), Error> {
213        let mut pinned_nodes = BTreeMap::new();
214        for pos in nodes_to_pin(prune_pos) {
215            let digest =
216                Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(metadata, journal, pos).await?;
217            pinned_nodes.insert(pos, digest);
218        }
219        mem_mmr.add_pinned_nodes(pinned_nodes);
220
221        Ok(())
222    }
223}
224
225impl<E: RStorage + Clock + Metrics, D: Digest> CleanMmr<E, D> {
226    /// Initialize a new journaled MMR from an MMR's size and set of pinned nodes.
227    ///
228    /// This creates a journaled MMR that appears to have `mmr_size` elements, all of which
229    /// are pruned, leaving only the minimal set of `pinned_nodes` required for proof generation.
230    /// The next element added will be at position `mmr_size`.
231    ///
232    /// The returned MMR is functionally equivalent to a journaled MMR that was created,
233    /// populated, and then pruned up to its size.
234    ///
235    /// # Arguments
236    /// * `context` - Storage context
237    /// * `pinned_nodes` - Digest values in the order returned by `nodes_to_pin(mmr_size)`
238    /// * `mmr_size` - The logical size of the MMR (all elements before this are considered pruned)
239    /// * `config` - Journaled MMR configuration. Any data in the given journal and metadata
240    ///   partitions will be overwritten.
241    pub async fn init_from_pinned_nodes(
242        context: E,
243        pinned_nodes: Vec<D>,
244        mmr_size: Position,
245        config: Config,
246        hasher: &mut impl Hasher<D>,
247    ) -> Result<Self, Error> {
248        // Destroy any existing journal data
249        context.remove(&config.journal_partition, None).await.ok();
250        context.remove(&config.metadata_partition, None).await.ok();
251
252        // Create the journal with the desired size
253        let journal_cfg = JConfig {
254            partition: config.journal_partition.clone(),
255            items_per_blob: config.items_per_blob,
256            buffer_pool: config.buffer_pool.clone(),
257            write_buffer: config.write_buffer,
258        };
259        let journal =
260            init_journal_at_size(context.with_label("mmr_journal"), journal_cfg, *mmr_size).await?;
261
262        // Initialize metadata
263        let metadata_cfg = MConfig {
264            partition: config.metadata_partition.clone(),
265            codec_config: ((0..).into(), ()),
266        };
267        let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
268
269        // Store the pruning boundary in metadata
270        let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
271        metadata.put(pruning_boundary_key, mmr_size.to_be_bytes().into());
272
273        // Store the pinned nodes in metadata
274        let nodes_to_pin_positions = nodes_to_pin(mmr_size);
275        for (pos, digest) in nodes_to_pin_positions.zip(pinned_nodes.iter()) {
276            metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
277        }
278
279        // Sync metadata to disk
280        metadata.sync().await.map_err(Error::MetadataError)?;
281
282        // Create in-memory MMR in fully pruned state
283        let mem_mmr = MemMmr::init(
284            MemConfig {
285                nodes: vec![],
286                pruned_to_pos: mmr_size,
287                pinned_nodes,
288            },
289            hasher,
290        )?;
291
292        Ok(Self {
293            mem_mmr,
294            journal,
295            journal_size: mmr_size,
296            metadata,
297            pruned_to_pos: mmr_size,
298            pool: config.thread_pool,
299        })
300    }
301
302    /// Initialize a new `Mmr` instance.
303    pub async fn init(context: E, hasher: &mut impl Hasher<D>, cfg: Config) -> Result<Self, Error> {
304        let journal_cfg = JConfig {
305            partition: cfg.journal_partition,
306            items_per_blob: cfg.items_per_blob,
307            buffer_pool: cfg.buffer_pool,
308            write_buffer: cfg.write_buffer,
309        };
310        let mut journal =
311            Journal::<E, D>::init(context.with_label("mmr_journal"), journal_cfg).await?;
312        let mut journal_size = Position::new(journal.size());
313
314        let metadata_cfg = MConfig {
315            partition: cfg.metadata_partition,
316            codec_config: ((0..).into(), ()),
317        };
318        let metadata =
319            Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
320                .await?;
321
322        if journal_size == 0 {
323            let mem_mmr = MemMmr::init(
324                MemConfig {
325                    nodes: vec![],
326                    pruned_to_pos: Position::new(0),
327                    pinned_nodes: vec![],
328                },
329                hasher,
330            )?;
331            return Ok(Self {
332                mem_mmr,
333                journal,
334                journal_size,
335                metadata,
336                pruned_to_pos: Position::new(0),
337                pool: cfg.thread_pool,
338            });
339        }
340
341        // Make sure the journal's oldest retained node is as expected based on the last pruning
342        // boundary stored in metadata. If they don't match, prune the journal to the appropriate
343        // location.
344        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
345        let metadata_prune_pos = metadata.get(&key).map_or(0, |bytes| {
346            u64::from_be_bytes(
347                bytes
348                    .as_slice()
349                    .try_into()
350                    .expect("metadata prune position is not 8 bytes"),
351            )
352        });
353        let oldest_retained_pos = journal.oldest_retained_pos().unwrap_or(0);
354        if metadata_prune_pos != oldest_retained_pos {
355            assert!(metadata_prune_pos >= oldest_retained_pos);
356            // These positions may differ only due to blob boundary alignment, so this case isn't
357            // unusual.
358            journal.prune(metadata_prune_pos).await?;
359            if journal.oldest_retained_pos().unwrap_or(0) != oldest_retained_pos {
360                // This should only happen in the event of some failure during the last attempt to
361                // prune the journal.
362                warn!(
363                    oldest_retained_pos,
364                    metadata_prune_pos, "journal pruned to match metadata"
365                );
366            }
367        }
368
369        let last_valid_size = PeakIterator::to_nearest_size(journal_size);
370        let mut orphaned_leaf: Option<D> = None;
371        if last_valid_size != journal_size {
372            warn!(
373                ?last_valid_size,
374                "encountered invalid MMR structure, recovering from last valid size"
375            );
376            // Check if there is an intact leaf following the last valid size, from which we can
377            // recover its missing parents.
378            let recovered_item = journal.read(*last_valid_size).await;
379            if let Ok(item) = recovered_item {
380                orphaned_leaf = Some(item);
381            }
382            journal.rewind(*last_valid_size).await?;
383            journal.sync().await?;
384            journal_size = last_valid_size
385        }
386
387        // Initialize the mem_mmr in the "prune_all" state.
388        let mut pinned_nodes = Vec::new();
389        for pos in nodes_to_pin(journal_size) {
390            let digest =
391                Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
392            pinned_nodes.push(digest);
393        }
394        let mut mem_mmr = MemMmr::init(
395            MemConfig {
396                nodes: vec![],
397                pruned_to_pos: journal_size,
398                pinned_nodes,
399            },
400            hasher,
401        )?;
402        let prune_pos = Position::new(metadata_prune_pos);
403        Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, prune_pos).await?;
404
405        let mut s = Self {
406            mem_mmr,
407            journal,
408            journal_size,
409            metadata,
410            pruned_to_pos: prune_pos,
411            pool: cfg.thread_pool,
412        };
413
414        if let Some(leaf) = orphaned_leaf {
415            // Recover the orphaned leaf and any missing parents.
416            let pos = s.mem_mmr.size();
417            warn!(?pos, "recovering orphaned leaf");
418            s.mem_mmr.add_leaf_digest(hasher, leaf);
419            assert_eq!(pos, journal_size);
420            s.sync().await?;
421            assert_eq!(s.size(), s.journal.size());
422        }
423
424        Ok(s)
425    }
426
427    /// Initialize an MMR for synchronization, reusing existing data if possible.
428    ///
429    /// Handles three sync scenarios based on existing journal data vs. the given sync boundaries.
430    ///
431    /// 1. **Fresh Start**: existing_size < range.start
432    ///    - Deletes existing data (if any)
433    ///    - Creates new [Journal] with pruning boundary and size `range.start`
434    ///
435    /// 2. **Prune and Reuse**: range.start ≤ existing_size ≤ range.end
436    ///    - Sets in-memory MMR size to `existing_size`
437    ///    - Prunes the journal to `range.start`
438    ///
439    /// 3. **Prune and Rewind**: existing_size > range.end
440    ///    - Rewinds the journal to size `range.end`
441    ///    - Sets in-memory MMR size to `range.end`
442    ///    - Prunes the journal to `range.start`
443    pub async fn init_sync(
444        context: E,
445        cfg: SyncConfig<D>,
446        hasher: &mut impl Hasher<D>,
447    ) -> Result<Self, crate::qmdb::Error> {
448        let journal = init_journal(
449            context.with_label("mmr_journal"),
450            JConfig {
451                partition: cfg.config.journal_partition,
452                items_per_blob: cfg.config.items_per_blob,
453                write_buffer: cfg.config.write_buffer,
454                buffer_pool: cfg.config.buffer_pool.clone(),
455            },
456            *cfg.range.start..*cfg.range.end,
457        )
458        .await?;
459        let journal_size = Position::new(journal.size());
460        assert!(journal_size <= *cfg.range.end);
461
462        // Open the metadata.
463        let metadata_cfg = MConfig {
464            partition: cfg.config.metadata_partition,
465            codec_config: ((0..).into(), ()),
466        };
467        let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
468
469        // Write the pruning boundary.
470        let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
471        metadata.put(
472            pruning_boundary_key,
473            (*cfg.range.start).to_be_bytes().into(),
474        );
475
476        // Write the required pinned nodes to metadata.
477        if let Some(pinned_nodes) = cfg.pinned_nodes {
478            let nodes_to_pin_persisted = nodes_to_pin(cfg.range.start);
479            for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
480                metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
481            }
482        }
483
484        // Create the in-memory MMR with the pinned nodes required for its size.
485        let nodes_to_pin_mem = nodes_to_pin(journal_size);
486        let mut mem_pinned_nodes = Vec::new();
487        for pos in nodes_to_pin_mem {
488            let digest =
489                Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
490            mem_pinned_nodes.push(digest);
491        }
492        let mut mem_mmr = MemMmr::init(
493            MemConfig {
494                nodes: vec![],
495                pruned_to_pos: journal_size,
496                pinned_nodes: mem_pinned_nodes,
497            },
498            hasher,
499        )?;
500
501        // Add the additional pinned nodes required for the pruning boundary, if applicable.
502        if cfg.range.start < journal_size {
503            Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.range.start)
504                .await?;
505        }
506        metadata.sync().await?;
507
508        Ok(Self {
509            mem_mmr,
510            journal,
511            journal_size,
512            metadata,
513            pruned_to_pos: cfg.range.start,
514            pool: cfg.config.thread_pool,
515        })
516    }
517
518    /// Compute and add required nodes for the given pruning point to the metadata, and write it to
519    /// disk. Return the computed set of required nodes.
520    async fn update_metadata(
521        &mut self,
522        prune_to_pos: Position,
523    ) -> Result<BTreeMap<Position, D>, Error> {
524        assert!(prune_to_pos >= self.pruned_to_pos);
525
526        let mut pinned_nodes = BTreeMap::new();
527        for pos in nodes_to_pin(prune_to_pos) {
528            let digest = self.get_node(pos).await?.expect(
529                "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
530            );
531            self.metadata
532                .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
533            pinned_nodes.insert(pos, digest);
534        }
535
536        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
537        self.metadata.put(key, (*prune_to_pos).to_be_bytes().into());
538
539        self.metadata.sync().await.map_err(Error::MetadataError)?;
540
541        Ok(pinned_nodes)
542    }
543
544    pub async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
545        if let Some(node) = self.mem_mmr.get_node(position) {
546            return Ok(Some(node));
547        }
548
549        match self.journal.read(*position).await {
550            Ok(item) => Ok(Some(item)),
551            Err(JError::ItemPruned(_)) => Ok(None),
552            Err(e) => Err(Error::JournalError(e)),
553        }
554    }
555
556    /// Sync the MMR to disk.
557    pub async fn sync(&mut self) -> Result<(), Error> {
558        // Write the nodes cached in the memory-resident MMR to the journal.
559        for pos in *self.journal_size..*self.size() {
560            let pos = Position::new(pos);
561            let node = *self.mem_mmr.get_node_unchecked(pos);
562            self.journal.append(node).await?;
563        }
564        self.journal_size = self.size();
565        self.journal.sync().await?;
566        assert_eq!(self.journal_size, self.journal.size());
567
568        // Recompute pinned nodes since we'll need to repopulate the cache after it is cleared by
569        // pruning the mem_mmr.
570        let mut pinned_nodes = BTreeMap::new();
571        for pos in nodes_to_pin(self.pruned_to_pos) {
572            let digest = self.mem_mmr.get_node_unchecked(pos);
573            pinned_nodes.insert(pos, *digest);
574        }
575
576        // Now that the pinned node set has been recomputed, it's safe to prune the mem_mmr and
577        // reinstate them.
578        self.mem_mmr.prune_all();
579        self.mem_mmr.add_pinned_nodes(pinned_nodes);
580
581        Ok(())
582    }
583
584    /// Prune all nodes up to but not including the given position and update the pinned nodes.
585    ///
586    /// This implementation ensures that no failure can leave the MMR in an unrecoverable state,
587    /// requiring it sync the MMR to write any potential unmerkleized updates.
588    pub async fn prune_to_pos(&mut self, pos: Position) -> Result<(), Error> {
589        assert!(pos <= self.size());
590        if pos <= self.pruned_to_pos {
591            return Ok(());
592        }
593
594        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
595        self.sync().await?;
596
597        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
598        // event of a pruning failure.
599        let pinned_nodes = self.update_metadata(pos).await?;
600
601        self.journal.prune(*pos).await?;
602        self.mem_mmr.add_pinned_nodes(pinned_nodes);
603        self.pruned_to_pos = pos;
604
605        Ok(())
606    }
607
608    /// Pop the given number of elements from the tip of the MMR assuming they exist, and otherwise
609    /// return Empty or ElementPruned errors. The backing journal is synced to disk before
610    /// returning.
611    pub async fn pop(
612        &mut self,
613        hasher: &mut impl Hasher<D>,
614        mut leaves_to_pop: usize,
615    ) -> Result<(), Error> {
616        // See if the elements are still cached in which case we can just pop them from the in-mem
617        // MMR.
618        while leaves_to_pop > 0 {
619            match self.mem_mmr.pop(hasher) {
620                Ok(_) => {
621                    leaves_to_pop -= 1;
622                }
623                Err(ElementPruned(_)) => break,
624                Err(Empty) => {
625                    return Err(Error::Empty);
626                }
627                _ => unreachable!(),
628            }
629        }
630        if leaves_to_pop == 0 {
631            return Ok(());
632        }
633
634        let mut new_size = self.size();
635        while leaves_to_pop > 0 {
636            if new_size == 0 {
637                return Err(Error::Empty);
638            }
639            new_size -= 1;
640            if new_size < self.pruned_to_pos {
641                return Err(Error::ElementPruned(new_size));
642            }
643            if new_size.is_mmr_size() {
644                leaves_to_pop -= 1;
645            }
646        }
647
648        self.journal.rewind(*new_size).await?;
649        self.journal.sync().await?;
650        self.journal_size = new_size;
651
652        // Reset the mem_mmr to one of the new_size in the "prune_all" state.
653        let mut pinned_nodes = Vec::new();
654        for pos in nodes_to_pin(new_size) {
655            let digest =
656                Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?;
657            pinned_nodes.push(digest);
658        }
659
660        self.mem_mmr = CleanMemMmr::from_components(hasher, vec![], new_size, pinned_nodes);
661        Self::add_extra_pinned_nodes(
662            &mut self.mem_mmr,
663            &self.metadata,
664            &self.journal,
665            self.pruned_to_pos,
666        )
667        .await?;
668
669        Ok(())
670    }
671
672    /// Return the root of the MMR.
673    pub const fn root(&self) -> D {
674        *self.mem_mmr.root()
675    }
676
677    /// Return an inclusion proof for the element at the location `loc`.
678    ///
679    /// # Errors
680    ///
681    /// Returns [Error::LocationOverflow] if `loc` exceeds [crate::mmr::MAX_LOCATION].
682    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
683    /// Returns [Error::Empty] if the range is empty.
684    pub async fn proof(&self, loc: Location) -> Result<Proof<D>, Error> {
685        if !loc.is_valid() {
686            return Err(Error::LocationOverflow(loc));
687        }
688        // loc is valid so it won't overflow from + 1
689        self.range_proof(loc..loc + 1).await
690    }
691
692    /// Return an inclusion proof for the elements within the specified location range.
693    ///
694    /// Locations are validated by [verification::range_proof].
695    ///
696    /// # Errors
697    ///
698    /// Returns [Error::LocationOverflow] if any location in `range` exceeds [crate::mmr::MAX_LOCATION].
699    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
700    /// Returns [Error::Empty] if the range is empty.
701    pub async fn range_proof(&self, range: Range<Location>) -> Result<Proof<D>, Error> {
702        verification::range_proof(self, range).await
703    }
704
705    /// Analogous to range_proof but for a previous database state. Specifically, the state when the
706    /// MMR had `size` nodes.
707    ///
708    /// Locations are validated by [verification::historical_range_proof].
709    ///
710    /// # Errors
711    ///
712    /// Returns [Error::LocationOverflow] if any location in `range` exceeds [crate::mmr::MAX_LOCATION].
713    /// Returns [Error::ElementPruned] if some element needed to generate the proof has been pruned.
714    /// Returns [Error::Empty] if the range is empty.
715    pub async fn historical_range_proof(
716        &self,
717        size: Position,
718        range: Range<Location>,
719    ) -> Result<Proof<D>, Error> {
720        verification::historical_range_proof(self, size, range).await
721    }
722
723    /// Prune as many nodes as possible, leaving behind at most items_per_blob nodes in the current
724    /// blob.
725    pub async fn prune_all(&mut self) -> Result<(), Error> {
726        if self.size() != 0 {
727            self.prune_to_pos(self.size()).await?;
728            return Ok(());
729        }
730        Ok(())
731    }
732
733    /// Close the MMR, syncing any cached elements to disk and closing the journal.
734    pub async fn close(mut self) -> Result<(), Error> {
735        self.sync().await?;
736        self.journal.close().await?;
737        self.metadata.close().await.map_err(Error::MetadataError)
738    }
739
740    /// Close and permanently remove any disk resources.
741    pub async fn destroy(self) -> Result<(), Error> {
742        self.journal.destroy().await?;
743        self.metadata.destroy().await?;
744
745        Ok(())
746    }
747
748    /// Convert this MMR into its dirty counterpart for batched updates.
749    pub fn into_dirty(self) -> DirtyMmr<E, D> {
750        self.into()
751    }
752
753    #[cfg(any(test, feature = "fuzzing"))]
754    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
755    /// a partial write for testing failure scenarios.
756    pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error> {
757        if write_limit == 0 {
758            return Ok(());
759        }
760
761        // Write the nodes cached in the memory-resident MMR to the journal, aborting after
762        // write_count nodes have been written.
763        let mut written_count = 0usize;
764        for i in *self.journal_size..*self.size() {
765            let node = *self.mem_mmr.get_node_unchecked(Position::new(i));
766            self.journal.append(node).await?;
767            written_count += 1;
768            if written_count >= write_limit {
769                break;
770            }
771        }
772        self.journal.sync().await?;
773
774        Ok(())
775    }
776
777    #[cfg(test)]
778    pub fn get_pinned_nodes(&self) -> BTreeMap<Position, D> {
779        self.mem_mmr.pinned_nodes()
780    }
781
782    #[cfg(test)]
783    pub async fn simulate_pruning_failure(mut self, prune_to_pos: Position) -> Result<(), Error> {
784        assert!(prune_to_pos <= self.size());
785
786        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
787        self.sync().await?;
788
789        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
790        // event of a pruning failure.
791        self.update_metadata(prune_to_pos).await?;
792
793        // Don't actually prune the journal to simulate failure
794        Ok(())
795    }
796}
797
798impl<E: RStorage + Clock + Metrics, D: Digest> DirtyMmr<E, D> {
799    /// Merkleize the MMR and compute the root digest.
800    pub fn merkleize(self, h: &mut impl Hasher<D>) -> CleanMmr<E, D> {
801        CleanMmr {
802            mem_mmr: self.mem_mmr.merkleize(h, self.pool.clone()),
803            journal: self.journal,
804            journal_size: self.journal_size,
805            metadata: self.metadata,
806            pruned_to_pos: self.pruned_to_pos,
807            pool: self.pool,
808        }
809    }
810
811    /// Pop elements while staying in Dirty state. No root recomputation occurs until merkleize.
812    pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
813        while leaves_to_pop > 0 {
814            match self.mem_mmr.pop() {
815                Ok(_) => leaves_to_pop -= 1,
816                Err(ElementPruned(_)) => break,
817                Err(Empty) => return Err(Error::Empty),
818                Err(e) => return Err(e),
819            }
820        }
821        if leaves_to_pop == 0 {
822            return Ok(());
823        }
824
825        let mut new_size = self.size();
826        while leaves_to_pop > 0 {
827            if new_size == 0 {
828                return Err(Error::Empty);
829            }
830            new_size -= 1;
831            if new_size < self.pruned_to_pos {
832                return Err(Error::ElementPruned(new_size));
833            }
834            if new_size.is_mmr_size() {
835                leaves_to_pop -= 1;
836            }
837        }
838
839        self.journal.rewind(*new_size).await?;
840        self.journal.sync().await?;
841        self.journal_size = new_size;
842
843        let mut pinned_nodes = Vec::new();
844        for pos in nodes_to_pin(new_size) {
845            let digest = Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(
846                &self.metadata,
847                &self.journal,
848                pos,
849            )
850            .await?;
851            pinned_nodes.push(digest);
852        }
853
854        self.mem_mmr = DirtyMemMmr::from_components(vec![], new_size, pinned_nodes);
855        Self::add_extra_pinned_nodes(
856            &mut self.mem_mmr,
857            &self.metadata,
858            &self.journal,
859            self.pruned_to_pos,
860        )
861        .await?;
862
863        Ok(())
864    }
865
866    #[cfg(any(test, feature = "fuzzing"))]
867    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
868    /// a partial write for testing failure scenarios.
869    pub async fn simulate_partial_sync(
870        self,
871        hasher: &mut impl Hasher<D>,
872        write_limit: usize,
873    ) -> Result<(), Error> {
874        if write_limit == 0 {
875            return Ok(());
876        }
877
878        let mut clean_mmr = self.merkleize(hasher);
879
880        // Write the nodes cached in the memory-resident MMR to the journal, aborting after
881        // write_count nodes have been written.
882        let mut written_count = 0usize;
883        for i in *clean_mmr.journal_size..*clean_mmr.size() {
884            let node = *clean_mmr.mem_mmr.get_node_unchecked(Position::new(i));
885            clean_mmr.journal.append(node).await?;
886            written_count += 1;
887            if written_count >= write_limit {
888                break;
889            }
890        }
891        clean_mmr.journal.sync().await?;
892
893        Ok(())
894    }
895}
896
897impl<E: RStorage + Clock + Metrics, D: Digest> Storage<D> for Mmr<E, D, Clean<D>> {
898    fn size(&self) -> Position {
899        self.size()
900    }
901
902    async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
903        self.get_node(position).await
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910    use crate::mmr::{
911        hasher::Hasher as _, location::LocationRangeExt as _, stability::ROOTS, Location,
912        StandardHasher as Standard,
913    };
914    use commonware_cryptography::{
915        sha256::{self, Digest},
916        Hasher, Sha256,
917    };
918    use commonware_macros::test_traced;
919    use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
920    use commonware_utils::{hex, NZUsize, NZU64};
921
922    fn test_digest(v: usize) -> Digest {
923        Sha256::hash(&v.to_be_bytes())
924    }
925
926    const PAGE_SIZE: usize = 111;
927    const PAGE_CACHE_SIZE: usize = 5;
928
929    fn test_config() -> Config {
930        Config {
931            journal_partition: "journal_partition".into(),
932            metadata_partition: "metadata_partition".into(),
933            items_per_blob: NZU64!(7),
934            write_buffer: NZUsize!(1024),
935            thread_pool: None,
936            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
937        }
938    }
939
940    pub async fn build_batched_and_check_test_roots_journaled<E: RStorage + Clock + Metrics>(
941        journaled_mmr: CleanMmr<E, sha256::Digest>,
942    ) -> CleanMmr<E, sha256::Digest> {
943        let mut hasher: Standard<Sha256> = Standard::new();
944
945        // First element transitions Clean -> Dirty explicitly
946        let mut dirty_mmr = journaled_mmr.into_dirty();
947        hasher.inner().update(&0u64.to_be_bytes());
948        let element = hasher.inner().finalize();
949        dirty_mmr.add(&mut hasher, &element).await.unwrap();
950
951        // Subsequent elements keep it Dirty
952        for i in 1u64..199 {
953            hasher.inner().update(&i.to_be_bytes());
954            let element = hasher.inner().finalize();
955            dirty_mmr.add(&mut hasher, &element).await.unwrap();
956        }
957
958        let journaled_mmr = dirty_mmr.merkleize(&mut hasher);
959
960        assert_eq!(
961            hex(&journaled_mmr.root()),
962            ROOTS[199],
963            "Root after 200 elements"
964        );
965
966        journaled_mmr
967    }
968
969    /// Test that the MMR root computation remains stable.
970    #[test]
971    fn test_journaled_mmr_root_stability() {
972        let executor = deterministic::Runner::default();
973        executor.start(|context| async move {
974            let mmr = Mmr::init(
975                context.clone(),
976                &mut Standard::<Sha256>::new(),
977                test_config(),
978            )
979            .await
980            .unwrap();
981            let mmr = build_batched_and_check_test_roots_journaled(mmr).await;
982            mmr.destroy().await.unwrap();
983        });
984    }
985
986    #[test_traced]
987    fn test_journaled_mmr_empty() {
988        let executor = deterministic::Runner::default();
989        executor.start(|context| async move {
990            let mut hasher: Standard<Sha256> = Standard::new();
991            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
992                .await
993                .unwrap();
994            assert_eq!(mmr.size(), 0);
995            assert!(mmr.get_node(Position::new(0)).await.is_err());
996            assert_eq!(mmr.oldest_retained_pos(), None);
997            assert!(mmr.prune_all().await.is_ok());
998            assert_eq!(mmr.pruned_to_pos(), 0);
999            assert!(mmr.prune_to_pos(Position::new(0)).await.is_ok());
1000            assert!(mmr.sync().await.is_ok());
1001            assert!(matches!(mmr.pop(&mut hasher, 1).await, Err(Error::Empty)));
1002
1003            mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1004            assert_eq!(mmr.size(), 1);
1005            mmr.sync().await.unwrap();
1006            assert!(mmr.get_node(Position::new(0)).await.is_ok());
1007            assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1008            assert_eq!(mmr.size(), 0);
1009            mmr.sync().await.unwrap();
1010
1011            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1012                .await
1013                .unwrap();
1014            assert_eq!(mmr.size(), 0);
1015
1016            let empty_proof = Proof::default();
1017            let mut hasher: Standard<Sha256> = Standard::new();
1018            let root = mmr.root();
1019            assert!(empty_proof.verify_range_inclusion(
1020                &mut hasher,
1021                &[] as &[Digest],
1022                Location::new_unchecked(0),
1023                &root
1024            ));
1025            assert!(empty_proof.verify_multi_inclusion(
1026                &mut hasher,
1027                &[] as &[(Digest, Location)],
1028                &root
1029            ));
1030
1031            // Confirm empty proof no longer verifies after adding an element.
1032            mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1033            let root = mmr.root();
1034            assert!(!empty_proof.verify_range_inclusion(
1035                &mut hasher,
1036                &[] as &[Digest],
1037                Location::new_unchecked(0),
1038                &root
1039            ));
1040            assert!(!empty_proof.verify_multi_inclusion(
1041                &mut hasher,
1042                &[] as &[(Digest, Location)],
1043                &root
1044            ));
1045
1046            mmr.destroy().await.unwrap();
1047        });
1048    }
1049
1050    #[test_traced]
1051    fn test_journaled_mmr_pop() {
1052        let executor = deterministic::Runner::default();
1053        executor.start(|context| async move {
1054            let mut hasher: Standard<Sha256> = Standard::new();
1055            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1056                .await
1057                .unwrap();
1058
1059            let mut c_hasher = Sha256::new();
1060            for i in 0u64..199 {
1061                c_hasher.update(&i.to_be_bytes());
1062                let element = c_hasher.finalize();
1063                mmr.add(&mut hasher, &element).await.unwrap();
1064            }
1065            assert_eq!(ROOTS[199], hex(&mmr.root()));
1066
1067            // Pop off one node at a time without syncing until empty, confirming the root is still
1068            // is as expected.
1069            for i in (0..199u64).rev() {
1070                assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1071                let root = mmr.root();
1072                let expected_root = ROOTS[i as usize];
1073                assert_eq!(hex(&root), expected_root);
1074            }
1075            assert!(matches!(mmr.pop(&mut hasher, 1).await, Err(Error::Empty)));
1076            assert!(mmr.pop(&mut hasher, 0).await.is_ok());
1077
1078            // Repeat the test though sync part of the way to tip to test crossing the boundary from
1079            // cached to uncached leaves, and pop 2 at a time instead of just 1.
1080            for i in 0u64..199 {
1081                c_hasher.update(&i.to_be_bytes());
1082                let element = c_hasher.finalize();
1083                mmr.add(&mut hasher, &element).await.unwrap();
1084                if i == 101 {
1085                    mmr.sync().await.unwrap();
1086                }
1087            }
1088            for i in (0..198u64).rev().step_by(2) {
1089                assert!(mmr.pop(&mut hasher, 2).await.is_ok(), "at position {i:?}");
1090                let root = mmr.root();
1091                let expected_root = ROOTS[i as usize];
1092                assert_eq!(hex(&root), expected_root, "at position {i:?}");
1093            }
1094            assert_eq!(mmr.size(), 1);
1095            assert!(mmr.pop(&mut hasher, 1).await.is_ok()); // pop the last element
1096            assert!(matches!(mmr.pop(&mut hasher, 99).await, Err(Error::Empty)));
1097
1098            // Repeat one more time only after pruning the MMR first.
1099            for i in 0u64..199 {
1100                c_hasher.update(&i.to_be_bytes());
1101                let element = c_hasher.finalize();
1102                mmr.add(&mut hasher, &element).await.unwrap();
1103                if i == 101 {
1104                    mmr.sync().await.unwrap();
1105                }
1106            }
1107            let leaf_pos = Position::try_from(Location::new_unchecked(50)).unwrap();
1108            mmr.prune_to_pos(leaf_pos).await.unwrap();
1109            // Pop enough nodes to cause the mem-mmr to be completely emptied, and then some.
1110            mmr.pop(&mut hasher, 80).await.unwrap();
1111            // Make sure the pinned node boundary is valid by generating a proof for the oldest item.
1112            mmr.proof(Location::try_from(leaf_pos).unwrap())
1113                .await
1114                .unwrap();
1115            // prune all remaining leaves 1 at a time.
1116            while mmr.size() > leaf_pos {
1117                assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1118            }
1119            assert!(matches!(
1120                mmr.pop(&mut hasher, 1).await,
1121                Err(Error::ElementPruned(_))
1122            ));
1123
1124            // Make sure pruning to an older location is a no-op.
1125            assert!(mmr.prune_to_pos(leaf_pos - 1).await.is_ok());
1126            assert_eq!(mmr.pruned_to_pos(), leaf_pos);
1127
1128            mmr.destroy().await.unwrap();
1129        });
1130    }
1131
1132    #[test_traced]
1133    fn test_journaled_mmr_basic() {
1134        let executor = deterministic::Runner::default();
1135        executor.start(|context| async move {
1136            let mut hasher: Standard<Sha256> = Standard::new();
1137            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1138                .await
1139                .unwrap();
1140            // Build a test MMR with 255 leaves
1141            const LEAF_COUNT: usize = 255;
1142            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1143            let mut positions = Vec::with_capacity(LEAF_COUNT);
1144            for i in 0..LEAF_COUNT {
1145                let digest = test_digest(i);
1146                leaves.push(digest);
1147                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1148                positions.push(pos);
1149            }
1150            assert_eq!(mmr.size(), Position::new(502));
1151            assert_eq!(mmr.journal_size, Position::new(0));
1152
1153            // Generate & verify proof from element that is not yet flushed to the journal.
1154            const TEST_ELEMENT: usize = 133;
1155            const TEST_ELEMENT_LOC: Location = Location::new_unchecked(TEST_ELEMENT as u64);
1156
1157            let proof = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1158            let root = mmr.root();
1159            assert!(proof.verify_element_inclusion(
1160                &mut hasher,
1161                &leaves[TEST_ELEMENT],
1162                TEST_ELEMENT_LOC,
1163                &root,
1164            ));
1165
1166            // Sync the MMR, make sure it flushes the in-mem MMR as expected.
1167            mmr.sync().await.unwrap();
1168            assert_eq!(mmr.journal_size, Position::new(502));
1169            assert_eq!(mmr.mem_mmr.oldest_retained_pos(), None);
1170
1171            // Now that the element is flushed from the in-mem MMR, confirm its proof is still is
1172            // generated correctly.
1173            let proof2 = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1174            assert_eq!(proof, proof2);
1175
1176            // Generate & verify a proof that spans flushed elements and the last element.
1177            let range = Location::new_unchecked(TEST_ELEMENT as u64)
1178                ..Location::new_unchecked(LEAF_COUNT as u64);
1179            let proof = mmr.range_proof(range.clone()).await.unwrap();
1180            assert!(proof.verify_range_inclusion(
1181                &mut hasher,
1182                &leaves[range.to_usize_range()],
1183                TEST_ELEMENT_LOC,
1184                &root
1185            ));
1186
1187            mmr.destroy().await.unwrap();
1188        });
1189    }
1190
1191    #[test_traced]
1192    /// Generates a stateful MMR, simulates various partial-write scenarios, and confirms we
1193    /// appropriately recover to a valid state.
1194    fn test_journaled_mmr_recovery() {
1195        let executor = deterministic::Runner::default();
1196        executor.start(|context| async move {
1197            let mut hasher: Standard<Sha256> = Standard::new();
1198            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1199                .await
1200                .unwrap();
1201            assert_eq!(mmr.size(), 0);
1202
1203            // Build a test MMR with 252 leaves
1204            const LEAF_COUNT: usize = 252;
1205            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1206            let mut positions = Vec::with_capacity(LEAF_COUNT);
1207            for i in 0..LEAF_COUNT {
1208                let digest = test_digest(i);
1209                leaves.push(digest);
1210                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1211                positions.push(pos);
1212            }
1213            assert_eq!(mmr.size(), 498);
1214            let root = mmr.root();
1215            mmr.close().await.unwrap();
1216
1217            // The very last element we added (pos=495) resulted in new parents at positions 496 &
1218            // 497. Simulate a partial write by corrupting the last parent's checksum by truncating
1219            // the last blob by a single byte.
1220            let partition: String = "journal_partition".into();
1221            let (blob, len) = context
1222                .open(&partition, &71u64.to_be_bytes())
1223                .await
1224                .expect("Failed to open blob");
1225            assert_eq!(len, 36); // N+4 = 36 bytes per node, 1 node in the last blob
1226
1227            // truncate the blob by one byte to corrupt the checksum of the last parent node.
1228            blob.resize(len - 1).await.expect("Failed to corrupt blob");
1229            blob.sync().await.expect("Failed to sync blob");
1230
1231            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1232                .await
1233                .unwrap();
1234            // Since we didn't corrupt the leaf, the MMR is able to replay the leaf and recover to
1235            // the previous state.
1236            assert_eq!(mmr.size(), 498);
1237            assert_eq!(mmr.root(), root);
1238
1239            // Make sure closing it and re-opening it persists the recovered state.
1240            mmr.close().await.unwrap();
1241            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1242                .await
1243                .unwrap();
1244            assert_eq!(mmr.size(), 498);
1245            mmr.close().await.unwrap();
1246
1247            // Repeat partial write test though this time truncate the leaf itself not just some
1248            // parent. The leaf is in the *previous* blob so we'll have to delete the most recent
1249            // blob, then appropriately truncate the previous one.
1250            context
1251                .remove(&partition, Some(&71u64.to_be_bytes()))
1252                .await
1253                .expect("Failed to remove blob");
1254            let (blob, len) = context
1255                .open(&partition, &70u64.to_be_bytes())
1256                .await
1257                .expect("Failed to open blob");
1258            assert_eq!(len, 36 * 7); // this blob should be full.
1259
1260            // The last leaf should be in slot 5 of this blob, truncate last byte of its checksum.
1261            blob.resize(36 * 5 + 35)
1262                .await
1263                .expect("Failed to corrupt blob");
1264            blob.sync().await.expect("Failed to sync blob");
1265
1266            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1267                .await
1268                .unwrap();
1269            // Since the leaf was corrupted, it should not have been recovered, and the journal's
1270            // size will be the last-valid size.
1271            assert_eq!(mmr.size(), 495);
1272
1273            mmr.destroy().await.unwrap();
1274        });
1275    }
1276
1277    #[test_traced]
1278    fn test_journaled_mmr_pruning() {
1279        let executor = deterministic::Runner::default();
1280        executor.start(|context| async move {
1281            let mut hasher: Standard<Sha256> = Standard::new();
1282            // make sure pruning doesn't break root computation, adding of new nodes, etc.
1283            const LEAF_COUNT: usize = 2000;
1284            let cfg_pruned = test_config();
1285            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1286                .await
1287                .unwrap();
1288            let cfg_unpruned = Config {
1289                journal_partition: "unpruned_journal_partition".into(),
1290                metadata_partition: "unpruned_metadata_partition".into(),
1291                items_per_blob: NZU64!(7),
1292                write_buffer: NZUsize!(1024),
1293                thread_pool: None,
1294                buffer_pool: cfg_pruned.buffer_pool.clone(),
1295            };
1296            let mut mmr = Mmr::init(context.clone(), &mut hasher, cfg_unpruned)
1297                .await
1298                .unwrap();
1299            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1300            let mut positions = Vec::with_capacity(LEAF_COUNT);
1301            for i in 0..LEAF_COUNT {
1302                let digest = test_digest(i);
1303                leaves.push(digest);
1304                let last_leaf = leaves.last().unwrap();
1305                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1306                positions.push(pos);
1307                pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1308            }
1309            assert_eq!(mmr.size(), 3994);
1310            assert_eq!(pruned_mmr.size(), 3994);
1311
1312            // Prune the MMR in increments of 10 making sure the journal is still able to compute
1313            // roots and accept new elements.
1314            for i in 0usize..300 {
1315                let prune_pos = i as u64 * 10;
1316                pruned_mmr
1317                    .prune_to_pos(Position::new(prune_pos))
1318                    .await
1319                    .unwrap();
1320                assert_eq!(prune_pos, pruned_mmr.pruned_to_pos());
1321
1322                let digest = test_digest(LEAF_COUNT + i);
1323                leaves.push(digest);
1324                let last_leaf = leaves.last().unwrap();
1325                let pos = pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1326                positions.push(pos);
1327                mmr.add(&mut hasher, last_leaf).await.unwrap();
1328                assert_eq!(pruned_mmr.root(), mmr.root());
1329            }
1330
1331            // Sync the MMRs.
1332            pruned_mmr.sync().await.unwrap();
1333            assert_eq!(pruned_mmr.root(), mmr.root());
1334
1335            // Close the MMR & reopen.
1336            pruned_mmr.close().await.unwrap();
1337            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1338                .await
1339                .unwrap();
1340            assert_eq!(pruned_mmr.root(), mmr.root());
1341
1342            // Prune everything.
1343            let size = pruned_mmr.size();
1344            pruned_mmr.prune_all().await.unwrap();
1345            assert_eq!(pruned_mmr.root(), mmr.root());
1346            assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1347            assert_eq!(pruned_mmr.pruned_to_pos(), size);
1348
1349            // Close MMR after adding a new node without syncing and make sure state is as expected
1350            // on reopening.
1351            mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1352                .await
1353                .unwrap();
1354            pruned_mmr
1355                .add(&mut hasher, &test_digest(LEAF_COUNT))
1356                .await
1357                .unwrap();
1358            assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1359            pruned_mmr.close().await.unwrap();
1360            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1361                .await
1362                .unwrap();
1363            assert_eq!(pruned_mmr.root(), mmr.root());
1364            assert_eq!(pruned_mmr.oldest_retained_pos(), Some(size));
1365            assert_eq!(pruned_mmr.pruned_to_pos(), size);
1366
1367            // Make sure pruning to older location is a no-op.
1368            assert!(pruned_mmr.prune_to_pos(size - 1).await.is_ok());
1369            assert_eq!(pruned_mmr.pruned_to_pos(), size);
1370
1371            // Add nodes until we are on a blob boundary, and confirm prune_all still removes all
1372            // retained nodes.
1373            while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1374                pruned_mmr
1375                    .add(&mut hasher, &test_digest(LEAF_COUNT))
1376                    .await
1377                    .unwrap();
1378            }
1379            pruned_mmr.prune_all().await.unwrap();
1380            assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1381
1382            pruned_mmr.destroy().await.unwrap();
1383            mmr.destroy().await.unwrap();
1384        });
1385    }
1386
1387    #[test_traced("WARN")]
1388    /// Simulate partial writes after pruning, making sure we recover to a valid state.
1389    fn test_journaled_mmr_recovery_with_pruning() {
1390        let executor = deterministic::Runner::default();
1391        executor.start(|context| async move {
1392            // Build MMR with 2000 leaves.
1393            let mut hasher: Standard<Sha256> = Standard::new();
1394            const LEAF_COUNT: usize = 2000;
1395            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1396                .await
1397                .unwrap();
1398            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1399            let mut positions = Vec::with_capacity(LEAF_COUNT);
1400            for i in 0..LEAF_COUNT {
1401                let digest = test_digest(i);
1402                leaves.push(digest);
1403                let last_leaf = leaves.last().unwrap();
1404                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1405                positions.push(pos);
1406            }
1407            assert_eq!(mmr.size(), 3994);
1408            mmr.close().await.unwrap();
1409
1410            // Prune the MMR in increments of 50, simulating a partial write after each prune.
1411            for i in 0usize..200 {
1412                let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1413                    .await
1414                    .unwrap();
1415                let start_size = mmr.size();
1416                let prune_pos = std::cmp::min(i as u64 * 50, *start_size);
1417                let prune_pos = Position::new(prune_pos);
1418                if i % 5 == 0 {
1419                    mmr.simulate_pruning_failure(prune_pos).await.unwrap();
1420                    continue;
1421                }
1422                mmr.prune_to_pos(prune_pos).await.unwrap();
1423
1424                // add 25 new elements, simulating a partial write after each.
1425                for j in 0..10 {
1426                    let digest = test_digest(100 * (i + 1) + j);
1427                    leaves.push(digest);
1428                    let last_leaf = leaves.last().unwrap();
1429                    let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1430                    positions.push(pos);
1431                    mmr.add(&mut hasher, last_leaf).await.unwrap();
1432                    assert_eq!(mmr.root(), mmr.root());
1433                    let digest = test_digest(LEAF_COUNT + i);
1434                    leaves.push(digest);
1435                    let last_leaf = leaves.last().unwrap();
1436                    let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1437                    positions.push(pos);
1438                    mmr.add(&mut hasher, last_leaf).await.unwrap();
1439                }
1440                let end_size = mmr.size();
1441                let total_to_write = (*end_size - *start_size) as usize;
1442                let partial_write_limit = i % total_to_write;
1443                mmr.simulate_partial_sync(partial_write_limit)
1444                    .await
1445                    .unwrap();
1446            }
1447
1448            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1449                .await
1450                .unwrap();
1451            mmr.destroy().await.unwrap();
1452        });
1453    }
1454
1455    #[test_traced]
1456    fn test_journaled_mmr_historical_range_proof_basic() {
1457        let executor = deterministic::Runner::default();
1458        executor.start(|context| async move {
1459            // Create MMR with 10 elements
1460            let mut hasher = Standard::<Sha256>::new();
1461            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1462                .await
1463                .unwrap();
1464            let mut elements = Vec::new();
1465            let mut positions = Vec::new();
1466            for i in 0..10 {
1467                elements.push(test_digest(i));
1468                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1469            }
1470            let original_size = mmr.size();
1471
1472            // Historical proof should match "regular" proof when historical size == current database size
1473            let historical_proof = mmr
1474                .historical_range_proof(
1475                    original_size,
1476                    Location::new_unchecked(2)..Location::new_unchecked(6),
1477                )
1478                .await
1479                .unwrap();
1480            assert_eq!(historical_proof.size, original_size);
1481            let root = mmr.root();
1482            assert!(historical_proof.verify_range_inclusion(
1483                &mut hasher,
1484                &elements[2..6],
1485                Location::new_unchecked(2),
1486                &root
1487            ));
1488            let regular_proof = mmr
1489                .range_proof(Location::new_unchecked(2)..Location::new_unchecked(6))
1490                .await
1491                .unwrap();
1492            assert_eq!(regular_proof.size, historical_proof.size);
1493            assert_eq!(regular_proof.digests, historical_proof.digests);
1494
1495            // Add more elements to the MMR
1496            for i in 10..20 {
1497                elements.push(test_digest(i));
1498                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1499            }
1500            let new_historical_proof = mmr
1501                .historical_range_proof(
1502                    original_size,
1503                    Location::new_unchecked(2)..Location::new_unchecked(6),
1504                )
1505                .await
1506                .unwrap();
1507            assert_eq!(new_historical_proof.size, historical_proof.size);
1508            assert_eq!(new_historical_proof.digests, historical_proof.digests);
1509
1510            mmr.destroy().await.unwrap();
1511        });
1512    }
1513
1514    #[test_traced]
1515    fn test_journaled_mmr_historical_range_proof_with_pruning() {
1516        let executor = deterministic::Runner::default();
1517        executor.start(|context| async move {
1518            let mut hasher = Standard::<Sha256>::new();
1519            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1520                .await
1521                .unwrap();
1522
1523            // Add many elements
1524            let mut elements = Vec::new();
1525            let mut positions = Vec::new();
1526            for i in 0..50 {
1527                elements.push(test_digest(i));
1528                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1529            }
1530
1531            // Prune to position 30
1532            let prune_pos = Position::new(30);
1533            mmr.prune_to_pos(prune_pos).await.unwrap();
1534
1535            // Create reference MMR for verification to get correct size
1536            let mut ref_mmr = Mmr::init(
1537                context.clone(),
1538                &mut hasher,
1539                Config {
1540                    journal_partition: "ref_journal_pruned".into(),
1541                    metadata_partition: "ref_metadata_pruned".into(),
1542                    items_per_blob: NZU64!(7),
1543                    write_buffer: NZUsize!(1024),
1544                    thread_pool: None,
1545                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1546                },
1547            )
1548            .await
1549            .unwrap();
1550
1551            for elt in elements.iter().take(41) {
1552                ref_mmr.add(&mut hasher, elt).await.unwrap();
1553            }
1554            let historical_size = ref_mmr.size();
1555            let historical_root = ref_mmr.root();
1556
1557            // Test proof at historical position after pruning
1558            let historical_proof = mmr
1559                .historical_range_proof(
1560                    historical_size,
1561                    Location::new_unchecked(35)..Location::new_unchecked(39), // Start after prune point to end at historical size
1562                )
1563                .await
1564                .unwrap();
1565
1566            assert_eq!(historical_proof.size, historical_size);
1567
1568            // Verify proof works despite pruning
1569            assert!(historical_proof.verify_range_inclusion(
1570                &mut hasher,
1571                &elements[35..39],
1572                Location::new_unchecked(35),
1573                &historical_root
1574            ));
1575
1576            ref_mmr.destroy().await.unwrap();
1577            mmr.destroy().await.unwrap();
1578        });
1579    }
1580
1581    #[test_traced]
1582    fn test_journaled_mmr_historical_range_proof_large() {
1583        let executor = deterministic::Runner::default();
1584        executor.start(|context| async move {
1585            let mut hasher = Standard::<Sha256>::new();
1586
1587            let mut mmr = Mmr::init(
1588                context.clone(),
1589                &mut hasher,
1590                Config {
1591                    journal_partition: "server_journal".into(),
1592                    metadata_partition: "server_metadata".into(),
1593                    items_per_blob: NZU64!(7),
1594                    write_buffer: NZUsize!(1024),
1595                    thread_pool: None,
1596                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1597                },
1598            )
1599            .await
1600            .unwrap();
1601
1602            let mut elements = Vec::new();
1603            let mut positions = Vec::new();
1604            for i in 0..100 {
1605                elements.push(test_digest(i));
1606                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1607            }
1608
1609            let range = Location::new_unchecked(30)..Location::new_unchecked(61);
1610
1611            // Only apply elements up to end_loc to the reference MMR.
1612            let mut ref_mmr = Mmr::init(
1613                context.clone(),
1614                &mut hasher,
1615                Config {
1616                    journal_partition: "client_journal".into(),
1617                    metadata_partition: "client_metadata".into(),
1618                    items_per_blob: NZU64!(7),
1619                    write_buffer: NZUsize!(1024),
1620                    thread_pool: None,
1621                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1622                },
1623            )
1624            .await
1625            .unwrap();
1626
1627            // Add elements up to the end of the range to verify historical root
1628            for elt in elements.iter().take(*range.end as usize) {
1629                ref_mmr.add(&mut hasher, elt).await.unwrap();
1630            }
1631            let historical_size = ref_mmr.size();
1632            let expected_root = ref_mmr.root();
1633
1634            // Generate proof from full MMR
1635            let proof = mmr
1636                .historical_range_proof(historical_size, range.clone())
1637                .await
1638                .unwrap();
1639
1640            assert!(proof.verify_range_inclusion(
1641                &mut hasher,
1642                &elements[range.to_usize_range()],
1643                range.start,
1644                &expected_root // Compare to historical (reference) root
1645            ));
1646
1647            ref_mmr.destroy().await.unwrap();
1648            mmr.destroy().await.unwrap();
1649        });
1650    }
1651
1652    #[test_traced]
1653    fn test_journaled_mmr_historical_range_proof_singleton() {
1654        let executor = deterministic::Runner::default();
1655        executor.start(|context| async move {
1656            let mut hasher = Standard::<Sha256>::new();
1657            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1658                .await
1659                .unwrap();
1660
1661            let element = test_digest(0);
1662            mmr.add(&mut hasher, &element).await.unwrap();
1663
1664            // Test single element proof at historical position
1665            let single_proof = mmr
1666                .historical_range_proof(
1667                    Position::new(1),
1668                    Location::new_unchecked(0)..Location::new_unchecked(1),
1669                )
1670                .await
1671                .unwrap();
1672
1673            let root = mmr.root();
1674            assert!(single_proof.verify_range_inclusion(
1675                &mut hasher,
1676                &[element],
1677                Location::new_unchecked(0),
1678                &root
1679            ));
1680
1681            mmr.destroy().await.unwrap();
1682        });
1683    }
1684
1685    #[test_traced]
1686    fn test_journaled_mmr_init_from_pinned_nodes() {
1687        let executor = deterministic::Runner::default();
1688        executor.start(|context| async move {
1689            let mut hasher = Standard::<Sha256>::new();
1690
1691            // Create an in-memory MMR with some elements
1692            let mut original_mmr = Mmr::init(
1693                context.clone(),
1694                &mut hasher,
1695                Config {
1696                    journal_partition: "original_journal".into(),
1697                    metadata_partition: "original_metadata".into(),
1698                    items_per_blob: NZU64!(7),
1699                    write_buffer: NZUsize!(1024),
1700                    thread_pool: None,
1701                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1702                },
1703            )
1704            .await
1705            .unwrap();
1706
1707            // Add some elements and prune to the size of the MMR
1708            const NUM_ELEMENTS: u64 = 1_000;
1709            for i in 0..NUM_ELEMENTS {
1710                original_mmr
1711                    .add(&mut hasher, &test_digest(i as usize))
1712                    .await
1713                    .unwrap();
1714            }
1715            original_mmr.sync().await.unwrap();
1716            let original_size = original_mmr.size();
1717            original_mmr.prune_to_pos(original_size).await.unwrap();
1718
1719            // Get the journal digest
1720            let mut hasher = Standard::<Sha256>::new();
1721            let original_journal_digest = original_mmr.root();
1722
1723            // Get the pinned nodes
1724            let pinned_nodes_map = original_mmr.get_pinned_nodes();
1725            let pinned_nodes: Vec<_> = nodes_to_pin(original_size)
1726                .map(|pos| pinned_nodes_map[&pos])
1727                .collect();
1728
1729            // Create a journaled MMR from the pinned nodes
1730            let new_mmr_config = Config {
1731                journal_partition: "new_journal".into(),
1732                metadata_partition: "new_metadata".into(),
1733                items_per_blob: NZU64!(7),
1734                write_buffer: NZUsize!(1024),
1735                thread_pool: None,
1736                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1737            };
1738            let mut new_mmr =
1739                Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1740                    context.clone(),
1741                    pinned_nodes,
1742                    original_size,
1743                    new_mmr_config.clone(),
1744                    &mut hasher,
1745                )
1746                .await
1747                .unwrap();
1748
1749            // Verify the journaled MMR has the same properties as the original MMR
1750            assert_eq!(new_mmr.size(), original_size);
1751            assert_eq!(new_mmr.pruned_to_pos(), original_size);
1752            assert_eq!(new_mmr.oldest_retained_pos(), None);
1753            // Verify the roots match
1754            let new_journal_digest = new_mmr.root();
1755            assert_eq!(new_journal_digest, original_journal_digest);
1756
1757            // Insert a new element into the new journaled MMR and the original MMR
1758            let new_element = test_digest(10);
1759
1760            let original_mmr_pos = original_mmr.add(&mut hasher, &new_element).await.unwrap();
1761            assert_eq!(original_mmr_pos, original_size);
1762
1763            let new_mmr_pos = new_mmr.add(&mut hasher, &new_element).await.unwrap();
1764            assert_eq!(new_mmr_pos, original_size); // New element is added at the end
1765
1766            // Verify the roots still match
1767            let original_mmr_root = original_mmr.root();
1768            let new_mmr_root = new_mmr.root();
1769            assert_eq!(new_mmr_root, original_mmr_root);
1770
1771            // Close and re-open the journaled MMR
1772            new_mmr.close().await.unwrap();
1773            let new_mmr = Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init(
1774                context.clone(),
1775                &mut hasher,
1776                new_mmr_config,
1777            )
1778            .await
1779            .unwrap();
1780
1781            // Root should be unchanged
1782            let new_mmr_root = new_mmr.root();
1783            assert_eq!(new_mmr_root, original_mmr_root);
1784
1785            // Size and other metadata should be unchanged
1786            assert_eq!(new_mmr.size(), original_size + 1); // +1 for element we just added
1787            assert_eq!(new_mmr.pruned_to_pos(), original_size);
1788            assert_eq!(new_mmr.oldest_retained_pos(), Some(original_size)); // Element we just added is the oldest retained
1789
1790            // Proofs generated from the journaled MMR should be the same as the proofs generated from the original MMR
1791            let proof = new_mmr
1792                .proof(Location::new_unchecked(NUM_ELEMENTS))
1793                .await
1794                .unwrap();
1795            let original_proof = original_mmr
1796                .proof(Location::new_unchecked(NUM_ELEMENTS))
1797                .await
1798                .unwrap();
1799            assert_eq!(proof.digests, original_proof.digests);
1800            assert_eq!(proof.size, original_proof.size);
1801
1802            original_mmr.destroy().await.unwrap();
1803            new_mmr.destroy().await.unwrap();
1804        });
1805    }
1806
1807    #[test_traced]
1808    fn test_journaled_mmr_init_from_pinned_nodes_edge_cases() {
1809        use crate::mmr::mem::CleanMmr as CleanMemMmr;
1810
1811        let executor = deterministic::Runner::default();
1812        executor.start(|context| async move {
1813            let mut hasher = Standard::<Sha256>::new();
1814
1815            // === TEST 1: Empty MMR (size 0) ===
1816            let mut empty_mmr =
1817                Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1818                    context.clone(),
1819                    vec![],           // No pinned nodes
1820                    Position::new(0), // Size 0
1821                    Config {
1822                        journal_partition: "empty_journal".into(),
1823                        metadata_partition: "empty_metadata".into(),
1824                        items_per_blob: NZU64!(7),
1825                        write_buffer: NZUsize!(1024),
1826                        thread_pool: None,
1827                        buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1828                    },
1829                    &mut hasher,
1830                )
1831                .await
1832                .unwrap();
1833
1834            assert_eq!(empty_mmr.size(), 0);
1835            assert_eq!(empty_mmr.pruned_to_pos(), Position::new(0));
1836            assert_eq!(empty_mmr.oldest_retained_pos(), None);
1837
1838            // Should be able to add first element at position 0
1839            let pos = empty_mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1840            assert_eq!(pos, 0);
1841            assert_eq!(empty_mmr.size(), 1);
1842
1843            empty_mmr.destroy().await.unwrap();
1844
1845            // === TEST 2: Single element MMR ===
1846            let mut single_mem_mmr = CleanMemMmr::new(&mut hasher);
1847            single_mem_mmr.add(&mut hasher, &test_digest(42));
1848            let single_size = single_mem_mmr.size();
1849            let single_root = single_mem_mmr.root();
1850            let single_pinned = single_mem_mmr.node_digests_to_pin(single_size);
1851
1852            let single_journaled_mmr =
1853                Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1854                    context.clone(),
1855                    single_pinned,
1856                    single_size,
1857                    Config {
1858                        journal_partition: "single_journal".into(),
1859                        metadata_partition: "single_metadata".into(),
1860                        items_per_blob: NZU64!(7),
1861                        write_buffer: NZUsize!(1024),
1862                        thread_pool: None,
1863                        buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1864                    },
1865                    &mut hasher,
1866                )
1867                .await
1868                .unwrap();
1869
1870            assert_eq!(single_journaled_mmr.size(), single_size);
1871            assert_eq!(single_journaled_mmr.root(), *single_root);
1872
1873            single_journaled_mmr.destroy().await.unwrap();
1874        });
1875    }
1876    // Test `init_sync` when there is no persisted data.
1877    #[test_traced]
1878    fn test_journaled_mmr_init_sync_empty() {
1879        let executor = deterministic::Runner::default();
1880        executor.start(|context| async move {
1881            let mut hasher = Standard::<Sha256>::new();
1882
1883            // Test fresh start scenario with completely new MMR (no existing data)
1884            let sync_cfg = SyncConfig::<sha256::Digest> {
1885                config: test_config(),
1886                range: Position::new(0)..Position::new(100),
1887                pinned_nodes: None,
1888            };
1889
1890            let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1891                .await
1892                .unwrap();
1893
1894            // Should be fresh MMR starting empty
1895            assert_eq!(sync_mmr.size(), 0);
1896            assert_eq!(sync_mmr.pruned_to_pos(), 0);
1897            assert_eq!(sync_mmr.oldest_retained_pos(), None);
1898
1899            // Should be able to add new elements
1900            let mut sync_mmr = sync_mmr;
1901            let new_element = test_digest(999);
1902            sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1903
1904            // Root should be computable
1905            let _root = sync_mmr.root();
1906
1907            sync_mmr.destroy().await.unwrap();
1908        });
1909    }
1910
1911    // Test `init_sync` where the persisted MMR's persisted nodes match the sync boundaries.
1912    #[test_traced]
1913    fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1914        let executor = deterministic::Runner::default();
1915        executor.start(|context| async move {
1916            let mut hasher = Standard::<Sha256>::new();
1917
1918            // Create initial MMR with elements.
1919            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1920                .await
1921                .unwrap();
1922            for i in 0..50 {
1923                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1924            }
1925            mmr.sync().await.unwrap();
1926            let original_size = mmr.size();
1927            let original_leaves = mmr.leaves();
1928            let original_root = mmr.root();
1929
1930            // Sync with range.start ≤ existing_size ≤ range.end should reuse data
1931            let lower_bound_pos = mmr.pruned_to_pos();
1932            let upper_bound_pos = mmr.size();
1933            let mut expected_nodes = BTreeMap::new();
1934            for i in *lower_bound_pos..*upper_bound_pos {
1935                expected_nodes.insert(
1936                    Position::new(i),
1937                    mmr.get_node(Position::new(i)).await.unwrap().unwrap(),
1938                );
1939            }
1940            let sync_cfg = SyncConfig::<sha256::Digest> {
1941                config: test_config(),
1942                range: lower_bound_pos..upper_bound_pos,
1943                pinned_nodes: None,
1944            };
1945
1946            mmr.close().await.unwrap();
1947
1948            let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1949                .await
1950                .unwrap();
1951
1952            // Should have existing data in the sync range.
1953            assert_eq!(sync_mmr.size(), original_size);
1954            assert_eq!(sync_mmr.leaves(), original_leaves);
1955            assert_eq!(sync_mmr.pruned_to_pos(), lower_bound_pos);
1956            assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound_pos));
1957            assert_eq!(sync_mmr.root(), original_root);
1958            for pos in *lower_bound_pos..*upper_bound_pos {
1959                let pos = Position::new(pos);
1960                assert_eq!(
1961                    sync_mmr.get_node(pos).await.unwrap(),
1962                    expected_nodes.get(&pos).cloned()
1963                );
1964            }
1965
1966            sync_mmr.destroy().await.unwrap();
1967        });
1968    }
1969
1970    // Test `init_sync` where the persisted MMR's data partially overlaps with the sync boundaries.
1971    #[test_traced]
1972    fn test_journaled_mmr_init_sync_partial_overlap() {
1973        let executor = deterministic::Runner::default();
1974        executor.start(|context| async move {
1975            let mut hasher = Standard::<Sha256>::new();
1976
1977            // Create initial MMR with elements.
1978            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1979                .await
1980                .unwrap();
1981            for i in 0..30 {
1982                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1983            }
1984            mmr.sync().await.unwrap();
1985            mmr.prune_to_pos(Position::new(10)).await.unwrap();
1986
1987            let original_size = mmr.size();
1988            let original_root = mmr.root();
1989            let original_pruned_to = mmr.pruned_to_pos();
1990
1991            // Sync with boundaries that extend beyond existing data (partial overlap).
1992            let lower_bound_pos = original_pruned_to;
1993            let upper_bound_pos = original_size + 11; // Extend beyond existing data
1994
1995            let mut expected_nodes = BTreeMap::new();
1996            for pos in *lower_bound_pos..*original_size {
1997                let pos = Position::new(pos);
1998                expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
1999            }
2000
2001            let sync_cfg = SyncConfig::<sha256::Digest> {
2002                config: test_config(),
2003                range: lower_bound_pos..upper_bound_pos,
2004                pinned_nodes: None,
2005            };
2006
2007            mmr.close().await.unwrap();
2008
2009            let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
2010                .await
2011                .unwrap();
2012
2013            // Should have existing data in the overlapping range.
2014            assert_eq!(sync_mmr.size(), original_size);
2015            assert_eq!(sync_mmr.pruned_to_pos(), lower_bound_pos);
2016            assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound_pos));
2017            assert_eq!(sync_mmr.root(), original_root);
2018
2019            // Check that existing nodes are preserved in the overlapping range.
2020            for pos in *lower_bound_pos..*original_size {
2021                let pos = Position::new(pos);
2022                assert_eq!(
2023                    sync_mmr.get_node(pos).await.unwrap(),
2024                    expected_nodes.get(&pos).cloned()
2025                );
2026            }
2027
2028            sync_mmr.destroy().await.unwrap();
2029        });
2030    }
2031}