dig-coinstore 0.1.0

DIG L2 global coin state database — persistent UTXO store with Merkle proofs, hint indexing, and rollback support
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
//! Sparse Merkle tree for state root computation.
//!
//! Maintains a 256-level sparse Merkle tree over all coin records.
//! The root hash is committed in every block header, enabling light
//! client proofs and state verification.
//!
//! # Domain-separated hashing
//!
//! - **Leaf hash:** `SHA256(0x00 || value)` — the `0x00` prefix prevents
//!   second-preimage attacks by distinguishing leaves from internal nodes.
//! - **Node hash:** `SHA256(0x01 || left || right)` — the `0x01` prefix
//!   distinguishes internal nodes from leaves.
//!
//! This convention matches the existing `l2_driver_state_channel` implementation
//! (see `utils/hash.rs`: `merkle_leaf_hash`, `merkle_node_hash`).
//!
//! # Deferred root recomputation
//!
//! Mutation methods (`batch_insert`, `batch_update`, `batch_remove`) mark the
//! tree as dirty but do NOT recompute the root. The root is recomputed lazily
//! on the next call to `root()`. This ensures at most one expensive tree
//! traversal per block, regardless of how many coins are modified.
//!
//! # Persistent internal nodes (MRK-003)
//!
//! After [`SparseMerkleTree::root`] recomputes from leaves, the implementation records which
//! `(level, path)` internal rows differ from the canonical empty subtree so
//! [`SparseMerkleTree::flush_to_batch`] can append `merkle_nodes` puts/deletes plus the metadata
//! root row in one [`crate::storage::WriteBatch`]. See [`persistent`](persistent) and
//! `docs/requirements/domains/merkle/specs/MRK-003.md`.
//!
//! # Requirements: STR-004, MRK-001, MRK-002, MRK-003, MRK-004, MRK-005, MRK-006
//! # Spec: docs/requirements/domains/merkle/specs/MRK-001.md, specs/MRK-004.md, specs/MRK-005.md, specs/MRK-006.md
//! # SPEC.md: Section 9 (Merkle Tree), Section 13.4 (Persistent Merkle Tree)
//!
//! ## MRK-006 leaf payloads vs domain-separated helpers
//!
//! [`coin_record_hash`](coin_record_hash) (MRK-006) hashes **only** the STO-008 bincode bytes of a
//! [`crate::types::CoinRecord`] with [`chia_sha2::Sha256`] — no `0x00` prefix. That digest is what
//! block application stores as the SMT leaf **value** keyed by `coin_id`. [`merkle_leaf_hash`] /
//! [`merkle_node_hash`] remain the **internal** domain-separated primitives for MRK-001/MRK-002
//! tree math over those 32-byte leaf values.

mod leaf_hash;
pub mod persistent;
pub mod proof;

pub use leaf_hash::coin_record_hash;
pub use proof::{verify_coin_proof, SparseMerkleProof};

use std::collections::HashMap;
use std::sync::OnceLock;

use chia_protocol::Bytes32;
use chia_sha2::Sha256;

use crate::storage::schema;
use crate::storage::{StorageBackend, WriteBatch};

pub use persistent::{MerkleNodePersistOp, MERKLE_STATE_ROOT_META_KEY};

// ─────────────────────────────────────────────────────────────────────────────
// Constants
// ─────────────────────────────────────────────────────────────────────────────

/// Height of the sparse Merkle tree: 256 levels for 256-bit (Bytes32) keys.
///
/// Each bit of a coin_id selects left (0) or right (1) at successive levels,
/// from bit 0 (MSB, root level) to bit 255 (LSB, leaf level).
pub const SMT_HEIGHT: usize = 256;

// ─────────────────────────────────────────────────────────────────────────────
// Domain-separated hashing
// ─────────────────────────────────────────────────────────────────────────────

/// Compute a leaf hash: `SHA256(0x00 || data)`.
///
/// The `0x00` domain separator prevents second-preimage attacks by ensuring
/// leaf hashes cannot collide with internal node hashes.
///
/// Matches `l2_driver_state_channel/src/utils/hash.rs:merkle_leaf_hash`.
#[inline]
pub fn merkle_leaf_hash(data: &[u8]) -> Bytes32 {
    let mut hasher = Sha256::new();
    hasher.update([0x00]);
    hasher.update(data);
    let result = hasher.finalize();
    let mut bytes = [0u8; 32];
    bytes.copy_from_slice(&result);
    Bytes32::from(bytes)
}

