net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
//! Read-path dispatch helpers — bridge raw event-payload bytes to
//! the inline / blob-ref distinction without requiring substrate
//! changes to the read APIs.
//!
//! Two shapes per `DATAFORTS_PLAN.md` § Phase 3 work-item 8:
//!
//! - [`classify_payload`] — peek the discriminator byte; return
//!   either the inline bytes view or a decoded [`BlobRef`]. Cheap;
//!   no async work, no adapter lookup.
//! - [`resolve_payload`] — transparent fetch path. Returns the
//!   resolved bytes for both inline payloads (passthrough) and
//!   blob-ref payloads (adapter fetch + hash verify). Callers that
//!   don't want to know which is which use this and treat every
//!   event payload uniformly.
//!
//! Routing by `adapter_id` is the caller's job — the plan's locked
//! decision picks per channel via `RedexFileConfig::blob_adapter_id`
//! (additive substrate change not yet shipped). For now,
//! [`resolve_payload`] takes the chosen adapter directly so callers
//! can build their own routing on top.

use bytes::Bytes;

use super::adapter::BlobAdapter;
use super::blob_ref::BlobRef;
use super::error::BlobError;

/// Classification of an event payload's wire shape.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EventPayload<'a> {
    /// Plain inline payload — the bytes are the event content
    /// verbatim. Borrowed from the caller's buffer; the caller
    /// owns the lifetime.
    Inline(&'a [u8]),
    /// Out-of-band content addressed by a [`BlobRef`]. The caller
    /// resolves via a [`BlobAdapter`]; the substrate's own
    /// verification path runs as part of [`resolve_payload`].
    Blob(BlobRef),
}

/// Peek a payload's discriminator and produce either the inline
/// borrow or the decoded blob reference. No I/O. No allocation
/// for the inline path; one allocation (the decoded URI string)
/// for the blob path.
pub fn classify_payload(bytes: &[u8]) -> Result<EventPayload<'_>, BlobError> {
    match BlobRef::decode(bytes)? {
        Some(blob) => Ok(EventPayload::Blob(blob)),
        None => Ok(EventPayload::Inline(bytes)),
    }
}

/// Resolve a payload to its content bytes. Inline payloads return
/// a `Vec<u8>` copy; blob-ref payloads validate the URI scheme
/// against the adapter's accepted-schemes list, fetch via
/// `adapter`, verify against the embedded BLAKE3 hash, and return
/// the verified bytes.
///
/// Scheme validation closes the publisher-controls-adapter-input
/// attack surface: a publisher with append rights on a channel
/// configured to use a privileged adapter (e.g. an FS adapter
/// with host-side authority) could otherwise stamp arbitrary
/// `s3://attacker/key` URIs that the FS adapter would still try
/// to resolve. The adapter's [`BlobAdapter::accepted_schemes`]
/// override drives the gate — empty default means "accept any."
///
/// Hash verification runs inside this function rather than inside
/// the adapter so an adversarial adapter cannot fake-verify by
/// returning bytes that match a hash it computed itself.
pub async fn resolve_payload<A: BlobAdapter + ?Sized>(
    bytes: &[u8],
    adapter: &A,
) -> Result<Bytes, BlobError> {
    match classify_payload(bytes)? {
        EventPayload::Inline(b) => Ok(Bytes::copy_from_slice(b)),
        EventPayload::Blob(blob) => {
            let accepted = adapter.accepted_schemes();
            if !accepted.is_empty() {
                let scheme = uri_scheme(blob.uri());
                if !accepted.contains(&scheme) {
                    return Err(BlobError::UnsupportedScheme(blob.uri().to_owned()));
                }
            }
            let fetched = adapter.fetch(&blob).await?;
            // Verification posture is different for the two BlobRef
            // shapes:
            //   * `Small` — single top-level BLAKE3 hash. The
            //     substrate runs `BlobRef::verify` here, independent
            //     of the adapter, so an adversarial / buggy adapter
            //     can't fake-verify by returning bytes that match a
            //     hash it computed itself.
            //   * `Manifest` — no top-level hash. The substrate
            //     runs the chunk-by-chunk verification here too:
            //     slice `fetched` at `chunks[i].size` boundaries,
            //     hash each region, compare against the manifest's
            //     `chunks[i].hash`. This used to be left to the
            //     adapter (e.g. `MeshBlobAdapter::fetch_chunk`
            //     internally), but `resolve_payload` accepts any
            //     `BlobAdapter`-impl and adapters that don't verify
            //     chunk-by-chunk (or are adversarial) could
            //     otherwise return tampered bytes here. Doing the
            //     check at the dispatch layer keeps the substrate-
            //     side guarantee uniform across both shapes.
            if !blob.is_chunked() {
                blob.verify(&fetched)?;
            } else {
                verify_manifest_chunks(&blob, &fetched)?;
            }
            Ok(fetched)
        }
    }
}

