Skip to main content

metamorphic_log/ingest/
mod.rs

1//! Storage-agnostic, **deterministic** ingestion primitives (Slice 7).
2//!
3//! This module is the OSS engine's contribution to the *write path*: the small,
4//! pure, byte-reproducible building blocks an operator needs to turn a stream of
5//! application records into an append-only tiled log. It is deliberately
6//! **storage-agnostic and I/O-free** — there is no Broadway/GenStage pipeline,
7//! no object-storage client, no network, and no persistence here. Those belong
8//! to the operator layer (the paid mosskeys app, per the #290 open-core
9//! boundary). What lives here is the deterministic *logic* that such a pipeline
10//! must run identically regardless of language or backend, so that two
11//! independent ingesters (e.g. this crate and a future Elixir consumer over the
12//! sibling NIF, #336) produce **the same bytes and the same tile geometry**.
13//!
14//! It provides four primitives:
15//!
16//! 1. [`Sequencer`] — a per-namespace **monotonic sequencer**. Each namespace
17//!    gets its own strictly-increasing `u64` position counter (the `seq` that a
18//!    record such as [`crate::leaf::key_history_v1`] commits). It is pure
19//!    in-memory state with an explicit [`Sequencer::resume_from`] so an operator
20//!    can rebuild it from durable storage after a restart without ever rewinding
21//!    (monotonicity is enforced, not assumed).
22//!
23//! 2. [`DedupKey`] — an **idempotent-append** deduplication key. A deterministic,
24//!    domain-separated, post-quantum (SHA3-512) digest of `(namespace, payload)`
25//!    so that re-submitting the same record — or the same client-supplied
26//!    idempotency token — maps to the same key and can be dropped before it
27//!    double-appends. Same input ⇒ same key, in any language.
28//!
29//! 3. [`plan_flush`] / [`tiles_to_flush`] / [`entry_bundles_to_flush`] — the
30//!    **tile-write/flush geometry**. Given the log's `old_size` and the
31//!    `new_size` after appending a batch, it computes exactly which C2SP
32//!    `tlog-tiles` coordinates changed and must be (re)written. It is defined
33//!    purely in terms of [`crate::tile`] and is byte-compatible with the audited
34//!    Layer-1 substrate: it never invents a tile [`crate::tile::tiles_for_size`]
35//!    would not, and never changes a canonical byte.
36//!
37//! 4. [`TileReader`] — the object-storage / CDN **read-path trait**. An
38//!    *interface only*: it describes how a verifier or monitor fetches immutable
39//!    tile and entry-bundle bytes by coordinate, with **no** backend
40//!    implementation in this crate. A logic-only bridge,
41//!    [`recompute_root_via`], reads the level-0 tiles through any `TileReader`
42//!    and recomputes the RFC 6962 root via [`crate::tile::recompute_root`],
43//!    proving the trait composes with the verification core without performing
44//!    any I/O itself.
45//!
46//! ## Throughput posture (honest framing)
47//!
48//! These are the *deterministic primitives*, not an end-to-end ingest pipeline.
49//! The `benches/ingestion.rs` benchmark measures them in isolation (sequencing +
50//! dedup-keying + flush planning) and they run far above the Tessera reference
51//! band of ~5k–18k entries/sec. **That number is not an end-to-end claim:**
52//! real throughput is bounded by the operator's pipeline (backpressure,
53//! batching) and the object-storage/CDN backend — both out of scope for this
54//! crate. The primitives are designed not to be the bottleneck.
55//!
56//! ## Byte / determinism discipline
57//!
58//! Like every other canonical encoding in this crate, the dedup key uses the
59//! fixed discipline (big-endian integers; `u32`-be length-prefixed variable
60//! fields; domain-separated context labels) so independent implementations
61//! recompute it byte-for-byte. The sequencer and flush geometry are fully
62//! determined by their inputs.
63
64use crate::error::{Error, Result};
65use crate::merkle::Hash;
66use crate::tile::{Tile, recompute_root, tiles_for_size};
67use std::collections::BTreeMap;
68use std::ops::Range;
69
70pub use crate::coniks::Namespace;
71
72/// Domain-separation context for the content-derived idempotent-append key.
73const DEDUP_CONTENT_CONTEXT: &str = "metamorphic-log/ingest-dedup-content/v1";
74/// Domain-separation context for the token-derived idempotent-append key.
75const DEDUP_TOKEN_CONTEXT: &str = "metamorphic-log/ingest-dedup-token/v1";
76
77/// Append `lp(bytes) = u32_be(len(bytes)) || bytes` to `out`.
78///
79/// The same `u32`-be length-prefix discipline used by [`crate::leaf`], kept
80/// local so field boundaries are unambiguous and two distinct `(namespace,
81/// payload)` pairs cannot collide by boundary confusion.
82fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
83    out.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
84    out.extend_from_slice(bytes);
85}
86
87/// A per-namespace **monotonic sequencer**.
88///
89/// Assigns strictly-increasing, gap-free `u64` positions *within each
90/// namespace*, starting at `0`. Namespaces are fully independent: assigning in
91/// one never affects another. The sequencer is pure in-memory state and holds
92/// no I/O — durability is the operator's concern. After a restart the operator
93/// rebuilds state with [`Sequencer::resume_from`] (typically `last_committed +
94/// 1`), which refuses to rewind so monotonicity survives crashes.
95///
96/// ```
97/// use metamorphic_log::ingest::{Namespace, Sequencer};
98///
99/// let ns = Namespace::parse("acme").unwrap();
100/// let mut seq = Sequencer::new();
101/// assert_eq!(seq.next(&ns), 0);
102/// assert_eq!(seq.next(&ns), 1);
103/// assert_eq!(seq.peek(&ns), 2); // next position that would be assigned
104/// ```
105#[derive(Debug, Clone, Default)]
106pub struct Sequencer {
107    next: BTreeMap<String, u64>,
108}
109
110impl Sequencer {
111    /// Create an empty sequencer (every namespace starts at position `0`).
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            next: BTreeMap::new(),
116        }
117    }
118
119    /// The next position that [`Sequencer::next`] would assign for `namespace`
120    /// (without assigning it). A never-seen namespace peeks at `0`.
121    #[must_use]
122    pub fn peek(&self, namespace: &Namespace) -> u64 {
123        self.next.get(namespace.as_str()).copied().unwrap_or(0)
124    }
125
126    /// Assign and return the next monotonic position for `namespace`.
127    ///
128    /// # Panics
129    /// Panics only on `u64` position overflow (after `2^64` appends in a single
130    /// namespace), which is not reachable in practice.
131    pub fn next(&mut self, namespace: &Namespace) -> u64 {
132        let slot = self.next.entry(namespace.as_str().to_string()).or_insert(0);
133        let assigned = *slot;
134        *slot = assigned
135            .checked_add(1)
136            .expect("sequence position overflowed u64");
137        assigned
138    }
139
140    /// Reserve a contiguous block of `count` positions for `namespace`, returning
141    /// the half-open range `[start, start + count)`. Useful for batch ingest: the
142    /// operator assigns the whole batch in one step while preserving order and
143    /// gap-freeness. A `count` of `0` returns an empty range at the current
144    /// position and does not advance the counter.
145    ///
146    /// # Errors
147    /// Returns [`Error::SequenceOverflow`] if the block would overflow `u64`.
148    pub fn reserve(&mut self, namespace: &Namespace, count: u64) -> Result<Range<u64>> {
149        let slot = self.next.entry(namespace.as_str().to_string()).or_insert(0);
150        let start = *slot;
151        let end = start
152            .checked_add(count)
153            .ok_or_else(|| Error::SequenceOverflow {
154                namespace: namespace.as_str().to_string(),
155            })?;
156        *slot = end;
157        Ok(start..end)
158    }
159
160    /// Re-seat the next position for `namespace` from durable storage (e.g. on
161    /// restart, to `last_committed_position + 1`).
162    ///
163    /// This is **monotonic-safe**: it may only move the counter *forward* (or
164    /// leave it unchanged). An attempt to set it below the current in-memory
165    /// position is rejected rather than silently rewinding, because rewinding
166    /// would re-issue an already-assigned position and break append-only
167    /// ordering.
168    ///
169    /// # Errors
170    /// Returns [`Error::SequenceRegression`] if `next` is below the current
171    /// position for `namespace`.
172    pub fn resume_from(&mut self, namespace: &Namespace, next: u64) -> Result<()> {
173        let current = self.peek(namespace);
174        if next < current {
175            return Err(Error::SequenceRegression {
176                namespace: namespace.as_str().to_string(),
177                current,
178                requested: next,
179            });
180        }
181        self.next.insert(namespace.as_str().to_string(), next);
182        Ok(())
183    }
184}
185
186/// An **idempotent-append** deduplication key: a 64-byte SHA3-512 digest that is
187/// a deterministic function of `(namespace, payload)`.
188///
189/// An ingest pipeline keys in-flight and recently-committed records by their
190/// `DedupKey`; a re-submission (network retry, at-least-once delivery, a client
191/// replaying a request) produces the *same* key and can be dropped before it
192/// double-appends. The key is post-quantum (SHA3-512), domain-separated, and
193/// namespace-scoped, so keys never collide across namespaces or across the two
194/// derivation modes.
195///
196/// Two derivations are offered:
197///
198/// - [`DedupKey::from_record`] keys on the **content** itself (the canonical
199///   leaf bytes). Identical content in the same namespace is the same record.
200/// - [`DedupKey::from_token`] keys on a **client-supplied idempotency token**
201///   (e.g. a request UUID). Use this when the same logical submission may carry
202///   non-identical bytes (timestamps, nonces) yet must append at most once.
203///
204/// The key is opaque; compare keys for equality or use [`DedupKey::as_bytes`] /
205/// [`DedupKey::to_hex`] as a storage index.
206#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
207pub struct DedupKey([u8; 64]);
208
209impl DedupKey {
210    /// Derive a content-addressed dedup key from a namespace and the canonical
211    /// record bytes: `SHA3-512_ctx("…/content/v1", lp(ns) || lp(payload))`.
212    #[must_use]
213    pub fn from_record(namespace: &Namespace, payload: &[u8]) -> Self {
214        Self::derive(DEDUP_CONTENT_CONTEXT, namespace, payload)
215    }
216
217    /// Derive a dedup key from a namespace and a client-supplied idempotency
218    /// token: `SHA3-512_ctx("…/token/v1", lp(ns) || lp(token))`.
219    #[must_use]
220    pub fn from_token(namespace: &Namespace, token: &[u8]) -> Self {
221        Self::derive(DEDUP_TOKEN_CONTEXT, namespace, token)
222    }
223
224    fn derive(context: &str, namespace: &Namespace, payload: &[u8]) -> Self {
225        let mut input = Vec::with_capacity(8 + namespace.as_str().len() + payload.len());
226        push_lp(&mut input, namespace.as_str().as_bytes());
227        push_lp(&mut input, payload);
228        Self(metamorphic_crypto::hash::sha3_512_with_context(
229            context, &input,
230        ))
231    }
232
233    /// The raw 64-byte key.
234    #[must_use]
235    pub fn as_bytes(&self) -> &[u8; 64] {
236        &self.0
237    }
238
239    /// The key as lowercase hex (128 chars) — a convenient storage-index form.
240    #[must_use]
241    pub fn to_hex(&self) -> String {
242        crate::encoding::hex_encode(&self.0)
243    }
244}
245
246/// A deterministic **flush plan**: the exact set of C2SP `tlog-tiles`
247/// coordinates whose bytes change when a log grows from `old_size` to
248/// `new_size`, split into Merkle tiles and level-0 entry bundles.
249///
250/// This is what an operator's writer flushes after sequencing a batch: every
251/// other tile in the tree is already finalized and byte-identical, so only these
252/// need to be (re)written to object storage. Computed purely from
253/// [`crate::tile`] geometry — it changes no canonical bytes.
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct FlushPlan {
256    /// Merkle tiles (`tile/<L>/<N>[.p/<W>]`) to (re)write, in
257    /// [`tiles_for_size`] order (level 0 first, then by index).
258    pub tiles: Vec<Tile>,
259    /// Level-0 entry-bundle tiles (`tile/entries/<N>[.p/<W>]`) to (re)write.
260    pub entry_bundles: Vec<Tile>,
261}
262
263/// The half-open leaf range `[start, end)` a tile covers, computed in `u128` to
264/// avoid overflow at high levels. `end` is the exclusive rightmost leaf index.
265fn tile_leaf_end(tile: &Tile) -> u128 {
266    let span = 256u128.pow(u32::from(tile.level()));
267    (u128::from(tile.index()) * 256 + u128::from(tile.width())) * span
268}
269
270/// Whether a tile in the `new_size` tree changed relative to `old_size`: it did
271/// iff it covers at least one leaf at or beyond `old_size` (a brand-new tile, or
272/// a partial tile that grew). Tiles entirely below `old_size` were finalized
273/// identically and need no rewrite.
274fn tile_is_dirty(tile: &Tile, old_size: u64) -> bool {
275    tile_leaf_end(tile) > u128::from(old_size)
276}
277
278/// The Merkle tiles that must be (re)written when the log grows from `old_size`
279/// to `new_size`.
280///
281/// Returns them in [`tiles_for_size`] order. Tiles entirely contained below
282/// `old_size` are omitted (already finalized, byte-identical).
283///
284/// # Errors
285/// Returns [`Error::SizeRegression`] if `new_size < old_size` — a tiled log is
286/// append-only and never shrinks.
287pub fn tiles_to_flush(old_size: u64, new_size: u64) -> Result<Vec<Tile>> {
288    if new_size < old_size {
289        return Err(Error::SizeRegression {
290            size1: old_size,
291            size2: new_size,
292        });
293    }
294    Ok(tiles_for_size(new_size)
295        .into_iter()
296        .filter(|t| tile_is_dirty(t, old_size))
297        .collect())
298}
299
300/// The level-0 **entry-bundle** tiles that must be (re)written when the log
301/// grows from `old_size` to `new_size`. These mirror the dirty level-0 Merkle
302/// tiles (the entries served alongside `tile/0/...`).
303///
304/// # Errors
305/// Returns [`Error::SizeRegression`] if `new_size < old_size`.
306pub fn entry_bundles_to_flush(old_size: u64, new_size: u64) -> Result<Vec<Tile>> {
307    Ok(tiles_to_flush(old_size, new_size)?
308        .into_iter()
309        .filter(|t| t.level() == 0)
310        .collect())
311}
312
313/// Compute the full [`FlushPlan`] for growing the log from `old_size` to
314/// `new_size`.
315///
316/// # Errors
317/// Returns [`Error::SizeRegression`] if `new_size < old_size`.
318pub fn plan_flush(old_size: u64, new_size: u64) -> Result<FlushPlan> {
319    let tiles = tiles_to_flush(old_size, new_size)?;
320    let entry_bundles = tiles.iter().filter(|t| t.level() == 0).copied().collect();
321    Ok(FlushPlan {
322        tiles,
323        entry_bundles,
324    })
325}
326
327/// The object-storage / CDN **read-path trait** — an interface only.
328///
329/// A tiled transparency log serves its tree as immutable, content-addressed
330/// objects fetched by coordinate (see [`crate::tile`]); a verifier or monitor
331/// recomputes roots and proofs from those bytes. This trait abstracts that fetch
332/// so the verification core composes with *any* backend (S3, GCS, a CDN edge, a
333/// local mirror, a test fixture). **This crate ships no implementation and
334/// performs no I/O** — backends live in the operator layer. The associated
335/// [`TileReader::Error`] lets implementations surface their own error type
336/// without this crate depending on any I/O or async machinery.
337pub trait TileReader {
338    /// The backend's fetch-error type.
339    type Error;
340
341    /// Fetch the raw bytes of the Merkle tile at `tile`'s coordinate
342    /// (`tile.path()`), i.e. `tile.width()` consecutive 32-byte hashes.
343    ///
344    /// # Errors
345    /// Returns the backend's error if the object cannot be fetched.
346    fn read_tile(&self, tile: &Tile) -> core::result::Result<Vec<u8>, Self::Error>;
347
348    /// Fetch the raw bytes of the level-0 entry bundle at `tile`'s coordinate
349    /// (`tile.entries_path()`).
350    ///
351    /// # Errors
352    /// Returns the backend's error if the object cannot be fetched.
353    fn read_entry_bundle(&self, tile: &Tile) -> core::result::Result<Vec<u8>, Self::Error>;
354}
355
356/// Error from the [`recompute_root_via`] read-path bridge: either the backend
357/// failed to fetch a tile, or a fetched tile was malformed.
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub enum ReadPathError<E> {
360    /// The [`TileReader`] backend failed to fetch a tile.
361    Backend(E),
362    /// A fetched tile was structurally invalid (wrong byte length for its
363    /// declared width). Wraps a [`crate::Error`].
364    Tile(Error),
365}
366
367/// Logic-only bridge: read the level-0 tiles of a tree of `size` leaves through
368/// any [`TileReader`] and recompute the RFC 6962 root via
369/// [`crate::tile::recompute_root`].
370///
371/// This performs **no I/O itself** — every byte comes from the supplied reader.
372/// It exists to demonstrate (and let callers reuse) that the read-path trait
373/// composes directly with the verification core: feed the concatenated level-0
374/// leaf hashes through the same fixed RFC 6962 hashing and you reproduce the
375/// root a checkpoint commits to. `size == 0` yields the empty root.
376///
377/// # Errors
378/// Returns [`ReadPathError::Backend`] if the reader fails, or
379/// [`ReadPathError::Tile`] if a returned tile's byte length does not match its
380/// declared width.
381pub fn recompute_root_via<R: TileReader>(
382    reader: &R,
383    size: u64,
384) -> core::result::Result<Hash, ReadPathError<R::Error>> {
385    // Do not pre-allocate from the caller-supplied `size`: the reader may fail
386    // before producing any hash, so capacity grows with what is actually read.
387    let mut leaf_hashes = Vec::new();
388    for tile in tiles_for_size(size).into_iter().filter(|t| t.level() == 0) {
389        let bytes = reader.read_tile(&tile).map_err(ReadPathError::Backend)?;
390        let hashes = tile.hashes(&bytes).map_err(ReadPathError::Tile)?;
391        leaf_hashes.extend_from_slice(&hashes);
392    }
393    Ok(recompute_root(&leaf_hashes))
394}
395
396#[cfg(all(test, not(target_arch = "wasm32")))]
397mod tests {
398    use super::*;
399    use crate::merkle::MerkleTree;
400    use std::collections::HashMap;
401
402    fn ns(s: &str) -> Namespace {
403        Namespace::parse(s).unwrap()
404    }
405
406    #[test]
407    fn sequencer_is_monotonic_and_per_namespace() {
408        let mut seq = Sequencer::new();
409        let a = ns("alpha");
410        let b = ns("beta");
411        assert_eq!(seq.next(&a), 0);
412        assert_eq!(seq.next(&a), 1);
413        assert_eq!(seq.next(&b), 0); // independent namespace
414        assert_eq!(seq.next(&a), 2);
415        assert_eq!(seq.peek(&a), 3);
416        assert_eq!(seq.peek(&b), 1);
417    }
418
419    #[test]
420    fn reserve_assigns_contiguous_block() {
421        let mut seq = Sequencer::new();
422        let a = ns("alpha");
423        assert_eq!(seq.next(&a), 0);
424        assert_eq!(seq.reserve(&a, 5).unwrap(), 1..6);
425        assert_eq!(seq.next(&a), 6);
426        // Zero-count reservation does not advance.
427        assert_eq!(seq.reserve(&a, 0).unwrap(), 7..7);
428        assert_eq!(seq.peek(&a), 7);
429    }
430
431    #[test]
432    fn resume_from_is_monotonic_safe() {
433        let mut seq = Sequencer::new();
434        let a = ns("alpha");
435        seq.next(&a);
436        seq.next(&a); // peek now 2
437        seq.resume_from(&a, 10).unwrap(); // jump forward (durable max+1)
438        assert_eq!(seq.next(&a), 10);
439        // Re-seating to the same position is allowed.
440        seq.resume_from(&a, 11).unwrap();
441        // Rewinding is rejected.
442        assert!(matches!(
443            seq.resume_from(&a, 5),
444            Err(Error::SequenceRegression {
445                current: 11,
446                requested: 5,
447                ..
448            })
449        ));
450    }
451
452    #[test]
453    fn dedup_key_is_deterministic_and_scoped() {
454        let a = ns("alpha");
455        let b = ns("beta");
456        let k1 = DedupKey::from_record(&a, b"hello");
457        let k2 = DedupKey::from_record(&a, b"hello");
458        assert_eq!(k1, k2); // same input => same key
459        assert_ne!(k1, DedupKey::from_record(&a, b"world")); // content matters
460        assert_ne!(k1, DedupKey::from_record(&b, b"hello")); // namespace-scoped
461        // Token mode is domain-separated from content mode.
462        assert_ne!(
463            DedupKey::from_record(&a, b"x"),
464            DedupKey::from_token(&a, b"x")
465        );
466        assert_eq!(k1.to_hex().len(), 128);
467    }
468
469    #[test]
470    fn flush_geometry_matches_tile_substrate() {
471        // From a fresh log, the flush set equals the full tile set for new_size.
472        let plan = plan_flush(0, 70_000).unwrap();
473        assert_eq!(plan.tiles, tiles_for_size(70_000));
474        assert!(plan.entry_bundles.iter().all(|t| t.level() == 0));
475        assert_eq!(
476            plan.entry_bundles.len(),
477            tiles_for_size(70_000)
478                .iter()
479                .filter(|t| t.level() == 0)
480                .count()
481        );
482    }
483
484    #[test]
485    fn flush_only_touches_changed_tiles() {
486        // Grow 256 -> 512. Level-0 tile 0 (leaves 0..256) is finalized and must
487        // NOT be reflushed; tile 1 and the new partial level-1 tile must.
488        let dirty = tiles_to_flush(256, 512).unwrap();
489        assert!(
490            !dirty
491                .iter()
492                .any(|t| t.level() == 0 && t.index() == 0 && !t.is_partial()),
493            "finalized level-0 tile 0 must not be reflushed"
494        );
495        assert!(
496            dirty.iter().any(|t| t.level() == 0 && t.index() == 1),
497            "new level-0 tile 1 must be flushed"
498        );
499        assert!(
500            dirty.iter().any(|t| t.level() == 1),
501            "grown level-1 partial tile must be flushed"
502        );
503    }
504
505    #[test]
506    fn flush_noop_when_size_unchanged() {
507        assert!(tiles_to_flush(1000, 1000).unwrap().is_empty());
508        assert_eq!(
509            plan_flush(1000, 1000).unwrap(),
510            FlushPlan {
511                tiles: vec![],
512                entry_bundles: vec![]
513            }
514        );
515    }
516
517    #[test]
518    fn flush_rejects_size_regression() {
519        assert!(matches!(
520            tiles_to_flush(500, 499),
521            Err(Error::SizeRegression {
522                size1: 500,
523                size2: 499
524            })
525        ));
526        assert!(plan_flush(500, 499).is_err());
527        assert!(entry_bundles_to_flush(500, 499).is_err());
528    }
529
530    /// An in-memory [`TileReader`] used only to prove the read-path bridge
531    /// composes with the verification core. NOT a storage backend.
532    struct MemReader {
533        tiles: HashMap<String, Vec<u8>>,
534    }
535
536    impl MemReader {
537        fn from_tree(tree: &MerkleTree, size: u64) -> Self {
538            let mut tiles = HashMap::new();
539            for tile in tiles_for_size(size).into_iter().filter(|t| t.level() == 0) {
540                let start = tile.index() * u64::from(crate::tile::TILE_WIDTH);
541                let mut bytes = Vec::new();
542                for i in 0..u64::from(tile.width()) {
543                    bytes.extend_from_slice(&tree.leaf_hash(start + i).unwrap());
544                }
545                tiles.insert(tile.path(), bytes);
546            }
547            Self { tiles }
548        }
549    }
550
551    impl TileReader for MemReader {
552        type Error = String;
553
554        fn read_tile(&self, tile: &Tile) -> core::result::Result<Vec<u8>, String> {
555            self.tiles
556                .get(&tile.path())
557                .cloned()
558                .ok_or_else(|| format!("missing tile {}", tile.path()))
559        }
560
561        fn read_entry_bundle(&self, tile: &Tile) -> core::result::Result<Vec<u8>, String> {
562            self.tiles
563                .get(&tile.path())
564                .cloned()
565                .ok_or_else(|| format!("missing entries {}", tile.entries_path()))
566        }
567    }
568
569    #[test]
570    fn read_path_bridge_recomputes_checkpoint_root() {
571        let mut tree = MerkleTree::new();
572        for i in 0u32..1000 {
573            tree.push(&i.to_be_bytes());
574        }
575        let reader = MemReader::from_tree(&tree, 1000);
576        let root = recompute_root_via(&reader, 1000).unwrap();
577        assert_eq!(root, tree.root());
578    }
579
580    #[test]
581    fn read_path_bridge_empty_tree() {
582        let reader = MemReader {
583            tiles: HashMap::new(),
584        };
585        assert_eq!(
586            recompute_root_via(&reader, 0).unwrap(),
587            crate::merkle::empty_root()
588        );
589    }
590
591    #[test]
592    fn read_path_bridge_surfaces_backend_error() {
593        // Reader with no tiles but a non-empty size => backend miss.
594        let reader = MemReader {
595            tiles: HashMap::new(),
596        };
597        assert!(matches!(
598            recompute_root_via(&reader, 300),
599            Err(ReadPathError::Backend(_))
600        ));
601    }
602
603    #[test]
604    fn read_path_bridge_surfaces_malformed_tile() {
605        let mut tiles = HashMap::new();
606        // Level-0 tile 0 for size 1 is a partial tile of width 1 (32 bytes);
607        // give it the wrong length to trigger a Tile error.
608        let t = tiles_for_size(1)[0];
609        tiles.insert(t.path(), vec![0u8; 31]);
610        let reader = MemReader { tiles };
611        assert!(matches!(
612            recompute_root_via(&reader, 1),
613            Err(ReadPathError::Tile(_))
614        ));
615    }
616}