/// Compute an internal node hash: `SHA256(0x01 || left || right)`.
///
/// The `0x01` domain separator distinguishes internal nodes from leaves.
///
/// Matches `l2_driver_state_channel/src/utils/hash.rs:merkle_node_hash`.
#[inline]
pub fn merkle_node_hash(left: &Bytes32, right: &Bytes32) -> Bytes32 {
    let mut hasher = Sha256::new();
    hasher.update([0x01]);
    hasher.update(left.as_ref());
    hasher.update(right.as_ref());
    let result = hasher.finalize();
    let mut bytes = [0u8; 32];
    bytes.copy_from_slice(&result);
    Bytes32::from(bytes)
}

// ─────────────────────────────────────────────────────────────────────────────
// MRK-002: Memoized empty hashes
// ─────────────────────────────────────────────────────────────────────────────
// Pre-computed empty subtree hashes for all 257 levels (0..=256).
// Level 0 = empty leaf hash, Level 256 = root of entirely empty 256-level tree.
//
// Requirement: MRK-002
// Spec: docs/requirements/domains/merkle/specs/MRK-002.md

/// Sentinel value for an empty leaf. All zeros, 32 bytes.
const EMPTY_LEAF_SENTINEL: [u8; 32] = [0u8; 32];

/// Pre-computed empty hashes for sparse Merkle tree levels 0..=256.
///
/// - `EMPTY_HASHES[0]` = `merkle_leaf_hash(EMPTY_LEAF_SENTINEL)` (empty leaf)
/// - `EMPTY_HASHES[n]` = `merkle_node_hash(EMPTY_HASHES[n-1], EMPTY_HASHES[n-1])` for n > 0
/// - `EMPTY_HASHES[256]` = root hash of an entirely empty 256-level tree
///
/// **Storage shape:** fixed `[Bytes32; 257]` inside [`OnceLock`], not `Vec`, so the table is
/// stack-sized at init time and matches **MRK-002 / NORMATIVE** (`OnceLock<[Bytes32; 257]>`).
/// Lookup remains a single bounds-checked index — O(1), no heap per read, no recursive work
/// on the query path (see `docs/requirements/domains/merkle/specs/MRK-002.md`).
///
/// Initialized lazily via [`OnceLock::get_or_init`] on first access. All 257 values are computed
/// bottom-up in one pass. Rust guarantees concurrent first callers block until a single init
/// completes, then all observe the same static slice.
static EMPTY_HASHES: OnceLock<[Bytes32; 257]> = OnceLock::new();

/// Get the pre-computed empty hash at the given tree level.
///
/// Level 0 = empty leaf hash.
/// Level 256 = root of entirely empty tree.
///
/// # Panics
///
/// Panics if `level > SMT_HEIGHT` (256).
///
/// # Performance
///
/// O(1) — direct array index after one-time initialization.
#[inline]
pub fn empty_hash(level: usize) -> Bytes32 {
    assert!(
        level <= SMT_HEIGHT,
        "level {} exceeds SMT_HEIGHT {}",
        level,
        SMT_HEIGHT
    );
    get_empty_hashes()[level]
}

/// Access the full pre-computed empty hash table (257 entries, indices `0..=SMT_HEIGHT`).
fn get_empty_hashes() -> &'static [Bytes32; 257] {
    EMPTY_HASHES.get_or_init(|| {
        let mut hashes = [Bytes32::default(); SMT_HEIGHT + 1];

        // Level 0: domain-separated empty leaf (MRK-002 § leaf sentinel).
        hashes[0] = merkle_leaf_hash(&EMPTY_LEAF_SENTINEL);

        // Levels 1..256: parent = H(left || right) with both children the same empty subtree.
        for i in 1..=SMT_HEIGHT {
            hashes[i] = merkle_node_hash(&hashes[i - 1], &hashes[i - 1]);
        }

        hashes
    })
}

// ─────────────────────────────────────────────────────────────────────────────
// MerkleError
// ─────────────────────────────────────────────────────────────────────────────