/// Verify a `Manifest`-shape `BlobRef` against the reassembled
/// `fetched` byte stream. Walks the manifest's `chunks` list,
/// slices `fetched` at the recorded chunk sizes, hashes each
/// slice with BLAKE3, and compares against the recorded
/// `chunks[i].hash`. Surface a typed error on any mismatch so the
/// caller surfaces a verification failure instead of returning
/// tampered bytes.
///
/// Independent of any adapter-side verification — adversarial or
/// buggy adapters that return manipulated bytes here will fail
/// the substrate-side check.
fn verify_manifest_chunks(
    blob: &super::blob_ref::BlobRef,
    fetched: &[u8],
) -> Result<(), BlobError> {
    use super::blob_ref::BlobRef;
    let chunks = match blob {
        BlobRef::Manifest { chunks, .. } => chunks,
        // The caller only invokes this on `is_chunked() == true`,
        // so this branch is unreachable in production. Guard
        // anyway for forward-compat with new BlobRef variants.
        BlobRef::Small { .. } => return Ok(()),
        // Tree blobs cannot be verified through the flat-buffer
        // verifier — the integrity invariant for Tree is
        // per-node BLAKE3 chain along the descent + per-chunk
        // verification at each leaf, all driven by the
        // tree-walker in `MeshBlobAdapter::walk_tree_range`. A
        // generic `BlobAdapter::fetch` that returned a flat
        // reassembled byte buffer for a Tree blob would bypass
        // that chain entirely — adversarial / buggy third-party
        // adapters could thus inject tampered bytes here without
        // detection. Reject explicitly so the only path that can
        // resolve a Tree blob is one that invokes the tree walk
        // (e.g. `MeshBlobAdapter::fetch_range(0..total_size)`),
        // which performs the per-chunk verification itself.
        BlobRef::Tree { .. } => {
            return Err(BlobError::Backend(
                "resolve_payload: BlobRef::Tree cannot be resolved through a flat \
                 fetch — route through a tree-walking fetch_range so per-chunk \
                 verification runs along the descent"
                    .to_owned(),
            ));
        }
    };
    // Single-pass walk per dataforts perf #177: validate the
    // running offset stays in bounds (overrun → typed error) AND
    // hash each chunk region in the same iteration. Pre-fix did
    // a `chunks.iter().sum()` to validate the total first, then
    // a second pass to hash — two passes over the same vec. The
    // total-equality check survives as the final `offset !=
    // fetched.len()` assertion: if every chunk fits without
    // overrun and the running offset lands exactly at
    // `fetched.len()`, the sums agree by construction. Same
    // correctness on tampered inputs, one walk.
    let mut offset: usize = 0;
    for chunk in chunks.iter() {
        // `checked_add` catches the wrap that `offset + chunk.size`
        // would silently produce on a hostile manifest with
        // `chunks` summing past `usize::MAX`; the bounds check
        // catches the more common case where the fetched buffer
        // is shorter than the declared chunk layout.
        let end = match offset.checked_add(chunk.size as usize) {
            Some(end) if end <= fetched.len() => end,
            _ => {
                return Err(BlobError::Backend(format!(
                    "manifest chunk layout overruns fetched buffer: \
                     offset {offset} + size {} > len {}",
                    chunk.size,
                    fetched.len()
                )));
            }
        };
        let region = &fetched[offset..end];
        let computed: [u8; 32] = blake3::hash(region).into();
        if computed != chunk.hash {
            return Err(BlobError::HashMismatch {
                expected: chunk.hash,
                actual: computed,
            });
        }
        offset = end;
    }
    if offset != fetched.len() {
        // All chunks consumed but bytes remain — declared layout
        // is shorter than the fetched buffer. Surface the same
        // error shape the legacy two-pass code used.
        return Err(BlobError::Backend(format!(
            "manifest reassembled length {} != sum of chunk sizes {}",
            fetched.len(),
            offset
        )));
    }
    Ok(())
}

