net-mesh 0.27.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
//! Blob transfer over router streams (FairScheduler transport plan, T-1).
//!
//! On-demand cross-peer blob fetch that moves bytes over the router's
//! reliable, scheduled streams — NOT RedEX replication (a replication
//! primitive) and NOT nRPC (a request/reply primitive). See
//! `docs/plans/FAIRSCHEDULER_TRANSPORT_PLAN.md`.
//!
//! T-1 (this slice): the subprotocol ID and the stream-allocation
//! convention. The control packet that initiates a transfer and the
//! bulk data both ride [`SUBPROTOCOL_BLOB_TRANSFER`]; transfer streams
//! draw their IDs from a reserved region of the shared `u64` stream-id
//! space so they never alias channel-publisher, subprotocol, or control
//! streams. (T-2 discovery→stream bridge, T-3 serving handler, T-4
//! receive reassembly land on top.)
//!
//! # Stream-id convention
//!
//! The substrate's stream-id space is shared (the session keys stream
//! state and the [`FairScheduler`](crate::adapter::net::router::FairScheduler)
//! keys queues by raw `stream_id`), with soft conventions per subsystem:
//!
//! - **Channel-publisher streams** always SET bit 48
//!   (`MeshNode::publish_stream_id` = `0x0001_0000_0000_0000 | hash`).
//! - **Subprotocol streams** use the small subprotocol-id value
//!   (`< 0x1100`).
//! - **Control stream** is `u64::MAX` (bit 48 set).
//!
//! Transfer streams therefore use **bit 61 set AND bit 48 clear**:
//! distinct from channel/control streams (which always set bit 48) and
//! from subprotocol streams (which never set bit 61). The low 48 bits
//! carry a per-transfer nonce, so bit 48 stays clear by construction.

use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};

use bytes::Bytes;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};

use super::error::BlobError;
use super::mesh::MeshBlobAdapter;
use crate::adapter::net::{MeshNode, Reliability, Stream, StreamConfig};

/// Subprotocol ID for blob transfer. Next free family after the
/// existing `0x04xx..0x10xx` allocations (fold is `0x1000`). Both the
/// transfer control packet and its bulk data carry this ID so inbound
/// dispatch routes them to the transfer handler (T-3).
pub const SUBPROTOCOL_BLOB_TRANSFER: u16 = 0x1100;

/// Marker bit (61) on a transfer stream ID. Combined with bit 48 clear
/// (channel-publisher / control streams always set bit 48), this keeps
/// transfer stream IDs disjoint from every other subsystem's streams.
const TRANSFER_STREAM_FLAG: u64 = 1 << 61;

/// Bit 48 — the channel-publisher discriminator. Transfer stream IDs
/// keep it CLEAR (their nonce occupies only bits 0..47), which is what
/// makes them disjoint from channel/control streams.
const CHANNEL_STREAM_BIT: u64 = 1 << 48;

/// Mask for the per-transfer nonce (bits 0..47). Keeping the nonce
/// below bit 48 guarantees [`CHANNEL_STREAM_BIT`] stays clear.
const TRANSFER_NONCE_MASK: u64 = (1 << 48) - 1;

/// Construct a transfer stream ID from a per-transfer `nonce`. Only the
/// low 48 bits of `nonce` are used.
pub fn transfer_stream_id(nonce: u64) -> u64 {
    TRANSFER_STREAM_FLAG | (nonce & TRANSFER_NONCE_MASK)
}

/// True iff `stream_id` is a blob-transfer stream (bit 61 set, bit 48
/// clear). Channel/control streams (bit 48 set) and subprotocol streams
/// (bit 61 clear) both return `false`.
pub fn is_transfer_stream_id(stream_id: u64) -> bool {
    stream_id & TRANSFER_STREAM_FLAG != 0 && stream_id & CHANNEL_STREAM_BIT == 0
}

/// Process-wide nonce source for transfer streams. A monotonic counter
/// is enough: collisions only at 2^48 concurrent-lifetime transfers,
/// far beyond any real workload, and a wrapped nonce only risks
/// aliasing a *still-open* transfer stream (closed ones are cleaned up).
static TRANSFER_STREAM_NONCE: AtomicU64 = AtomicU64::new(1);