/// Errors from sparse Merkle tree operations.
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum MerkleError {
    /// Attempted to insert a key that already exists in the tree.
    #[error("key already exists: {0}")]
    KeyAlreadyExists(Bytes32),

    /// Attempted to update or remove a key that does not exist.
    #[error("key not found: {0}")]
    KeyNotFound(Bytes32),

    /// `merkle_state_root` metadata row missing during [`SparseMerkleTree::load_from_store`].
    #[error("persisted merkle state root missing from metadata column family")]
    PersistedRootMissing,

    /// Metadata root bytes are not a 32-byte [`Bytes32`] wire encoding.
    #[error("persisted merkle state root has invalid length: {0} bytes (expected 32)")]
    InvalidPersistedRootLength(usize),

    /// Recomputed root from the provided leaf map does not match the single metadata read (data
    /// corruption or partial write).
    #[error("persisted merkle root mismatch: disk={disk:?} recomputed={recomputed:?}")]
    PersistedRootMismatch { disk: Bytes32, recomputed: Bytes32 },

    /// Backend I/O failure while loading persisted Merkle metadata.
    #[error("storage error during merkle load: {0}")]
    Storage(String),

    /// MRK-004 / NORMATIVE: [`SparseMerkleTree::get_coin_proof`] refuses to mint a proof while the
    /// MRK-001 deferred root is stale (`root_hash` unset with a non-empty leaf map). Callers must
    /// run [`SparseMerkleTree::root`] after batch mutations so the proof is tied to a single
    /// committed state root (same boundary as block application).
    #[error("cannot generate coin proof while tree is dirty; call root() after mutations")]
    ProofRequiresCleanTree,
}

// ─────────────────────────────────────────────────────────────────────────────
// MRK-001: SparseMerkleTree
// ─────────────────────────────────────────────────────────────────────────────

/// A 256-level sparse Merkle tree for coin state commitment.
///
/// Leaves are keyed by `Bytes32` (coin IDs) and hold the leaf hash of the
/// coin record. The tree uses deferred root recomputation: mutations mark
/// the tree as dirty, and the root is only recomputed when `root()` is called.
///
/// # Design
///
/// The tree stores only non-empty leaves in a `HashMap`. Internal nodes are
/// computed on-the-fly during root recomputation by recursively partitioning
/// leaves by their key bits at each level. Empty subtrees are replaced with
/// pre-computed empty hashes from [`empty_hash`] (MRK-002).
///
/// This is memory-efficient for sparse trees: a tree with 10M coins stores
/// only 10M leaf entries, not 2^256 nodes.
///
/// # Requirements: MRK-001, MRK-003, MRK-004
/// # Spec: docs/requirements/domains/merkle/specs/MRK-001.md, specs/MRK-003.md, specs/MRK-004.md
/// # SPEC.md: Section 9 (Merkle Tree)
///
/// # Reference
/// Derived from `l2_driver_state_channel/src/utils/merkle.rs:SparseMerkleTree`.
#[derive(Debug, Clone)]
pub struct SparseMerkleTree {
    /// Non-empty leaf values: coin_id -> leaf_hash.
    /// Only populated (non-empty) leaves are stored.
    leaves: HashMap<Bytes32, Bytes32>,

    /// Cached root hash. `None` when dirty (needs recomputation).
    root_hash: Option<Bytes32>,

    /// Pending `merkle_nodes` writes since the last successful [`Self::flush_to_batch`].
    ///
    /// **MRK-003:** Populated during [`Self::root`] recomputation (same traversal as MRK-001) so we
    /// never pay an extra tree walk for persistence. Keys are exactly [`schema::merkle_node_key`]
    /// outputs (`[u8; 33]`). Values are [`MerkleNodePersistOp::Put`] for non-empty-subtree digests
    /// or [`MerkleNodePersistOp::Delete`] when the canonical empty hash applies at that `(level,
    /// path)` — enabling empty-subtree pruning on disk per MRK-003 spec §Behavior item 2.
    dirty_merkle_nodes: HashMap<[u8; 33], MerkleNodePersistOp>,
}

impl Default for SparseMerkleTree {
    fn default() -> Self {
        Self::new()
    }
}

impl SparseMerkleTree {
    /// Create a new empty sparse Merkle tree.
    ///
    /// The root of an empty tree is `empty_hash(SMT_HEIGHT)` — the pre-computed
    /// root of a 256-level tree with all-empty leaves.
    pub fn new() -> Self {
        Self {
            leaves: HashMap::new(),
            root_hash: Some(empty_hash(SMT_HEIGHT)),
            dirty_merkle_nodes: HashMap::new(),
        }
    }

