commonware_storage/mmr/
journaled.rs

1//! An MMR backed by a fixed-item-length journal.
2//!
3//! A [crate::journal] is used to store all unpruned MMR nodes, and a [crate::metadata] store is
4//! used to preserve digests required for root and proof generation that would have otherwise been
5//! pruned.
6
7use crate::{
8    adb::any::fixed::sync::{init_journal, init_journal_at_size},
9    journal::{
10        fixed::{Config as JConfig, Journal},
11        Error as JError,
12    },
13    metadata::{Config as MConfig, Metadata},
14    mmr::{
15        iterator::PeakIterator,
16        mem::{Config as MemConfig, Mmr as MemMmr},
17        verification::Proof,
18        Builder, Error, Hasher,
19    },
20};
21use commonware_codec::DecodeExt;
22use commonware_cryptography::{Digest, Hasher as CHasher};
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
24use commonware_utils::sequence::prefixed_u64::U64;
25use std::{
26    collections::HashMap,
27    num::{NonZeroU64, NonZeroUsize},
28};
29use tracing::{debug, error, warn};
30
31/// Configuration for a journal-backed MMR.
32#[derive(Clone)]
33pub struct Config {
34    /// The name of the `commonware-runtime::Storage` storage partition used for the journal storing
35    /// the MMR nodes.
36    pub journal_partition: String,
37
38    /// The name of the `commonware-runtime::Storage` storage partition used for the metadata
39    /// containing pruned MMR nodes that are still required to calculate the root and generate
40    /// proofs.
41    pub metadata_partition: String,
42
43    /// The maximum number of items to store in each blob in the backing journal.
44    pub items_per_blob: NonZeroU64,
45
46    /// The size of the write buffer to use for each blob in the backing journal.
47    pub write_buffer: NonZeroUsize,
48
49    /// Optional thread pool to use for parallelizing batch operations.
50    pub thread_pool: Option<ThreadPool>,
51
52    /// The buffer pool to use for caching data.
53    pub buffer_pool: PoolRef,
54}
55
56/// Configuration for initializing a journaled MMR for synchronization.
57///
58/// Determines how to handle existing persistent data based on sync boundaries:
59/// - **Fresh Start**: Existing data < lower_bound → discard and start fresh
60/// - **Prune and Reuse**: lower_bound ≤ existing data ≤ upper_bound → prune and reuse
61/// - **Prune and Rewind**: existing data > upper_bound → prune and rewind to upper_bound
62pub struct SyncConfig<D: Digest> {
63    /// Base MMR configuration (journal, metadata, etc.)
64    pub config: Config,
65
66    /// Pruning boundary - operations below this are considered pruned.
67    pub lower_bound: u64,
68
69    /// Sync boundary - operations above this are rewound if present.
70    pub upper_bound: u64,
71
72    /// The pinned nodes the MMR needs at the pruning boundary given by
73    /// `lower_bound`, in the order specified by [Proof::nodes_to_pin].
74    /// If `None`, the pinned nodes are expected to already be in the MMR's metadata/journal.
75    pub pinned_nodes: Option<Vec<D>>,
76}
77
78/// A MMR backed by a fixed-item-length journal.
79pub struct Mmr<E: RStorage + Clock + Metrics, H: CHasher> {
80    /// A memory resident MMR used to build the MMR structure and cache updates. It caches all
81    /// un-synced nodes, and the pinned node set as derived from both its own pruning boundary and
82    /// the journaled MMR's pruning boundary.
83    mem_mmr: MemMmr<H>,
84
85    /// Stores all unpruned MMR nodes.
86    journal: Journal<E, H::Digest>,
87
88    /// The size of the journal irrespective of any pruned nodes or any un-synced nodes currently
89    /// cached in the memory resident MMR.
90    journal_size: u64,
91
92    /// Stores all "pinned nodes" (pruned nodes required for proving & root generation) for the MMR,
93    /// and the corresponding pruning boundary used to generate them. The metadata remains empty
94    /// until pruning is invoked, and its contents change only when the pruning boundary moves.
95    metadata: Metadata<E, U64, Vec<u8>>,
96
97    /// The highest position for which this MMR has been pruned, or 0 if this MMR has never been
98    /// pruned.
99    pruned_to_pos: u64,
100}
101
102impl<E: RStorage + Clock + Metrics, H: CHasher> Builder<H> for Mmr<E, H> {
103    async fn add(&mut self, hasher: &mut impl Hasher<H>, element: &[u8]) -> Result<u64, Error> {
104        self.add(hasher, element).await
105    }
106
107    fn root(&self, hasher: &mut impl Hasher<H>) -> H::Digest {
108        self.root(hasher)
109    }
110}
111
112/// Prefix used for nodes in the metadata prefixed U8 key.
113const NODE_PREFIX: u8 = 0;
114
115/// Prefix used for the key storing the prune_to_pos position in the metadata.
116const PRUNE_TO_POS_PREFIX: u8 = 1;
117
118impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
119    /// Initialize a new journaled MMR from an MMR's size and set of pinned nodes.
120    ///
121    /// This creates a journaled MMR that appears to have `mmr_size` elements, all of which
122    /// are pruned, leaving only the minimal set of `pinned_nodes` required for proof generation.
123    /// The next element added will be at position `mmr_size`.
124    ///
125    /// The returned MMR is functionally equivalent to a journaled MMR that was created,
126    /// populated, and then pruned up to its size.
127    ///
128    /// # Arguments
129    /// * `context` - Storage context
130    /// * `pinned_nodes` - Digest values in the order returned by `Proof::nodes_to_pin(mmr_size)`
131    /// * `mmr_size` - The logical size of the MMR (all elements before this are considered pruned)
132    /// * `config` - Journaled MMR configuration. Any data in the given journal and metadata
133    ///   partitions will be overwritten.
134    pub async fn init_from_pinned_nodes(
135        context: E,
136        pinned_nodes: Vec<H::Digest>,
137        mmr_size: u64,
138        config: Config,
139    ) -> Result<Self, Error> {
140        // Destroy any existing journal data
141        context.remove(&config.journal_partition, None).await.ok();
142        context.remove(&config.metadata_partition, None).await.ok();
143
144        // Create the journal with the desired size
145        let journal_cfg = JConfig {
146            partition: config.journal_partition.clone(),
147            items_per_blob: config.items_per_blob,
148            buffer_pool: config.buffer_pool.clone(),
149            write_buffer: config.write_buffer,
150        };
151        let journal =
152            init_journal_at_size(context.with_label("mmr_journal"), journal_cfg, mmr_size).await?;
153
154        // Initialize metadata
155        let metadata_cfg = MConfig {
156            partition: config.metadata_partition.clone(),
157            codec_config: ((0..).into(), ()),
158        };
159        let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
160
161        // Store the pruning boundary in metadata
162        let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
163        metadata.put(pruning_boundary_key, mmr_size.to_be_bytes().into());
164
165        // Store the pinned nodes in metadata
166        let nodes_to_pin_positions = Proof::<H::Digest>::nodes_to_pin(mmr_size);
167        for (pos, digest) in nodes_to_pin_positions.zip(pinned_nodes.iter()) {
168            metadata.put(U64::new(NODE_PREFIX, pos), digest.to_vec());
169        }
170
171        // Sync metadata to disk
172        metadata.sync().await.map_err(Error::MetadataError)?;
173
174        // Create in-memory MMR in fully pruned state
175        let mem_mmr = MemMmr::init(MemConfig {
176            nodes: vec![],
177            pruned_to_pos: mmr_size,
178            pinned_nodes,
179            pool: config.thread_pool,
180        });
181
182        Ok(Self {
183            mem_mmr,
184            journal,
185            journal_size: mmr_size,
186            metadata,
187            pruned_to_pos: mmr_size,
188        })
189    }
190
191    /// Initialize a new `Mmr` instance.
192    pub async fn init(context: E, hasher: &mut impl Hasher<H>, cfg: Config) -> Result<Self, Error> {
193        let journal_cfg = JConfig {
194            partition: cfg.journal_partition,
195            items_per_blob: cfg.items_per_blob,
196            buffer_pool: cfg.buffer_pool,
197            write_buffer: cfg.write_buffer,
198        };
199        let mut journal =
200            Journal::<E, H::Digest>::init(context.with_label("mmr_journal"), journal_cfg).await?;
201        let mut journal_size = journal.size().await?;
202
203        let metadata_cfg = MConfig {
204            partition: cfg.metadata_partition,
205            codec_config: ((0..).into(), ()),
206        };
207        let metadata =
208            Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
209                .await?;
210
211        if journal_size == 0 {
212            return Ok(Self {
213                mem_mmr: MemMmr::init(MemConfig {
214                    nodes: vec![],
215                    pruned_to_pos: 0,
216                    pinned_nodes: vec![],
217                    pool: cfg.thread_pool,
218                }),
219                journal,
220                journal_size,
221                metadata,
222                pruned_to_pos: 0,
223            });
224        }
225
226        // Make sure the journal's oldest retained node is as expected based on the last pruning
227        // boundary stored in metadata. If they don't match, prune the journal to the appropriate
228        // location.
229        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
230        let metadata_prune_pos = match metadata.get(&key) {
231            Some(bytes) => u64::from_be_bytes(
232                bytes
233                    .as_slice()
234                    .try_into()
235                    .expect("metadata prune position is not 8 bytes"),
236            ),
237            None => 0,
238        };
239        let oldest_retained_pos = journal.oldest_retained_pos().await?.unwrap_or(0);
240        if metadata_prune_pos != oldest_retained_pos {
241            assert!(metadata_prune_pos >= oldest_retained_pos);
242            // These positions may differ only due to blob boundary alignment, so this case isn't
243            // unusual.
244            journal.prune(metadata_prune_pos).await?;
245            if journal.oldest_retained_pos().await?.unwrap_or(0) != oldest_retained_pos {
246                // This should only happen in the event of some failure during the last attempt to
247                // prune the journal.
248                warn!(
249                    oldest_retained_pos,
250                    metadata_prune_pos, "journal pruned to match metadata"
251                );
252            }
253        }
254
255        let last_valid_size = PeakIterator::to_nearest_size(journal_size);
256        let mut orphaned_leaf: Option<H::Digest> = None;
257        if last_valid_size != journal_size {
258            warn!(
259                last_valid_size,
260                "encountered invalid MMR structure, recovering from last valid size"
261            );
262            // Check if there is an intact leaf following the last valid size, from which we can
263            // recover its missing parents.
264            let recovered_item = journal.read(last_valid_size).await;
265            if let Ok(item) = recovered_item {
266                orphaned_leaf = Some(item);
267            }
268            journal.rewind(last_valid_size).await?;
269            journal.sync().await?;
270            journal_size = last_valid_size
271        }
272
273        // Initialize the mem_mmr in the "prune_all" state.
274        let mut pinned_nodes = Vec::new();
275        for pos in Proof::<H::Digest>::nodes_to_pin(journal_size) {
276            let digest =
277                Mmr::<E, H>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
278            pinned_nodes.push(digest);
279        }
280        let mut mem_mmr = MemMmr::init(MemConfig {
281            nodes: vec![],
282            pruned_to_pos: journal_size,
283            pinned_nodes,
284            pool: cfg.thread_pool,
285        });
286        Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, metadata_prune_pos).await?;
287
288        let mut s = Self {
289            mem_mmr,
290            journal,
291            journal_size,
292            metadata,
293            pruned_to_pos: metadata_prune_pos,
294        };
295
296        if let Some(leaf) = orphaned_leaf {
297            // Recover the orphaned leaf and any missing parents.
298            let pos = s.mem_mmr.size();
299            warn!(pos, "recovering orphaned leaf");
300            s.mem_mmr.add_leaf_digest(hasher, leaf);
301            assert_eq!(pos, journal_size);
302            s.sync(hasher).await?;
303            assert_eq!(s.size(), s.journal.size().await?);
304        }
305
306        Ok(s)
307    }
308
309    /// Adds the pinned nodes based on `prune_pos` to `mem_mmr`.
310    async fn add_extra_pinned_nodes(
311        mem_mmr: &mut MemMmr<H>,
312        metadata: &Metadata<E, U64, Vec<u8>>,
313        journal: &Journal<E, H::Digest>,
314        prune_pos: u64,
315    ) -> Result<(), Error> {
316        let mut pinned_nodes = HashMap::new();
317        for pos in Proof::<H::Digest>::nodes_to_pin(prune_pos) {
318            let digest = Mmr::<E, H>::get_from_metadata_or_journal(metadata, journal, pos).await?;
319            pinned_nodes.insert(pos, digest);
320        }
321        mem_mmr.add_pinned_nodes(pinned_nodes);
322
323        Ok(())
324    }
325
326    /// Initialize an MMR for synchronization, reusing existing data if possible.
327    ///
328    /// Handles three sync scenarios based on existing journal data vs. the given sync boundaries.
329    ///
330    /// 1. **Fresh Start**: existing_size < lower_bound
331    ///    - Deletes existing data (if any)
332    ///    - Creates new [Journal] with pruning boundary and size `lower_bound`
333    ///
334    /// 2. **Prune and Reuse**: lower_bound ≤ existing_size ≤ upper_bound
335    ///    - Sets in-memory MMR size to `existing_size`
336    ///    - Prunes the journal to `lower_bound`
337    ///
338    /// 3. **Prune and Rewind**: existing_size > upper_bound
339    ///    - Rewinds the journal to size `upper_bound+1`
340    ///    - Sets in-memory MMR size to `upper_bound+1`
341    ///    - Prunes the journal to `lower_bound`
342    pub async fn init_sync(context: E, cfg: SyncConfig<H::Digest>) -> Result<Self, Error> {
343        let journal = init_journal(
344            context.with_label("mmr_journal"),
345            JConfig {
346                partition: cfg.config.journal_partition,
347                items_per_blob: cfg.config.items_per_blob,
348                write_buffer: cfg.config.write_buffer,
349                buffer_pool: cfg.config.buffer_pool.clone(),
350            },
351            cfg.lower_bound,
352            cfg.upper_bound,
353        )
354        .await?;
355        let journal_size = journal.size().await?;
356        assert!(journal_size <= cfg.upper_bound + 1);
357
358        // Open the metadata.
359        let metadata_cfg = MConfig {
360            partition: cfg.config.metadata_partition,
361            codec_config: ((0..).into(), ()),
362        };
363        let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
364
365        // Write the pruning boundary.
366        let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
367        metadata.put(pruning_boundary_key, cfg.lower_bound.to_be_bytes().into());
368
369        // Write the required pinned nodes to metadata.
370        if let Some(pinned_nodes) = cfg.pinned_nodes {
371            let nodes_to_pin_persisted = Proof::<H::Digest>::nodes_to_pin(cfg.lower_bound);
372            for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
373                metadata.put(U64::new(NODE_PREFIX, pos), digest.to_vec());
374            }
375        }
376
377        // Create the in-memory MMR with the pinned nodes required for its size.
378        let nodes_to_pin_mem = Proof::<H::Digest>::nodes_to_pin(journal_size);
379        let mut mem_pinned_nodes = Vec::new();
380        for pos in nodes_to_pin_mem {
381            let digest =
382                Mmr::<E, H>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
383            mem_pinned_nodes.push(digest);
384        }
385        let mut mem_mmr = MemMmr::init(MemConfig {
386            nodes: vec![],
387            pruned_to_pos: journal_size,
388            pinned_nodes: mem_pinned_nodes,
389            pool: cfg.config.thread_pool,
390        });
391
392        // Add the additional pinned nodes required for the pruning boundary, if applicable.
393        if cfg.lower_bound < journal_size {
394            Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.lower_bound)
395                .await?;
396        }
397        metadata.sync().await?;
398
399        Ok(Self {
400            mem_mmr,
401            journal,
402            journal_size,
403            metadata,
404            pruned_to_pos: cfg.lower_bound,
405        })
406    }
407
408    /// Return the total number of nodes in the MMR, irrespective of any pruning. The next added
409    /// element's position will have this value.
410    pub fn size(&self) -> u64 {
411        self.mem_mmr.size()
412    }
413
414    /// Return the total number of leaves in the MMR.
415    pub fn leaves(&self) -> u64 {
416        self.mem_mmr.leaves()
417    }
418
419    /// Return the position of the last leaf in this MMR, or None if the MMR is empty.
420    pub fn last_leaf_pos(&self) -> Option<u64> {
421        self.mem_mmr.last_leaf_pos()
422    }
423
424    /// Returns whether there are pending updates.
425    pub fn is_dirty(&self) -> bool {
426        self.mem_mmr.is_dirty()
427    }
428
429    pub async fn get_node(&self, position: u64) -> Result<Option<H::Digest>, Error> {
430        if let Some(node) = self.mem_mmr.get_node(position) {
431            return Ok(Some(node));
432        }
433
434        match self.journal.read(position).await {
435            Ok(item) => Ok(Some(item)),
436            Err(JError::ItemPruned(_)) => Ok(None),
437            Err(e) => Err(Error::JournalError(e)),
438        }
439    }
440
441    /// Attempt to get a node from the metadata, with fallback to journal lookup if it fails.
442    /// Assumes the node should exist in at least one of these sources and returns a `MissingNode`
443    /// error otherwise.
444    async fn get_from_metadata_or_journal(
445        metadata: &Metadata<E, U64, Vec<u8>>,
446        journal: &Journal<E, H::Digest>,
447        pos: u64,
448    ) -> Result<H::Digest, Error> {
449        if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, pos)) {
450            debug!(pos, "read node from metadata");
451            let digest = H::Digest::decode(bytes.as_ref());
452            let Ok(digest) = digest else {
453                error!(
454                    pos,
455                    err = %digest.err().unwrap(),
456                    "could not convert node from metadata bytes to digest"
457                );
458                return Err(Error::MissingNode(pos));
459            };
460            return Ok(digest);
461        }
462
463        // If a node isn't found in the metadata, it might still be in the journal.
464        debug!(pos, "reading node from journal");
465        let node = journal.read(pos).await;
466        match node {
467            Ok(node) => Ok(node),
468            Err(JError::ItemPruned(_)) => {
469                error!(pos, "node is missing from metadata and journal");
470                Err(Error::MissingNode(pos))
471            }
472            Err(e) => Err(Error::JournalError(e)),
473        }
474    }
475
476    /// Add an element to the MMR and return its position in the MMR. Elements added to the MMR
477    /// aren't persisted to disk until `sync` is called.
478    ///
479    /// # Warning
480    ///
481    /// Panics if there are unprocessed updates.
482    pub async fn add(&mut self, h: &mut impl Hasher<H>, element: &[u8]) -> Result<u64, Error> {
483        Ok(self.mem_mmr.add(h, element))
484    }
485
486    /// Add an element to the MMR, delaying the computation of ancestor digests
487    /// until the next `sync`.
488    pub async fn add_batched(
489        &mut self,
490        h: &mut impl Hasher<H>,
491        element: &[u8],
492    ) -> Result<u64, Error> {
493        Ok(self.mem_mmr.add_batched(h, element))
494    }
495
496    /// Pop the given number of elements from the tip of the MMR assuming they exist, and otherwise
497    /// return Empty or ElementPruned errors. The backing journal is synced to disk before
498    /// returning.
499    ///
500    /// # Warning
501    ///
502    /// Panics if there are unprocessed batch updates.
503    pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
504        // See if the elements are still cached in which case we can just pop them from the in-mem
505        // MMR.
506        while leaves_to_pop > 0 {
507            match self.mem_mmr.pop() {
508                Ok(_) => {
509                    leaves_to_pop -= 1;
510                }
511                Err(Error::ElementPruned(_)) => break,
512                Err(Error::Empty) => {
513                    return Err(Error::Empty);
514                }
515                _ => unreachable!(),
516            }
517        }
518        if leaves_to_pop == 0 {
519            return Ok(());
520        }
521
522        let mut new_size = self.size();
523        while leaves_to_pop > 0 {
524            if new_size == 0 {
525                return Err(Error::Empty);
526            }
527            new_size -= 1;
528            if new_size < self.pruned_to_pos {
529                return Err(Error::ElementPruned(new_size));
530            }
531            if PeakIterator::check_validity(new_size) {
532                leaves_to_pop -= 1;
533            }
534        }
535
536        self.journal.rewind(new_size).await?;
537        self.journal.sync().await?;
538        self.journal_size = new_size;
539
540        // Reset the mem_mmr to one of the new_size in the "prune_all" state.
541        let mut pinned_nodes = Vec::new();
542        for pos in Proof::<H::Digest>::nodes_to_pin(new_size) {
543            let digest =
544                Mmr::<E, H>::get_from_metadata_or_journal(&self.metadata, &self.journal, pos)
545                    .await?;
546            pinned_nodes.push(digest);
547        }
548        self.mem_mmr = MemMmr::init(MemConfig {
549            nodes: vec![],
550            pruned_to_pos: new_size,
551            pinned_nodes,
552            pool: self.mem_mmr.thread_pool.take(),
553        });
554        Self::add_extra_pinned_nodes(
555            &mut self.mem_mmr,
556            &self.metadata,
557            &self.journal,
558            self.pruned_to_pos,
559        )
560        .await?;
561
562        Ok(())
563    }
564
565    /// Return the root of the MMR.
566    ///
567    /// # Warning
568    ///
569    /// Panics if there are unprocessed updates.
570    pub fn root(&self, h: &mut impl Hasher<H>) -> H::Digest {
571        self.mem_mmr.root(h)
572    }
573
574    /// Process all batched updates without syncing to disk.
575    pub fn process_updates(&mut self, h: &mut impl Hasher<H>) {
576        self.mem_mmr.sync(h)
577    }
578
579    /// Process all batched updates and sync the MMR to disk. If `pool` is non-null, then it will be
580    /// used to parallelize the sync.
581    pub async fn sync(&mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
582        self.process_updates(h);
583
584        // Write the nodes cached in the memory-resident MMR to the journal.
585        for i in self.journal_size..self.size() {
586            let node = *self.mem_mmr.get_node_unchecked(i);
587            self.journal.append(node).await?;
588        }
589        self.journal_size = self.size();
590        self.journal.sync().await?;
591        assert_eq!(self.journal_size, self.journal.size().await?);
592
593        // Recompute pinned nodes since we'll need to repopulate the cache after it is cleared by
594        // pruning the mem_mmr.
595        let mut pinned_nodes = HashMap::new();
596        for pos in Proof::<H::Digest>::nodes_to_pin(self.pruned_to_pos) {
597            let digest = self.mem_mmr.get_node_unchecked(pos);
598            pinned_nodes.insert(pos, *digest);
599        }
600
601        // Now that the pinned node set has been recomputed, it's safe to prune the mem_mmr and
602        // reinstate them.
603        self.mem_mmr.prune_all();
604        self.mem_mmr.add_pinned_nodes(pinned_nodes);
605
606        Ok(())
607    }
608
609    /// Compute and add required nodes for the given pruning point to the metadata, and write it to
610    /// disk. Return the computed set of required nodes.
611    async fn update_metadata(
612        &mut self,
613        prune_to_pos: u64,
614    ) -> Result<HashMap<u64, H::Digest>, Error> {
615        let mut pinned_nodes = HashMap::new();
616        for pos in Proof::<H::Digest>::nodes_to_pin(prune_to_pos) {
617            let digest = self.get_node(pos).await?.unwrap();
618            self.metadata
619                .put(U64::new(NODE_PREFIX, pos), digest.to_vec());
620            pinned_nodes.insert(pos, digest);
621        }
622
623        let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
624        self.metadata.put(key, prune_to_pos.to_be_bytes().into());
625
626        self.metadata.sync().await.map_err(Error::MetadataError)?;
627
628        Ok(pinned_nodes)
629    }
630
631    /// Return an inclusion proof for the specified element, or ElementPruned error if some element
632    /// needed to generate the proof has been pruned.
633    ///
634    /// # Warning
635    ///
636    /// Panics if there are unprocessed updates.
637    pub async fn proof(&self, element_pos: u64) -> Result<Proof<H::Digest>, Error> {
638        self.range_proof(element_pos, element_pos).await
639    }
640
641    /// Return an inclusion proof for the specified range of elements, inclusive of both endpoints,
642    /// or ElementPruned error if some element needed to generate the proof has been pruned.
643    ///
644    /// # Warning
645    ///
646    /// Panics if there are unprocessed updates.
647    pub async fn range_proof(
648        &self,
649        start_element_pos: u64,
650        end_element_pos: u64,
651    ) -> Result<Proof<H::Digest>, Error> {
652        assert!(!self.mem_mmr.is_dirty());
653        Proof::<H::Digest>::range_proof::<Mmr<E, H>>(self, start_element_pos, end_element_pos).await
654    }
655
656    /// Analogous to range_proof but for a previous database state.
657    /// Specifically, the state when the MMR had `size` elements.
658    pub async fn historical_range_proof(
659        &self,
660        size: u64,
661        start_element_pos: u64,
662        end_element_pos: u64,
663    ) -> Result<Proof<H::Digest>, Error> {
664        assert!(!self.mem_mmr.is_dirty());
665        Proof::<H::Digest>::historical_range_proof::<Mmr<E, H>>(
666            self,
667            size,
668            start_element_pos,
669            end_element_pos,
670        )
671        .await
672    }
673
674    /// Prune as many nodes as possible, leaving behind at most items_per_blob nodes in the current
675    /// blob.
676    pub async fn prune_all(&mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
677        if self.size() != 0 {
678            self.prune_to_pos(h, self.size()).await?;
679            return Ok(());
680        }
681        Ok(())
682    }
683
684    /// Prune all nodes up to but not including the given position and update the pinned nodes.
685    ///
686    /// This implementation ensures that no failure can leave the MMR in an unrecoverable state,
687    /// requiring it sync the MMR to write any potential unprocessed updates.
688    pub async fn prune_to_pos(&mut self, h: &mut impl Hasher<H>, pos: u64) -> Result<(), Error> {
689        assert!(pos <= self.size());
690        if self.size() == 0 {
691            return Ok(());
692        }
693
694        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
695        self.sync(h).await?;
696
697        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
698        // event of a pruning failure.
699        let pinned_nodes = self.update_metadata(pos).await?;
700
701        self.journal.prune(pos).await?;
702        self.mem_mmr.add_pinned_nodes(pinned_nodes);
703        self.pruned_to_pos = pos;
704
705        Ok(())
706    }
707
708    /// The highest position for which this MMR has been pruned, or 0 if this MMR has never been
709    /// pruned.
710    pub fn pruned_to_pos(&self) -> u64 {
711        self.pruned_to_pos
712    }
713
714    /// Return the position of the oldest retained node in the MMR, not including pinned nodes.
715    pub fn oldest_retained_pos(&self) -> Option<u64> {
716        if self.pruned_to_pos == self.size() {
717            return None;
718        }
719
720        Some(self.pruned_to_pos)
721    }
722
723    /// Close the MMR, syncing any cached elements to disk and closing the journal.
724    pub async fn close(mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
725        self.sync(h).await?;
726        self.journal.close().await?;
727        self.metadata.close().await.map_err(Error::MetadataError)
728    }
729
730    /// Close and permanently remove any disk resources.
731    pub async fn destroy(self) -> Result<(), Error> {
732        self.journal.destroy().await?;
733        self.metadata.destroy().await?;
734
735        Ok(())
736    }
737
738    #[cfg(test)]
739    /// Sync elements to disk until `write_limit` elements have been written, then abort to simulate
740    /// a partial write for testing failure scenarios.
741    pub async fn simulate_partial_sync(
742        mut self,
743        hasher: &mut impl Hasher<H>,
744        write_limit: usize,
745    ) -> Result<(), Error> {
746        if write_limit == 0 {
747            return Ok(());
748        }
749
750        // Write the nodes cached in the memory-resident MMR to the journal, aborting after
751        // write_count nodes have been written.
752        let mut written_count = 0usize;
753        self.mem_mmr.sync(hasher);
754        for i in self.journal_size..self.size() {
755            let node = *self.mem_mmr.get_node_unchecked(i);
756            self.journal.append(node).await?;
757            written_count += 1;
758            if written_count >= write_limit {
759                break;
760            }
761        }
762        self.journal.sync().await?;
763
764        Ok(())
765    }
766
767    #[cfg(test)]
768    pub fn get_pinned_nodes(&self) -> HashMap<u64, H::Digest> {
769        self.mem_mmr.pinned_nodes.clone()
770    }
771
772    #[cfg(test)]
773    pub async fn simulate_pruning_failure(
774        mut self,
775        h: &mut impl Hasher<H>,
776        prune_to_pos: u64,
777    ) -> Result<(), Error> {
778        assert!(prune_to_pos <= self.size());
779
780        // Flush items cached in the mem_mmr to disk to ensure the current state is recoverable.
781        self.sync(h).await?;
782
783        // Update metadata to reflect the desired pruning boundary, allowing for recovery in the
784        // event of a pruning failure.
785        self.update_metadata(prune_to_pos).await?;
786
787        // Don't actually prune the journal to simulate failure
788        Ok(())
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::mmr::{
796        hasher::Standard,
797        iterator::leaf_num_to_pos,
798        tests::{
799            build_and_check_test_roots_mmr, build_batched_and_check_test_roots_journaled, ROOTS,
800        },
801    };
802    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
803    use commonware_macros::test_traced;
804    use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
805    use commonware_utils::{hex, NZUsize, NZU64};
806
807    fn test_digest(v: usize) -> Digest {
808        Sha256::hash(&v.to_be_bytes())
809    }
810
811    const PAGE_SIZE: usize = 111;
812    const PAGE_CACHE_SIZE: usize = 5;
813
814    fn test_config() -> Config {
815        Config {
816            journal_partition: "journal_partition".into(),
817            metadata_partition: "metadata_partition".into(),
818            items_per_blob: NZU64!(7),
819            write_buffer: NZUsize!(1024),
820            thread_pool: None,
821            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
822        }
823    }
824
825    /// Test that the MMR root computation remains stable.
826    #[test]
827    fn test_journaled_mmr_root_stability() {
828        let executor = deterministic::Runner::default();
829        executor.start(|context| async move {
830            let mut mmr = Mmr::init(context.clone(), &mut Standard::new(), test_config())
831                .await
832                .unwrap();
833            build_and_check_test_roots_mmr(&mut mmr).await;
834            mmr.destroy().await.unwrap();
835        });
836    }
837
838    /// Test that the MMR root computation remains stable by comparing against previously computed
839    /// roots.
840    #[test]
841    fn test_journaled_mmr_root_stability_batched() {
842        let executor = deterministic::Runner::default();
843        executor.start(|context| async move {
844            let mut std_hasher = Standard::new();
845            let mut mmr = Mmr::init(context.clone(), &mut std_hasher, test_config())
846                .await
847                .unwrap();
848            build_batched_and_check_test_roots_journaled(&mut mmr).await;
849            mmr.destroy().await.unwrap();
850        });
851    }
852
853    #[test_traced]
854    fn test_journaled_mmr_empty() {
855        let executor = deterministic::Runner::default();
856        executor.start(|context| async move {
857            let mut hasher: Standard<Sha256> = Standard::new();
858            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
859                .await
860                .unwrap();
861            assert_eq!(mmr.size(), 0);
862            assert!(mmr.get_node(0).await.is_err());
863            assert_eq!(mmr.oldest_retained_pos(), None);
864            assert!(mmr.prune_all(&mut hasher).await.is_ok());
865            assert_eq!(mmr.pruned_to_pos(), 0);
866            assert!(mmr.prune_to_pos(&mut hasher, 0).await.is_ok());
867            assert!(mmr.sync(&mut hasher).await.is_ok());
868            assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
869
870            mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
871            assert_eq!(mmr.size(), 1);
872            mmr.sync(&mut hasher).await.unwrap();
873            assert!(mmr.get_node(0).await.is_ok());
874            assert!(mmr.pop(1).await.is_ok());
875            assert_eq!(mmr.size(), 0);
876            mmr.sync(&mut hasher).await.unwrap();
877
878            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
879                .await
880                .unwrap();
881            assert_eq!(mmr.size(), 0);
882
883            mmr.destroy().await.unwrap();
884        });
885    }
886
887    #[test_traced]
888    fn test_journaled_mmr_pop() {
889        let executor = deterministic::Runner::default();
890        executor.start(|context| async move {
891            let mut hasher: Standard<Sha256> = Standard::new();
892            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
893                .await
894                .unwrap();
895
896            let mut c_hasher = Sha256::new();
897            for i in 0u64..199 {
898                c_hasher.update(&i.to_be_bytes());
899                let element = c_hasher.finalize();
900                mmr.add(&mut hasher, &element).await.unwrap();
901            }
902            assert_eq!(ROOTS[199], hex(&mmr.root(&mut hasher)));
903
904            // Pop off one node at a time without syncing until empty, confirming the root is still
905            // is as expected.
906            for i in (0..199u64).rev() {
907                assert!(mmr.pop(1).await.is_ok());
908                let root = mmr.root(&mut hasher);
909                let expected_root = ROOTS[i as usize];
910                assert_eq!(hex(&root), expected_root);
911            }
912            assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
913            assert!(mmr.pop(0).await.is_ok());
914
915            // Repeat the test though sync part of the way to tip to test crossing the boundary from
916            // cached to uncached leaves, and pop 2 at a time instead of just 1.
917            for i in 0u64..199 {
918                c_hasher.update(&i.to_be_bytes());
919                let element = c_hasher.finalize();
920                mmr.add(&mut hasher, &element).await.unwrap();
921                if i == 101 {
922                    mmr.sync(&mut hasher).await.unwrap();
923                }
924            }
925            for i in (0..198u64).rev().step_by(2) {
926                assert!(mmr.pop(2).await.is_ok());
927                let root = mmr.root(&mut hasher);
928                let expected_root = ROOTS[i as usize];
929                assert_eq!(hex(&root), expected_root);
930            }
931            assert_eq!(mmr.size(), 1);
932            assert!(mmr.pop(1).await.is_ok()); // pop the last element
933            assert!(matches!(mmr.pop(99).await, Err(Error::Empty)));
934
935            // Repeat one more time only after pruning the MMR first.
936            for i in 0u64..199 {
937                c_hasher.update(&i.to_be_bytes());
938                let element = c_hasher.finalize();
939                mmr.add(&mut hasher, &element).await.unwrap();
940                if i == 101 {
941                    mmr.sync(&mut hasher).await.unwrap();
942                }
943            }
944            let leaf_pos = leaf_num_to_pos(50);
945            mmr.prune_to_pos(&mut hasher, leaf_pos).await.unwrap();
946            // Pop enough nodes to cause the mem-mmr to be completely emptied, and then some.
947            mmr.pop(80).await.unwrap();
948            // Make sure the pinned node boundary is valid by generating a proof for the oldest item.
949            mmr.proof(leaf_pos).await.unwrap();
950            // prune all remaining leaves 1 at a time.
951            while mmr.size() > leaf_pos {
952                assert!(mmr.pop(1).await.is_ok());
953            }
954            assert!(matches!(mmr.pop(1).await, Err(Error::ElementPruned(_))));
955
956            mmr.destroy().await.unwrap();
957        });
958    }
959
960    #[test_traced]
961    fn test_journaled_mmr_basic() {
962        let executor = deterministic::Runner::default();
963        executor.start(|context| async move {
964            let mut hasher: Standard<Sha256> = Standard::new();
965            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
966                .await
967                .unwrap();
968            // Build a test MMR with 255 leaves
969            const LEAF_COUNT: usize = 255;
970            let mut leaves = Vec::with_capacity(LEAF_COUNT);
971            let mut positions = Vec::with_capacity(LEAF_COUNT);
972            for i in 0..LEAF_COUNT {
973                let digest = test_digest(i);
974                leaves.push(digest);
975                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
976                positions.push(pos);
977            }
978            assert_eq!(mmr.size(), 502);
979            assert_eq!(mmr.journal_size, 0);
980
981            // Generate & verify proof from element that is not yet flushed to the journal.
982            const TEST_ELEMENT: usize = 133;
983            let test_element_pos = positions[TEST_ELEMENT];
984
985            let proof = mmr.proof(test_element_pos).await.unwrap();
986            let root = mmr.root(&mut hasher);
987            assert!(proof.verify_element_inclusion(
988                &mut hasher,
989                &leaves[TEST_ELEMENT],
990                test_element_pos,
991                &root,
992            ));
993
994            // Sync the MMR, make sure it flushes the in-mem MMR as expected.
995            mmr.sync(&mut hasher).await.unwrap();
996            assert_eq!(mmr.journal_size, 502);
997            assert_eq!(mmr.mem_mmr.oldest_retained_pos(), None);
998
999            // Now that the element is flushed from the in-mem MMR, confirm its proof is still is
1000            // generated correctly.
1001            let proof2 = mmr.proof(test_element_pos).await.unwrap();
1002            assert_eq!(proof, proof2);
1003
1004            // Generate & verify a proof that spans flushed elements and the last element.
1005            let last_element = LEAF_COUNT - 1;
1006            let last_element_pos = positions[last_element];
1007            let proof = mmr
1008                .range_proof(test_element_pos, last_element_pos)
1009                .await
1010                .unwrap();
1011            assert!(proof.verify_range_inclusion(
1012                &mut hasher,
1013                &leaves[TEST_ELEMENT..last_element + 1],
1014                test_element_pos,
1015                &root
1016            ));
1017
1018            mmr.destroy().await.unwrap();
1019        });
1020    }
1021
1022    #[test_traced]
1023    /// Generates a stateful MMR, simulates various partial-write scenarios, and confirms we
1024    /// appropriately recover to a valid state.
1025    fn test_journaled_mmr_recovery() {
1026        let executor = deterministic::Runner::default();
1027        executor.start(|context| async move {
1028            let mut hasher: Standard<Sha256> = Standard::new();
1029            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1030                .await
1031                .unwrap();
1032            assert_eq!(mmr.size(), 0);
1033
1034            // Build a test MMR with 252 leaves
1035            const LEAF_COUNT: usize = 252;
1036            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1037            let mut positions = Vec::with_capacity(LEAF_COUNT);
1038            for i in 0..LEAF_COUNT {
1039                let digest = test_digest(i);
1040                leaves.push(digest);
1041                let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1042                positions.push(pos);
1043            }
1044            assert_eq!(mmr.size(), 498);
1045            let root = mmr.root(&mut hasher);
1046            mmr.close(&mut hasher).await.unwrap();
1047
1048            // The very last element we added (pos=495) resulted in new parents at positions 496 &
1049            // 497. Simulate a partial write by corrupting the last parent's checksum by truncating
1050            // the last blob by a single byte.
1051            let partition: String = "journal_partition".into();
1052            let (blob, len) = context
1053                .open(&partition, &71u64.to_be_bytes())
1054                .await
1055                .expect("Failed to open blob");
1056            assert_eq!(len, 36); // N+4 = 36 bytes per node, 1 node in the last blob
1057
1058            // truncate the blob by one byte to corrupt the checksum of the last parent node.
1059            blob.resize(len - 1).await.expect("Failed to corrupt blob");
1060            blob.sync().await.expect("Failed to sync blob");
1061
1062            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1063                .await
1064                .unwrap();
1065            // Since we didn't corrupt the leaf, the MMR is able to replay the leaf and recover to
1066            // the previous state.
1067            assert_eq!(mmr.size(), 498);
1068            assert_eq!(mmr.root(&mut hasher), root);
1069
1070            // Make sure closing it and re-opening it persists the recovered state.
1071            mmr.close(&mut hasher).await.unwrap();
1072            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1073                .await
1074                .unwrap();
1075            assert_eq!(mmr.size(), 498);
1076            mmr.close(&mut hasher).await.unwrap();
1077
1078            // Repeat partial write test though this time truncate the leaf itself not just some
1079            // parent. The leaf is in the *previous* blob so we'll have to delete the most recent
1080            // blob, then appropriately truncate the previous one.
1081            context
1082                .remove(&partition, Some(&71u64.to_be_bytes()))
1083                .await
1084                .expect("Failed to remove blob");
1085            let (blob, len) = context
1086                .open(&partition, &70u64.to_be_bytes())
1087                .await
1088                .expect("Failed to open blob");
1089            assert_eq!(len, 36 * 7); // this blob should be full.
1090
1091            // The last leaf should be in slot 5 of this blob, truncate last byte of its checksum.
1092            blob.resize(36 * 5 + 35)
1093                .await
1094                .expect("Failed to corrupt blob");
1095            blob.sync().await.expect("Failed to sync blob");
1096
1097            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1098                .await
1099                .unwrap();
1100            // Since the leaf was corrupted, it should not have been recovered, and the journal's
1101            // size will be the last-valid size.
1102            assert_eq!(mmr.size(), 495);
1103
1104            mmr.destroy().await.unwrap();
1105        });
1106    }
1107
1108    #[test_traced]
1109    fn test_journaled_mmr_pruning() {
1110        let executor = deterministic::Runner::default();
1111        executor.start(|context| async move {
1112            let mut hasher: Standard<Sha256> = Standard::new();
1113            // make sure pruning doesn't break root computation, adding of new nodes, etc.
1114            const LEAF_COUNT: usize = 2000;
1115            let cfg_pruned = test_config();
1116            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1117                .await
1118                .unwrap();
1119            let cfg_unpruned = Config {
1120                journal_partition: "unpruned_journal_partition".into(),
1121                metadata_partition: "unpruned_metadata_partition".into(),
1122                items_per_blob: NZU64!(7),
1123                write_buffer: NZUsize!(1024),
1124                thread_pool: None,
1125                buffer_pool: cfg_pruned.buffer_pool.clone(),
1126            };
1127            let mut mmr = Mmr::init(context.clone(), &mut hasher, cfg_unpruned)
1128                .await
1129                .unwrap();
1130            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1131            let mut positions = Vec::with_capacity(LEAF_COUNT);
1132            for i in 0..LEAF_COUNT {
1133                let digest = test_digest(i);
1134                leaves.push(digest);
1135                let last_leaf = leaves.last().unwrap();
1136                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1137                positions.push(pos);
1138                pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1139            }
1140            assert_eq!(mmr.size(), 3994);
1141            assert_eq!(pruned_mmr.size(), 3994);
1142
1143            // Prune the MMR in increments of 10 making sure the journal is still able to compute
1144            // roots and accept new elements.
1145            for i in 0usize..300 {
1146                let prune_pos = i as u64 * 10;
1147                pruned_mmr
1148                    .prune_to_pos(&mut hasher, prune_pos)
1149                    .await
1150                    .unwrap();
1151                assert_eq!(prune_pos, pruned_mmr.pruned_to_pos());
1152
1153                let digest = test_digest(LEAF_COUNT + i);
1154                leaves.push(digest);
1155                let last_leaf = leaves.last().unwrap();
1156                let pos = pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1157                positions.push(pos);
1158                mmr.add(&mut hasher, last_leaf).await.unwrap();
1159                assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1160            }
1161
1162            // Sync the MMRs.
1163            pruned_mmr.sync(&mut hasher).await.unwrap();
1164            assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1165
1166            // Close the MMR & reopen.
1167            pruned_mmr.close(&mut hasher).await.unwrap();
1168            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1169                .await
1170                .unwrap();
1171            assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1172
1173            // Prune everything.
1174            let size = pruned_mmr.size();
1175            pruned_mmr.prune_all(&mut hasher).await.unwrap();
1176            assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1177            assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1178            assert_eq!(pruned_mmr.pruned_to_pos(), size);
1179
1180            // Close MMR after adding a new node without syncing and make sure state is as expected
1181            // on reopening.
1182            mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1183                .await
1184                .unwrap();
1185            pruned_mmr
1186                .add(&mut hasher, &test_digest(LEAF_COUNT))
1187                .await
1188                .unwrap();
1189            assert!(pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1190            pruned_mmr.close(&mut hasher).await.unwrap();
1191            let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1192                .await
1193                .unwrap();
1194            assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1195            assert_eq!(pruned_mmr.oldest_retained_pos(), Some(size));
1196            assert_eq!(pruned_mmr.pruned_to_pos(), size);
1197
1198            // Add nodes until we are on a blob boundary, and confirm prune_all still removes all
1199            // retained nodes.
1200            while pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1201                pruned_mmr
1202                    .add(&mut hasher, &test_digest(LEAF_COUNT))
1203                    .await
1204                    .unwrap();
1205            }
1206            pruned_mmr.prune_all(&mut hasher).await.unwrap();
1207            assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1208
1209            pruned_mmr.destroy().await.unwrap();
1210            mmr.destroy().await.unwrap();
1211        });
1212    }
1213
1214    #[test_traced("WARN")]
1215    /// Simulate partial writes after pruning, making sure we recover to a valid state.
1216    fn test_journaled_mmr_recovery_with_pruning() {
1217        let executor = deterministic::Runner::default();
1218        executor.start(|context| async move {
1219            // Build MMR with 2000 leaves.
1220            let mut hasher: Standard<Sha256> = Standard::new();
1221            const LEAF_COUNT: usize = 2000;
1222            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1223                .await
1224                .unwrap();
1225            let mut leaves = Vec::with_capacity(LEAF_COUNT);
1226            let mut positions = Vec::with_capacity(LEAF_COUNT);
1227            for i in 0..LEAF_COUNT {
1228                let digest = test_digest(i);
1229                leaves.push(digest);
1230                let last_leaf = leaves.last().unwrap();
1231                let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1232                positions.push(pos);
1233            }
1234            assert_eq!(mmr.size(), 3994);
1235            mmr.close(&mut hasher).await.unwrap();
1236
1237            // Prune the MMR in increments of 50, simulating a partial write after each prune.
1238            for i in 0usize..200 {
1239                let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1240                    .await
1241                    .unwrap();
1242                let start_size = mmr.size();
1243                let prune_pos = std::cmp::min(i as u64 * 50, start_size);
1244                if i % 5 == 0 {
1245                    mmr.simulate_pruning_failure(&mut hasher, prune_pos)
1246                        .await
1247                        .unwrap();
1248                    continue;
1249                }
1250                mmr.prune_to_pos(&mut hasher, prune_pos).await.unwrap();
1251
1252                // add 25 new elements, simulating a partial write after each.
1253                for j in 0..10 {
1254                    let digest = test_digest(100 * (i + 1) + j);
1255                    leaves.push(digest);
1256                    let last_leaf = leaves.last().unwrap();
1257                    let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1258                    positions.push(pos);
1259                    mmr.add(&mut hasher, last_leaf).await.unwrap();
1260                    assert_eq!(mmr.root(&mut hasher), mmr.root(&mut hasher));
1261                    let digest = test_digest(LEAF_COUNT + i);
1262                    leaves.push(digest);
1263                    let last_leaf = leaves.last().unwrap();
1264                    let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1265                    positions.push(pos);
1266                    mmr.add(&mut hasher, last_leaf).await.unwrap();
1267                }
1268                let end_size = mmr.size();
1269                let total_to_write = (end_size - start_size) as usize;
1270                let partial_write_limit = i % total_to_write;
1271                mmr.simulate_partial_sync(&mut hasher, partial_write_limit)
1272                    .await
1273                    .unwrap();
1274            }
1275
1276            let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1277                .await
1278                .unwrap();
1279            mmr.destroy().await.unwrap();
1280        });
1281    }
1282
1283    #[test_traced]
1284    fn test_journaled_mmr_historical_range_proof_basic() {
1285        let executor = deterministic::Runner::default();
1286        executor.start(|context| async move {
1287            // Create MMR with 10 elements
1288            let mut hasher = Standard::<Sha256>::new();
1289            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1290                .await
1291                .unwrap();
1292            let mut elements = Vec::new();
1293            let mut positions = Vec::new();
1294            for i in 0..10 {
1295                elements.push(test_digest(i));
1296                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1297            }
1298            let original_size = mmr.size();
1299
1300            // Historical proof should match "regular" proof when historical size == current database size
1301            let historical_proof = mmr
1302                .historical_range_proof(original_size, positions[2], positions[5])
1303                .await
1304                .unwrap();
1305            assert_eq!(historical_proof.size, original_size);
1306            let root = mmr.root(&mut hasher);
1307            assert!(historical_proof.verify_range_inclusion(
1308                &mut hasher,
1309                &elements[2..=5],
1310                positions[2],
1311                &root
1312            ));
1313            let regular_proof = mmr.range_proof(positions[2], positions[5]).await.unwrap();
1314            assert_eq!(regular_proof.size, historical_proof.size);
1315            assert_eq!(regular_proof.digests, historical_proof.digests);
1316
1317            // Add more elements to the MMR
1318            for i in 10..20 {
1319                elements.push(test_digest(i));
1320                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1321            }
1322            let new_historical_proof = mmr
1323                .historical_range_proof(original_size, positions[2], positions[5])
1324                .await
1325                .unwrap();
1326            assert_eq!(new_historical_proof.size, historical_proof.size);
1327            assert_eq!(new_historical_proof.digests, historical_proof.digests);
1328
1329            mmr.destroy().await.unwrap();
1330        });
1331    }
1332
1333    #[test_traced]
1334    fn test_journaled_mmr_historical_range_proof_with_pruning() {
1335        let executor = deterministic::Runner::default();
1336        executor.start(|context| async move {
1337            let mut hasher = Standard::<Sha256>::new();
1338            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1339                .await
1340                .unwrap();
1341
1342            // Add many elements
1343            let mut elements = Vec::new();
1344            let mut positions = Vec::new();
1345            for i in 0..50 {
1346                elements.push(test_digest(i));
1347                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1348            }
1349
1350            // Prune to position 30
1351            let prune_pos = 30;
1352            mmr.prune_to_pos(&mut hasher, prune_pos).await.unwrap();
1353
1354            // Create reference MMR for verification to get correct size
1355            let mut ref_mmr = Mmr::init(
1356                context.clone(),
1357                &mut hasher,
1358                Config {
1359                    journal_partition: "ref_journal_pruned".into(),
1360                    metadata_partition: "ref_metadata_pruned".into(),
1361                    items_per_blob: NZU64!(7),
1362                    write_buffer: NZUsize!(1024),
1363                    thread_pool: None,
1364                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1365                },
1366            )
1367            .await
1368            .unwrap();
1369
1370            for elt in elements.iter().take(41) {
1371                ref_mmr.add(&mut hasher, elt).await.unwrap();
1372            }
1373            let historical_size = ref_mmr.size();
1374            let historical_root = ref_mmr.root(&mut hasher);
1375
1376            // Test proof at historical position after pruning
1377            let historical_proof = mmr
1378                .historical_range_proof(
1379                    historical_size,
1380                    positions[35], // Start after prune point
1381                    positions[38], // End before historical size
1382                )
1383                .await
1384                .unwrap();
1385
1386            assert_eq!(historical_proof.size, historical_size);
1387
1388            // Verify proof works despite pruning
1389            assert!(historical_proof.verify_range_inclusion(
1390                &mut hasher,
1391                &elements[35..=38],
1392                positions[35],
1393                &historical_root
1394            ));
1395
1396            ref_mmr.destroy().await.unwrap();
1397            mmr.destroy().await.unwrap();
1398        });
1399    }
1400
1401    #[test_traced]
1402    fn test_journaled_mmr_historical_range_proof_large() {
1403        let executor = deterministic::Runner::default();
1404        executor.start(|context| async move {
1405            let mut hasher = Standard::<Sha256>::new();
1406
1407            let mut mmr = Mmr::init(
1408                context.clone(),
1409                &mut hasher,
1410                Config {
1411                    journal_partition: "server_journal".into(),
1412                    metadata_partition: "server_metadata".into(),
1413                    items_per_blob: NZU64!(7),
1414                    write_buffer: NZUsize!(1024),
1415                    thread_pool: None,
1416                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1417                },
1418            )
1419            .await
1420            .unwrap();
1421
1422            let mut elements = Vec::new();
1423            let mut positions = Vec::new();
1424            for i in 0..100 {
1425                elements.push(test_digest(i));
1426                positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1427            }
1428
1429            let start_pos = 30;
1430            let end_pos = 60;
1431
1432            // Only apply elements up to end_pos to the reference MMR.
1433            let mut ref_mmr = Mmr::init(
1434                context.clone(),
1435                &mut hasher,
1436                Config {
1437                    journal_partition: "client_journal".into(),
1438                    metadata_partition: "client_metadata".into(),
1439                    items_per_blob: NZU64!(7),
1440                    write_buffer: NZUsize!(1024),
1441                    thread_pool: None,
1442                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1443                },
1444            )
1445            .await
1446            .unwrap();
1447
1448            // Add elements up to end_pos to verify historical root
1449            for elt in elements.iter().take(end_pos + 1) {
1450                ref_mmr.add(&mut hasher, elt).await.unwrap();
1451            }
1452            let historical_size = ref_mmr.size();
1453            let expected_root = ref_mmr.root(&mut hasher);
1454
1455            // Generate proof from full MMR
1456            let proof = mmr
1457                .historical_range_proof(historical_size, positions[start_pos], positions[end_pos])
1458                .await
1459                .unwrap();
1460
1461            assert!(proof.verify_range_inclusion(
1462                &mut hasher,
1463                &elements[start_pos..=end_pos],
1464                positions[start_pos],
1465                &expected_root // Compare to historical (reference) root
1466            ));
1467
1468            ref_mmr.destroy().await.unwrap();
1469            mmr.destroy().await.unwrap();
1470        });
1471    }
1472
1473    #[test_traced]
1474    fn test_journaled_mmr_historical_range_proof_singleton() {
1475        let executor = deterministic::Runner::default();
1476        executor.start(|context| async move {
1477            let mut hasher = Standard::<Sha256>::new();
1478            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1479                .await
1480                .unwrap();
1481
1482            let element = test_digest(0);
1483            let position = mmr.add(&mut hasher, &element).await.unwrap();
1484
1485            // Test single element proof at historical position
1486            let single_proof = mmr
1487                .historical_range_proof(
1488                    position + 1, // Historical size after first element
1489                    position,
1490                    position,
1491                )
1492                .await
1493                .unwrap();
1494
1495            let root = mmr.root(&mut hasher);
1496            assert!(single_proof.verify_range_inclusion(&mut hasher, &[element], position, &root));
1497
1498            mmr.destroy().await.unwrap();
1499        });
1500    }
1501
1502    #[test_traced]
1503    fn test_journaled_mmr_init_from_pinned_nodes() {
1504        let executor = deterministic::Runner::default();
1505        executor.start(|context| async move {
1506            let mut hasher = Standard::<Sha256>::new();
1507
1508            // Create an in-memory MMR with some elements
1509            let mut original_mmr = Mmr::init(
1510                context.clone(),
1511                &mut hasher,
1512                Config {
1513                    journal_partition: "original_journal".into(),
1514                    metadata_partition: "original_metadata".into(),
1515                    items_per_blob: NZU64!(7),
1516                    write_buffer: NZUsize!(1024),
1517                    thread_pool: None,
1518                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1519                },
1520            )
1521            .await
1522            .unwrap();
1523
1524            // Add some elements and prune to the size of the MMR
1525            for i in 0..1_000 {
1526                original_mmr
1527                    .add(&mut hasher, &test_digest(i))
1528                    .await
1529                    .unwrap();
1530            }
1531            original_mmr.sync(&mut hasher).await.unwrap();
1532            let original_size = original_mmr.size();
1533            original_mmr
1534                .prune_to_pos(&mut hasher, original_size)
1535                .await
1536                .unwrap();
1537
1538            // Get the journal digest
1539            let mut hasher = Standard::<Sha256>::new();
1540            let original_journal_digest = original_mmr.root(&mut hasher);
1541
1542            // Get the pinned nodes
1543            let pinned_nodes_map = original_mmr.get_pinned_nodes();
1544            let pinned_nodes: Vec<_> = Proof::<Digest>::nodes_to_pin(original_size)
1545                .map(|pos| pinned_nodes_map[&pos])
1546                .collect();
1547
1548            // Create a journaled MMR from the pinned nodes
1549            let new_mmr_config = Config {
1550                journal_partition: "new_journal".into(),
1551                metadata_partition: "new_metadata".into(),
1552                items_per_blob: NZU64!(7),
1553                write_buffer: NZUsize!(1024),
1554                thread_pool: None,
1555                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1556            };
1557            let mut new_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1558                context.clone(),
1559                pinned_nodes,
1560                original_size,
1561                new_mmr_config.clone(),
1562            )
1563            .await
1564            .unwrap();
1565
1566            // Verify the journaled MMR has the same properties as the original MMR
1567            assert_eq!(new_mmr.size(), original_size);
1568            assert_eq!(new_mmr.pruned_to_pos(), original_size);
1569            assert_eq!(new_mmr.oldest_retained_pos(), None);
1570            // Verify the roots match
1571            let new_journal_digest = new_mmr.root(&mut hasher);
1572            assert_eq!(new_journal_digest, original_journal_digest);
1573
1574            // Insert a new element into the new journaled MMR and the original MMR
1575            let new_element = test_digest(10);
1576
1577            let original_mmr_pos = original_mmr.add(&mut hasher, &new_element).await.unwrap();
1578            assert_eq!(original_mmr_pos, original_size);
1579
1580            let new_mmr_pos = new_mmr.add(&mut hasher, &new_element).await.unwrap();
1581            assert_eq!(new_mmr_pos, original_size); // New element is added at the end
1582
1583            // Verify the roots still match
1584            let original_mmr_root = original_mmr.root(&mut hasher);
1585            let new_mmr_root = new_mmr.root(&mut hasher);
1586            assert_eq!(new_mmr_root, original_mmr_root);
1587
1588            // Close and re-open the journaled MMR
1589            new_mmr.close(&mut hasher).await.unwrap();
1590            let new_mmr = Mmr::<_, Sha256>::init(context.clone(), &mut hasher, new_mmr_config)
1591                .await
1592                .unwrap();
1593
1594            // Root should be unchanged
1595            let new_mmr_root = new_mmr.root(&mut hasher);
1596            assert_eq!(new_mmr_root, original_mmr_root);
1597
1598            // Size and other metadata should be unchanged
1599            assert_eq!(new_mmr.size(), original_size + 1); // +1 for element we just added
1600            assert_eq!(new_mmr.pruned_to_pos(), original_size);
1601            assert_eq!(new_mmr.oldest_retained_pos(), Some(original_size)); // Element we just added is the oldest retained
1602
1603            // Proofs generated from the journaled MMR should be the same as the proofs generated from the original MMR
1604            let proof = new_mmr.proof(original_size).await.unwrap();
1605            let original_proof = original_mmr.proof(original_size).await.unwrap();
1606            assert_eq!(proof.digests, original_proof.digests);
1607            assert_eq!(proof.size, original_proof.size);
1608
1609            original_mmr.destroy().await.unwrap();
1610            new_mmr.destroy().await.unwrap();
1611        });
1612    }
1613
1614    #[test_traced]
1615    fn test_journaled_mmr_init_from_pinned_nodes_edge_cases() {
1616        let executor = deterministic::Runner::default();
1617        executor.start(|context| async move {
1618            let mut hasher = Standard::<Sha256>::new();
1619
1620            // === TEST 1: Empty MMR (size 0) ===
1621            let mut empty_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1622                context.clone(),
1623                vec![], // No pinned nodes
1624                0,      // Size 0
1625                Config {
1626                    journal_partition: "empty_journal".into(),
1627                    metadata_partition: "empty_metadata".into(),
1628                    items_per_blob: NZU64!(7),
1629                    write_buffer: NZUsize!(1024),
1630                    thread_pool: None,
1631                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1632                },
1633            )
1634            .await
1635            .unwrap();
1636
1637            assert_eq!(empty_mmr.size(), 0);
1638            assert_eq!(empty_mmr.pruned_to_pos(), 0);
1639            assert_eq!(empty_mmr.oldest_retained_pos(), None);
1640
1641            // Should be able to add first element at position 0
1642            let pos = empty_mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1643            assert_eq!(pos, 0);
1644            assert_eq!(empty_mmr.size(), 1);
1645
1646            empty_mmr.destroy().await.unwrap();
1647
1648            // === TEST 2: Single element MMR ===
1649            let mut single_mem_mmr = MemMmr::new();
1650            single_mem_mmr.add(&mut hasher, &test_digest(42));
1651            let single_size = single_mem_mmr.size();
1652            let single_root = single_mem_mmr.root(&mut hasher);
1653            let single_pinned = single_mem_mmr.node_digests_to_pin(single_size);
1654
1655            let single_journaled_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1656                context.clone(),
1657                single_pinned,
1658                single_size,
1659                Config {
1660                    journal_partition: "single_journal".into(),
1661                    metadata_partition: "single_metadata".into(),
1662                    items_per_blob: NZU64!(7),
1663                    write_buffer: NZUsize!(1024),
1664                    thread_pool: None,
1665                    buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1666                },
1667            )
1668            .await
1669            .unwrap();
1670
1671            assert_eq!(single_journaled_mmr.size(), single_size);
1672            assert_eq!(single_journaled_mmr.root(&mut hasher), single_root);
1673
1674            single_journaled_mmr.destroy().await.unwrap();
1675        });
1676    }
1677    // Test `init_sync` when there is no persisted data.
1678    #[test_traced]
1679    fn test_journaled_mmr_init_sync_empty() {
1680        let executor = deterministic::Runner::default();
1681        executor.start(|context| async move {
1682            let mut hasher = Standard::<Sha256>::new();
1683
1684            // Test fresh start scenario with completely new MMR (no existing data)
1685            let sync_cfg = SyncConfig::<Digest> {
1686                config: test_config(),
1687                lower_bound: 0,
1688                upper_bound: 100,
1689                pinned_nodes: None,
1690            };
1691
1692            let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1693                .await
1694                .unwrap();
1695
1696            // Should be fresh MMR starting empty
1697            assert_eq!(sync_mmr.size(), 0);
1698            assert_eq!(sync_mmr.pruned_to_pos(), 0);
1699            assert_eq!(sync_mmr.oldest_retained_pos(), None);
1700
1701            // Should be able to add new elements
1702            let mut sync_mmr = sync_mmr;
1703            let new_element = test_digest(999);
1704            sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1705
1706            // Root should be computable
1707            let _root = sync_mmr.root(&mut hasher);
1708
1709            sync_mmr.destroy().await.unwrap();
1710        });
1711    }
1712
1713    // Test `init_sync` where the persisted MMR's persisted nodes match the sync boundaries.
1714    #[test_traced]
1715    fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1716        let executor = deterministic::Runner::default();
1717        executor.start(|context| async move {
1718            let mut hasher = Standard::<Sha256>::new();
1719
1720            // Create initial MMR with elements.
1721            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1722                .await
1723                .unwrap();
1724            for i in 0..50 {
1725                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1726            }
1727            mmr.sync(&mut hasher).await.unwrap();
1728            let original_size = mmr.size();
1729            let original_root = mmr.root(&mut hasher);
1730
1731            // Sync with lower_bound ≤ existing_size ≤ upper_bound should reuse data
1732            let lower_bound = mmr.pruned_to_pos();
1733            let upper_bound = mmr.size() - 1;
1734            let mut expected_nodes = HashMap::new();
1735            for i in lower_bound..=upper_bound {
1736                expected_nodes.insert(i, mmr.get_node(i).await.unwrap().unwrap());
1737            }
1738            let sync_cfg = SyncConfig::<Digest> {
1739                config: test_config(),
1740                lower_bound,
1741                upper_bound,
1742                pinned_nodes: None,
1743            };
1744
1745            mmr.close(&mut hasher).await.unwrap();
1746
1747            let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1748                .await
1749                .unwrap();
1750
1751            // Should have existing data in the sync range.
1752            assert_eq!(sync_mmr.size(), original_size);
1753            assert_eq!(sync_mmr.pruned_to_pos(), lower_bound);
1754            assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound));
1755            assert_eq!(sync_mmr.root(&mut hasher), original_root);
1756            for i in lower_bound..=upper_bound {
1757                assert_eq!(
1758                    sync_mmr.get_node(i).await.unwrap(),
1759                    expected_nodes.get(&i).cloned()
1760                );
1761            }
1762
1763            sync_mmr.destroy().await.unwrap();
1764        });
1765    }
1766
1767    // Test `init_sync` where the persisted MMR's data partially overlaps with the sync boundaries.
1768    #[test_traced]
1769    fn test_journaled_mmr_init_sync_partial_overlap() {
1770        let executor = deterministic::Runner::default();
1771        executor.start(|context| async move {
1772            let mut hasher = Standard::<Sha256>::new();
1773
1774            // Create initial MMR with elements.
1775            let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1776                .await
1777                .unwrap();
1778            for i in 0..30 {
1779                mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1780            }
1781            mmr.sync(&mut hasher).await.unwrap();
1782            mmr.prune_to_pos(&mut hasher, 10).await.unwrap();
1783
1784            let original_size = mmr.size();
1785            let original_root = mmr.root(&mut hasher);
1786            let original_pruned_to = mmr.pruned_to_pos();
1787
1788            // Sync with boundaries that extend beyond existing data (partial overlap).
1789            let lower_bound = original_pruned_to;
1790            let upper_bound = original_size + 10; // Extend beyond existing data
1791
1792            let mut expected_nodes = HashMap::new();
1793            for i in lower_bound..original_size {
1794                expected_nodes.insert(i, mmr.get_node(i).await.unwrap().unwrap());
1795            }
1796
1797            let sync_cfg = SyncConfig::<Digest> {
1798                config: test_config(),
1799                lower_bound,
1800                upper_bound,
1801                pinned_nodes: None,
1802            };
1803
1804            mmr.close(&mut hasher).await.unwrap();
1805
1806            let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1807                .await
1808                .unwrap();
1809
1810            // Should have existing data in the overlapping range.
1811            assert_eq!(sync_mmr.size(), original_size);
1812            assert_eq!(sync_mmr.pruned_to_pos(), lower_bound);
1813            assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound));
1814            assert_eq!(sync_mmr.root(&mut hasher), original_root);
1815
1816            // Check that existing nodes are preserved in the overlapping range.
1817            for i in lower_bound..original_size {
1818                assert_eq!(
1819                    sync_mmr.get_node(i).await.unwrap(),
1820                    expected_nodes.get(&i).cloned()
1821                );
1822            }
1823
1824            sync_mmr.destroy().await.unwrap();
1825        });
1826    }
1827}