/// Allocate a fresh transfer stream ID (unique within this process for
/// the next 2^48 allocations).
pub fn next_transfer_stream_id() -> u64 {
    let nonce = TRANSFER_STREAM_NONCE.fetch_add(1, Ordering::Relaxed);
    transfer_stream_id(nonce)
}

/// Per-data-event byte cap. Kept under `MAX_PAYLOAD_SIZE` (8108) minus
/// the event-frame length prefix so each raw data event rides one
/// packet without overflowing the payload, and each `send_on_stream`
/// of a single event sends exactly one packet (no partial-batch on
/// backpressure).
const DATA_FRAME_BYTES: usize = 8000;

/// Tx-credit window for a serving transfer stream, in on-wire bytes.
///
/// Sized to ≈ `DEFAULT_MAX_PENDING` (32) frames worth: `DEFAULT_MAX_PENDING
/// × DATA_FRAME_BYTES` ≈ 256 KiB. Charged on-wire bytes per packet exceed
/// `DATA_FRAME_BYTES` (Net header + AEAD tag + framing), so this admits
/// *fewer* than 32 packets in flight.
///
/// Since H-1 the reliability **retransmit window auto-sizes from this
/// tx-window** (`ReliableStream::max_pending_for_window`), so the
/// "in-flight ≤ retransmit-window" invariant holds automatically for any
/// window value — an unacked packet aged past the retransmit window
/// would be evicted and unrecoverable, but the retransmit window now
/// always covers the flow-control window. This constant therefore no
/// longer *manually* couples to the fixed 32 (pre-H-1 it had to).
///
/// It's kept modest because a larger window buys nothing here: measured
/// single-stream throughput is bounded by per-datagram loopback latency,
/// not credit, and concurrent transfers keep the pipe full across
/// streams. (An earlier 5 MiB window pre-dated H-1's auto-sizing and,
/// against the then-fixed 32-packet retransmit window, let concurrent
/// large transfers drop packets past recovery — see
/// tests/transfer_concurrency.rs.) A multi-MiB chunk refills this window
/// several times; a refill that finds no credit pays `send_with_retry`'s
/// backoff, which is negligible on the loopback-latency-bound path.
const TRANSFER_STREAM_WINDOW_BYTES: u32 =
    crate::adapter::net::ReliableStream::DEFAULT_MAX_PENDING as u32 * DATA_FRAME_BYTES as u32;

/// Upper bound the receiver accepts for a chunk's `total_len`, so a
/// misbehaving holder can't claim a huge length and OOM the buffer.
/// Generous above the 4 MiB single-chunk max.
const TRANSFER_MAX_CHUNK_BYTES: u64 = 16 * 1024 * 1024;

/// How long a requester waits for a transfer to complete before giving
/// up (and letting the caller retry another holder).
const TRANSFER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

/// Retry budget for an individual stream send under backpressure.
const SEND_RETRIES: usize = 64;

/// Cap on how far ahead of the next-expected sequence the receiver will
/// buffer out-of-order transfer packets. The sender can't have more than
/// its tx window (`TRANSFER_STREAM_WINDOW_BYTES` ≈ 32 frames) in flight,
/// so legitimate reordering never spans more than that; 1024 leaves
/// margin while bounding the reorder buffer (a far-future seq is dropped
/// and the sender retransmits it once the gap closes). Also bounds memory
/// against a misbehaving holder spraying sparse high sequences.
const MAX_REORDER_AHEAD: u64 = 1024;

// ── Wire frames ────────────────────────────────────────────────────

/// Control frame, carried on a `SUBPROTOCOL_BLOB_TRANSFER` packet with
/// the transfer stream ID. Sent requester → holder to initiate.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransferControl {
    /// "Send me the chunk addressed by `hash` on this stream."
    Request {
        /// 32-byte BLAKE3 content address.
        hash: [u8; 32],
    },
}

/// First data-plane event on the transfer stream, holder → requester.
/// Subsequent events on the stream are raw chunk bytes.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransferHeader {
    /// The holder has the chunk; `total_len` bytes follow as raw events.
    Found {
        /// Total chunk length the following raw events sum to.
        total_len: u64,
    },
    /// The holder doesn't have the chunk; no bytes follow.
    NotFound,
}

// ── Engine ─────────────────────────────────────────────────────────

type DoneTx = tokio::sync::oneshot::Sender<Result<Bytes, BlobError>>;