/// Extract the URI scheme (everything before the first `:`), or
/// the empty string if no scheme is present.
fn uri_scheme(uri: &str) -> &str {
    match uri.find(':') {
        Some(i) => &uri[..i],
        None => "",
    }
}

/// Write `bytes` to `adapter` and return the encoded [`BlobRef`]
/// the caller should publish (via `RedexFile::append`,
/// `MeshNode::publish`, or any path that takes raw event-payload
/// bytes). The substrate computes the BLAKE3 hash, so the
/// returned ref is guaranteed to verify against the stored
/// content when later fetched through [`resolve_payload`].
///
/// The returned `Vec<u8>` is the encoded form, ready to use as an
/// event payload. Callers wanting the structured `BlobRef` can use
/// [`publish_blob_ref`] instead.
pub async fn publish_blob<A: BlobAdapter + ?Sized>(
    adapter: &A,
    uri: impl Into<String>,
    bytes: &[u8],
) -> Result<Vec<u8>, BlobError> {
    let blob = publish_blob_ref(adapter, uri, bytes).await?;
    Ok(blob.encode())
}

/// Same as [`publish_blob`], but returns the structured
/// [`BlobRef`] instead of the encoded form. Useful when the caller
/// wants to surface the URI / hash / size separately (e.g. for
/// telemetry or a side-channel index).
pub async fn publish_blob_ref<A: BlobAdapter + ?Sized>(
    adapter: &A,
    uri: impl Into<String>,
    bytes: &[u8],
) -> Result<BlobRef, BlobError> {
    let hash: [u8; 32] = blake3::hash(bytes).into();
    let blob = BlobRef::small(uri, hash, bytes.len() as u64);
    adapter.store(&blob, bytes).await?;
    Ok(blob)
}

#[cfg(test)]
mod tests {
    use super::super::fs::FileSystemAdapter;
    use super::super::noop::NoopAdapter;
    use super::*;

    fn fixture_blob_ref(payload: &[u8]) -> BlobRef {
        BlobRef::small(
            "file:///dispatch",
            blake3::hash(payload).into(),
            payload.len() as u64,
        )
    }

    #[test]
    fn classify_inline_when_first_byte_is_not_discriminator() {
        let bytes = b"plain payload";
        match classify_payload(bytes).unwrap() {
            EventPayload::Inline(b) => assert_eq!(b, bytes),
            other => panic!("expected Inline, got {:?}", other),
        }
    }

    #[test]
    fn classify_blob_when_first_byte_is_discriminator() {
        let payload = b"out of band";
        let blob = fixture_blob_ref(payload);
        let encoded = blob.encode();
        match classify_payload(&encoded).unwrap() {
            EventPayload::Blob(decoded) => assert_eq!(decoded, blob),
            other => panic!("expected Blob, got {:?}", other),
        }
    }

    #[test]
    fn classify_empty_payload_is_inline() {
        // Empty event payloads exist (heartbeats, ack frames).
        // First-byte peek returns None, so classify reports Inline.
        let bytes: &[u8] = &[];
        match classify_payload(bytes).unwrap() {
            EventPayload::Inline(b) => assert!(b.is_empty()),
            other => panic!("expected empty Inline, got {:?}", other),
        }
    }

    #[tokio::test]
    async fn resolve_passes_inline_through() {
        let adapter = NoopAdapter::default();
        let bytes = b"inline goes straight through";
        let resolved = resolve_payload(bytes, &adapter).await.unwrap();
        assert_eq!(resolved.as_ref(), bytes);
    }