    /// Insert new leaves into the tree. Does NOT recompute the root.
    ///
    /// Each entry is `(coin_id, leaf_hash)`. Inserting a key that already
    /// exists returns `MerkleError::KeyAlreadyExists`.
    ///
    /// Call `root()` after all mutations to get the updated root hash.
    ///
    /// # Requirement: MRK-001
    pub fn batch_insert(&mut self, entries: &[(Bytes32, Bytes32)]) -> Result<(), MerkleError> {
        for (key, _) in entries {
            if self.leaves.contains_key(key) {
                return Err(MerkleError::KeyAlreadyExists(*key));
            }
        }
        for (key, value) in entries {
            self.leaves.insert(*key, *value);
        }
        if !entries.is_empty() {
            self.root_hash = None; // Mark dirty
        }
        Ok(())
    }

    /// Update existing leaves with new values. Does NOT recompute the root.
    ///
    /// Updating a key that does not exist returns `MerkleError::KeyNotFound`.
    ///
    /// # Requirement: MRK-001
    pub fn batch_update(&mut self, entries: &[(Bytes32, Bytes32)]) -> Result<(), MerkleError> {
        for (key, _) in entries {
            if !self.leaves.contains_key(key) {
                return Err(MerkleError::KeyNotFound(*key));
            }
        }
        for (key, value) in entries {
            self.leaves.insert(*key, *value);
        }
        if !entries.is_empty() {
            self.root_hash = None; // Mark dirty
        }
        Ok(())
    }

    /// Remove leaves from the tree. Does NOT recompute the root.
    ///
    /// Removing a key that does not exist returns `MerkleError::KeyNotFound`.
    ///
    /// # Requirement: MRK-001
    pub fn batch_remove(&mut self, keys: &[Bytes32]) -> Result<(), MerkleError> {
        for key in keys {
            if !self.leaves.contains_key(key) {
                return Err(MerkleError::KeyNotFound(*key));
            }
        }
        for key in keys {
            self.leaves.remove(key);
        }
        if !keys.is_empty() {
            self.root_hash = None; // Mark dirty
        }
        Ok(())
    }

    /// Return the current Merkle root, recomputing if dirty.
    ///
    /// After this call, the tree is no longer dirty. Subsequent calls
    /// without intervening mutations return the cached root in O(1).
    ///
    /// # Algorithm
    ///
    /// Recursively partitions leaves by their key bits at each tree level.
    /// At each level, bit `depth` of the key determines left (0) or right (1).
    /// Empty subtrees are replaced with pre-computed `empty_hash(height)`.
    ///
    /// # Requirement: MRK-001, MRK-003 (dirty map refresh)
    pub fn root(&mut self) -> Bytes32 {
        if let Some(cached) = self.root_hash {
            return cached;
        }

        // Recompute from leaves and repopulate MRK-003 dirty set in the same traversal.
        let leaf_refs: Vec<(&Bytes32, &Bytes32)> = self.leaves.iter().collect();
        self.dirty_merkle_nodes.clear();
        let path_root = Bytes32::default();
        let root = Self::compute_subtree_hash_core(
            &leaf_refs,
            0,
            &path_root,
            &mut self.dirty_merkle_nodes,
            true,
        );
        self.root_hash = Some(root);
        root
    }

    /// Read the Merkle root without `&mut self` (for [`crate::coin_store::CoinStore::stats`] and other `&self` APIs).
    ///
    /// **Difference vs [`Self::root`]:** [`Self::root`] caches the digest in [`Self::root_hash`] after a
    /// dirty recompute. This method returns the same mathematical root but **does not** update the cache
    /// when dirty, so it can run on a shared `&` reference. The next [`Self::root`] call may still pay the
    /// full recompute cost once a writer mutates the tree.
    ///
    /// **Empty tree:** Returns [`empty_hash`](empty_hash)([`SMT_HEIGHT`]) — identical to [`Self::new`]'s
    /// initial root.
    ///
    /// # Requirement: API-007 (stats reads state root), MRK-001
    pub fn root_observed(&self) -> Bytes32 {
        if self.leaves.is_empty() {
            return empty_hash(SMT_HEIGHT);
        }
        if let Some(cached) = self.root_hash {
            return cached;
        }
        let leaf_refs: Vec<(&Bytes32, &Bytes32)> = self.leaves.iter().collect();
        let mut sink = HashMap::new();
        Self::compute_subtree_hash_core(&leaf_refs, 0, &Bytes32::default(), &mut sink, false)
    }