/// Outcome of folding one reassembly event. Hoisted to module scope so
/// both `on_data` (drives the loop) and `process_event` (folds one
/// event) share it without re-acquiring the DashMap guard across a
/// `finish` (which removes the entry).
enum ReassembleStep {
    /// More events expected.
    Continue,
    /// Terminal failure (NotFound, over-length, bad header, cap).
    Fail(BlobError),
    /// All declared bytes received — verify + deliver.
    Complete,
}

/// Requester-side in-flight transfer state, keyed by transfer stream id.
struct PendingInbound {
    /// The peer we're fetching from — needed to close the receive-side
    /// stream once the transfer settles (otherwise streams leak: one
    /// per fetched chunk, reclaimed only at the 300 s idle timeout,
    /// which exhausts memory at directory scale).
    holder: u64,
    expected_hash: [u8; 32],
    /// `None` until the `TransferHeader` lands; then the declared length.
    total_len: Option<u64>,
    buf: Vec<u8>,
    /// Next reliable sequence to process. The divert delivers packets in
    /// ARRIVAL order (the substrate's `on_receive` accepts out-of-order
    /// sequences for SACK), so the engine reorders by sequence: header is
    /// seq 0, data frames are seq 1..N in send order.
    next_seq: u64,
    /// Out-of-order packets buffered until their sequence becomes
    /// contiguous, keyed by sequence. Bounded by [`MAX_REORDER_AHEAD`].
    reorder: BTreeMap<u64, Vec<Bytes>>,
    /// Taken and fired once on completion (success / NotFound / error).
    done: Option<DoneTx>,
}

impl PendingInbound {
    /// Point-in-time snapshot of this transfer for operator introspection.
    fn status(&self, stream_id: u64) -> TransferStatus {
        TransferStatus {
            stream_id,
            holder: self.holder,
            expected_hash: self.expected_hash,
            bytes_received: self.buf.len() as u64,
            total_bytes: self.total_len,
        }
    }
}

/// A point-in-time snapshot of one **requester-side** in-flight transfer,
/// for operator introspection (the `blob.transfers` RPC behind
/// `net transfer ls` / `status`). Serving-side tasks are not tracked, so
/// this reflects what the node is currently *fetching*, not what it serves.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TransferStatus {
    /// Transfer stream id — the transfer's identity / cancel handle.
    pub stream_id: u64,
    /// Peer node id the bytes are being fetched from.
    pub holder: u64,
    /// BLAKE3 content address being fetched.
    pub expected_hash: [u8; 32],
    /// Bytes reassembled so far.
    pub bytes_received: u64,
    /// Declared total once the transfer header arrived; `None` before then.
    pub total_bytes: Option<u64>,
}

/// Drives blob transfer over router streams (FairScheduler transport).
/// Installed on a node via
/// [`crate::adapter::net::MeshNode::serve_blob_transfer`]. Holds a
/// `Weak<MeshNode>` (to open reply streams without an adapter↔mesh
/// cycle) and the local [`MeshBlobAdapter`] (for content lookup), plus
/// the requester-side pending map.
pub struct BlobTransferEngine {
    mesh: Weak<MeshNode>,
    adapter: Arc<MeshBlobAdapter>,
    pending: DashMap<u64, PendingInbound>,
}

impl BlobTransferEngine {
    /// Construct an engine over the local node + adapter.
    pub fn new(mesh: &Arc<MeshNode>, adapter: Arc<MeshBlobAdapter>) -> Self {
        Self {
            mesh: Arc::downgrade(mesh),
            adapter,
            pending: DashMap::new(),
        }
    }

    /// Register a requester-side pending transfer before the Request is
    /// sent, so the reply (header/data on `stream_id`) can be matched.
    /// `holder` is the serving peer, recorded so the receive-side stream
    /// can be closed when the transfer settles.
    pub fn register_pending(
        &self,
        stream_id: u64,
        holder: u64,
        expected_hash: [u8; 32],
        done: DoneTx,
    ) {
        self.pending.insert(
            stream_id,
            PendingInbound {
                holder,
                expected_hash,
                total_len: None,
                buf: Vec::new(),
                next_seq: 0,
                reorder: BTreeMap::new(),
                done: Some(done),
            },
        );
    }