    #[tokio::test]
    async fn resolve_fetches_and_verifies_blob() {
        let root = std::env::temp_dir().join(format!(
            "net-blob-resolve-{}-{}",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        ));
        let adapter = FileSystemAdapter::new("resolve-test", &root);
        let payload = b"this content lives out of band";
        let blob = fixture_blob_ref(payload);
        adapter.store(&blob, payload).await.unwrap();

        let encoded = blob.encode();
        let resolved = resolve_payload(&encoded, &adapter).await.unwrap();
        assert_eq!(resolved.as_ref(), payload);

        let _ = std::fs::remove_dir_all(&root);
    }

    #[tokio::test]
    async fn publish_blob_round_trips_through_resolve_payload() {
        let root = std::env::temp_dir().join(format!(
            "net-blob-publish-{}-{}",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        ));
        let adapter = FileSystemAdapter::new("publish-test", &root);
        let payload = b"write side equivalent of resolve_payload";

        // publish_blob returns the encoded BlobRef as bytes.
        let encoded = publish_blob(&adapter, "file:///published", payload)
            .await
            .unwrap();
        // First four bytes are the BlobRef magic.
        assert_eq!(
            &encoded[..4],
            &crate::adapter::net::dataforts::blob::BLOB_REF_MAGIC,
        );

        // resolve_payload turns the encoded form back into the
        // original bytes via fetch + verify.
        let resolved = resolve_payload(&encoded, &adapter).await.unwrap();
        assert_eq!(resolved.as_ref(), payload);

        let _ = std::fs::remove_dir_all(&root);
    }

    #[tokio::test]
    async fn publish_blob_ref_returns_structured_ref() {
        let root = std::env::temp_dir().join(format!(
            "net-blob-publish-ref-{}-{}",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        ));
        let adapter = FileSystemAdapter::new("publish-ref", &root);
        let payload = b"explicit ref shape";

        let blob = publish_blob_ref(&adapter, "file:///structured", payload)
            .await
            .unwrap();
        // Hash is BLAKE3 of the payload.
        let expected: [u8; 32] = blake3::hash(payload).into();
        assert_eq!(blob.small_hash(), Some(&expected));
        assert_eq!(blob.size(), payload.len() as u64);
        assert_eq!(blob.uri(), "file:///structured");

        // Stored content is fetchable + verifies.
        let fetched = adapter.fetch(&blob).await.unwrap();
        assert_eq!(fetched.as_ref(), payload);
        blob.verify(&fetched).unwrap();

        let _ = std::fs::remove_dir_all(&root);
    }

    /// `BlobRef::verify` hard-errors on the Manifest arm because
    /// Manifest has no top-level hash; chunk hashes are verified
    /// per-chunk inside the adapter (`MeshBlobAdapter::fetch_chunk`).
    /// resolve_payload must skip the top-level verify for Manifest,
    /// otherwise every chunked payload (anything over 4 MiB) is
    /// un-fetchable through the documented helper.
    #[tokio::test]
    async fn resolve_passes_chunked_manifest_through_without_top_level_verify() {
        use super::super::adapter::BlobAdapter;
        use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};