    /// Check if the tree has been modified since the last `root()` call.
    pub fn is_dirty(&self) -> bool {
        self.root_hash.is_none() && !self.leaves.is_empty()
    }

    /// Number of non-empty leaves in the tree.
    pub fn len(&self) -> usize {
        self.leaves.len()
    }

    /// Whether the tree has no leaves.
    pub fn is_empty(&self) -> bool {
        self.leaves.is_empty()
    }

    /// Check if a key exists in the tree.
    pub fn contains_key(&self, key: &Bytes32) -> bool {
        self.leaves.contains_key(key)
    }

    /// Get the leaf hash for a key, if it exists.
    pub fn get(&self, key: &Bytes32) -> Option<&Bytes32> {
        self.leaves.get(key)
    }

    /// MRK-003: read-only view of pending `merkle_nodes` writes (primarily tests and diagnostics).
    pub fn dirty_nodes(&self) -> &HashMap<[u8; 33], MerkleNodePersistOp> {
        &self.dirty_merkle_nodes
    }

    /// MRK-003: drop pending persistence rows without touching disk.
    ///
    /// Normal production flow uses [`Self::flush_to_batch`], which clears dirty after enqueueing
    /// ops. This helper exists for rollback simulations and tests that need a clean dirty slate
    /// without committing a batch.
    pub fn clear_dirty(&mut self) {
        self.dirty_merkle_nodes.clear();
    }

    /// MRK-003: enqueue dirty internal nodes plus the metadata state root into `batch`.
    ///
    /// **Atomicity:** Callers MUST pass the same [`WriteBatch`] they use for coin-record mutations
    /// so one [`StorageBackend::batch_write`] satisfies MRK-003 / BLK-014 “all-or-nothing” commits.
    ///
    /// **Ordering:** Invokes [`Self::root`] first so the dirty map matches the latest leaf multiset,
    /// then drains [`Self::dirty_merkle_nodes`] into `merkle_nodes` puts/deletes, then appends the
    /// metadata root row ([`MERKLE_STATE_ROOT_META_KEY`]).
    pub fn flush_to_batch(&mut self, batch: &mut WriteBatch) -> Result<(), MerkleError> {
        let root = self.root();
        for (key, op) in std::mem::take(&mut self.dirty_merkle_nodes) {
            match op {
                MerkleNodePersistOp::Put(h) => {
                    batch.put(schema::CF_MERKLE_NODES, &key, h.as_ref());
                }
                MerkleNodePersistOp::Delete => {
                    batch.delete(schema::CF_MERKLE_NODES, &key);
                }
            }
        }
        let meta_key = schema::metadata_key(MERKLE_STATE_ROOT_META_KEY);
        batch.put(schema::CF_METADATA, &meta_key, root.as_ref());
        Ok(())
    }

    /// MRK-003: single metadata `get` for the committed root, then validate against `leaves`.
    ///
    /// Performs **exactly one** storage read on [`schema::CF_METADATA`] (no `merkle_nodes` scan),
    /// matching NORMATIVE MRK-003. Recomputes from `leaves` via [`Self::root_observed`] to detect
    /// corruption before accepting the disk root into [`Self::root_hash`].
    pub fn load_from_store(
        store: &dyn StorageBackend,
        leaves: HashMap<Bytes32, Bytes32>,
    ) -> Result<Self, MerkleError> {
        let meta_key = schema::metadata_key(MERKLE_STATE_ROOT_META_KEY);
        let disk_bytes = store
            .get(schema::CF_METADATA, &meta_key)
            .map_err(|e| MerkleError::Storage(e.to_string()))?
            .ok_or(MerkleError::PersistedRootMissing)?;
        if disk_bytes.len() != 32 {
            return Err(MerkleError::InvalidPersistedRootLength(disk_bytes.len()));
        }
        let mut arr = [0u8; 32];
        arr.copy_from_slice(&disk_bytes);
        let disk_root = Bytes32::from(arr);

        let mut tree = Self {
            leaves,
            root_hash: None,
            dirty_merkle_nodes: HashMap::new(),
        };
        let recomputed = tree.root_observed();
        if recomputed != disk_root {
            return Err(MerkleError::PersistedRootMismatch {
                disk: disk_root,
                recomputed,
            });
        }
        tree.root_hash = Some(disk_root);
        Ok(tree)
    }