    /// Drop a pending transfer (timeout / give-up). Idempotent. Closes the
    /// receive-side stream too (mirroring the settle path) so a cancelled
    /// transfer doesn't linger as a live stream until the 300 s idle
    /// timeout — the same leak the completion path avoids.
    pub fn cancel_pending(&self, stream_id: u64) {
        self.remove_and_close(stream_id);
    }

    /// Remove a pending transfer and tear down its receive-side stream,
    /// reporting whether one was present. Shared by [`Self::cancel_pending`]
    /// and [`Self::cancel_pending_reporting`]; the held `holder` is what
    /// lets us close the stream without re-deriving the peer.
    fn remove_and_close(&self, stream_id: u64) -> bool {
        match self.pending.remove(&stream_id) {
            Some((_, pending)) => {
                self.close_receive_stream(pending.holder, stream_id);
                true
            }
            None => false,
        }
    }

    /// Snapshot every requester-side in-flight transfer (operator
    /// introspection — `net transfer ls`). Receiver-side only.
    pub fn list_pending(&self) -> Vec<TransferStatus> {
        self.pending
            .iter()
            .map(|e| e.value().status(*e.key()))
            .collect()
    }

    /// Snapshot one in-flight transfer by stream id, or `None` if it isn't
    /// pending (already settled, cancelled, or never existed).
    pub fn get_pending(&self, stream_id: u64) -> Option<TransferStatus> {
        self.pending
            .get(&stream_id)
            .map(|e| e.value().status(stream_id))
    }

    /// Like [`Self::cancel_pending`] but reports whether a transfer was
    /// actually removed — for the operator cancel surface, which
    /// distinguishes "cancelled" from "no such transfer". Closes the
    /// receive-side stream and drops the entry's `done` sender, failing the
    /// awaiting fetch.
    pub fn cancel_pending_reporting(&self, stream_id: u64) -> bool {
        self.remove_and_close(stream_id)
    }

    /// The holder's reliable layer gave up retransmitting this transfer
    /// stream (STREAM_RETRANSMIT H-3 reset). Fail the pending read now
    /// with a distinct error so the caller can fail over to another
    /// holder immediately instead of waiting for the 30 s timeout.
    /// No-op if the transfer already settled.
    pub fn on_reset(&self, stream_id: u64) {
        self.finish(
            stream_id,
            Err(BlobError::Backend(
                "transfer: holder reset stream (retransmit exhausted)".into(),
            )),
        );
    }

    /// Serving side: a `TransferControl::Request` arrived on `stream_id`
    /// from `requester`. Spawn a task that reads the chunk locally and
    /// streams it back on the same (transfer) stream.
    ///
    /// # Authorization model: possession-of-hash is the capability
    ///
    /// A transfer is **content-addressed** — the request names a 32-byte
    /// BLAKE3 hash, not a channel. A blob can belong to many channels (or
    /// none), so channel-scoped read-auth doesn't map onto a bare hash.
    /// The deliberate model (chosen over channel-auth / capability
    /// tokens) is **possession-of-hash**: a peer that presents a valid
    /// content hash may fetch the bytes that hash to it. The 256-bit
    /// BLAKE3 digest is an unguessable bearer capability — you cannot
    /// enumerate or forge it, so knowing it is itself the grant.
    ///
    /// Two substrate guarantees backstop this, both already enforced:
    /// 1. **Authenticated session.** This handler only runs for a packet
    ///    that AEAD-decrypted under an established session with a
    ///    resolved `requester` (the dispatch branch rejects `from_node
    ///    == 0`), so an unauthenticated/forged peer never reaches here.
    /// 2. **Established peer for the reply.** `serve_chunk` streams the
    ///    bytes via `MeshNode::open_stream(requester, …)`, which requires
    ///    `requester` to be a connected peer — bytes never flow to an
    ///    unknown origin.
    ///
    /// **Caveat (by design):** the hash is a *bearer* token — anyone who
    /// learns it can fetch the content from any holder. Callers that need
    /// stronger confinement must treat content hashes for sensitive blobs
    /// as secrets (don't log/publish them to parties who shouldn't read
    /// the content), or layer channel/capability auth above this transport.
    pub fn on_request(&self, requester: u64, stream_id: u64, payload: &[u8]) {
        let control: TransferControl = match postcard::from_bytes(payload) {
            Ok(c) => c,
            Err(e) => {
                tracing::debug!(error = %e, requester, "blob transfer: bad control frame");
                return;
            }
        };
        let TransferControl::Request { hash } = control;
        let Some(mesh) = self.mesh.upgrade() else {
            return;
        };
        let adapter = self.adapter.clone();
        tokio::spawn(async move {
            serve_chunk(mesh, adapter, requester, stream_id, hash).await;
        });
    }