        // Adapter that returns the same payload for any Manifest
        // fetch. resolve_payload must NOT try to BlobRef::verify
        // those bytes against the (non-existent) top-level hash.
        #[derive(Debug)]
        struct StubManifestAdapter(Vec<u8>);
        #[async_trait::async_trait]
        impl BlobAdapter for StubManifestAdapter {
            fn adapter_id(&self) -> &str {
                "stub-manifest"
            }
            async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.0.clone()))
            }
            async fn fetch_range(
                &self,
                _: &BlobRef,
                range: std::ops::Range<u64>,
            ) -> Result<Bytes, BlobError> {
                Ok(Bytes::copy_from_slice(
                    &self.0[range.start as usize..range.end as usize],
                ))
            }
            async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        let payload = vec![0x5A; (BLOB_CHUNK_SIZE_BYTES as usize) + 16];
        let chunk_1 = vec![0x5A; BLOB_CHUNK_SIZE_BYTES as usize];
        let chunk_2 = vec![0x5A; 16];
        let chunks = vec![
            ChunkRef {
                hash: blake3::hash(&chunk_1).into(),
                size: BLOB_CHUNK_SIZE_BYTES as u32,
            },
            ChunkRef {
                hash: blake3::hash(&chunk_2).into(),
                size: 16,
            },
        ];
        let blob = BlobRef::manifest("mesh://chunked-resolve", Encoding::Replicated, chunks)
            .expect("manifest construct");
        let encoded = blob.encode();
        let adapter = StubManifestAdapter(payload.clone());
        let resolved = resolve_payload(&encoded, &adapter)
            .await
            .expect("resolve must accept Manifest without top-level verify");
        assert_eq!(resolved.as_ref(), payload.as_slice());
    }

    /// Review P1 regression: a chunked Manifest fetched via
    /// `resolve_payload` must verify each chunk against the
    /// manifest's recorded hashes — adversarial adapters that
    /// return tampered chunk bytes must fail with `HashMismatch`
    /// at the dispatch layer (independent of any adapter-side
    /// verification, which third-party impls may or may not do).
    #[tokio::test]
    async fn resolve_rejects_chunked_manifest_with_tampered_chunk_bytes() {
        use super::super::adapter::BlobAdapter;
        use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};

        // Stub that returns the legitimate first chunk but flips
        // the second chunk's payload — simulates an adversarial
        // / buggy adapter.
        #[derive(Debug)]
        struct TamperingAdapter {
            payload: Vec<u8>,
        }
        #[async_trait::async_trait]
        impl BlobAdapter for TamperingAdapter {
            fn adapter_id(&self) -> &str {
                "tampering"
            }
            async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.payload.clone()))
            }
            async fn fetch_range(
                &self,
                _: &BlobRef,
                range: std::ops::Range<u64>,
            ) -> Result<Bytes, BlobError> {
                Ok(Bytes::copy_from_slice(
                    &self.payload[range.start as usize..range.end as usize],
                ))
            }
            async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        // Legitimate manifest: first chunk all 0xAA, second chunk
        // all 0xBB. Hashes are recorded against the legitimate
        // bytes.
        let legit_chunk_1 = vec![0xAA; BLOB_CHUNK_SIZE_BYTES as usize];
        let legit_chunk_2 = vec![0xBB; 16];
        let chunks = vec![
            ChunkRef {
                hash: blake3::hash(&legit_chunk_1).into(),
                size: BLOB_CHUNK_SIZE_BYTES as u32,
            },
            ChunkRef {
                hash: blake3::hash(&legit_chunk_2).into(),
                size: 16,
            },
        ];
        let blob = BlobRef::manifest("mesh://tampered", Encoding::Replicated, chunks)
            .expect("manifest construct");
        let encoded = blob.encode();

        // Tampered payload: first chunk matches the manifest's
        // recorded hash; second chunk is flipped to all 0xCC.
        let mut tampered = legit_chunk_1.clone();
        tampered.extend(vec![0xCC; 16]);
        let adapter = TamperingAdapter { payload: tampered };
        let err = resolve_payload(&encoded, &adapter)
            .await
            .expect_err("tampered chunk must fail verification");
        assert!(
            matches!(err, BlobError::HashMismatch { .. }),
            "expected HashMismatch on tampered chunk 2, got {:?}",
            err
        );
    }

    /// Pin dataforts perf #177: the one-pass verifier rejects
    /// a manifest whose declared chunk layout doesn't fit the
    /// fetched buffer. Pre-fix this was caught by a separate
    /// `chunks.iter().sum() != fetched.len()` pass; post-fix it
    /// falls out of the per-chunk `checked_add` + bounds check.
    /// A regression that drops the bounds check would let the
    /// `&fetched[offset..end]` slice panic at runtime — this
    /// test would surface as a panic instead of the typed error.
    #[tokio::test]
    async fn resolve_rejects_manifest_when_fetched_buffer_is_short() {
        use super::super::adapter::BlobAdapter;
        use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};

        #[derive(Debug)]
        struct ShortAdapter {
            payload: Vec<u8>,
        }
        #[async_trait::async_trait]
        impl BlobAdapter for ShortAdapter {
            fn adapter_id(&self) -> &str {
                "short"
            }
            async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.payload.clone()))
            }
            async fn fetch_range(
                &self,
                _: &BlobRef,
                range: std::ops::Range<u64>,
            ) -> Result<Bytes, BlobError> {
                let end = (range.end as usize).min(self.payload.len());
                Ok(Bytes::copy_from_slice(
                    &self.payload[range.start as usize..end],
                ))
            }
            async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        // Manifest declares two chunks totaling BLOB_CHUNK_SIZE_BYTES + 16 bytes.
        let legit_chunk_1 = vec![0xAA; BLOB_CHUNK_SIZE_BYTES as usize];
        let legit_chunk_2 = vec![0xBB; 16];
        let chunks = vec![
            ChunkRef {
                hash: blake3::hash(&legit_chunk_1).into(),
                size: BLOB_CHUNK_SIZE_BYTES as u32,
            },
            ChunkRef {
                hash: blake3::hash(&legit_chunk_2).into(),
                size: 16,
            },
        ];
        let blob = BlobRef::manifest("mesh://short", Encoding::Replicated, chunks)
            .expect("manifest construct");
        let encoded = blob.encode();

        // But the adapter returns only the first chunk's bytes
        // (8 bytes short of the declared layout). The per-chunk
        // bounds check must reject before any `&fetched[...]`
        // index panic could fire.
        let adapter = ShortAdapter {
            payload: legit_chunk_1,
        };
        let err = resolve_payload(&encoded, &adapter)
            .await
            .expect_err("short buffer must reject");
        let msg = err.to_string();
        assert!(
            msg.contains("overruns") || msg.contains("!="),
            "expected layout-overrun / length-mismatch error; got: {msg}",
        );
    }

    /// Companion to the legitimate-path test above: a Manifest
    /// fetched via `resolve_payload` whose adapter returns the
    /// genuine bytes must still succeed (the chunk-by-chunk
    /// verifier accepts every matching chunk hash).
    #[tokio::test]
    async fn resolve_accepts_chunked_manifest_with_matching_chunk_bytes() {
        use super::super::adapter::BlobAdapter;
        use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};

        #[derive(Debug)]
        struct LegitAdapter(Vec<u8>);
        #[async_trait::async_trait]
        impl BlobAdapter for LegitAdapter {
            fn adapter_id(&self) -> &str {
                "legit"
            }
            async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.0.clone()))
            }
            async fn fetch_range(
                &self,
                _: &BlobRef,
                range: std::ops::Range<u64>,
            ) -> Result<Bytes, BlobError> {
                Ok(Bytes::copy_from_slice(
                    &self.0[range.start as usize..range.end as usize],
                ))
            }
            async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        let chunk_1 = vec![0x11; BLOB_CHUNK_SIZE_BYTES as usize];
        let chunk_2 = vec![0x22; 32];
        let mut full = chunk_1.clone();
        full.extend(&chunk_2);
        let chunks = vec![
            ChunkRef {
                hash: blake3::hash(&chunk_1).into(),
                size: BLOB_CHUNK_SIZE_BYTES as u32,
            },
            ChunkRef {
                hash: blake3::hash(&chunk_2).into(),
                size: 32,
            },
        ];
        let blob = BlobRef::manifest("mesh://legit", Encoding::Replicated, chunks)
            .expect("manifest construct");
        let encoded = blob.encode();
        let adapter = LegitAdapter(full.clone());
        let resolved = resolve_payload(&encoded, &adapter)
            .await
            .expect("legitimate manifest must verify");
        assert_eq!(resolved.as_ref(), full.as_slice());
    }

    #[tokio::test]
    async fn resolve_rejects_uri_with_unaccepted_scheme() {
        // FileSystemAdapter only accepts `file:` URIs. An event
        // payload whose BlobRef carries `s3://attacker/key` must
        // reject with UnsupportedScheme before the adapter is
        // asked to fetch anything.
        let root = std::env::temp_dir().join(format!(
            "net-blob-scheme-{}-{}",
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_nanos()
        ));
        let adapter = FileSystemAdapter::new("scheme-test", &root);
        let payload = b"unused";
        let blob = BlobRef::small(
            "s3://attacker/key",
            blake3::hash(payload).into(),
            payload.len() as u64,
        );
        let encoded = blob.encode();
        let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
        assert!(matches!(err, BlobError::UnsupportedScheme(_)));
        let _ = std::fs::remove_dir_all(&root);
    }

    /// `BlobRef::Tree` cannot be resolved through `resolve_payload`
    /// because the trait's `fetch` returns flat bytes that bypass
    /// the per-node BLAKE3 chain along the tree descent. The fix
    /// must reject Tree with a typed error rather than passing
    /// adapter-supplied bytes through unverified.
    #[tokio::test]
    async fn resolve_rejects_tree_blob_to_force_tree_walking_fetch_range() {
        use super::super::blob_ref::Encoding;
        use async_trait::async_trait;
        use std::ops::Range;

        // Stub returns arbitrary bytes for any fetch — pre-fix
        // these passed through `verify_manifest_chunks`'s Ok(()) for
        // Tree branch unverified.
        struct UnverifiedTreeAdapter(Vec<u8>);
        #[async_trait]
        impl BlobAdapter for UnverifiedTreeAdapter {
            fn adapter_id(&self) -> &str {
                "unverified-tree"
            }
            async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.0.clone()))
            }
            async fn fetch_range(
                &self,
                _: &BlobRef,
                _range: Range<u64>,
            ) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.0.clone()))
            }
            async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        // Plausible Tree BlobRef — the wire bytes are valid even
        // though no chunk store carries the root.
        let tree = BlobRef::tree("test://tree", Encoding::Replicated, [0xAB; 32], 64, 1)
            .expect("tree ref");
        let encoded = tree.encode();
        let adapter = UnverifiedTreeAdapter(vec![0u8; 64]);
        let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
        let msg = err.to_string();
        assert!(
            msg.contains("BlobRef::Tree cannot be resolved through a flat fetch"),
            "expected Tree-rejected error; got: {msg}",
        );
    }

    #[tokio::test]
    async fn resolve_rejects_blob_with_corrupted_content() {
        // Pin that the substrate-side verify defends against an
        // adversarial adapter that returns bytes whose hash
        // doesn't match the advertised BlobRef. Built on a stub
        // adapter that doesn't verify on store (the production FS
        // adapter does, so we can't use it to forge mismatched
        // content).
        use async_trait::async_trait;
        use std::ops::Range;

        struct AdversarialAdapter {
            id: String,
            bytes: Vec<u8>,
        }
        #[async_trait]
        impl BlobAdapter for AdversarialAdapter {
            fn adapter_id(&self) -> &str {
                &self.id
            }
            async fn store(&self, _blob_ref: &BlobRef, _bytes: &[u8]) -> Result<(), BlobError> {
                Ok(())
            }
            async fn fetch(&self, _blob_ref: &BlobRef) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.bytes.clone()))
            }
            async fn fetch_range(
                &self,
                _blob_ref: &BlobRef,
                _range: Range<u64>,
            ) -> Result<Bytes, BlobError> {
                Ok(Bytes::from(self.bytes.clone()))
            }
            async fn exists(&self, _blob_ref: &BlobRef) -> Result<bool, BlobError> {
                Ok(true)
            }
        }

        let advertised = b"the truth";
        let actual: &[u8] = b"a different lie";
        let blob = BlobRef::small(
            "test://tamper",
            blake3::hash(advertised).into(),
            advertised.len() as u64,
        );
        let adapter = AdversarialAdapter {
            id: "tamper".into(),
            bytes: actual.to_vec(),
        };
        let encoded = blob.encode();
        let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
        assert!(matches!(err, BlobError::HashMismatch { .. }));
    }
}