    // ─────────────────────────────────────────────────────────────────────
    // Internal: recursive subtree hash computation
    // ─────────────────────────────────────────────────────────────────────

    /// Recursive subtree hash with optional MRK-003 dirty recording.
    ///
    /// `path` carries the MSB-first prefix (bits `0..depth-1`) identifying the position of the
    /// current node in the global 256-bit key space. When `record_dirty` is true, every visited
    /// internal level records a [`MerkleNodePersistOp`] into `dirty_out` (keyed by
    /// [`schema::merkle_node_key`]). Callers that only need the digest (e.g. MRK-004 sibling
    /// recomputation) pass `record_dirty: false` and a throwaway map so both recursive children
    /// can share one writer without moving `Option<&mut HashMap<…>>` twice.
    fn compute_subtree_hash_core(
        leaves: &[(&Bytes32, &Bytes32)],
        depth: usize,
        path: &Bytes32,
        dirty_out: &mut HashMap<[u8; 33], MerkleNodePersistOp>,
        record_dirty: bool,
    ) -> Bytes32 {
        if leaves.is_empty() {
            let h = empty_hash(SMT_HEIGHT - depth);
            if record_dirty {
                record_merkle_persist_op(dirty_out, depth, path, h);
            }
            return h;
        }

        if depth == SMT_HEIGHT {
            return *leaves[0].1;
        }

        let (left, right): (Vec<_>, Vec<_>) = leaves
            .iter()
            .partition(|(key, _)| !Self::get_bit(key, depth));

        let path_left = child_path(path, depth, false);
        let path_right = child_path(path, depth, true);

        let left_hash =
            Self::compute_subtree_hash_core(&left, depth + 1, &path_left, dirty_out, record_dirty);
        let right_hash = Self::compute_subtree_hash_core(
            &right,
            depth + 1,
            &path_right,
            dirty_out,
            record_dirty,
        );

        let node_hash = merkle_node_hash(&left_hash, &right_hash);
        if record_dirty {
            record_merkle_persist_op(dirty_out, depth, path, node_hash);
        }
        node_hash
    }

    /// Get bit `n` of a Bytes32 key (MSB-first ordering).
    ///
    /// Bit 0 = MSB of byte 0 (root level decision).
    /// Bit 255 = LSB of byte 31 (leaf level decision).
    #[inline]
    fn get_bit(key: &Bytes32, n: usize) -> bool {
        let byte_index = n / 8;
        let bit_index = 7 - (n % 8); // MSB-first within each byte
        (key.as_ref()[byte_index] >> bit_index) & 1 == 1
    }
}

/// Child path for the sparse walk: copy `base` bits `0..depth-1`, clear suffix, then set bit `depth`.
///
/// **Invariant:** All keys in the recursive `leaves` slice at `(depth, path)` agree on the first
/// `depth` bits; `go_right` selects the `1` branch at bit index `depth` (MRK-001 MSB-first rule).
fn child_path(base: &Bytes32, depth: usize, go_right: bool) -> Bytes32 {
    let mut arr: [u8; 32] = base.as_ref().try_into().expect("Bytes32 is 32 bytes");
    for bit in depth..256 {
        let bi = bit / 8;
        let bj = 7 - (bit % 8);
        arr[bi] &= !(1 << bj);
    }
    let bi = depth / 8;
    let bj = 7 - (depth % 8);
    if go_right {
        arr[bi] |= 1 << bj;
    }
    Bytes32::from(arr)
}

/// Queue one `merkle_nodes` row for flush (MRK-003 empty-subtree pruning).
fn record_merkle_persist_op(
    dirty: &mut HashMap<[u8; 33], MerkleNodePersistOp>,
    depth: usize,
    path: &Bytes32,
    hash: Bytes32,
) {
    if depth >= SMT_HEIGHT {
        return;
    }
    let key = schema::merkle_node_key(depth as u8, path);
    let empty = empty_hash(SMT_HEIGHT - depth);
    if hash == empty {
        dirty.insert(key, MerkleNodePersistOp::Delete);
    } else {
        dirty.insert(key, MerkleNodePersistOp::Put(hash));
    }
}