    /// Requester side: a transfer packet at reliable sequence `seq` was
    /// diverted here. **Events arrive in transmission order only when the
    /// wire didn't reorder** — the substrate's `on_receive` accepts
    /// out-of-order sequences (for SACK), and the divert hands them over
    /// in arrival order, so this method reorders by `seq` itself: it
    /// buffers out-of-order packets and processes events strictly in
    /// sequence (header = seq 0, data = seq 1..N). Duplicates (seq already
    /// processed or buffered) and far-future seqs are dropped; the sender
    /// retransmits a dropped far-future packet once the gap closes.
    pub fn on_data(&self, stream_id: u64, seq: u64, events: Vec<Bytes>) {
        let outcome = {
            let mut entry = match self.pending.get_mut(&stream_id) {
                Some(e) => e,
                None => return, // already completed / cancelled
            };
            // Dedup + bound: ignore already-consumed sequences, duplicate
            // buffered ones, and anything beyond the reorder horizon.
            if seq < entry.next_seq
                || entry.reorder.contains_key(&seq)
                || seq >= entry.next_seq.saturating_add(MAX_REORDER_AHEAD)
            {
                return;
            }
            entry.reorder.insert(seq, events);

            // Release every now-contiguous packet in sequence order,
            // processing its events until one is terminal.
            let mut outcome = ReassembleStep::Continue;
            loop {
                let ns = entry.next_seq;
                let Some(ready) = entry.reorder.remove(&ns) else {
                    break;
                };
                entry.next_seq += 1;
                for event in &ready {
                    outcome = Self::process_event(&mut entry, event);
                    if !matches!(outcome, ReassembleStep::Continue) {
                        break;
                    }
                }
                if !matches!(outcome, ReassembleStep::Continue) {
                    break;
                }
            }
            outcome
        };
        match outcome {
            ReassembleStep::Continue => {}
            ReassembleStep::Fail(err) => self.finish(stream_id, Err(err)),
            ReassembleStep::Complete => self.finish_verified(stream_id),
        }
    }

    /// Fold one in-sequence event into the pending reassembly: the first
    /// event (seq 0) is the [`TransferHeader`]; the rest are raw chunk
    /// bytes appended in order.
    fn process_event(entry: &mut PendingInbound, event: &Bytes) -> ReassembleStep {
        if entry.total_len.is_none() {
            match postcard::from_bytes::<TransferHeader>(event) {
                Ok(TransferHeader::NotFound) => {
                    ReassembleStep::Fail(BlobError::NotFound("transfer: holder NotFound".into()))
                }
                Ok(TransferHeader::Found { total_len }) if total_len > TRANSFER_MAX_CHUNK_BYTES => {
                    ReassembleStep::Fail(BlobError::Backend(format!(
                        "transfer: total_len {total_len} exceeds cap"
                    )))
                }
                Ok(TransferHeader::Found { total_len }) => {
                    entry.total_len = Some(total_len);
                    entry
                        .buf
                        .reserve(total_len.min(TRANSFER_MAX_CHUNK_BYTES) as usize);
                    if total_len == 0 {
                        ReassembleStep::Complete
                    } else {
                        ReassembleStep::Continue
                    }
                }
                Err(e) => {
                    ReassembleStep::Fail(BlobError::Backend(format!("transfer: bad header: {e}")))
                }
            }
        } else {
            let total = entry.total_len.unwrap_or(0);
            if (entry.buf.len() as u64).saturating_add(event.len() as u64) > total {
                ReassembleStep::Fail(BlobError::Backend(
                    "transfer: holder sent more than total_len".into(),
                ))
            } else {
                entry.buf.extend_from_slice(event);
                if entry.buf.len() as u64 >= total {
                    ReassembleStep::Complete
                } else {
                    ReassembleStep::Continue
                }
            }
        }
    }

    /// Remove the pending entry and fire its oneshot with `result`.
    fn finish(&self, stream_id: u64, result: Result<Bytes, BlobError>) {
        if let Some((_, mut pending)) = self.pending.remove(&stream_id) {
            if let Some(tx) = pending.done.take() {
                let _ = tx.send(result);
            }
            self.close_receive_stream(pending.holder, stream_id);
        }
    }

    /// Remove the pending entry, verify the assembled bytes against the
    /// expected hash, and fire its oneshot.
    fn finish_verified(&self, stream_id: u64) {
        let Some((_, mut pending)) = self.pending.remove(&stream_id) else {
            return;
        };
        let bytes = std::mem::take(&mut pending.buf);
        let result = {
            let computed: [u8; 32] = blake3::hash(&bytes).into();
            if computed == pending.expected_hash {
                Ok(Bytes::from(bytes))
            } else {
                Err(BlobError::HashMismatch {
                    expected: pending.expected_hash,
                    actual: computed,
                })
            }
        };
        if let Some(tx) = pending.done.take() {
            let _ = tx.send(result);
        }
        self.close_receive_stream(pending.holder, stream_id);
    }

    /// Tear down the receive-side stream once a transfer settles. The
    /// data is fully received (or the transfer failed), so no more
    /// packets are expected; reclaiming the stream keeps a high-file-
    /// count directory pull from accumulating one live stream per chunk
    /// until the 300 s idle timeout (which exhausts memory at scale). A
    /// late retransmit after close is harmless — it re-creates an empty
    /// stream that finds no pending entry and idles out.
    fn close_receive_stream(&self, holder: u64, stream_id: u64) {
        if let Some(mesh) = self.mesh.upgrade() {
            mesh.close_stream(holder, stream_id);
        }
    }
}

/// Serving-side: read `hash` locally and stream it to `requester` on
/// `stream_id` over a reliable, scheduled stream (FairScheduler).
async fn serve_chunk(
    mesh: Arc<MeshNode>,
    adapter: Arc<MeshBlobAdapter>,
    requester: u64,
    stream_id: u64,
    hash: [u8; 32],
) {
    let cfg = StreamConfig::new()
        .with_reliability(Reliability::Reliable)
        .with_scheduled(true)
        .with_window_bytes(TRANSFER_STREAM_WINDOW_BYTES)
        .with_fairness_weight(1);
    // `open_stream` requires `requester` to be a connected peer (an
    // established, authenticated session), so this is also the
    // authorization gate for the possession-of-hash model (see
    // `BlobTransferEngine::on_request`): bytes only ever flow to a peer
    // we have a live session with, and only for the exact hash it asked.
    let stream = match mesh.open_stream(requester, stream_id, cfg) {
        Ok(s) => s,
        Err(e) => {
            tracing::debug!(error = %e, requester, "blob transfer: open reply stream failed");
            return;
        }
    };

    // `fetch_chunk` here is the local content-addressed read (this
    // branch has no peer-fetch fallback on the adapter), so a serving
    // node always answers from its own store — no recursion risk.
    let local = adapter.fetch_chunk(&hash).await;
    match local {
        Ok(bytes) => {
            let header = TransferHeader::Found {
                total_len: bytes.len() as u64,
            };
            if send_one(&mesh, &stream, postcard_event(&header))
                .await
                .is_ok()
            {
                // One reliable event per ~8 KiB frame. Because
                // `TRANSFER_STREAM_WINDOW_BYTES` covers a whole chunk's
                // on-wire size, the per-event credit never runs dry
                // mid-chunk, so these sends don't stall into
                // `send_with_retry`'s backoff (the 64 KiB default window
                // exhausted every ~8 frames and each stall paid ≥5 ms
                // even though the receiver's grant lands in <1 ms).
                // Per-event (not batched) keeps each `send_with_retry`
                // independently safe: a one-packet call can't partially
                // commit and then resend a duplicate under a fresh
                // sequence on retry.
                for chunk in bytes.chunks(DATA_FRAME_BYTES) {
                    if send_one(&mesh, &stream, Bytes::copy_from_slice(chunk))
                        .await
                        .is_err()
                    {
                        break;
                    }
                }
            }
        }
        Err(_) => {
            // Absent locally or local read error → NotFound (never serve
            // suspect bytes). The requester fails over to another holder.
            let _ = send_one(&mesh, &stream, postcard_event(&TransferHeader::NotFound)).await;
        }
    }

    // Close gracefully (H-7): wait until the receiver has acked every
    // sent byte (so NACK-driven resends can still fill gaps) before
    // tearing down the retransmit window — closing eagerly strands a lost
    // tail packet on a lossy link. Bounded by `TRANSFER_TIMEOUT` so a
    // vanished receiver can't pin the stream; reclaiming it also stops
    // directory-scale fan-out from leaking one live stream per chunk.
    mesh.close_stream_graceful(requester, stream_id, TRANSFER_TIMEOUT)
        .await;
}

fn postcard_event<T: Serialize>(value: &T) -> Bytes {
    Bytes::from(postcard::to_allocvec(value).unwrap_or_default())
}

async fn send_one(mesh: &Arc<MeshNode>, stream: &Stream, event: Bytes) -> Result<(), ()> {
    mesh.send_with_retry(stream, std::slice::from_ref(&event), SEND_RETRIES)
        .await
        .map_err(|e| {
            tracing::debug!(error = %e, "blob transfer: stream send failed");
        })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn transfer_ids_are_disjoint_from_channel_and_control_streams() {
        // Channel-publisher streams always set bit 48.
        let channel_like = CHANNEL_STREAM_BIT | 0xDEAD_BEEF_CAFE;
        assert!(!is_transfer_stream_id(channel_like));
        // Control stream is u64::MAX (bit 48 set).
        assert!(!is_transfer_stream_id(u64::MAX));
        // Subprotocol streams are small (bit 61 clear).
        assert!(!is_transfer_stream_id(SUBPROTOCOL_BLOB_TRANSFER as u64));
        assert!(!is_transfer_stream_id(0x1000));
    }

    #[test]
    fn transfer_ids_round_trip_and_self_identify() {
        for nonce in [1u64, 42, 0xFFFF, (1 << 48) - 1] {
            let id = transfer_stream_id(nonce);
            assert!(is_transfer_stream_id(id), "id {id:#x} must self-identify");
            // bit 48 clear by construction.
            assert_eq!(id & CHANNEL_STREAM_BIT, 0);
            // bit 61 set.
            assert_ne!(id & TRANSFER_STREAM_FLAG, 0);
        }
    }

    #[test]
    fn allocator_yields_distinct_transfer_ids() {
        let a = next_transfer_stream_id();
        let b = next_transfer_stream_id();
        assert_ne!(a, b);
        assert!(is_transfer_stream_id(a) && is_transfer_stream_id(b));
    }

    /// The introspection accessors behind `blob.transfers` (list / get /
    /// cancel-reporting) must reflect `register_pending` and report removal.
    #[tokio::test]
    async fn engine_accessors_report_and_cancel_pending() {
        use crate::adapter::net::identity::EntityKeypair;
        use crate::adapter::net::redex::Redex;
        use crate::adapter::net::MeshNodeConfig;

        let addr = "127.0.0.1:0".parse().expect("addr");
        let node = Arc::new(
            MeshNode::new(
                EntityKeypair::generate(),
                MeshNodeConfig::new(addr, [0x17u8; 32]),
            )
            .await
            .expect("node"),
        );
        let adapter = Arc::new(MeshBlobAdapter::new("t", Arc::new(Redex::new())));
        let engine = BlobTransferEngine::new(&node, adapter);

        assert!(engine.list_pending().is_empty());

        let sid = transfer_stream_id(99);
        let (tx, _rx) = tokio::sync::oneshot::channel();
        engine.register_pending(sid, 7, [0xABu8; 32], tx);

        let listed = engine.list_pending();
        assert_eq!(listed.len(), 1);
        assert_eq!(listed[0].stream_id, sid);
        assert_eq!(listed[0].holder, 7);
        assert_eq!(listed[0].expected_hash, [0xABu8; 32]);
        assert_eq!(listed[0].bytes_received, 0);
        assert_eq!(listed[0].total_bytes, None);

        let got = engine.get_pending(sid).expect("pending present");
        assert_eq!(got.holder, 7);
        assert!(engine.get_pending(transfer_stream_id(1234)).is_none());

        // Cancel reports existence once, then is idempotently false.
        assert!(engine.cancel_pending_reporting(sid));
        assert!(!engine.cancel_pending_reporting(sid));
        assert!(engine.list_pending().is_empty());
    }
}