Skip to main content

net/adapter/net/cortex/
rpc.rs

1//! nRPC — request/response on top of CortEX folds.
2//!
3//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
4//! In short: an RPC server is a `RedexFold` whose state is the
5//! in-flight call set, whose events are typed `(REQUEST, RESPONSE,
6//! CANCEL, DEADLINE_EXCEEDED)`, whose `EventMeta::seq_or_ts` is the
7//! correlation id, and whose `EventMeta::origin_hash` is the
8//! AEAD-verified caller. The mesh-channel layer's queue-group
9//! subscription mode (see `channel::SubscriptionMode`) does the
10//! one-of-N work distribution across replica servers.
11//!
12//! This module is the **wire codec layer**: dispatch constants for
13//! `EventMeta::dispatch`, payload structs for `RpcRequestPayload` /
14//! `RpcResponsePayload`, and the `RpcStatus` enumeration. The fold
15//! types and the `Mesh::serve_rpc` / `Mesh::call` glue layer build
16//! on top.
17
18use bytes::{Buf, BufMut, Bytes};
19use parking_lot::Mutex;
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use tokio::sync::Notify;
24
25use super::super::redex::{RedexError, RedexEvent, RedexFold};
26use super::meta::{EventMeta, EVENT_META_SIZE};
27
28// ============================================================================
29// `EventMeta::dispatch` byte assignments for nRPC.
30//
31// All four values live in the cortex-internal range (`0x00..0x7F`).
32// Application/vendor dispatches stay in `0x80..0xFF`. Adapters that
33// don't care about RPC ignore unknown dispatches as they ignore any
34// other.
35// ============================================================================
36
37/// Caller → server. The first frame of an RPC call. `EventMeta::seq_or_ts`
38/// is the caller-generated `call_id`; `EventMeta::origin_hash` is the
39/// AEAD-verified caller. Payload is an [`RpcRequestPayload`].
40pub const DISPATCH_RPC_REQUEST: u8 = 0x10;
41
42/// Server → caller. The (terminal, for unary) frame of an RPC call.
43/// `EventMeta::seq_or_ts` matches the request's `call_id`. Payload is
44/// an [`RpcResponsePayload`].
45pub const DISPATCH_RPC_RESPONSE: u8 = 0x11;
46
47/// Caller → server. Cancellation signal. `EventMeta::seq_or_ts` matches
48/// the request's `call_id`. Empty payload — the dispatch byte plus
49/// the matching `call_id` is the whole signal. Server's fold removes
50/// the in-flight entry and (if cooperative) flips the handler's
51/// `CancellationToken`.
52pub const DISPATCH_RPC_CANCEL: u8 = 0x12;
53
54/// Server → caller. Deadline-exceeded signal. Emitted when the
55/// server's fold sees `now_ns() > request.deadline_ns` before
56/// starting the handler (or, optionally, when a long-running handler
57/// is aborted by the deadline timer). `EventMeta::seq_or_ts` matches
58/// the request's `call_id`. Empty payload.
59pub const DISPATCH_RPC_DEADLINE_EXCEEDED: u8 = 0x13;
60
61/// Caller → server. Stream credit grant. Carries a 4-byte
62/// big-endian `u32` in the payload after `EventMeta`: the number
63/// of additional response chunks the caller is willing to accept
64/// for the streaming call identified by `EventMeta::seq_or_ts`.
65///
66/// Only meaningful when the caller opted into flow control via
67/// the `nrpc-stream-window-initial` request header
68/// ([`HEADER_NRPC_STREAM_WINDOW_INITIAL`]). On a flow-controlled
69/// stream the server's pump task awaits one credit per chunk; on
70/// a non-flow-controlled stream (no header) the server ignores
71/// every GRANT.
72///
73/// Phase 3.
74pub const DISPATCH_RPC_STREAM_GRANT: u8 = 0x14;
75
76/// Caller → server. Continuation chunk of a client-streaming or
77/// duplex REQUEST. Carries an [`RpcRequestChunkPayload`] after the
78/// `EventMeta` prefix. `EventMeta::seq_or_ts` matches the initial
79/// REQUEST's `call_id`. Non-terminal chunks have
80/// `flags & FLAG_RPC_REQUEST_END == 0`; the terminal upload chunk
81/// sets [`FLAG_RPC_REQUEST_END`].
82///
83/// Only meaningful for calls whose initial REQUEST set
84/// [`FLAG_RPC_CLIENT_STREAMING_REQUEST`]; otherwise the server
85/// silently drops the chunk (caller bug; no observable effect).
86pub const DISPATCH_RPC_REQUEST_CHUNK: u8 = 0x15;
87
88/// Server → caller. Request-direction stream-credit grant. Mirror
89/// of [`DISPATCH_RPC_STREAM_GRANT`] for the upload direction.
90/// Carries an [`RpcRequestGrantPayload`] after `EventMeta`: a
91/// `call_id` plus a `u32` credit count. `EventMeta::seq_or_ts`
92/// matches the call's `call_id` (redundant with the payload, but
93/// kept symmetric with the rest of the dispatch family).
94///
95/// Only meaningful when the caller opted into request-direction
96/// flow control via the `nrpc-request-window-initial` header
97/// ([`HEADER_NRPC_REQUEST_WINDOW_INITIAL`]). Caller's sink
98/// awaits one credit per `REQUEST_CHUNK`; absent header →
99/// unbounded credit (sink emits as fast as the publish path can
100/// take it).
101pub const DISPATCH_RPC_REQUEST_GRANT: u8 = 0x16;
102
103// ============================================================================
104// `RpcRequestPayload::flags` bit assignments.
105// ============================================================================
106
107// Bit 0 (`1 << 0`) is RESERVED — was previously documented as
108// `FLAG_RPC_IDEMPOTENT`, but the server-side replay-cache (LRU of
109// `(origin_hash, call_id) -> RpcResponsePayload`) was never landed,
110// so the flag silently no-op'd despite a load-bearing contract in
111// its doc-string. Removed to avoid shipping a documented behavior
112// the runtime doesn't implement; reservation kept so a future
113// re-add (with the LRU) preserves wire compatibility.
114
115/// Set if the server may emit multiple `DISPATCH_RPC_RESPONSE` events
116/// for this call. Without it, the first response terminates the
117/// call. With it, each response except the terminal one carries
118/// `headers["nrpc-streaming"] = b"continue"`; the terminal response
119/// has either `b"end"` (success) or a non-`Ok` status.
120pub const FLAG_RPC_STREAMING_RESPONSE: u16 = 1 << 1;
121
122/// Set if the request carries W3C Trace Context headers
123/// (`traceparent`, `tracestate`). Server propagates them to its own
124/// span emission. Phase 3.
125pub const FLAG_RPC_PROPAGATE_TRACE: u16 = 1 << 2;
126
127// Bit `1 << 3` reserved — symmetric to the reserved bit 0 above,
128// kept as breathing room for a future protocol-level flag without
129// pushing every existing live bit.
130
131/// Set on the initial REQUEST if the caller will follow up with
132/// one or more [`DISPATCH_RPC_REQUEST_CHUNK`] events. Distinguishes
133/// client-streaming / duplex calls from unary at the very first
134/// frame so the server's fold knows to open a request-side stream
135/// instead of treating the REQUEST as complete.
136///
137/// Combined with [`FLAG_RPC_STREAMING_RESPONSE`] on the same
138/// REQUEST: full duplex.
139///
140/// Bidi streaming plan (Phase A).
141pub const FLAG_RPC_CLIENT_STREAMING_REQUEST: u16 = 1 << 4;
142
143/// Set on a [`DISPATCH_RPC_REQUEST_CHUNK`] (or on the initial
144/// REQUEST itself) to signal the terminal upload frame for a
145/// client-streaming or duplex call. After receiving this, the
146/// server's request-side stream yields `None` and the handler
147/// proceeds to its terminal response.
148///
149/// Setting this on the initial REQUEST is the degenerate "client-
150/// streaming with exactly one item" path — saves a round-trip
151/// for the trivial case.
152///
153/// Bidi streaming plan (Phase A).
154pub const FLAG_RPC_REQUEST_END: u16 = 1 << 5;
155
156// Bits `6..=15` reserved; producers MUST write zero, consumers MUST
157// ignore unknown bits (forward-compat with future flags).
158
159// ============================================================================
160// `RpcResponsePayload::status` enumeration.
161// ============================================================================
162
163/// Outcome of an nRPC call. Net-native numbering with documented
164/// gRPC equivalents (see comments). Numeric stability: callers and
165/// servers across versions agree on `0x0000..=0x7FFF`; the
166/// application-defined range is `0x8000..=0xFFFF`.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168#[repr(u16)]
169pub enum RpcStatus {
170    /// Success. Payload carries the application response. Terminal
171    /// (or, for streaming responses, may be one of many — see the
172    /// streaming flag).
173    /// gRPC equivalent: `OK` (0).
174    Ok = 0x0000,
175    /// No service registered with the requested name on the server.
176    /// gRPC equivalent: `NOT_FOUND` (5).
177    NotFound = 0x0001,
178    /// Caller's token doesn't list the requested service in scope,
179    /// or the channel-level capability check failed.
180    /// gRPC equivalent: `PERMISSION_DENIED` (7).
181    Unauthorized = 0x0002,
182    /// Server observed `now_ns() > deadline_ns` before starting work.
183    /// (For the in-flight case after the handler started, see
184    /// [`DISPATCH_RPC_DEADLINE_EXCEEDED`].)
185    /// gRPC equivalent: `DEADLINE_EXCEEDED` (4).
186    Timeout = 0x0003,
187    /// Server's per-service queue is at `max_in_flight` capacity.
188    /// gRPC equivalent: `RESOURCE_EXHAUSTED` (8).
189    Backpressure = 0x0004,
190    /// Caller emitted `DISPATCH_RPC_CANCEL` before the server
191    /// completed.
192    /// gRPC equivalent: `CANCELLED` (1).
193    Cancelled = 0x0005,
194    /// Handler panicked or returned an error not classified as one
195    /// of the above. Payload carries a UTF-8 diagnostic.
196    /// gRPC equivalent: `INTERNAL` (13).
197    Internal = 0x0006,
198    /// Request payload version not supported by the server. Should
199    /// normally be caught earlier by subprotocol-version
200    /// negotiation; the in-payload guard is the floor.
201    /// gRPC equivalent: `UNIMPLEMENTED` (12).
202    UnknownVersion = 0x0007,
203    /// v0.4 capability-auth: the target's `CapabilityAnnouncement`
204    /// either does not list the requested `nrpc:<service>` tag, or
205    /// lists it with allow-lists the caller does not match. See
206    /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3. Distinct from
207    /// `Unauthorized` (channel-auth / token-scope failures) so
208    /// operators can tell the two enforcement surfaces apart in
209    /// audit logs.
210    /// gRPC equivalent: `PERMISSION_DENIED` (7) — same outward
211    /// shape as `Unauthorized` but a separate substrate code.
212    CapabilityDenied = 0x0008,
213    /// Application-defined status. The wire carries the raw u16;
214    /// callers / servers agree on the meaning out of band.
215    Application(u16),
216}
217
218impl RpcStatus {
219    /// Encode to the wire `u16`.
220    pub fn to_wire(self) -> u16 {
221        match self {
222            Self::Ok => 0x0000,
223            Self::NotFound => 0x0001,
224            Self::Unauthorized => 0x0002,
225            Self::Timeout => 0x0003,
226            Self::Backpressure => 0x0004,
227            Self::Cancelled => 0x0005,
228            Self::Internal => 0x0006,
229            Self::UnknownVersion => 0x0007,
230            Self::CapabilityDenied => 0x0008,
231            Self::Application(v) => v,
232        }
233    }
234
235    /// Decode from the wire `u16`. Reserved values
236    /// (`0x0009..=0x7FFF`) decode as `Application(v)` rather than
237    /// failing — forward-compat with future status assignments.
238    pub fn from_wire(v: u16) -> Self {
239        match v {
240            0x0000 => Self::Ok,
241            0x0001 => Self::NotFound,
242            0x0002 => Self::Unauthorized,
243            0x0003 => Self::Timeout,
244            0x0004 => Self::Backpressure,
245            0x0005 => Self::Cancelled,
246            0x0006 => Self::Internal,
247            0x0007 => Self::UnknownVersion,
248            0x0008 => Self::CapabilityDenied,
249            other => Self::Application(other),
250        }
251    }
252
253    /// True iff `self == Ok`. Convenience for the hot caller-side
254    /// success-or-error branch.
255    #[inline]
256    pub fn is_ok(self) -> bool {
257        matches!(self, Self::Ok)
258    }
259}
260
261// ============================================================================
262// Request / response payloads.
263//
264// These ride in the bytes AFTER the 24-byte `EventMeta` prefix on a
265// CortEX-adapted event. The cortex adapter handles meta + tail
266// concatenation; this codec produces only the tail.
267// ============================================================================
268
269/// Header name + value pair. Used for trace-context propagation,
270/// idempotency-key carriage, content-type hints. Names are
271/// case-sensitive UTF-8; values are opaque bytes.
272pub type RpcHeader = (String, Vec<u8>);
273
274/// Maximum service-name length on the wire (matches
275/// `MAX_CHANNEL_NAME_LEN` upstream; reasonable upper bound for a
276/// human-readable identifier).
277pub const MAX_RPC_SERVICE_NAME_LEN: usize = 255;
278
279/// Maximum number of headers in a single request or response.
280/// Prevents pathological `headers.len()` reads from a malformed
281/// peer; legitimate callers stay well below this.
282pub const MAX_RPC_HEADERS: usize = 32;
283
284/// Maximum length of a single header name (UTF-8 bytes).
285pub const MAX_RPC_HEADER_NAME_LEN: usize = 64;
286
287/// Maximum length of a single header value (bytes).
288pub const MAX_RPC_HEADER_VALUE_LEN: usize = 4096;
289
290/// Maximum length of a request or response body. Larger payloads
291/// must use streaming responses (Phase 3) or chunk at the
292/// application layer. Comparable to gRPC's default `max_message_size`
293/// of 4 MiB; tuned downward to match RedEX's
294/// `MAX_REDEX_HEAP_PAYLOAD` ceiling.
295pub const MAX_RPC_BODY_LEN: usize = 4 * 1024 * 1024;
296
297/// nRPC request payload. Lives after the 24-byte `EventMeta` prefix
298/// in a `DISPATCH_RPC_REQUEST` event.
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct RpcRequestPayload {
301    /// Service-name dispatch key. The server's fold looks this up
302    /// in its `serve_rpc` registry and routes to the registered
303    /// handler.
304    pub service: String,
305    /// Absolute deadline (unix nanos). `0` means no deadline; the
306    /// caller will cancel via `DISPATCH_RPC_CANCEL` if it changes
307    /// its mind.
308    pub deadline_ns: u64,
309    /// Bitfield of `FLAG_RPC_*` constants.
310    pub flags: u16,
311    /// Headers (trace context, idempotency key, content-type, etc.).
312    /// Capped at `MAX_RPC_HEADERS` entries, name <= `MAX_RPC_HEADER_NAME_LEN`,
313    /// value <= `MAX_RPC_HEADER_VALUE_LEN`.
314    pub headers: Vec<RpcHeader>,
315    /// Application-defined request body. Caller and server agree on
316    /// the codec out-of-band; nRPC doesn't interpret these bytes.
317    ///
318    /// Held as [`Bytes`] so [`Self::decode`] can zero-copy `slice_ref`
319    /// the body out of the inbound event's `Bytes` payload — pre-fix
320    /// perf #84 in `docs/performance/net-perf-analysis.md` this was
321    /// `Vec<u8>` and every decode did a `data[body_start..body_end].to_vec()`
322    /// (a memcpy per frame). For high-RPS systems doing 100K+ RPCs/sec
323    /// with 1 KB+ bodies that was 100+ MB/sec of pure memcpy.
324    pub body: Bytes,
325}
326
327/// Continuation chunk for a client-streaming or duplex REQUEST.
328/// Lives after the 24-byte `EventMeta` prefix in a
329/// [`DISPATCH_RPC_REQUEST_CHUNK`] event.
330///
331/// Unlike the initial [`RpcRequestPayload`] there is no
332/// `service` field (server already routed by service at the
333/// initial REQUEST) and no `deadline_ns` (the initial REQUEST's
334/// deadline applies to the whole call). The `call_id` field is
335/// redundant with `EventMeta::seq_or_ts` but kept on the
336/// payload so the codec is self-contained — a reader handed a
337/// chunk's bytes without the meta header can still recover its
338/// correlation id.
339///
340/// Bidi streaming plan (Phase A).
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub struct RpcRequestChunkPayload {
343    /// Matches `EventMeta::seq_or_ts` and the original REQUEST's
344    /// `call_id`. Kept on the payload so the codec round-trips
345    /// in isolation.
346    pub call_id: u64,
347    /// Bitfield of `FLAG_RPC_*` constants. The only flag that
348    /// makes sense on a chunk today is [`FLAG_RPC_REQUEST_END`];
349    /// other flags MUST be zero on the wire so future protocol
350    /// extensions can claim them without colliding with
351    /// existing chunks.
352    pub flags: u16,
353    /// Per-chunk metadata. Typically empty; reserved for
354    /// trace-span continuity across long uploads, content-type
355    /// changes mid-stream, or other rare per-chunk concerns.
356    /// Capped at `MAX_RPC_HEADERS` entries with the same
357    /// per-field caps as `RpcRequestPayload::headers`.
358    pub headers: Vec<RpcHeader>,
359    /// Application-defined chunk body. Cap is `MAX_RPC_BODY_LEN`
360    /// (4 MiB), same as the initial REQUEST body — clients that
361    /// need >4 MiB total payload chunk their upload across
362    /// multiple `REQUEST_CHUNK` events.
363    ///
364    /// See [`RpcRequestPayload::body`] for the `Bytes`-vs-`Vec<u8>`
365    /// rationale.
366    pub body: Bytes,
367}
368
369/// Request-direction credit grant. Lives after the 24-byte
370/// `EventMeta` prefix in a [`DISPATCH_RPC_REQUEST_GRANT`] event.
371/// Mirror of the response-direction [`encode_stream_grant`] /
372/// [`decode_stream_grant`] pair, but with an explicit `call_id`
373/// in the payload (instead of relying solely on
374/// `EventMeta::seq_or_ts`) so the codec is self-contained — same
375/// rationale as [`RpcRequestChunkPayload::call_id`].
376///
377/// Bidi streaming plan (Phase A).
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub struct RpcRequestGrantPayload {
380    /// Matches the call's `call_id`.
381    pub call_id: u64,
382    /// Additional REQUEST_CHUNK frames the server is willing to
383    /// admit beyond the current credit. Server's incoming-credit
384    /// counter is capped defensively (see PHASE-B server fold)
385    /// so a misbehaving grant can't overflow.
386    pub credits: u32,
387}
388
389/// nRPC response payload. Lives after the 24-byte `EventMeta`
390/// prefix in a `DISPATCH_RPC_RESPONSE` event.
391#[derive(Debug, Clone, PartialEq, Eq)]
392pub struct RpcResponsePayload {
393    /// Outcome of the call. Decoded on the caller side via
394    /// [`RpcStatus::from_wire`].
395    pub status: RpcStatus,
396    /// Headers (trace context, content-type, content-encoding,
397    /// etc.). Same caps as `RpcRequestPayload::headers`.
398    pub headers: Vec<RpcHeader>,
399    /// For `status == Ok`: the application response body.
400    /// For non-`Ok` statuses: a UTF-8 diagnostic string (callers
401    /// `String::from_utf8_lossy` for display; the bytes are not
402    /// guaranteed to be valid UTF-8 against a malicious server).
403    ///
404    /// See [`RpcRequestPayload::body`] for the `Bytes`-vs-`Vec<u8>`
405    /// rationale.
406    pub body: Bytes,
407}
408
409// ============================================================================
410// Codec.
411//
412// All wire integers are little-endian. Lengths are u32_le where the
413// upper bound exceeds u16, u16_le where it fits, u8 where it fits.
414// ============================================================================
415
416/// Errors from the request / response codecs.
417#[derive(Debug, thiserror::Error)]
418pub enum RpcCodecError {
419    /// Buffer ended mid-field.
420    #[error("truncated payload at {0}")]
421    Truncated(&'static str),
422    /// Length prefix exceeds the configured maximum.
423    #[error("length {actual} exceeds limit {limit} for {field}")]
424    TooLarge {
425        /// Field name whose declared length exceeded the cap (e.g.
426        /// `"body"`, `"headers"`, `"service"`). Stable strings —
427        /// callers may match on them for diagnostics.
428        field: &'static str,
429        /// The length the wire claimed for the field.
430        actual: usize,
431        /// The maximum the codec accepts (one of the `MAX_RPC_*`
432        /// constants).
433        limit: usize,
434    },
435    /// String field contains non-UTF-8 bytes.
436    #[error("non-utf8 string in {0}")]
437    InvalidUtf8(&'static str),
438}
439
440impl RpcRequestPayload {
441    /// Compute the encoded byte length WITHOUT actually encoding.
442    /// Used by [`request_wire_size`] and any caller that needs to
443    /// budget event size at the bus layer (e.g., to refuse a
444    /// request that wouldn't fit in the configured packet budget)
445    /// without paying the encode cost.
446    pub fn encoded_len(&self) -> usize {
447        // service: u8 length + bytes
448        1 + self.service.len()
449            // deadline_ns: u64
450            + 8
451            // flags: u16
452            + 2
453            // headers: u8 count + per-header (u8 name_len + name + u16 value_len + value)
454            + 1
455            + self
456                .headers
457                .iter()
458                .map(|(n, v)| 1 + n.len() + 2 + v.len())
459                .sum::<usize>()
460            // body: u32 length + bytes
461            + 4
462            + self.body.len()
463    }
464
465    /// Encode to the wire format. The result is the bytes that
466    /// follow the 24-byte `EventMeta` prefix in the RedEX payload.
467    ///
468    /// **Encoder bounds:** every field that has a `MAX_RPC_*` cap
469    /// is asserted against that cap. In debug builds an oversize
470    /// field panics with a useful diagnostic so the programmer
471    /// notices in tests; in release builds the assert is dropped
472    /// (the decoder side still enforces the cap, so a malformed
473    /// frame would be rejected by the receiver — but constructing
474    /// one is always a caller bug).
475    pub fn encode(&self) -> Vec<u8> {
476        let mut buf = Vec::with_capacity(self.encoded_len());
477        // service
478        let svc = self.service.as_bytes();
479        debug_assert!(
480            svc.len() <= MAX_RPC_SERVICE_NAME_LEN,
481            "service name {} exceeds MAX_RPC_SERVICE_NAME_LEN ({})",
482            svc.len(),
483            MAX_RPC_SERVICE_NAME_LEN,
484        );
485        buf.put_u8(svc.len() as u8);
486        buf.extend_from_slice(svc);
487        // deadline_ns
488        buf.put_u64_le(self.deadline_ns);
489        // flags
490        buf.put_u16_le(self.flags);
491        // headers
492        encode_headers(&self.headers, &mut buf);
493        // body
494        debug_assert!(
495            self.body.len() <= MAX_RPC_BODY_LEN,
496            "body length {} exceeds MAX_RPC_BODY_LEN ({})",
497            self.body.len(),
498            MAX_RPC_BODY_LEN,
499        );
500        buf.put_u32_le(self.body.len() as u32);
501        buf.extend_from_slice(&self.body);
502        buf
503    }
504
505    /// Decode from the wire bytes following the `EventMeta` prefix.
506    /// All length fields are bounded by the `MAX_RPC_*` constants;
507    /// over-cap inputs error rather than allocate unbounded
508    /// buffers.
509    ///
510    /// Takes [`Bytes`] (not `&[u8]`) so the decoded `body` field
511    /// can be a zero-copy `data.slice(..)` instead of an owned
512    /// `to_vec` — see perf #84.
513    pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
514        let mut cur = std::io::Cursor::new(data.as_ref());
515        // service
516        if cur.remaining() < 1 {
517            return Err(RpcCodecError::Truncated("service length"));
518        }
519        let svc_len = cur.get_u8() as usize;
520        if svc_len == 0 {
521            return Err(RpcCodecError::Truncated("empty service name"));
522        }
523        if svc_len > MAX_RPC_SERVICE_NAME_LEN {
524            return Err(RpcCodecError::TooLarge {
525                field: "service",
526                actual: svc_len,
527                limit: MAX_RPC_SERVICE_NAME_LEN,
528            });
529        }
530        if cur.remaining() < svc_len {
531            return Err(RpcCodecError::Truncated("service bytes"));
532        }
533        let svc_start = cur.position() as usize;
534        let svc_end = svc_start + svc_len;
535        let service = std::str::from_utf8(&data[svc_start..svc_end])
536            .map_err(|_| RpcCodecError::InvalidUtf8("service"))?
537            .to_string();
538        cur.set_position(svc_end as u64);
539        // deadline_ns
540        if cur.remaining() < 8 {
541            return Err(RpcCodecError::Truncated("deadline_ns"));
542        }
543        let deadline_ns = cur.get_u64_le();
544        // flags
545        if cur.remaining() < 2 {
546            return Err(RpcCodecError::Truncated("flags"));
547        }
548        let flags = cur.get_u16_le();
549        // headers
550        let headers = decode_headers(&mut cur, &data)?;
551        // body
552        if cur.remaining() < 4 {
553            return Err(RpcCodecError::Truncated("body length"));
554        }
555        let body_len = cur.get_u32_le() as usize;
556        if body_len > MAX_RPC_BODY_LEN {
557            return Err(RpcCodecError::TooLarge {
558                field: "body",
559                actual: body_len,
560                limit: MAX_RPC_BODY_LEN,
561            });
562        }
563        if cur.remaining() < body_len {
564            return Err(RpcCodecError::Truncated("body bytes"));
565        }
566        let body_start = cur.position() as usize;
567        let body_end = body_start + body_len;
568        // Zero-copy slice over the input — refcount bump only.
569        let body = data.slice(body_start..body_end);
570        Ok(Self {
571            service,
572            deadline_ns,
573            flags,
574            headers,
575            body,
576        })
577    }
578}
579
580impl RpcRequestChunkPayload {
581    /// Compute the encoded byte length WITHOUT actually encoding.
582    /// See [`RpcRequestPayload::encoded_len`] for the rationale.
583    pub fn encoded_len(&self) -> usize {
584        // call_id: u64
585        8
586            // flags: u16
587            + 2
588            // headers: u8 count + per-header (u8 name_len + name + u16 value_len + value)
589            + 1
590            + self
591                .headers
592                .iter()
593                .map(|(n, v)| 1 + n.len() + 2 + v.len())
594                .sum::<usize>()
595            // body: u32 length + bytes
596            + 4
597            + self.body.len()
598    }
599
600    /// Encode to the wire bytes that follow the 24-byte `EventMeta`
601    /// prefix in a [`DISPATCH_RPC_REQUEST_CHUNK`] event. Same
602    /// encoder-bounds policy as [`RpcRequestPayload::encode`]:
603    /// oversize fields panic in debug, the decoder enforces in
604    /// release.
605    pub fn encode(&self) -> Vec<u8> {
606        let mut buf = Vec::with_capacity(self.encoded_len());
607        // call_id
608        buf.put_u64_le(self.call_id);
609        // flags
610        buf.put_u16_le(self.flags);
611        // headers
612        encode_headers(&self.headers, &mut buf);
613        // body
614        debug_assert!(
615            self.body.len() <= MAX_RPC_BODY_LEN,
616            "body length {} exceeds MAX_RPC_BODY_LEN ({})",
617            self.body.len(),
618            MAX_RPC_BODY_LEN,
619        );
620        buf.put_u32_le(self.body.len() as u32);
621        buf.extend_from_slice(&self.body);
622        buf
623    }
624
625    /// Decode from the wire bytes following the `EventMeta` prefix.
626    /// Bounded by the same `MAX_RPC_*` caps as the initial REQUEST.
627    /// Takes [`Bytes`] for zero-copy `body` slicing — see perf #84.
628    pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
629        let mut cur = std::io::Cursor::new(data.as_ref());
630        // call_id
631        if cur.remaining() < 8 {
632            return Err(RpcCodecError::Truncated("call_id"));
633        }
634        let call_id = cur.get_u64_le();
635        // flags
636        if cur.remaining() < 2 {
637            return Err(RpcCodecError::Truncated("flags"));
638        }
639        let flags = cur.get_u16_le();
640        // headers
641        let headers = decode_headers(&mut cur, &data)?;
642        // body
643        if cur.remaining() < 4 {
644            return Err(RpcCodecError::Truncated("body length"));
645        }
646        let body_len = cur.get_u32_le() as usize;
647        if body_len > MAX_RPC_BODY_LEN {
648            return Err(RpcCodecError::TooLarge {
649                field: "body",
650                actual: body_len,
651                limit: MAX_RPC_BODY_LEN,
652            });
653        }
654        if cur.remaining() < body_len {
655            return Err(RpcCodecError::Truncated("body bytes"));
656        }
657        let body_start = cur.position() as usize;
658        let body_end = body_start + body_len;
659        let body = data.slice(body_start..body_end);
660        Ok(Self {
661            call_id,
662            flags,
663            headers,
664            body,
665        })
666    }
667}
668
669impl RpcResponsePayload {
670    /// Compute the encoded byte length WITHOUT actually encoding.
671    /// See [`RpcRequestPayload::encoded_len`].
672    pub fn encoded_len(&self) -> usize {
673        // status: u16
674        2
675            // headers: u8 count + per-header
676            + 1
677            + self
678                .headers
679                .iter()
680                .map(|(n, v)| 1 + n.len() + 2 + v.len())
681                .sum::<usize>()
682            // body: u32 length + bytes
683            + 4
684            + self.body.len()
685    }
686
687    /// Encode to the wire format. The result is the bytes that
688    /// follow the 24-byte `EventMeta` prefix in the RedEX payload.
689    /// Same encoder-bounds policy as
690    /// [`RpcRequestPayload::encode`] — see that method's doc.
691    pub fn encode(&self) -> Vec<u8> {
692        let mut buf = Vec::with_capacity(self.encoded_len());
693        buf.put_u16_le(self.status.to_wire());
694        encode_headers(&self.headers, &mut buf);
695        debug_assert!(
696            self.body.len() <= MAX_RPC_BODY_LEN,
697            "body length {} exceeds MAX_RPC_BODY_LEN ({})",
698            self.body.len(),
699            MAX_RPC_BODY_LEN,
700        );
701        buf.put_u32_le(self.body.len() as u32);
702        buf.extend_from_slice(&self.body);
703        buf
704    }
705
706    /// Decode from the wire bytes following the `EventMeta` prefix.
707    /// Takes [`Bytes`] for zero-copy `body` slicing — see perf #84.
708    pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
709        let mut cur = std::io::Cursor::new(data.as_ref());
710        if cur.remaining() < 2 {
711            return Err(RpcCodecError::Truncated("status"));
712        }
713        let status = RpcStatus::from_wire(cur.get_u16_le());
714        let headers = decode_headers(&mut cur, &data)?;
715        if cur.remaining() < 4 {
716            return Err(RpcCodecError::Truncated("body length"));
717        }
718        let body_len = cur.get_u32_le() as usize;
719        if body_len > MAX_RPC_BODY_LEN {
720            return Err(RpcCodecError::TooLarge {
721                field: "body",
722                actual: body_len,
723                limit: MAX_RPC_BODY_LEN,
724            });
725        }
726        if cur.remaining() < body_len {
727            return Err(RpcCodecError::Truncated("body bytes"));
728        }
729        let body_start = cur.position() as usize;
730        let body_end = body_start + body_len;
731        let body = data.slice(body_start..body_end);
732        Ok(Self {
733            status,
734            headers,
735            body,
736        })
737    }
738}
739
740/// Pull `traceparent` / `tracestate` out of `headers` if present.
741/// Caller-side helper: callers building an `RpcRequestPayload`
742/// with a `TraceContext` use [`build_trace_headers`] to emit the
743/// matching headers; this is the inverse on the server side.
744///
745/// Returns `Some(TraceContext)` if `traceparent` is present;
746/// `None` otherwise. `tracestate` defaults to empty when absent
747/// — W3C says tracestate is optional even when traceparent is
748/// set.
749pub fn extract_trace_context(headers: &[RpcHeader]) -> Option<TraceContext> {
750    let mut traceparent: Option<String> = None;
751    let mut tracestate = String::new();
752    for (name, value) in headers {
753        // Header names are case-insensitive (matches W3C and HTTP
754        // convention) — same comparison style as `parse_stream_
755        // window_initial` for consistency. The wire spec doesn't
756        // mandate case so a peer that emits `Traceparent` (capital
757        // T) shouldn't be silently ignored.
758        if name.eq_ignore_ascii_case("traceparent") {
759            if let Ok(s) = std::str::from_utf8(value) {
760                traceparent = Some(s.to_string());
761            }
762        } else if name.eq_ignore_ascii_case("tracestate") {
763            if let Ok(s) = std::str::from_utf8(value) {
764                tracestate = s.to_string();
765            }
766        }
767    }
768    traceparent.map(|tp| TraceContext {
769        traceparent: tp,
770        tracestate,
771    })
772}
773
774/// Build the headers a caller appends to its
775/// `RpcRequestPayload::headers` to propagate the trace context
776/// across the call. Set `RpcRequestPayload::flags |= FLAG_RPC_PROPAGATE_TRACE`
777/// alongside this so the server's fold knows to extract them.
778///
779/// Always emits `traceparent`. Emits `tracestate` only when
780/// non-empty (matches the W3C convention of skipping empty
781/// tracestate values on the wire).
782pub fn build_trace_headers(ctx: &TraceContext) -> Vec<RpcHeader> {
783    let mut headers = Vec::with_capacity(2);
784    headers.push((
785        "traceparent".to_string(),
786        ctx.traceparent.clone().into_bytes(),
787    ));
788    if !ctx.tracestate.is_empty() {
789        headers.push((
790            "tracestate".to_string(),
791            ctx.tracestate.clone().into_bytes(),
792        ));
793    }
794    headers
795}
796
797fn encode_headers(headers: &[RpcHeader], buf: &mut Vec<u8>) {
798    debug_assert!(
799        headers.len() <= MAX_RPC_HEADERS,
800        "headers count {} exceeds MAX_RPC_HEADERS ({})",
801        headers.len(),
802        MAX_RPC_HEADERS,
803    );
804    buf.put_u8(headers.len() as u8);
805    for (name, value) in headers {
806        let nbytes = name.as_bytes();
807        debug_assert!(
808            nbytes.len() <= MAX_RPC_HEADER_NAME_LEN,
809            "header name {} exceeds MAX_RPC_HEADER_NAME_LEN ({})",
810            nbytes.len(),
811            MAX_RPC_HEADER_NAME_LEN,
812        );
813        debug_assert!(
814            value.len() <= MAX_RPC_HEADER_VALUE_LEN,
815            "header value {} exceeds MAX_RPC_HEADER_VALUE_LEN ({})",
816            value.len(),
817            MAX_RPC_HEADER_VALUE_LEN,
818        );
819        buf.put_u8(nbytes.len() as u8);
820        buf.extend_from_slice(nbytes);
821        buf.put_u16_le(value.len() as u16);
822        buf.extend_from_slice(value);
823    }
824}
825
826fn decode_headers(
827    cur: &mut std::io::Cursor<&[u8]>,
828    data: &[u8],
829) -> Result<Vec<RpcHeader>, RpcCodecError> {
830    if cur.remaining() < 1 {
831        return Err(RpcCodecError::Truncated("headers count"));
832    }
833    let count = cur.get_u8() as usize;
834    if count > MAX_RPC_HEADERS {
835        return Err(RpcCodecError::TooLarge {
836            field: "headers",
837            actual: count,
838            limit: MAX_RPC_HEADERS,
839        });
840    }
841    let mut headers = Vec::with_capacity(count);
842    for _ in 0..count {
843        if cur.remaining() < 1 {
844            return Err(RpcCodecError::Truncated("header name length"));
845        }
846        let name_len = cur.get_u8() as usize;
847        if name_len == 0 {
848            return Err(RpcCodecError::Truncated("empty header name"));
849        }
850        if name_len > MAX_RPC_HEADER_NAME_LEN {
851            return Err(RpcCodecError::TooLarge {
852                field: "header name",
853                actual: name_len,
854                limit: MAX_RPC_HEADER_NAME_LEN,
855            });
856        }
857        if cur.remaining() < name_len {
858            return Err(RpcCodecError::Truncated("header name bytes"));
859        }
860        let nstart = cur.position() as usize;
861        let nend = nstart + name_len;
862        let name = std::str::from_utf8(&data[nstart..nend])
863            .map_err(|_| RpcCodecError::InvalidUtf8("header name"))?
864            .to_string();
865        cur.set_position(nend as u64);
866
867        if cur.remaining() < 2 {
868            return Err(RpcCodecError::Truncated("header value length"));
869        }
870        let value_len = cur.get_u16_le() as usize;
871        if value_len > MAX_RPC_HEADER_VALUE_LEN {
872            return Err(RpcCodecError::TooLarge {
873                field: "header value",
874                actual: value_len,
875                limit: MAX_RPC_HEADER_VALUE_LEN,
876            });
877        }
878        if cur.remaining() < value_len {
879            return Err(RpcCodecError::Truncated("header value bytes"));
880        }
881        let vstart = cur.position() as usize;
882        let vend = vstart + value_len;
883        let value = data[vstart..vend].to_vec();
884        cur.set_position(vend as u64);
885        headers.push((name, value));
886    }
887    Ok(headers)
888}
889
890/// Convenience: the byte layout of an `RpcRequestPayload` that lands
891/// after the `EventMeta` prefix in a `DISPATCH_RPC_REQUEST` event.
892/// Exposed so callers can budget the total event size at the bus
893/// layer without doing the encode first.
894pub fn request_wire_size(payload: &RpcRequestPayload) -> usize {
895    EVENT_META_SIZE + payload.encoded_len()
896}
897
898/// Same for `RpcResponsePayload` after the `EventMeta` prefix in a
899/// `DISPATCH_RPC_RESPONSE` event.
900pub fn response_wire_size(payload: &RpcResponsePayload) -> usize {
901    EVENT_META_SIZE + payload.encoded_len()
902}
903
904// ============================================================================
905// Mesh inbound dispatch hook.
906//
907// `MeshNode::dispatch_packet` normally pushes inbound channel
908// events onto a per-shard `inbound` queue keyed by `shard_id`. The
909// channel name / hash is stripped on the way in — by the time the
910// event lands in the queue, only the payload remains.
911//
912// RPC needs per-channel routing (events for `<service>.requests`
913// drive the server fold; events for `<service>.replies.<origin>`
914// drive the client fold). Without channel info on the queued
915// event, we can't filter from the consumer side.
916//
917// The hook below adds a per-channel-hash dispatcher map that the
918// mesh's inbound dispatch consults BEFORE pushing to the shard
919// queue. If a dispatcher is registered for the event's
920// canonical [`ChannelHash`], the event is routed there directly
921// (bypassing the shard queue); otherwise the existing shard-queue
922// path runs.
923//
924// **Collision posture.** The dispatch event carries the canonical
925// 32-bit [`ChannelHash`] (joint-collision threshold ~65 K
926// channels, well above realistic deployment); the wire
927// `NetHeader::channel_hash` is `u16` and may bucket-collide at
928// scale, so the mesh's inbound dispatch indexes by the wire `u16`
929// and dispatches every canonical entry registered in that bucket
930// (the canonical match resolves on the dispatcher side). At
931// typical sizing this is a single entry per bucket.
932// ============================================================================
933
934/// One inbound event delivered to a registered RPC dispatcher.
935#[derive(Debug, Clone)]
936pub struct RpcInboundEvent {
937    /// Canonical [`ChannelHash`](crate::adapter::net::channel::ChannelHash)
938    /// (u32) of the channel this event arrived on — widened from the
939    /// per-packet wire `u16` `NetHeader::channel_hash` via the
940    /// registered-dispatcher table at receive time.
941    /// Collision-resistant at realistic scale; the wire `u16` may
942    /// bucket-collide but the canonical hash uniquely identifies the
943    /// registered dispatcher target.
944    pub channel_hash: super::super::channel::ChannelHash,
945    /// Caller's `origin_hash` from the packet header — the full
946    /// 64-bit `EntityKeypair::origin_hash()` mirroring the wire
947    /// field's width post-`WIRE_ORIGIN_HASH_64BIT`. The dispatcher
948    /// should treat this as routing metadata, not identity
949    /// authentication (the AEAD-verified `session_node` field
950    /// below carries that).
951    pub origin_hash: u64,
952    /// Wire-session peer's `NodeId` resolved at packet receive
953    /// time from the AEAD-verified session_id. Distinct from
954    /// `origin_hash`: this is the full 64-bit network identity
955    /// of the peer that delivered the packet. Used by
956    /// `RpcClientPending::deliver`
957    /// to reject spoofed RESPONSE frames whose call_id happens
958    /// to match an in-flight request but whose session peer
959    /// isn't the recorded target.
960    ///
961    /// Set to `0` on test / loopback paths that don't have a
962    /// session to resolve against; callers that register
963    /// pending entries with `target_node = 0` opt out of the
964    /// binding gate (and trust the call_id randomness alone).
965    ///
966    /// **Production wire-path invariant**: real over-the-wire
967    /// inbound delivery MUST NOT produce `from_node = 0`. The
968    /// dispatcher in `mesh.rs` (`handle_inbound_user_payload`)
969    /// drops the event when the wire session has no resolvable
970    /// `NodeId`, rather than forwarding under the sentinel — see
971    /// the explicit drop + warn at the
972    /// `dropping cortex-RPC event: wire session has no resolvable NodeId`
973    /// log site. The v0.4 capability-auth callee-side gate in
974    /// `MeshNode::serve_rpc`'s bridge relies on this: it skips
975    /// permissively when `from_node == 0` (loopback compat), so
976    /// a wire-path leak of the sentinel would silently re-open
977    /// the gate. If you change the dispatcher to fall back to 0
978    /// instead of dropping, you ALSO have to teach the bridge
979    /// to deny on the sentinel.
980    pub from_node: super::super::behavior::placement::NodeId,
981    /// Event payload bytes — the same bytes that would have been
982    /// pushed onto the shard inbound queue. For RPC events these
983    /// start with a 24-byte `EventMeta` followed by the
984    /// `RpcRequestPayload` / `RpcResponsePayload` encoding.
985    pub payload: bytes::Bytes,
986}
987
988/// Type-erased callback fired by the mesh's inbound dispatch
989/// when an event arrives for a registered `channel_hash`. The
990/// callback runs on the mesh's dispatch task, so the body should
991/// be quick (push the event onto an mpsc / fold consumer rather
992/// than do real work).
993pub type RpcInboundDispatcher = Arc<dyn Fn(RpcInboundEvent) + Send + Sync + 'static>;
994
995// ============================================================================
996// Streaming-response protocol markers.
997//
998// When a caller sets `FLAG_RPC_STREAMING_RESPONSE` on the request,
999// the server emits multiple `DISPATCH_RPC_RESPONSE` events for the
1000// same `call_id`. Non-terminal chunks carry the
1001// `nrpc-streaming = continue` header; the terminal chunk carries
1002// `nrpc-streaming = end` (or any non-`Ok` status, which is also
1003// terminal). The client-side stream collects chunks until it sees
1004// a terminal marker.
1005// ============================================================================
1006
1007/// Header name nRPC uses to mark streaming-response chunks.
1008/// Present on every chunk of a streaming response, with one of two
1009/// values defined below.
1010pub const HEADER_NRPC_STREAMING: &str = "nrpc-streaming";
1011
1012/// `nrpc-streaming` value on a non-terminal chunk. The client-side
1013/// stream yields the chunk's body and continues waiting for more.
1014pub const HEADER_NRPC_STREAMING_CONTINUE: &[u8] = b"continue";
1015
1016/// `nrpc-streaming` value on the terminal chunk. The client-side
1017/// stream yields the chunk's body (if non-empty) and then closes.
1018/// A non-`Ok` status is also terminal, regardless of header — the
1019/// stream yields the error and closes.
1020pub const HEADER_NRPC_STREAMING_END: &[u8] = b"end";
1021
1022/// Header on a streaming REQUEST that opts into flow control with
1023/// the given initial credit window. Value is the ASCII decimal
1024/// representation of a `u32` (e.g. `"32"`). When present, the
1025/// server's streaming fold creates a per-call semaphore initialized
1026/// to that count and the pump awaits one credit per emitted chunk.
1027/// The caller refills via [`DISPATCH_RPC_STREAM_GRANT`] events.
1028///
1029/// Absent → unbounded credit (the pump emits chunks as fast as
1030/// the publish path can take them). Long-running streams that
1031/// could outpace a slow consumer SHOULD opt into flow control —
1032/// without it, the server's sink mpsc grows unbounded under a
1033/// stalled caller.
1034pub const HEADER_NRPC_STREAM_WINDOW_INITIAL: &str = "nrpc-stream-window-initial";
1035
1036/// Encode a stream-grant payload — 4 bytes big-endian `u32`
1037/// representing additional credit. Pair with [`decode_stream_grant`]
1038/// on the server side.
1039pub fn encode_stream_grant(amount: u32) -> Vec<u8> {
1040    amount.to_be_bytes().to_vec()
1041}
1042
1043/// Decode a stream-grant payload. Returns `None` if the slice is
1044/// not exactly 4 bytes — defends the server fold against
1045/// malformed grants without killing the cortex adapter.
1046pub fn decode_stream_grant(payload: &[u8]) -> Option<u32> {
1047    if payload.len() != 4 {
1048        return None;
1049    }
1050    let mut bytes = [0u8; 4];
1051    bytes.copy_from_slice(payload);
1052    Some(u32::from_be_bytes(bytes))
1053}
1054
1055/// Parse the `nrpc-stream-window-initial` header from a request's
1056/// header list. Returns `Some(window)` if a valid u32 ASCII-decimal
1057/// value is present, else `None` (no header / malformed value /
1058/// non-utf8 — all treated as "no flow control").
1059pub fn parse_stream_window_initial(headers: &[RpcHeader]) -> Option<u32> {
1060    for (name, value) in headers {
1061        if name.eq_ignore_ascii_case(HEADER_NRPC_STREAM_WINDOW_INITIAL) {
1062            return std::str::from_utf8(value).ok()?.parse::<u32>().ok();
1063        }
1064    }
1065    None
1066}
1067
1068/// Header on the initial REQUEST of a client-streaming or duplex
1069/// call that opts the upload direction into flow control with the
1070/// given initial credit window. Value is the ASCII decimal
1071/// representation of a `u32`. When present, the server's
1072/// streaming-request fold creates a per-call semaphore and the
1073/// caller's sink awaits one credit per `REQUEST_CHUNK`. The server
1074/// refills via [`DISPATCH_RPC_REQUEST_GRANT`] events.
1075///
1076/// Absent → unbounded credit (caller's sink emits as fast as the
1077/// publish path can take it). Long client-streaming calls that
1078/// could outpace a slow handler SHOULD opt into flow control —
1079/// without it, the server's chunk mpsc grows unbounded under a
1080/// stalled handler.
1081///
1082/// Bidi streaming plan (Phase A).
1083pub const HEADER_NRPC_REQUEST_WINDOW_INITIAL: &str = "nrpc-request-window-initial";
1084
1085/// Encode a request-grant payload — `call_id` (u64 little-endian)
1086/// followed by additional credit (u32 big-endian). Big-endian on
1087/// the credit field matches [`encode_stream_grant`]; little-endian
1088/// on `call_id` matches the rest of the RPC codec's u64 fields.
1089///
1090/// Pair with [`decode_request_grant`] on the caller side.
1091pub fn encode_request_grant(call_id: u64, credits: u32) -> Vec<u8> {
1092    let mut buf = Vec::with_capacity(12);
1093    buf.put_u64_le(call_id);
1094    buf.extend_from_slice(&credits.to_be_bytes());
1095    buf
1096}
1097
1098/// Decode a request-grant payload. Returns `None` if the slice is
1099/// not exactly 12 bytes — defends the caller's fold against
1100/// malformed grants without killing the cortex adapter.
1101pub fn decode_request_grant(payload: &[u8]) -> Option<RpcRequestGrantPayload> {
1102    if payload.len() != 12 {
1103        return None;
1104    }
1105    let mut cid = [0u8; 8];
1106    cid.copy_from_slice(&payload[..8]);
1107    let call_id = u64::from_le_bytes(cid);
1108    let mut credits = [0u8; 4];
1109    credits.copy_from_slice(&payload[8..]);
1110    Some(RpcRequestGrantPayload {
1111        call_id,
1112        credits: u32::from_be_bytes(credits),
1113    })
1114}
1115
1116/// Parse the `nrpc-request-window-initial` header from a request's
1117/// header list. Same semantics as [`parse_stream_window_initial`]
1118/// but for the upload direction.
1119pub fn parse_request_window_initial(headers: &[RpcHeader]) -> Option<u32> {
1120    for (name, value) in headers {
1121        if name.eq_ignore_ascii_case(HEADER_NRPC_REQUEST_WINDOW_INITIAL) {
1122            return std::str::from_utf8(value).ok()?.parse::<u32>().ok();
1123        }
1124    }
1125    None
1126}
1127
1128/// Inspect a `RpcResponsePayload`'s headers and decide whether
1129/// it's a non-terminal streaming chunk (`continue`), a terminal
1130/// streaming chunk (`end` OR non-`Ok` status), OR a unary
1131/// response (no streaming header at all). Used by the client-side
1132/// fold to demux streaming vs unary responses without needing a
1133/// separate flag.
1134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1135pub enum StreamingChunkKind {
1136    /// Non-terminal chunk — yield body, continue waiting.
1137    Continue,
1138    /// Terminal chunk — yield body (if any), close stream.
1139    Terminal,
1140    /// Not a streaming response — unary semantics apply.
1141    Unary,
1142}
1143
1144/// Classify a response per the streaming-protocol markers.
1145pub fn classify_streaming_chunk(resp: &RpcResponsePayload) -> StreamingChunkKind {
1146    // Non-Ok status is always terminal regardless of header — the
1147    // stream surfaces the error and closes.
1148    if !resp.status.is_ok() {
1149        return StreamingChunkKind::Terminal;
1150    }
1151    // Walk headers for the streaming marker. Absence = unary
1152    // semantics (caller used `call`, not `call_streaming`).
1153    for (name, value) in &resp.headers {
1154        if name == HEADER_NRPC_STREAMING {
1155            return if value.as_slice() == HEADER_NRPC_STREAMING_END {
1156                StreamingChunkKind::Terminal
1157            } else if value.as_slice() == HEADER_NRPC_STREAMING_CONTINUE {
1158                StreamingChunkKind::Continue
1159            } else {
1160                // Unknown marker value — be defensive, treat as
1161                // terminal so a misbehaving server doesn't keep
1162                // a stream open forever.
1163                StreamingChunkKind::Terminal
1164            };
1165        }
1166    }
1167    StreamingChunkKind::Unary
1168}
1169
1170// ============================================================================
1171// Server-side fold.
1172//
1173// `RpcServerFold` is the `RedexFold` half of the server. It sees
1174// REQUEST events on the channel the cortex adapter is opened against,
1175// spawns the user handler in a tokio task, and emits the RESPONSE
1176// via a callback the `Mesh::serve_rpc` glue layer wires up. The
1177// fold itself is small and pure — all I/O happens in the spawned
1178// task and the emitter callback.
1179//
1180// Cancellation: each in-flight call gets an `RpcCancellationToken`
1181// that the handler can `select!` on. CANCEL events flip the
1182// matching token; the handler observes `cancellation.cancelled()`
1183// firing and aborts cooperatively.
1184// ============================================================================
1185
1186/// Cancellation signal for an in-flight RPC handler.
1187///
1188/// Created when the fold dispatches a REQUEST; cloned into the
1189/// handler's `RpcContext` and held in the fold's in-flight map. A
1190/// matching CANCEL event flips the token; handlers observe via
1191/// either [`Self::is_cancelled`] (synchronous probe) or
1192/// [`Self::cancelled`] (await for the signal).
1193#[derive(Clone, Default)]
1194pub struct RpcCancellationToken {
1195    inner: Arc<RpcCancellationInner>,
1196}
1197
1198#[derive(Default)]
1199struct RpcCancellationInner {
1200    fired: AtomicBool,
1201    notify: Notify,
1202}
1203
1204impl RpcCancellationToken {
1205    /// Construct a fresh, un-fired token.
1206    pub fn new() -> Self {
1207        Self::default()
1208    }
1209
1210    /// Flip the token. Idempotent — repeated calls are no-ops.
1211    /// Wakes any task currently in [`Self::cancelled`].
1212    pub fn cancel(&self) {
1213        // Release pairs with the Acquire load in `is_cancelled`
1214        // so a handler that observes `is_cancelled() == true` is
1215        // guaranteed to see every prior write the canceller did.
1216        self.inner.fired.store(true, Ordering::Release);
1217        self.inner.notify.notify_waiters();
1218    }
1219
1220    /// Synchronous probe. `true` once `cancel()` has been called.
1221    #[inline]
1222    pub fn is_cancelled(&self) -> bool {
1223        self.inner.fired.load(Ordering::Acquire)
1224    }
1225
1226    /// Await the cancellation. Returns immediately if already
1227    /// cancelled. Otherwise registers as a waiter and returns when
1228    /// `cancel()` is called.
1229    ///
1230    /// Race-safe: registering the `notified()` future BEFORE the
1231    /// `is_cancelled` check means a `cancel()` racing this method
1232    /// either (a) is observed by the post-register check and we
1233    /// return immediately, OR (b) lands after we register and wakes
1234    /// our future. Either way we don't sleep past a cancellation.
1235    pub async fn cancelled(&self) {
1236        let notified = self.inner.notify.notified();
1237        if self.is_cancelled() {
1238            return;
1239        }
1240        notified.await;
1241    }
1242}
1243
1244/// W3C Trace Context — `traceparent` and `tracestate` headers
1245/// propagated through nRPC for distributed-tracing systems.
1246///
1247/// `traceparent` carries the trace id, parent span id, and flags;
1248/// `tracestate` carries vendor-specific tracing extensions. nRPC
1249/// is **transport-only** for these — it doesn't parse or generate
1250/// IDs, doesn't emit spans, doesn't talk to any tracing backend.
1251/// Application code (typically via `tracing-opentelemetry` or a
1252/// Datadog client) reads these on the server side and continues
1253/// the trace.
1254///
1255/// See <https://www.w3.org/TR/trace-context/> for the wire format
1256/// of each field.
1257#[derive(Debug, Clone, Default, PartialEq, Eq)]
1258pub struct TraceContext {
1259    /// `traceparent` header value (e.g.
1260    /// `"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"`).
1261    /// Required by the W3C spec; nRPC treats it as opaque bytes.
1262    pub traceparent: String,
1263    /// `tracestate` header value — vendor-specific extensions.
1264    /// Optional in W3C; empty string when absent.
1265    pub tracestate: String,
1266}
1267
1268/// Context handed to a `RpcHandler::call`. Carries everything the
1269/// handler needs to fulfill the request: the AEAD-verified caller
1270/// identity, the request payload, and a cancellation token.
1271pub struct RpcContext {
1272    /// AEAD-verified caller `origin_hash`. The bus sets this from
1273    /// the verified peer; not self-claimable from the request body.
1274    pub caller_origin: u64,
1275    /// Caller-generated correlation id. Same value on the matching
1276    /// CANCEL or RESPONSE.
1277    pub call_id: u64,
1278    /// Decoded request payload.
1279    pub payload: RpcRequestPayload,
1280    /// Cancellation signal. Handlers should `select!` on
1281    /// `cancellation.cancelled()` if their work is async-cancellable;
1282    /// long-running synchronous handlers should periodically check
1283    /// `cancellation.is_cancelled()`.
1284    pub cancellation: RpcCancellationToken,
1285    /// W3C Trace Context propagated from the caller, if the
1286    /// caller set `FLAG_RPC_PROPAGATE_TRACE` and supplied
1287    /// `traceparent` / `tracestate` headers in the request. The
1288    /// server's handler reads this to continue the distributed
1289    /// trace. `None` for calls that didn't propagate trace
1290    /// context.
1291    pub trace_context: Option<TraceContext>,
1292}
1293
1294/// Handler-side error that doesn't fit the application's normal
1295/// `Ok(RpcResponsePayload)` channel. The fold maps these onto a
1296/// failure-status `RpcResponsePayload` for the caller.
1297#[derive(Debug, thiserror::Error)]
1298pub enum RpcHandlerError {
1299    /// Application-defined error. The fold encodes this as
1300    /// `RpcStatus::Application(code)` with `message` as the body.
1301    #[error("application error {code:#06x}: {message}")]
1302    Application {
1303        /// Application error code; surfaces as `RpcStatus::Application(code)`
1304        /// to the caller. Use `0x8000..=0xFFFF` to avoid the
1305        /// reserved canonical range.
1306        code: u16,
1307        /// Diagnostic. Becomes the response body (UTF-8 bytes).
1308        message: String,
1309    },
1310    /// Catch-all for handler-internal failures. The fold encodes this
1311    /// as `RpcStatus::Internal` with `message` as the body.
1312    #[error("internal: {0}")]
1313    Internal(String),
1314}
1315
1316/// User-supplied handler. Implementors typically wrap their state
1317/// (or an `Arc<Mutex<State>>`) and route to the appropriate logic
1318/// based on `ctx.payload.service` or per-handler dispatch.
1319///
1320/// Multiple `Mesh::serve_rpc` registrations on different services
1321/// each install their own handler; a single handler typically
1322/// services one service.
1323#[async_trait::async_trait]
1324pub trait RpcHandler: Send + Sync + 'static {
1325    /// Process one request and return the response payload. The
1326    /// fold spawns this in a tokio task; the fold itself doesn't
1327    /// block on it. Handlers should respect `ctx.cancellation` for
1328    /// cooperative early-abort.
1329    async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError>;
1330}
1331
1332/// Callback the fold invokes to publish a response back to the
1333/// caller. Wired up by `Mesh::serve_rpc` to publish on
1334/// `<service>.replies.<caller_origin>`. Type-erased so the fold
1335/// doesn't depend on the mesh layer directly.
1336///
1337/// Arguments: `(caller_origin, call_id, response_payload)`.
1338pub type RpcResponseEmitter = Arc<dyn Fn(u64, u64, RpcResponsePayload) + Send + Sync + 'static>;
1339
1340/// Async counterpart of [`RpcResponseEmitter`] used by the
1341/// streaming fold's pump task to serialize per-call publishes.
1342///
1343/// The streaming pump awaits each emit before reading the next
1344/// chunk from the sink — this guarantees that chunks for one
1345/// `call_id` reach the network publish path in the order the
1346/// handler emitted them. (The unary fold has no such requirement
1347/// — it emits exactly one RESPONSE per call — so it sticks with
1348/// the simpler sync `RpcResponseEmitter`.)
1349pub type RpcAsyncResponseEmitter = Arc<
1350    dyn Fn(u64, u64, RpcResponsePayload) -> futures::future::BoxFuture<'static, ()>
1351        + Send
1352        + Sync
1353        + 'static,
1354>;
1355
1356/// Server-side fold. Sees REQUEST events on the configured channel,
1357/// dispatches to the user-supplied handler, emits RESPONSE events
1358/// via the supplied emitter. CANCEL events flip the matching
1359/// in-flight token.
1360///
1361/// State `()` — the user's state lives on whatever the `RpcHandler`
1362/// captures (typically `Arc<Mutex<S>>`). The fold's own state (the
1363/// in-flight map) lives on `&mut self` and is shared with spawned
1364/// handler tasks via `Arc<Mutex<...>>` so the task can self-clean
1365/// on completion.
1366pub struct RpcServerFold {
1367    handler: Arc<dyn RpcHandler>,
1368    emit: RpcResponseEmitter,
1369    /// (caller_origin, call_id) → cancellation token for the
1370    /// in-flight handler. Inserted on REQUEST, removed by either
1371    /// the spawned handler task on completion or by the fold on
1372    /// CANCEL. Wrapped in `Arc<Mutex<...>>` so spawned tasks can
1373    /// remove their own entries without going back through the
1374    /// fold.
1375    in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
1376    /// Optional per-service metrics handle. When `Some`, the
1377    /// spawned handler task bumps `handler_invocations_total` /
1378    /// `handler_in_flight` / `handler_panics_total` and records
1379    /// per-task wall-clock durations. `None` → no metrics
1380    /// (test-only path; production `Mesh::serve_rpc` always
1381    /// supplies one).
1382    metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
1383    /// Optional clock override for tests. `None` → real wall-clock
1384    /// `unix_nanos`. `Some(...)` → fixed value, lets tests pin
1385    /// deadline-already-passed behavior without sleeping.
1386    #[cfg(test)]
1387    test_now_ns: Option<u64>,
1388}
1389
1390impl RpcServerFold {
1391    /// Construct a server fold around `handler`. `emit` is the
1392    /// callback that publishes RESPONSE events to the caller's
1393    /// reply channel — `Mesh::serve_rpc` wires this to the
1394    /// publisher for `<service>.replies.<caller_origin>`.
1395    /// Constructed without a metrics handle; production callers
1396    /// chain `.with_metrics(...)` to opt into per-service
1397    /// counters.
1398    pub fn new(handler: Arc<dyn RpcHandler>, emit: RpcResponseEmitter) -> Self {
1399        Self {
1400            handler,
1401            emit,
1402            in_flight: Arc::new(Mutex::new(HashMap::new())),
1403            metrics: None,
1404            #[cfg(test)]
1405            test_now_ns: None,
1406        }
1407    }
1408
1409    /// Attach a per-service metrics handle. Hooks the spawned
1410    /// handler task to bump `handler_invocations_total`, balance
1411    /// `handler_in_flight`, count panics, and record handler
1412    /// duration into the histogram.
1413    pub fn with_metrics(
1414        mut self,
1415        metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
1416    ) -> Self {
1417        self.metrics = Some(metrics);
1418        self
1419    }
1420
1421    /// Test-only: pin the clock the fold uses for deadline
1422    /// short-circuit. Lets a unit test exercise the
1423    /// deadline-already-passed branch without waiting for wall
1424    /// time.
1425    #[cfg(test)]
1426    pub fn with_test_now_ns(mut self, now_ns: u64) -> Self {
1427        self.test_now_ns = Some(now_ns);
1428        self
1429    }
1430
1431    /// Test-only: snapshot of the in-flight call set.
1432    #[cfg(test)]
1433    pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
1434        self.in_flight.lock().keys().copied().collect()
1435    }
1436
1437    fn now_ns(&self) -> u64 {
1438        #[cfg(test)]
1439        if let Some(t) = self.test_now_ns {
1440            return t;
1441        }
1442        std::time::SystemTime::now()
1443            .duration_since(std::time::UNIX_EPOCH)
1444            .map(|d| d.as_nanos() as u64)
1445            .unwrap_or(0)
1446    }
1447
1448    /// `true` if the request's deadline has already elapsed at
1449    /// the server's current wall-clock — accounting for a small
1450    /// tolerance window that absorbs clock skew between caller
1451    /// and server. Without the tolerance, a request from a peer
1452    /// whose clock is a few hundred ms ahead of the server's
1453    /// would be timed out before the handler even saw it. Matches
1454    /// gRPC's default deadline-clock-skew tolerance shape (gRPC
1455    /// uses ~10 s).
1456    fn deadline_already_passed(&self, deadline_ns: u64) -> bool {
1457        if deadline_ns == 0 {
1458            return false;
1459        }
1460        self.now_ns().saturating_sub(DEADLINE_SKEW_TOLERANCE_NS) > deadline_ns
1461    }
1462}
1463
1464/// Tolerance for clock skew between caller and server when the
1465/// server short-circuits a request whose `deadline_ns` looks like
1466/// it has already elapsed. The check is
1467/// `now_ns - SKEW > deadline_ns`, so a request from a peer whose
1468/// clock is up to `SKEW` nanoseconds ahead of ours never hits the
1469/// short-circuit path. 10 s matches gRPC's default and is well
1470/// within the threshold an NTP-disciplined cluster ever drifts to.
1471pub const DEADLINE_SKEW_TOLERANCE_NS: u64 = 10_000_000_000; // 10 seconds
1472
1473impl RedexFold<()> for RpcServerFold {
1474    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
1475        // Decode the meta header. A garbled meta means the event
1476        // doesn't even claim to be an RPC packet — log and skip
1477        // rather than killing the fold. Returning `Err(Decode)`
1478        // here would stop the entire cortex adapter for one
1479        // malformed event, which is wrong for an RPC server that
1480        // needs to keep serving.
1481        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
1482            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
1483        } else {
1484            None
1485        }) else {
1486            tracing::warn!(
1487                payload_len = ev.payload.len(),
1488                "rpc server fold: event payload too short for EventMeta; skipping",
1489            );
1490            return Ok(());
1491        };
1492        let key = (meta.origin_hash, meta.seq_or_ts);
1493        match meta.dispatch {
1494            DISPATCH_RPC_REQUEST => {
1495                let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
1496                    Ok(p) => p,
1497                    Err(e) => {
1498                        // Malformed request payload. Surface as
1499                        // `UnknownVersion` to the caller — they sent
1500                        // bytes we couldn't parse, which usually
1501                        // means a wire-format mismatch (the most
1502                        // common cause). Log so operators can
1503                        // diagnose.
1504                        tracing::warn!(
1505                            error = %e,
1506                            caller_origin = format!("{:#x}", meta.origin_hash),
1507                            call_id = meta.seq_or_ts,
1508                            "rpc server fold: malformed request payload",
1509                        );
1510                        let resp = RpcResponsePayload {
1511                            status: RpcStatus::UnknownVersion,
1512                            headers: vec![],
1513                            body: Bytes::from(format!("malformed request: {e}")),
1514                        };
1515                        (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1516                        return Ok(());
1517                    }
1518                };
1519                // Fast deadline-already-passed short-circuit.
1520                // Server-side `Timeout` without invoking the
1521                // handler. Includes a clock-skew tolerance window
1522                // so a peer with a slightly-fast clock isn't
1523                // prematurely timed out — see
1524                // `deadline_already_passed`.
1525                if self.deadline_already_passed(payload.deadline_ns) {
1526                    let resp = RpcResponsePayload {
1527                        status: RpcStatus::Timeout,
1528                        headers: vec![],
1529                        body: Bytes::from_static(b"deadline already passed when request landed"),
1530                    };
1531                    (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1532                    return Ok(());
1533                }
1534                // Refuse a duplicate REQUEST with the same
1535                // `(origin_hash, call_id)` — see streaming fold for
1536                // the full rationale. For the unary fold this would
1537                // spawn a second handler under the same key, and
1538                // whichever handler completes first removes the
1539                // in-flight entry — leaving the second handler's
1540                // CANCEL handling broken (CANCEL events look up
1541                // the now-missing key and no-op). Cleaner to refuse.
1542                {
1543                    let in_flight = self.in_flight.lock();
1544                    if in_flight.contains_key(&key) {
1545                        drop(in_flight);
1546                        tracing::warn!(
1547                            caller_origin = format!("{:#x}", meta.origin_hash),
1548                            call_id = meta.seq_or_ts,
1549                            "rpc server fold: duplicate REQUEST for in-flight call_id; refusing",
1550                        );
1551                        let resp = RpcResponsePayload {
1552                            status: RpcStatus::Internal,
1553                            headers: vec![],
1554                            body: Bytes::from_static(
1555                                b"duplicate REQUEST for already-in-flight call_id",
1556                            ),
1557                        };
1558                        (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1559                        return Ok(());
1560                    }
1561                }
1562                let cancellation = RpcCancellationToken::new();
1563                self.in_flight.lock().insert(key, cancellation.clone());
1564                let handler = self.handler.clone();
1565                let emit = self.emit.clone();
1566                let in_flight = self.in_flight.clone();
1567                let caller_origin = meta.origin_hash;
1568                let call_id = meta.seq_or_ts;
1569                // Decode the W3C Trace Context if the caller
1570                // signaled it via `FLAG_RPC_PROPAGATE_TRACE` and
1571                // included the `traceparent` / `tracestate`
1572                // headers. nRPC is transport-only — application
1573                // code reads `ctx.trace_context` to continue the
1574                // trace via whatever backend it has wired up.
1575                let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
1576                    extract_trace_context(&payload.headers)
1577                } else {
1578                    None
1579                };
1580                let metrics = self.metrics.clone();
1581                // Keep a probe handle so the spawned task can detect
1582                // a CANCEL that fired during handler execution and
1583                // override its response with `RpcStatus::Cancelled`.
1584                let cancel_probe = cancellation.clone();
1585                tokio::spawn(async move {
1586                    // Server-side metrics: count this invocation;
1587                    // bump in_flight; time the handler; tally
1588                    // panics. Only fires when a metrics handle was
1589                    // attached via `with_metrics(...)` — test-only
1590                    // folds construct without one.
1591                    if let Some(m) = metrics.as_ref() {
1592                        m.handler_invocations_total
1593                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1594                        m.handler_in_flight
1595                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1596                    }
1597                    let handler_started = std::time::Instant::now();
1598                    let ctx = RpcContext {
1599                        caller_origin,
1600                        call_id,
1601                        payload,
1602                        cancellation,
1603                        trace_context,
1604                    };
1605                    // Catch panics so a misbehaving handler can't
1606                    // take down the runtime. `AssertUnwindSafe` is
1607                    // load-bearing because `RpcHandler::call`
1608                    // returns a future that may borrow non-
1609                    // `UnwindSafe` types from the handler; we
1610                    // accept the assertion because the handler's
1611                    // state is untouched on panic (we just don't
1612                    // observe its in-progress mutations).
1613                    let outcome = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
1614                        handler.call(ctx),
1615                    ))
1616                    .await;
1617                    if let Some(m) = metrics.as_ref() {
1618                        m.handler_in_flight
1619                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1620                        m.record_handler_duration(handler_started.elapsed());
1621                        if outcome.is_err() {
1622                            m.handler_panics_total
1623                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624                        }
1625                    }
1626                    // CANCEL-wins ordering: if the cancellation
1627                    // token fired at any point during handler
1628                    // execution, override the handler's outcome
1629                    // with `RpcStatus::Cancelled` so the caller
1630                    // (or hedge primary, retry layer, etc.) sees
1631                    // the documented `Cancelled` status code rather
1632                    // than whatever the handler happened to return
1633                    // before / despite cancellation. A cooperative
1634                    // handler that observes the token and bails
1635                    // early gets the same Cancelled framing as a
1636                    // handler that ignored cancellation and ran to
1637                    // completion — the caller's view is uniform.
1638                    let resp = if cancel_probe.is_cancelled() {
1639                        RpcResponsePayload {
1640                            status: RpcStatus::Cancelled,
1641                            headers: vec![],
1642                            body: Bytes::from_static(
1643                                b"server observed CANCEL during handler execution",
1644                            ),
1645                        }
1646                    } else {
1647                        match outcome {
1648                            Ok(Ok(payload)) => payload,
1649                            Ok(Err(RpcHandlerError::Application { code, message })) => {
1650                                RpcResponsePayload {
1651                                    status: RpcStatus::Application(code),
1652                                    headers: vec![],
1653                                    body: Bytes::from(message),
1654                                }
1655                            }
1656                            Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
1657                                status: RpcStatus::Internal,
1658                                headers: vec![],
1659                                body: Bytes::from(message),
1660                            },
1661                            Err(panic) => {
1662                                let panic_msg = panic
1663                                    .downcast_ref::<&'static str>()
1664                                    .map(|s| s.to_string())
1665                                    .or_else(|| panic.downcast_ref::<String>().cloned())
1666                                    .unwrap_or_else(|| "<non-string panic>".into());
1667                                tracing::error!(
1668                                    caller_origin = format!("{:#x}", caller_origin),
1669                                    call_id,
1670                                    panic = %panic_msg,
1671                                    "rpc server handler panicked",
1672                                );
1673                                RpcResponsePayload {
1674                                    status: RpcStatus::Internal,
1675                                    headers: vec![],
1676                                    body: Bytes::from(format!("handler panicked: {panic_msg}")),
1677                                }
1678                            }
1679                        }
1680                    };
1681                    in_flight.lock().remove(&key);
1682                    emit(caller_origin, call_id, resp);
1683                });
1684            }
1685            DISPATCH_RPC_CANCEL => {
1686                if let Some(token) = self.in_flight.lock().remove(&key) {
1687                    token.cancel();
1688                }
1689                // Idempotent — CANCEL for an unknown call_id (e.g.
1690                // a CANCEL that races the handler's completion) is
1691                // a no-op rather than an error. The spawned handler
1692                // task observes `cancel_probe.is_cancelled()` after
1693                // its future resolves and overrides the response
1694                // with `RpcStatus::Cancelled` so the caller sees a
1695                // documented status code rather than the handler's
1696                // accidental Ok / Internal payload.
1697            }
1698            // RESPONSE / DEADLINE_EXCEEDED are server-emitted; if
1699            // the server's own fold sees them (e.g. from a replay)
1700            // there's nothing to do.
1701            _ => {}
1702        }
1703        Ok(())
1704    }
1705}
1706
1707// ============================================================================
1708// Streaming server-side: handler trait + sink + fold.
1709// ============================================================================
1710
1711/// Sink the handler writes to in order to emit streaming-response
1712/// chunks. Each `send` produces one non-terminal `RESPONSE` event
1713/// to the caller. The terminal frame is emitted automatically when
1714/// the sink is dropped — the handler returning `Ok(())` drops the
1715/// sink, which closes the stream cleanly. Returning
1716/// `Err(RpcHandlerError)` drops the sink and emits the error as a
1717/// terminal non-`Ok` RESPONSE.
1718///
1719/// `send` is best-effort and infallible: the underlying mpsc is
1720/// **bounded** at [`STREAMING_PUMP_CAPACITY`] chunks. If the pump
1721/// can't keep up (publish path is congested, caller hasn't granted
1722/// flow-control credits), `send` discards on overflow — same
1723/// observable shape as a closed receiver (caller cancelled mid-
1724/// stream). Counts the drop in `streaming_chunks_dropped_total` so
1725/// operators can see backpressure occurring. Cooperative
1726/// cancellation via `ctx.cancellation` is the right way for the
1727/// handler to notice the consumer is gone; opt-in flow control via
1728/// `CallOptions::stream_window_initial` is the right way to
1729/// throttle a fast handler against a slow consumer.
1730pub struct RpcResponseSink {
1731    inner: tokio::sync::mpsc::Sender<bytes::Bytes>,
1732    /// Optional metrics handle so a dropped-on-full chunk bumps the
1733    /// `streaming_chunks_dropped_total` counter. `None` for unit-
1734    /// test folds that construct without metrics.
1735    metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
1736}
1737
1738impl RpcResponseSink {
1739    /// Emit one non-terminal chunk. Cheap (`try_send` on a
1740    /// [`STREAMING_PUMP_CAPACITY`]-bounded mpsc); never blocks. On
1741    /// overflow OR receiver-closed, the chunk is dropped and (when
1742    /// metrics are wired) `streaming_chunks_dropped_total` is
1743    /// incremented for the service.
1744    pub fn send(&self, body: impl Into<bytes::Bytes>) {
1745        if self.inner.try_send(body.into()).is_err() {
1746            if let Some(m) = self.metrics.as_ref() {
1747                m.streaming_chunks_dropped_total
1748                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1749            }
1750        }
1751    }
1752}
1753
1754/// Bounded capacity for the streaming pump's internal mpsc. A
1755/// runaway handler that produces chunks faster than the publish
1756/// path can drain them stops blocking the runtime past this many
1757/// queued chunks — additional chunks are dropped (and counted via
1758/// `streaming_chunks_dropped_total`). 1024 is generous for typical
1759/// streaming patterns; opt-in flow control via
1760/// `CallOptions::stream_window_initial` is the right primitive for
1761/// strict throttling.
1762pub const STREAMING_PUMP_CAPACITY: usize = 1024;
1763
1764/// Bounded capacity for the client-streaming server fold's
1765/// per-call request mpsc. Mirror of [`STREAMING_PUMP_CAPACITY`]
1766/// for the upload direction. A runaway caller that emits
1767/// REQUEST_CHUNKs faster than the handler can drain stops
1768/// queueing past this many chunks — additional chunks are
1769/// dropped (and counted via `streaming_request_chunks_dropped_total`
1770/// when metrics are wired). Opt-in flow control via the
1771/// `nrpc-request-window-initial` header is the right primitive
1772/// for strict throttling on the upload side.
1773///
1774/// Bidi streaming plan (Phase B).
1775pub const STREAMING_REQUEST_PUMP_CAPACITY: usize = 1024;
1776
1777// ============================================================================
1778// Phase B — server-side primitives for client-streaming.
1779// ============================================================================
1780
1781/// Context handed to an [`RpcClientStreamingHandler::call`]. Same
1782/// shape as [`RpcContext`] minus the eager `payload` (the request
1783/// stream delivers chunk bodies on the fly) and plus the
1784/// per-call `deadline_ns` (which would otherwise have ridden on
1785/// the eager payload).
1786///
1787/// Bidi streaming plan (Phase B).
1788pub struct RpcStreamingContext {
1789    /// AEAD-verified caller `origin_hash`. Same source as
1790    /// [`RpcContext::caller_origin`].
1791    pub caller_origin: u64,
1792    /// Caller-generated correlation id. Matches the initial
1793    /// REQUEST's `call_id` and every subsequent REQUEST_CHUNK /
1794    /// CANCEL / REQUEST_GRANT for this call.
1795    pub call_id: u64,
1796    /// Absolute deadline (unix nanos) from the initial REQUEST.
1797    /// `0` means no deadline; the fold does NOT auto-cancel on
1798    /// deadline (handlers self-supervise via tokio timers, same
1799    /// contract as the unary fold).
1800    pub deadline_ns: u64,
1801    /// Per-chunk metadata headers from the initial REQUEST.
1802    /// Per-REQUEST_CHUNK headers are NOT surfaced at the substrate
1803    /// layer — the typed SDK veneer (Phase E) is where header
1804    /// inspection across chunks lives (if it lands at all; the
1805    /// plan defers per-chunk-headers as opt-in raw-path access).
1806    pub headers: Vec<RpcHeader>,
1807    /// Cancellation signal. Flipped by the fold when a
1808    /// `DISPATCH_RPC_CANCEL` arrives for this call's `call_id`.
1809    /// Long-running handlers should `select!` on
1810    /// `cancellation.cancelled()`; the request stream also
1811    /// terminates on cancellation, but the token is the
1812    /// authoritative signal (the stream's terminator is shared
1813    /// with REQUEST_END).
1814    pub cancellation: RpcCancellationToken,
1815    /// W3C Trace Context propagated from the caller's initial
1816    /// REQUEST. Same semantics as [`RpcContext::trace_context`].
1817    pub trace_context: Option<TraceContext>,
1818}
1819
1820/// Callback the fold invokes to publish a [`DISPATCH_RPC_REQUEST_GRANT`]
1821/// event back to the caller. Wired up by the `Mesh` glue (Phase C)
1822/// to publish on the caller's reply channel. Type-erased so the
1823/// fold doesn't depend on the mesh layer directly.
1824///
1825/// Arguments: `(caller_origin, call_id, credits)`. Synchronous —
1826/// the publish itself is non-blocking (the underlying transport
1827/// has its own internal queueing); the fold fires-and-forgets
1828/// every grant, so dropped grants are at worst a latency wobble,
1829/// not a correctness issue (the caller's send sink will retry
1830/// when its credit budget refills via the next grant or via the
1831/// initial window).
1832///
1833/// Bidi streaming plan (Phase B).
1834pub type RpcRequestGrantEmitter = Arc<dyn Fn(u64, u64, u32) + Send + Sync + 'static>;
1835
1836/// Server-side stream of inbound request chunk bodies for one
1837/// client-streaming (or duplex) call. Yields one `Bytes` per
1838/// `DISPATCH_RPC_REQUEST` / `DISPATCH_RPC_REQUEST_CHUNK` frame
1839/// (including empty bodies — the semantics of "empty bytes" are
1840/// the application's concern, not the substrate's). Closes on
1841/// `FLAG_RPC_REQUEST_END` or on CANCEL.
1842///
1843/// **Stream item ordering convention**: the first item this
1844/// stream yields corresponds to the initial REQUEST body; every
1845/// subsequent item corresponds to a REQUEST_CHUNK body, in the
1846/// order the chunks were received from the wire. The substrate
1847/// does not tag items with their frame kind — the SDK veneer
1848/// (Phase E) is responsible for the Init / Data classification
1849/// via its `Chunk<T>` enum.
1850///
1851/// **Auto-grant behavior**: when the caller opted into
1852/// request-direction flow control via
1853/// [`HEADER_NRPC_REQUEST_WINDOW_INITIAL`], every successful
1854/// `poll_next()` fires one credit back to the caller via the
1855/// captured `grant_emitter`. This keeps the in-flight window
1856/// at the caller's initial value as the handler drains the
1857/// stream. When the caller did NOT opt in (no header), the
1858/// `grant_emitter` is `None` and the auto-grant path is a no-op
1859/// (caller is on the unbounded-credit fast path).
1860///
1861/// Bidi streaming plan (Phase B).
1862pub struct RequestStream {
1863    inner: tokio::sync::mpsc::Receiver<bytes::Bytes>,
1864    grant_emitter: Option<RpcRequestGrantEmitter>,
1865    caller_origin: u64,
1866    call_id: u64,
1867}
1868
1869impl RequestStream {
1870    /// Visible to the fold (and only the fold) for constructing
1871    /// a stream tied to a specific receiver + caller. The
1872    /// `grant_emitter` is `None` when the caller didn't opt into
1873    /// flow control; `Some(...)` when they did.
1874    pub(crate) fn new(
1875        inner: tokio::sync::mpsc::Receiver<bytes::Bytes>,
1876        grant_emitter: Option<RpcRequestGrantEmitter>,
1877        caller_origin: u64,
1878        call_id: u64,
1879    ) -> Self {
1880        Self {
1881            inner,
1882            grant_emitter,
1883            caller_origin,
1884            call_id,
1885        }
1886    }
1887}
1888
1889impl futures::Stream for RequestStream {
1890    type Item = bytes::Bytes;
1891
1892    fn poll_next(
1893        mut self: std::pin::Pin<&mut Self>,
1894        cx: &mut std::task::Context<'_>,
1895    ) -> std::task::Poll<Option<Self::Item>> {
1896        match self.inner.poll_recv(cx) {
1897            std::task::Poll::Ready(Some(bytes)) => {
1898                // Auto-grant fires on every successful pull when
1899                // flow control was opted into. Cheap and
1900                // fire-and-forget; missed grants are recovered
1901                // by subsequent pulls.
1902                if let Some(emit) = self.grant_emitter.as_ref() {
1903                    emit(self.caller_origin, self.call_id, 1);
1904                }
1905                std::task::Poll::Ready(Some(bytes))
1906            }
1907            other => other,
1908        }
1909    }
1910}
1911
1912/// User-supplied handler for a client-streaming RPC. Receives an
1913/// [`RpcStreamingContext`] (caller identity, deadline, cancellation,
1914/// trace context, initial REQUEST headers) plus a [`RequestStream`]
1915/// of chunk bodies. Returns one terminal [`RpcResponsePayload`] —
1916/// the fold publishes it as the call's single RESPONSE frame.
1917///
1918/// **Cancellation contract.** Long-running handlers should
1919/// `select!` on `ctx.cancellation.cancelled()` so a caller-side
1920/// drop / deadline correctly stops the handler. The request
1921/// stream also terminates on cancellation (yields `None`), but
1922/// the token is the authoritative signal — the stream's `None`
1923/// is shared with the clean REQUEST_END path, so handlers can't
1924/// distinguish "caller finished cleanly" from "caller cancelled"
1925/// without consulting the token.
1926///
1927/// **Auto-grant.** When the caller opted into request-direction
1928/// flow control via [`HEADER_NRPC_REQUEST_WINDOW_INITIAL`], every
1929/// `stream.next().await` that yields `Some` fires one
1930/// REQUEST_GRANT back to the caller, maintaining the in-flight
1931/// window at the caller's initial value. Handlers don't need to
1932/// think about credit management for the common case.
1933///
1934/// Bidi streaming plan (Phase B).
1935#[async_trait::async_trait]
1936pub trait RpcClientStreamingHandler: Send + Sync + 'static {
1937    /// Process a client-streaming call. Drain the request stream,
1938    /// produce one terminal response payload (or an
1939    /// [`RpcHandlerError`] for failure mapping).
1940    async fn call(
1941        &self,
1942        ctx: RpcStreamingContext,
1943        requests: RequestStream,
1944    ) -> Result<RpcResponsePayload, RpcHandlerError>;
1945}
1946
1947/// User-supplied handler for a duplex RPC — many requests in,
1948/// many responses out, interleaved. Receives an [`RpcStreamingContext`]
1949/// plus a [`RequestStream`] of chunk bodies plus an
1950/// [`RpcResponseSink`] for emitting response chunks. The handler's
1951/// return value is its terminal status, NOT a final payload:
1952/// `Ok(())` closes the response stream cleanly with a terminal
1953/// `Ok` frame, `Err(RpcHandlerError)` closes with the matching
1954/// error status.
1955///
1956/// **Composition.** A duplex handler is a hybrid of an
1957/// [`RpcClientStreamingHandler`] (drains request chunks) and an
1958/// [`RpcStreamingHandler`] (emits response chunks). The two
1959/// directions are independent — a handler can finish emitting
1960/// responses before reading all requests, or vice versa. The
1961/// server fold serializes RESPONSE chunk publishes per call_id
1962/// so wire order matches handler order.
1963///
1964/// **Cancellation contract.** Identical to
1965/// [`RpcClientStreamingHandler`]: long-running work should
1966/// `select!` on `ctx.cancellation.cancelled()`.
1967///
1968/// **Auto-grant.** Identical to [`RpcClientStreamingHandler`]:
1969/// every successful `requests.next().await` emits one
1970/// REQUEST_GRANT back to the caller (when the caller opted in).
1971///
1972/// Bidi streaming plan (Phase D).
1973#[async_trait::async_trait]
1974pub trait RpcDuplexHandler: Send + Sync + 'static {
1975    /// Process one duplex call. Drain inbound chunks via
1976    /// `requests.next().await`; emit outbound chunks via
1977    /// `responses.send(...)`. Return `Ok(())` for clean close,
1978    /// `Err(RpcHandlerError)` for failure mapping.
1979    async fn call(
1980        &self,
1981        ctx: RpcStreamingContext,
1982        requests: RequestStream,
1983        responses: RpcResponseSink,
1984    ) -> Result<(), RpcHandlerError>;
1985}
1986
1987/// User-supplied streaming handler. Receives the same `RpcContext`
1988/// as a unary handler plus a `RpcResponseSink` for emitting chunks.
1989/// Returning `Ok(())` closes the stream cleanly with a terminal
1990/// `Ok` RESPONSE; `Err(RpcHandlerError)` closes the stream with a
1991/// terminal non-`Ok` RESPONSE carrying the diagnostic.
1992///
1993/// **Cancellation contract.** Long-running streams should
1994/// `select!` on `ctx.cancellation.cancelled()` so a caller-side
1995/// drop / deadline correctly stops the handler. Continuing to
1996/// `send` after cancellation is harmless (sink discards) but
1997/// wastes work.
1998#[async_trait::async_trait]
1999pub trait RpcStreamingHandler: Send + Sync + 'static {
2000    /// Process one streaming request. Emit chunks via `sink.send(...)`.
2001    /// Drop the sink (or return) to close the stream.
2002    async fn call(&self, ctx: RpcContext, sink: RpcResponseSink) -> Result<(), RpcHandlerError>;
2003}
2004
2005/// Per-call flow-control map type. Keyed on
2006/// `(caller_origin_hash, call_id)`; value is a tokio
2007/// `Semaphore` shared between the pump task (which awaits
2008/// permits) and the fold's `apply()` method handling
2009/// STREAM_GRANT events (which add permits).
2010type FlowControlMap = Arc<Mutex<HashMap<(u64, u64), Arc<tokio::sync::Semaphore>>>>;
2011
2012/// Server-side fold for streaming RPC. Parallel to `RpcServerFold`
2013/// but multi-fire emit: each handler invocation may produce many
2014/// `RESPONSE` events for the same `call_id`, marked
2015/// non-terminal/terminal via the `nrpc-streaming` header.
2016///
2017/// State `()` — like the unary fold, the handler owns user state
2018/// via captured `Arc<Mutex<S>>`. The fold's own state (in-flight
2019/// cancellation tokens) lives on `&mut self`.
2020pub struct RpcServerStreamingFold {
2021    handler: Arc<dyn RpcStreamingHandler>,
2022    emit: RpcAsyncResponseEmitter,
2023    in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2024    /// Per-call flow-control semaphore (when the caller opted in).
2025    /// `Some(sem)` means "pump must `acquire().await` one permit
2026    /// per chunk before emitting; STREAM_GRANT events
2027    /// `add_permits(n)`". Absence of an entry for a `(origin,
2028    /// call_id)` key means unbounded credit (no flow control —
2029    /// pump emits as fast as the publish path can take chunks).
2030    flow_control: FlowControlMap,
2031    /// Optional per-service metrics handle. Same shape as
2032    /// `RpcServerFold::metrics`; the streaming fold ALSO bumps
2033    /// `streaming_chunks_emitted_total` from the pump task on
2034    /// every chunk.
2035    metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2036}
2037
2038impl RpcServerStreamingFold {
2039    /// Construct a streaming server fold. `emit` publishes
2040    /// individual chunks (and the terminal frame) on the caller's
2041    /// reply channel.
2042    ///
2043    /// Uses the **async** emitter variant so the pump task can
2044    /// serialize per-call publishes — without that ordering
2045    /// guarantee, two chunks emitted in succession can race into
2046    /// the publish path and arrive at the caller out of order
2047    /// (or be eclipsed by the terminal frame and lost entirely).
2048    pub fn new(handler: Arc<dyn RpcStreamingHandler>, emit: RpcAsyncResponseEmitter) -> Self {
2049        Self {
2050            handler,
2051            emit,
2052            in_flight: Arc::new(Mutex::new(HashMap::new())),
2053            flow_control: Arc::new(Mutex::new(HashMap::new())),
2054            metrics: None,
2055        }
2056    }
2057
2058    /// Attach a per-service metrics handle. Hooks the spawned
2059    /// handler task to bump `handler_invocations_total` /
2060    /// `handler_in_flight` / `handler_panics_total` /
2061    /// `handler_duration_*`, and the pump task to bump
2062    /// `streaming_chunks_emitted_total` per emitted chunk.
2063    pub fn with_metrics(
2064        mut self,
2065        metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2066    ) -> Self {
2067        self.metrics = Some(metrics);
2068        self
2069    }
2070
2071    /// Test-only: snapshot of the in-flight call set.
2072    #[cfg(test)]
2073    pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2074        self.in_flight.lock().keys().copied().collect()
2075    }
2076}
2077
2078impl RedexFold<()> for RpcServerStreamingFold {
2079    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
2080        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
2081            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
2082        } else {
2083            None
2084        }) else {
2085            tracing::warn!(
2086                payload_len = ev.payload.len(),
2087                "rpc streaming server fold: event payload too short for EventMeta",
2088            );
2089            return Ok(());
2090        };
2091        let key = (meta.origin_hash, meta.seq_or_ts);
2092        match meta.dispatch {
2093            DISPATCH_RPC_REQUEST => {
2094                let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
2095                    Ok(p) => p,
2096                    Err(e) => {
2097                        tracing::warn!(
2098                            error = %e,
2099                            caller_origin = format!("{:#x}", meta.origin_hash),
2100                            call_id = meta.seq_or_ts,
2101                            "rpc streaming server fold: malformed request payload",
2102                        );
2103                        // Surface as a terminal error chunk. Spawn
2104                        // because the apply method is sync and the
2105                        // emit is async; this is a one-shot publish
2106                        // so ordering doesn't matter here.
2107                        let resp = RpcResponsePayload {
2108                            status: RpcStatus::UnknownVersion,
2109                            headers: vec![(
2110                                HEADER_NRPC_STREAMING.to_string(),
2111                                HEADER_NRPC_STREAMING_END.to_vec(),
2112                            )],
2113                            body: Bytes::from(format!("malformed request: {e}")),
2114                        };
2115                        let emit = self.emit.clone();
2116                        let caller_origin = meta.origin_hash;
2117                        let call_id = meta.seq_or_ts;
2118                        tokio::spawn(async move {
2119                            emit(caller_origin, call_id, resp).await;
2120                        });
2121                        return Ok(());
2122                    }
2123                };
2124                // Refuse a duplicate REQUEST with the same
2125                // `(origin_hash, call_id)`. Without this, a retry
2126                // that arrives while the first attempt's pump is
2127                // still draining will overwrite the prior
2128                // semaphore Arc in `flow_control`, leaving the
2129                // first pump awaiting an orphaned semaphore (the
2130                // terminal cleanup keys on `key` and removes the
2131                // *new* entry, so the orphan never gets dropped
2132                // and the first handler hangs forever).
2133                //
2134                // Idempotent for the caller: we emit a terminal
2135                // `Internal` chunk so the duplicate sender sees a
2136                // clean refusal rather than waiting on a stream
2137                // that will never produce output.
2138                {
2139                    let in_flight = self.in_flight.lock();
2140                    if in_flight.contains_key(&key) {
2141                        drop(in_flight);
2142                        tracing::warn!(
2143                            caller_origin = format!("{:#x}", meta.origin_hash),
2144                            call_id = meta.seq_or_ts,
2145                            "rpc streaming server fold: duplicate REQUEST for in-flight call_id; refusing",
2146                        );
2147                        let resp = RpcResponsePayload {
2148                            status: RpcStatus::Internal,
2149                            headers: vec![(
2150                                HEADER_NRPC_STREAMING.to_string(),
2151                                HEADER_NRPC_STREAMING_END.to_vec(),
2152                            )],
2153                            body: Bytes::from_static(
2154                                b"duplicate REQUEST for already-in-flight call_id",
2155                            ),
2156                        };
2157                        let emit = self.emit.clone();
2158                        let caller_origin = meta.origin_hash;
2159                        let call_id = meta.seq_or_ts;
2160                        tokio::spawn(async move {
2161                            emit(caller_origin, call_id, resp).await;
2162                        });
2163                        return Ok(());
2164                    }
2165                }
2166                // Cancellation token + in-flight bookkeeping —
2167                // identical to the unary fold's pattern.
2168                let cancellation = RpcCancellationToken::new();
2169                self.in_flight.lock().insert(key, cancellation.clone());
2170                // Flow-control opt-in: parse the
2171                // `nrpc-stream-window-initial` header. When
2172                // present, install a per-call semaphore the pump
2173                // task will await per chunk; subsequent
2174                // STREAM_GRANT events refill it. When absent, no
2175                // entry → pump skips the await (back-compat).
2176                let flow_sem = parse_stream_window_initial(&payload.headers).map(|n| {
2177                    let sem = Arc::new(tokio::sync::Semaphore::new(n as usize));
2178                    self.flow_control.lock().insert(key, sem.clone());
2179                    sem
2180                });
2181                let handler = self.handler.clone();
2182                let emit = self.emit.clone();
2183                let in_flight = self.in_flight.clone();
2184                let flow_control = self.flow_control.clone();
2185                let caller_origin = meta.origin_hash;
2186                let call_id = meta.seq_or_ts;
2187                let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
2188                    extract_trace_context(&payload.headers)
2189                } else {
2190                    None
2191                };
2192                let metrics = self.metrics.clone();
2193                // See unary fold for rationale — clone the
2194                // cancellation handle so the spawned task can probe
2195                // it after the handler returns and override the
2196                // terminal frame with `RpcStatus::Cancelled`.
2197                let cancel_probe = cancellation.clone();
2198                tokio::spawn(async move {
2199                    if let Some(m) = metrics.as_ref() {
2200                        m.handler_invocations_total
2201                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2202                        m.handler_in_flight
2203                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2204                    }
2205                    let handler_started = std::time::Instant::now();
2206                    let ctx = RpcContext {
2207                        caller_origin,
2208                        call_id,
2209                        payload,
2210                        cancellation,
2211                        trace_context,
2212                    };
2213                    // Build the sink + receive end. Spawn a
2214                    // pump that forwards each chunk to the emit
2215                    // closure. The handler's `sink.send(...)`
2216                    // calls show up here as items on the receiver.
2217                    // **Bounded** at STREAMING_PUMP_CAPACITY: a
2218                    // runaway handler that produces chunks faster
2219                    // than the publish path can drain stops
2220                    // blocking the runtime past this many queued
2221                    // chunks; additional chunks are dropped and
2222                    // counted via streaming_chunks_dropped_total.
2223                    let (tx, mut rx) =
2224                        tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_PUMP_CAPACITY);
2225                    let sink = RpcResponseSink {
2226                        inner: tx,
2227                        metrics: metrics.clone(),
2228                    };
2229                    let pump_emit = emit.clone();
2230                    let pump_metrics = metrics.clone();
2231                    let pump_flow = flow_sem.clone();
2232                    let pump = tokio::spawn(async move {
2233                        while let Some(chunk) = rx.recv().await {
2234                            // Flow control: when the caller opted
2235                            // in, await one semaphore permit per
2236                            // chunk before publishing. The semaphore
2237                            // starts at the caller's `initial_window`
2238                            // and refills when the caller sends
2239                            // STREAM_GRANT events. `forget()`
2240                            // consumes the slot — each chunk uses
2241                            // exactly one credit, never returned.
2242                            // No-op when `pump_flow` is None
2243                            // (back-compat path).
2244                            if let Some(sem) = pump_flow.as_ref() {
2245                                let permit = match sem.clone().acquire_owned().await {
2246                                    Ok(p) => p,
2247                                    Err(_) => {
2248                                        // Semaphore was closed —
2249                                        // shouldn't happen during
2250                                        // normal operation; bail.
2251                                        break;
2252                                    }
2253                                };
2254                                permit.forget();
2255                            }
2256                            if let Some(m) = pump_metrics.as_ref() {
2257                                m.streaming_chunks_emitted_total
2258                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2259                            }
2260                            let resp = RpcResponsePayload {
2261                                status: RpcStatus::Ok,
2262                                headers: vec![(
2263                                    HEADER_NRPC_STREAMING.to_string(),
2264                                    HEADER_NRPC_STREAMING_CONTINUE.to_vec(),
2265                                )],
2266                                body: chunk.clone(),
2267                            };
2268                            // Await per-chunk publish so chunks for
2269                            // one call_id reach the network in send
2270                            // order. Without this, two chunks emitted
2271                            // in tight succession can race into the
2272                            // publish path and arrive out of order
2273                            // (or be eclipsed by the terminal frame
2274                            // and lost entirely on the caller side).
2275                            pump_emit(caller_origin, call_id, resp).await;
2276                        }
2277                    });
2278                    // Run the handler. Catch panics so a
2279                    // misbehaving handler can't take down the
2280                    // runtime — same shape as the unary fold.
2281                    let outcome = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
2282                        handler.call(ctx, sink),
2283                    ))
2284                    .await;
2285                    // The handler dropped the sink (either by
2286                    // returning or by panicking through the
2287                    // catch_unwind). Wait for the pump to drain
2288                    // any final in-flight chunks.
2289                    let _ = pump.await;
2290                    if let Some(m) = metrics.as_ref() {
2291                        m.handler_in_flight
2292                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2293                        m.record_handler_duration(handler_started.elapsed());
2294                        if outcome.is_err() {
2295                            m.handler_panics_total
2296                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2297                        }
2298                    }
2299                    // Emit the terminal frame. CANCEL-wins ordering
2300                    // matches the unary fold: if the cancellation
2301                    // token fired during execution, override the
2302                    // handler's terminal with `RpcStatus::Cancelled`.
2303                    let terminal = if cancel_probe.is_cancelled() {
2304                        RpcResponsePayload {
2305                            status: RpcStatus::Cancelled,
2306                            headers: vec![],
2307                            body: Bytes::from_static(
2308                                b"server observed CANCEL during streaming handler execution",
2309                            ),
2310                        }
2311                    } else {
2312                        match outcome {
2313                            Ok(Ok(())) => RpcResponsePayload {
2314                                status: RpcStatus::Ok,
2315                                headers: vec![(
2316                                    HEADER_NRPC_STREAMING.to_string(),
2317                                    HEADER_NRPC_STREAMING_END.to_vec(),
2318                                )],
2319                                body: Bytes::new(),
2320                            },
2321                            Ok(Err(RpcHandlerError::Application { code, message })) => {
2322                                RpcResponsePayload {
2323                                    status: RpcStatus::Application(code),
2324                                    headers: vec![],
2325                                    body: Bytes::from(message),
2326                                }
2327                            }
2328                            Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
2329                                status: RpcStatus::Internal,
2330                                headers: vec![],
2331                                body: Bytes::from(message),
2332                            },
2333                            Err(panic) => {
2334                                let panic_msg = panic
2335                                    .downcast_ref::<&'static str>()
2336                                    .map(|s| s.to_string())
2337                                    .or_else(|| panic.downcast_ref::<String>().cloned())
2338                                    .unwrap_or_else(|| "<non-string panic>".into());
2339                                tracing::error!(
2340                                    caller_origin = format!("{:#x}", caller_origin),
2341                                    call_id,
2342                                    panic = %panic_msg,
2343                                    "rpc streaming server handler panicked",
2344                                );
2345                                RpcResponsePayload {
2346                                    status: RpcStatus::Internal,
2347                                    headers: vec![],
2348                                    body: Bytes::from(format!("handler panicked: {panic_msg}")),
2349                                }
2350                            }
2351                        }
2352                    };
2353                    in_flight.lock().remove(&key);
2354                    // Drop the per-call flow-control semaphore
2355                    // (if any) so a stale GRANT arriving after
2356                    // termination is silently dropped — the entry
2357                    // is gone, lookup misses.
2358                    flow_control.lock().remove(&key);
2359                    // Await the terminal frame's publish too so it
2360                    // arrives strictly AFTER the last chunk on the
2361                    // wire (the pump has already drained, but the
2362                    // emit itself is still async and we must await
2363                    // it before the spawned task ends).
2364                    emit(caller_origin, call_id, terminal).await;
2365                });
2366            }
2367            DISPATCH_RPC_CANCEL => {
2368                if let Some(token) = self.in_flight.lock().remove(&key) {
2369                    token.cancel();
2370                }
2371                // Also drop the flow-control entry — the spawned
2372                // task's terminal cleanup will run too, but doing
2373                // it here makes the CANCEL path immediately stop
2374                // refilling the pump (the pending `acquire().await`
2375                // will resolve once the semaphore is dropped or
2376                // when the task exits).
2377                self.flow_control.lock().remove(&key);
2378            }
2379            DISPATCH_RPC_STREAM_GRANT => {
2380                // Add credit to the per-call semaphore. Silently
2381                // drop GRANT events for unknown / non-flow-
2382                // controlled calls — server can't tell whether
2383                // the caller is racing a terminal vs. sending a
2384                // grant for a non-flow-controlled stream, and
2385                // both are harmless to ignore.
2386                let amount = match decode_stream_grant(&ev.payload[EVENT_META_SIZE..]) {
2387                    Some(n) => n,
2388                    None => {
2389                        tracing::debug!(
2390                            caller_origin = format!("{:#x}", meta.origin_hash),
2391                            call_id = meta.seq_or_ts,
2392                            "rpc streaming server fold: malformed STREAM_GRANT payload",
2393                        );
2394                        return Ok(());
2395                    }
2396                };
2397                if amount == 0 {
2398                    return Ok(());
2399                }
2400                if let Some(sem) = self.flow_control.lock().get(&key).cloned() {
2401                    // Tokio's `Semaphore::add_permits` is bounded
2402                    // by `MAX_PERMITS = usize::MAX >> 3`. A
2403                    // misbehaving caller flooding huge grants
2404                    // would eventually saturate; cap defensively.
2405                    let safe = (amount as usize).min(usize::MAX >> 4);
2406                    sem.add_permits(safe);
2407                }
2408            }
2409            _ => {}
2410        }
2411        Ok(())
2412    }
2413}
2414
2415// ============================================================================
2416// Phase B — server-side fold for client-streaming.
2417//
2418// `RpcStreamingRequestFold` mirrors `RpcServerStreamingFold` but
2419// flipped on the data-direction axis: the SERVER consumes a
2420// stream of REQUEST_CHUNK events and the handler produces ONE
2421// terminal RESPONSE (vs. the response-side fold where one REQUEST
2422// drives many RESPONSE chunks).
2423//
2424// Wire shape it handles:
2425//   DISPATCH_RPC_REQUEST       (FLAG_RPC_CLIENT_STREAMING_REQUEST)
2426//   DISPATCH_RPC_REQUEST_CHUNK (zero or more)
2427//   DISPATCH_RPC_REQUEST_CHUNK (FLAG_RPC_REQUEST_END)
2428//   DISPATCH_RPC_CANCEL        (any time; flips token + closes stream)
2429//
2430// Wire shape it EMITS (via callbacks):
2431//   DISPATCH_RPC_RESPONSE        (one terminal frame; via RpcResponseEmitter)
2432//   DISPATCH_RPC_REQUEST_GRANT   (one per consumed chunk when flow
2433//                                 control is opted in; via
2434//                                 RpcRequestGrantEmitter)
2435//
2436// Each service binds to exactly one fold shape (unary, server-
2437// streaming, or client-streaming) at `serve_rpc*` registration.
2438// A REQUEST without FLAG_RPC_CLIENT_STREAMING_REQUEST that lands
2439// on the client-streaming fold is a caller bug — the fold emits a
2440// terminal `Internal` and drops the call.
2441// ============================================================================
2442
2443/// Per-call request-direction sender map type. Keyed on
2444/// `(caller_origin_hash, call_id)`; value is the bounded mpsc
2445/// sender the fold's `apply()` pushes REQUEST_CHUNK bodies into.
2446/// The matching receiver lives inside the handler's
2447/// [`RequestStream`]; dropping the sender (on REQUEST_END or
2448/// CANCEL) closes the stream.
2449type RequestChunkSenders = Arc<Mutex<HashMap<(u64, u64), tokio::sync::mpsc::Sender<bytes::Bytes>>>>;
2450
2451/// Shared REQUEST_CHUNK handling used by both
2452/// [`RpcStreamingRequestFold`] and [`RpcDuplexFold`]. Decodes the
2453/// payload, validates the call_id agreement, looks up the per-call
2454/// sender, pushes the body (skipping the empty-body FLAG_END
2455/// terminator), and removes the sender on FLAG_END so the
2456/// handler's stream observes EOF.
2457///
2458/// `diag_tag` selects the log prefix ("client-streaming" or
2459/// "duplex") so the two call sites surface identically-shaped
2460/// diagnostics with the correct fold name. The behavior is
2461/// otherwise identical — both folds carry the same wire format
2462/// and the same per-call mpsc + sender-map contract.
2463fn apply_request_chunk_to_senders(
2464    payload_bytes: Bytes,
2465    meta: &EventMeta,
2466    senders: &RequestChunkSenders,
2467    diag_tag: &'static str,
2468) {
2469    let payload = match RpcRequestChunkPayload::decode(payload_bytes) {
2470        Ok(p) => p,
2471        Err(e) => {
2472            tracing::warn!(
2473                error = %e,
2474                caller_origin = format!("{:#x}", meta.origin_hash),
2475                call_id = meta.seq_or_ts,
2476                tag = diag_tag,
2477                "rpc server fold: malformed REQUEST_CHUNK payload",
2478            );
2479            return;
2480        }
2481    };
2482    if payload.call_id != meta.seq_or_ts {
2483        tracing::warn!(
2484            caller_origin = format!("{:#x}", meta.origin_hash),
2485            meta_call_id = meta.seq_or_ts,
2486            payload_call_id = payload.call_id,
2487            tag = diag_tag,
2488            "rpc server fold: REQUEST_CHUNK payload call_id does not match EventMeta",
2489        );
2490        return;
2491    }
2492    let key = (meta.origin_hash, meta.seq_or_ts);
2493    let is_end = payload.flags & FLAG_RPC_REQUEST_END != 0;
2494    let sender = senders.lock().get(&key).cloned();
2495    let Some(sender) = sender else {
2496        // Unknown call — either the initial REQUEST hasn't
2497        // arrived yet (out-of-order delivery is possible on the
2498        // bus) or the handler already completed and the entry is
2499        // gone. Drop silently.
2500        tracing::debug!(
2501            caller_origin = format!("{:#x}", meta.origin_hash),
2502            call_id = meta.seq_or_ts,
2503            tag = diag_tag,
2504            "rpc server fold: REQUEST_CHUNK for unknown call_id; dropping",
2505        );
2506        return;
2507    };
2508    let is_pure_terminator = is_end && payload.body.is_empty();
2509    if !is_pure_terminator && sender.try_send(payload.body).is_err() {
2510        tracing::debug!(
2511            caller_origin = format!("{:#x}", meta.origin_hash),
2512            call_id = meta.seq_or_ts,
2513            tag = diag_tag,
2514            "rpc server fold: request-chunk mpsc full or closed; dropping",
2515        );
2516    }
2517    if is_end {
2518        // Drop the sender from the map → its clone here goes out
2519        // of scope at end of function → the receiver in the
2520        // handler's RequestStream sees EOF on the next poll.
2521        senders.lock().remove(&key);
2522    }
2523}
2524
2525/// Server-side fold for client-streaming RPC. Parallel to
2526/// [`RpcServerStreamingFold`] but consumes REQUEST_CHUNK on the
2527/// input side and produces one terminal RESPONSE on the output
2528/// side (vs. one REQUEST in / many RESPONSE chunks out).
2529///
2530/// State `()` — like the other folds, application state lives in
2531/// the handler's captured `Arc<Mutex<S>>`. The fold's own state
2532/// (in-flight cancellation tokens + per-call request-chunk
2533/// senders) lives on `&mut self` via `Arc<Mutex<...>>` so spawned
2534/// handler tasks can self-clean on completion.
2535///
2536/// Bidi streaming plan (Phase B).
2537pub struct RpcStreamingRequestFold {
2538    handler: Arc<dyn RpcClientStreamingHandler>,
2539    emit: RpcResponseEmitter,
2540    /// Optional request-direction grant emitter. `Some(...)`
2541    /// when the surrounding mesh glue is wired to publish
2542    /// REQUEST_GRANT events; `None` in unit tests / contexts
2543    /// without a real publish path. When `None`, the auto-grant
2544    /// path on every `RequestStream::poll_next` becomes a no-op
2545    /// (callers that opted into flow control will see no
2546    /// refill and stall once their initial window is exhausted —
2547    /// honest behavior for a fold not wired up for grants).
2548    grant_emit: Option<RpcRequestGrantEmitter>,
2549    in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2550    senders: RequestChunkSenders,
2551    /// Optional per-service metrics handle. Same shape as the
2552    /// other folds. Reuses the response-side counters where they
2553    /// apply (handler_invocations / handler_panics / etc.) and
2554    /// would gain request-side counters (e.g.
2555    /// `streaming_request_chunks_dropped_total`) in a follow-up.
2556    metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2557}
2558
2559impl RpcStreamingRequestFold {
2560    /// Construct a client-streaming server fold. `emit` publishes
2561    /// the terminal RESPONSE on the caller's reply channel.
2562    ///
2563    /// Use the sync [`RpcResponseEmitter`] here — there's only
2564    /// one RESPONSE per call (the terminal frame), so the
2565    /// per-call serialization the async emitter buys for the
2566    /// response-side fold is not needed here.
2567    pub fn new(handler: Arc<dyn RpcClientStreamingHandler>, emit: RpcResponseEmitter) -> Self {
2568        Self {
2569            handler,
2570            emit,
2571            grant_emit: None,
2572            in_flight: Arc::new(Mutex::new(HashMap::new())),
2573            senders: Arc::new(Mutex::new(HashMap::new())),
2574            metrics: None,
2575        }
2576    }
2577
2578    /// Attach the request-direction grant emitter. Hands every
2579    /// `RequestStream::poll_next` a hook to fire one REQUEST_GRANT
2580    /// back to the caller after a chunk is consumed. Optional —
2581    /// folds constructed without it still work, callers that
2582    /// opted into flow control just won't be refilled.
2583    pub fn with_grant_emitter(mut self, grant_emit: RpcRequestGrantEmitter) -> Self {
2584        self.grant_emit = Some(grant_emit);
2585        self
2586    }
2587
2588    /// Attach a per-service metrics handle. Hooks the spawned
2589    /// handler task to bump `handler_invocations_total` /
2590    /// `handler_in_flight` / `handler_panics_total` /
2591    /// `handler_duration_*`. Symmetric with `RpcServerFold` and
2592    /// `RpcServerStreamingFold`.
2593    pub fn with_metrics(
2594        mut self,
2595        metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2596    ) -> Self {
2597        self.metrics = Some(metrics);
2598        self
2599    }
2600
2601    /// Test-only: snapshot of the in-flight call set.
2602    #[cfg(test)]
2603    pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2604        self.in_flight.lock().keys().copied().collect()
2605    }
2606
2607    /// Test-only: snapshot of the in-flight per-call senders.
2608    /// Useful for tests that need to assert a call's sender has
2609    /// been dropped after REQUEST_END / CANCEL.
2610    #[cfg(test)]
2611    pub fn sender_keys(&self) -> Vec<(u64, u64)> {
2612        self.senders.lock().keys().copied().collect()
2613    }
2614}
2615
2616impl RedexFold<()> for RpcStreamingRequestFold {
2617    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
2618        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
2619            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
2620        } else {
2621            None
2622        }) else {
2623            tracing::warn!(
2624                payload_len = ev.payload.len(),
2625                "rpc client-streaming server fold: event payload too short for EventMeta",
2626            );
2627            return Ok(());
2628        };
2629        let key = (meta.origin_hash, meta.seq_or_ts);
2630        match meta.dispatch {
2631            DISPATCH_RPC_REQUEST => {
2632                let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
2633                    Ok(p) => p,
2634                    Err(e) => {
2635                        tracing::warn!(
2636                            error = %e,
2637                            caller_origin = format!("{:#x}", meta.origin_hash),
2638                            call_id = meta.seq_or_ts,
2639                            "rpc client-streaming server fold: malformed request payload",
2640                        );
2641                        let resp = RpcResponsePayload {
2642                            status: RpcStatus::UnknownVersion,
2643                            headers: vec![],
2644                            body: Bytes::from(format!("malformed request: {e}")),
2645                        };
2646                        (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2647                        return Ok(());
2648                    }
2649                };
2650                // A REQUEST without the client-streaming flag on
2651                // this fold is a caller bug — the service was
2652                // registered as client-streaming. Refuse cleanly.
2653                if payload.flags & FLAG_RPC_CLIENT_STREAMING_REQUEST == 0 {
2654                    tracing::warn!(
2655                        caller_origin = format!("{:#x}", meta.origin_hash),
2656                        call_id = meta.seq_or_ts,
2657                        flags = format!("{:#06x}", payload.flags),
2658                        "rpc client-streaming server fold: REQUEST missing FLAG_RPC_CLIENT_STREAMING_REQUEST",
2659                    );
2660                    let resp = RpcResponsePayload {
2661                        status: RpcStatus::Internal,
2662                        headers: vec![],
2663                        body: Bytes::from_static(
2664                            b"REQUEST on a client-streaming service must set FLAG_RPC_CLIENT_STREAMING_REQUEST",
2665                        ),
2666                    };
2667                    (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2668                    return Ok(());
2669                }
2670                // Refuse a duplicate REQUEST with the same
2671                // `(origin_hash, call_id)` — same rationale as
2672                // the response-side fold: a retry that arrives
2673                // while the first attempt is still in-flight
2674                // would overwrite the prior sender and orphan the
2675                // existing handler.
2676                {
2677                    let in_flight = self.in_flight.lock();
2678                    if in_flight.contains_key(&key) {
2679                        drop(in_flight);
2680                        tracing::warn!(
2681                            caller_origin = format!("{:#x}", meta.origin_hash),
2682                            call_id = meta.seq_or_ts,
2683                            "rpc client-streaming server fold: duplicate REQUEST for in-flight call_id; refusing",
2684                        );
2685                        let resp = RpcResponsePayload {
2686                            status: RpcStatus::Internal,
2687                            headers: vec![],
2688                            body: Bytes::from_static(
2689                                b"duplicate REQUEST for already-in-flight call_id",
2690                            ),
2691                        };
2692                        (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2693                        return Ok(());
2694                    }
2695                }
2696                let cancellation = RpcCancellationToken::new();
2697                self.in_flight.lock().insert(key, cancellation.clone());
2698                // Build the per-call request-chunk mpsc. Bounded
2699                // capacity — overflow on the sender side drops the
2700                // chunk (caller can re-send or, if flow-control is
2701                // wired, will naturally not push past the credit
2702                // window).
2703                let (tx, rx) =
2704                    tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_REQUEST_PUMP_CAPACITY);
2705                // Terminator-semantics rule: an empty body
2706                // combined with FLAG_REQUEST_END is a pure
2707                // terminator — the caller's `finish()` emits it
2708                // to close the stream without yielding a phantom
2709                // empty item to the handler. A non-empty body on
2710                // a FLAG_END frame IS a final item (used by the
2711                // "single-item degenerate path": initial REQUEST
2712                // with FLAG_END + a real body sends one item +
2713                // closes in a single frame).
2714                let end_on_initial = payload.flags & FLAG_RPC_REQUEST_END != 0;
2715                let is_pure_terminator = end_on_initial && payload.body.is_empty();
2716                if !is_pure_terminator {
2717                    // Fresh `mpsc::channel(STREAMING_REQUEST_PUMP_CAPACITY)`
2718                    // with a live receiver — try_send cannot fail.
2719                    // debug_assert surfaces the invariant break in
2720                    // tests; release logs at error level rather than
2721                    // silently swallowing the first request body.
2722                    if tx.try_send(payload.body).is_err() {
2723                        debug_assert!(
2724                            false,
2725                            "fresh client-streaming request mpsc rejected initial body"
2726                        );
2727                        tracing::error!(
2728                            caller_origin = format!("{:#x}", meta.origin_hash),
2729                            call_id = meta.seq_or_ts,
2730                            "rpc client-streaming server fold: fresh mpsc rejected initial REQUEST body (invariant break)",
2731                        );
2732                    }
2733                }
2734                // If the initial REQUEST also set FLAG_REQUEST_END,
2735                // close the stream immediately — degenerate case of
2736                // "one-item upload" where the caller didn't bother
2737                // with a trailing REQUEST_CHUNK. Don't even insert
2738                // the sender into the map; just drop it here.
2739                if !end_on_initial {
2740                    self.senders.lock().insert(key, tx);
2741                }
2742                // Build the handler's context + stream. Auto-grant
2743                // is opted into when the caller set the request
2744                // window header AND the fold was wired with a
2745                // grant emitter; both must be present for grants
2746                // to actually fly.
2747                let grant_emitter = if parse_request_window_initial(&payload.headers).is_some() {
2748                    self.grant_emit.clone()
2749                } else {
2750                    None
2751                };
2752                let request_stream =
2753                    RequestStream::new(rx, grant_emitter, meta.origin_hash, meta.seq_or_ts);
2754                let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
2755                    extract_trace_context(&payload.headers)
2756                } else {
2757                    None
2758                };
2759                let deadline_ns = payload.deadline_ns;
2760                let ctx = RpcStreamingContext {
2761                    caller_origin: meta.origin_hash,
2762                    call_id: meta.seq_or_ts,
2763                    deadline_ns,
2764                    headers: payload.headers,
2765                    cancellation: cancellation.clone(),
2766                    trace_context,
2767                };
2768                let handler = self.handler.clone();
2769                let emit = self.emit.clone();
2770                let in_flight = self.in_flight.clone();
2771                let senders = self.senders.clone();
2772                let caller_origin = meta.origin_hash;
2773                let call_id = meta.seq_or_ts;
2774                let cancel_probe = cancellation.clone();
2775                let cancel_for_deadline = cancellation.clone();
2776                let metrics = self.metrics.clone();
2777                tokio::spawn(async move {
2778                    if let Some(m) = metrics.as_ref() {
2779                        m.handler_invocations_total
2780                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2781                        m.handler_in_flight
2782                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2783                    }
2784                    let handler_started = std::time::Instant::now();
2785                    // Deadline guard: if the caller declared
2786                    // `deadline_ns`, force-drop the handler future
2787                    // after it elapses so an orphaned request stream
2788                    // (caller-side network partition before
2789                    // REQUEST_END arrives) can never hang the call
2790                    // indefinitely. `deadline_ns = 0` means "no
2791                    // deadline" — caller's responsibility.
2792                    let call_fut = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
2793                        handler.call(ctx, request_stream),
2794                    ));
2795                    let outcome = if deadline_ns > 0 {
2796                        let now_ns = std::time::SystemTime::now()
2797                            .duration_since(std::time::UNIX_EPOCH)
2798                            .map(|d| d.as_nanos() as u64)
2799                            .unwrap_or(0);
2800                        let remaining = deadline_ns.saturating_sub(now_ns);
2801                        if remaining == 0 {
2802                            cancel_for_deadline.cancel();
2803                            Ok(Err(RpcHandlerError::Internal(
2804                                "handler deadline_ns already expired at spawn".to_string(),
2805                            )))
2806                        } else {
2807                            match tokio::time::timeout(
2808                                std::time::Duration::from_nanos(remaining),
2809                                call_fut,
2810                            )
2811                            .await
2812                            {
2813                                Ok(o) => o,
2814                                Err(_) => {
2815                                    cancel_for_deadline.cancel();
2816                                    Ok(Err(RpcHandlerError::Internal(
2817                                        "handler deadline_ns exceeded".to_string(),
2818                                    )))
2819                                }
2820                            }
2821                        }
2822                    } else {
2823                        call_fut.await
2824                    };
2825                    if let Some(m) = metrics.as_ref() {
2826                        m.handler_in_flight
2827                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2828                        m.record_handler_duration(handler_started.elapsed());
2829                        if outcome.is_err() {
2830                            m.handler_panics_total
2831                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2832                        }
2833                    }
2834                    // CANCEL-wins ordering: if the cancellation
2835                    // token fired during execution, override the
2836                    // handler's terminal with Cancelled.
2837                    let terminal = if cancel_probe.is_cancelled() {
2838                        RpcResponsePayload {
2839                            status: RpcStatus::Cancelled,
2840                            headers: vec![],
2841                            body: Bytes::from_static(
2842                                b"server observed CANCEL during client-streaming handler execution",
2843                            ),
2844                        }
2845                    } else {
2846                        match outcome {
2847                            Ok(Ok(resp)) => resp,
2848                            Ok(Err(RpcHandlerError::Application { code, message })) => {
2849                                RpcResponsePayload {
2850                                    status: RpcStatus::Application(code),
2851                                    headers: vec![],
2852                                    body: Bytes::from(message),
2853                                }
2854                            }
2855                            Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
2856                                status: RpcStatus::Internal,
2857                                headers: vec![],
2858                                body: Bytes::from(message),
2859                            },
2860                            Err(panic) => {
2861                                let panic_msg = panic
2862                                    .downcast_ref::<&'static str>()
2863                                    .map(|s| s.to_string())
2864                                    .or_else(|| panic.downcast_ref::<String>().cloned())
2865                                    .unwrap_or_else(|| "<non-string panic>".into());
2866                                tracing::error!(
2867                                    caller_origin = format!("{:#x}", caller_origin),
2868                                    call_id,
2869                                    panic = %panic_msg,
2870                                    "rpc client-streaming server handler panicked",
2871                                );
2872                                RpcResponsePayload {
2873                                    status: RpcStatus::Internal,
2874                                    headers: vec![],
2875                                    body: Bytes::from(format!("handler panicked: {panic_msg}")),
2876                                }
2877                            }
2878                        }
2879                    };
2880                    in_flight.lock().remove(&key);
2881                    // Drop the per-call request-chunk sender too
2882                    // (idempotent — already gone if REQUEST_END
2883                    // arrived; defensive otherwise so a handler
2884                    // that returned without consuming all chunks
2885                    // doesn't leak the entry).
2886                    senders.lock().remove(&key);
2887                    (emit)(caller_origin, call_id, terminal);
2888                });
2889            }
2890            DISPATCH_RPC_REQUEST_CHUNK => {
2891                apply_request_chunk_to_senders(
2892                    ev.payload.slice(EVENT_META_SIZE..),
2893                    &meta,
2894                    &self.senders,
2895                    "client-streaming",
2896                );
2897            }
2898            DISPATCH_RPC_CANCEL => {
2899                if let Some(token) = self.in_flight.lock().remove(&key) {
2900                    token.cancel();
2901                }
2902                // Drop the per-call sender so the handler's
2903                // RequestStream yields None on the next poll
2904                // (handler observes cancel via the token OR via
2905                // the stream's EOF; the cancel_probe in the
2906                // spawned task ensures the terminal RESPONSE is
2907                // Cancelled regardless of which the handler
2908                // checks first).
2909                self.senders.lock().remove(&key);
2910            }
2911            _ => {}
2912        }
2913        Ok(())
2914    }
2915}
2916
2917// ============================================================================
2918// Phase D — server-side fold for full duplex.
2919//
2920// `RpcDuplexFold` is the hybrid of `RpcStreamingRequestFold`
2921// (Phase B — request side) and `RpcServerStreamingFold` (existing
2922// — response side). The handler trait takes BOTH a `RequestStream`
2923// AND an `RpcResponseSink`; the fold spawns one handler task per
2924// REQUEST and one pump task per call_id, then emits a terminal
2925// RESPONSE on handler return.
2926//
2927// Wire shape it consumes:
2928//   DISPATCH_RPC_REQUEST       (FLAG_CLIENT_STREAMING_REQUEST + FLAG_STREAMING_RESPONSE)
2929//   DISPATCH_RPC_REQUEST_CHUNK (zero or more, with FLAG_REQUEST_END on the last)
2930//   DISPATCH_RPC_CANCEL        (flips token + closes both directions)
2931//
2932// Wire shape it produces:
2933//   DISPATCH_RPC_RESPONSE        (multi-fire; nrpc-streaming: continue / end)
2934//   DISPATCH_RPC_REQUEST_GRANT   (one per consumed request-chunk when flow
2935//                                 control is opted in)
2936//
2937// Bidi streaming plan (Phase D).
2938// ============================================================================
2939
2940/// Server-side fold for duplex RPC. Composes Phase B's request
2941/// stream + per-call request-chunk senders with the existing
2942/// response-side pump + multi-fire RESPONSE emit.
2943///
2944/// State `()` — same as the sibling folds.
2945///
2946/// Bidi streaming plan (Phase D).
2947pub struct RpcDuplexFold {
2948    handler: Arc<dyn RpcDuplexHandler>,
2949    /// Async emitter for response chunks (per-call ordering via
2950    /// awaited emits — same rationale as `RpcServerStreamingFold`).
2951    emit: RpcAsyncResponseEmitter,
2952    /// Optional request-direction grant emitter.
2953    grant_emit: Option<RpcRequestGrantEmitter>,
2954    in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2955    senders: RequestChunkSenders,
2956    metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2957}
2958
2959impl RpcDuplexFold {
2960    /// Construct a duplex server fold. `emit` publishes individual
2961    /// response chunks AND the terminal frame on the caller's
2962    /// reply channel (uses the async emitter for per-call
2963    /// ordering, same as `RpcServerStreamingFold`).
2964    pub fn new(handler: Arc<dyn RpcDuplexHandler>, emit: RpcAsyncResponseEmitter) -> Self {
2965        Self {
2966            handler,
2967            emit,
2968            grant_emit: None,
2969            in_flight: Arc::new(Mutex::new(HashMap::new())),
2970            senders: Arc::new(Mutex::new(HashMap::new())),
2971            metrics: None,
2972        }
2973    }
2974
2975    /// Attach the request-direction grant emitter. See
2976    /// [`RpcStreamingRequestFold::with_grant_emitter`] for the
2977    /// auto-grant behavior. When unset, callers that opted into
2978    /// flow control simply won't get refilled.
2979    pub fn with_grant_emitter(mut self, grant_emit: RpcRequestGrantEmitter) -> Self {
2980        self.grant_emit = Some(grant_emit);
2981        self
2982    }
2983
2984    /// Attach a per-service metrics handle. Bumps
2985    /// handler_invocations / handler_in_flight / handler_panics /
2986    /// handler_duration_* + the response pump's
2987    /// streaming_chunks_emitted_total per emitted chunk.
2988    pub fn with_metrics(
2989        mut self,
2990        metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2991    ) -> Self {
2992        self.metrics = Some(metrics);
2993        self
2994    }
2995
2996    /// Test-only: snapshot of the in-flight call set.
2997    #[cfg(test)]
2998    pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2999        self.in_flight.lock().keys().copied().collect()
3000    }
3001
3002    /// Test-only: snapshot of the in-flight per-call senders.
3003    #[cfg(test)]
3004    pub fn sender_keys(&self) -> Vec<(u64, u64)> {
3005        self.senders.lock().keys().copied().collect()
3006    }
3007}
3008
3009impl RedexFold<()> for RpcDuplexFold {
3010    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
3011        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3012            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3013        } else {
3014            None
3015        }) else {
3016            tracing::warn!(
3017                payload_len = ev.payload.len(),
3018                "rpc duplex server fold: event payload too short for EventMeta",
3019            );
3020            return Ok(());
3021        };
3022        let key = (meta.origin_hash, meta.seq_or_ts);
3023        match meta.dispatch {
3024            DISPATCH_RPC_REQUEST => {
3025                let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3026                    Ok(p) => p,
3027                    Err(e) => {
3028                        tracing::warn!(
3029                            error = %e,
3030                            caller_origin = format!("{:#x}", meta.origin_hash),
3031                            call_id = meta.seq_or_ts,
3032                            "rpc duplex server fold: malformed request payload",
3033                        );
3034                        let resp = RpcResponsePayload {
3035                            status: RpcStatus::UnknownVersion,
3036                            headers: vec![(
3037                                HEADER_NRPC_STREAMING.to_string(),
3038                                HEADER_NRPC_STREAMING_END.to_vec(),
3039                            )],
3040                            body: Bytes::from(format!("malformed request: {e}")),
3041                        };
3042                        let emit = self.emit.clone();
3043                        let caller_origin = meta.origin_hash;
3044                        let call_id = meta.seq_or_ts;
3045                        tokio::spawn(async move {
3046                            emit(caller_origin, call_id, resp).await;
3047                        });
3048                        return Ok(());
3049                    }
3050                };
3051                // Caller-bug guard: a duplex REQUEST must set
3052                // BOTH the client-streaming flag (we'll receive
3053                // request chunks) AND the streaming-response flag
3054                // (we'll emit response chunks). Missing flags →
3055                // refuse cleanly.
3056                let required = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
3057                if payload.flags & required != required {
3058                    tracing::warn!(
3059                        caller_origin = format!("{:#x}", meta.origin_hash),
3060                        call_id = meta.seq_or_ts,
3061                        flags = format!("{:#06x}", payload.flags),
3062                        "rpc duplex server fold: REQUEST missing required flags",
3063                    );
3064                    let resp = RpcResponsePayload {
3065                        status: RpcStatus::Internal,
3066                        headers: vec![(
3067                            HEADER_NRPC_STREAMING.to_string(),
3068                            HEADER_NRPC_STREAMING_END.to_vec(),
3069                        )],
3070                        body: Bytes::from_static(
3071                            b"REQUEST on a duplex service must set FLAG_RPC_CLIENT_STREAMING_REQUEST and FLAG_RPC_STREAMING_RESPONSE",
3072                        ),
3073                    };
3074                    let emit = self.emit.clone();
3075                    let caller_origin = meta.origin_hash;
3076                    let call_id = meta.seq_or_ts;
3077                    tokio::spawn(async move {
3078                        emit(caller_origin, call_id, resp).await;
3079                    });
3080                    return Ok(());
3081                }
3082                // Duplicate-REQUEST refusal.
3083                {
3084                    let in_flight = self.in_flight.lock();
3085                    if in_flight.contains_key(&key) {
3086                        drop(in_flight);
3087                        tracing::warn!(
3088                            caller_origin = format!("{:#x}", meta.origin_hash),
3089                            call_id = meta.seq_or_ts,
3090                            "rpc duplex server fold: duplicate REQUEST for in-flight call_id; refusing",
3091                        );
3092                        let resp = RpcResponsePayload {
3093                            status: RpcStatus::Internal,
3094                            headers: vec![(
3095                                HEADER_NRPC_STREAMING.to_string(),
3096                                HEADER_NRPC_STREAMING_END.to_vec(),
3097                            )],
3098                            body: Bytes::from_static(
3099                                b"duplicate REQUEST for already-in-flight call_id",
3100                            ),
3101                        };
3102                        let emit = self.emit.clone();
3103                        let caller_origin = meta.origin_hash;
3104                        let call_id = meta.seq_or_ts;
3105                        tokio::spawn(async move {
3106                            emit(caller_origin, call_id, resp).await;
3107                        });
3108                        return Ok(());
3109                    }
3110                }
3111                let cancellation = RpcCancellationToken::new();
3112                self.in_flight.lock().insert(key, cancellation.clone());
3113
3114                // Build per-call request-side mpsc (Phase B
3115                // pattern).
3116                let (req_tx, req_rx) =
3117                    tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_REQUEST_PUMP_CAPACITY);
3118                let end_on_initial = payload.flags & FLAG_RPC_REQUEST_END != 0;
3119                let is_pure_terminator = end_on_initial && payload.body.is_empty();
3120                if !is_pure_terminator {
3121                    // Same invariant as the client-streaming fold:
3122                    // fresh bounded mpsc with a live receiver cannot
3123                    // reject the first send.
3124                    if req_tx.try_send(payload.body).is_err() {
3125                        debug_assert!(false, "fresh duplex request mpsc rejected initial body");
3126                        tracing::error!(
3127                            caller_origin = format!("{:#x}", meta.origin_hash),
3128                            call_id = meta.seq_or_ts,
3129                            "rpc duplex server fold: fresh mpsc rejected initial REQUEST body (invariant break)",
3130                        );
3131                    }
3132                }
3133                if !end_on_initial {
3134                    self.senders.lock().insert(key, req_tx);
3135                }
3136                // Hand the handler an auto-granting RequestStream
3137                // when the caller opted into request-direction
3138                // flow control AND the fold was wired with a
3139                // grant emitter.
3140                let grant_emitter = if parse_request_window_initial(&payload.headers).is_some() {
3141                    self.grant_emit.clone()
3142                } else {
3143                    None
3144                };
3145                let request_stream =
3146                    RequestStream::new(req_rx, grant_emitter, meta.origin_hash, meta.seq_or_ts);
3147
3148                // Build the per-call response-side mpsc (existing
3149                // server-streaming-response pattern). The handler
3150                // writes chunks to the sink; the pump task drains
3151                // the receiver and publishes RESPONSE events.
3152                let (resp_tx, mut resp_rx) =
3153                    tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_PUMP_CAPACITY);
3154                let response_sink = RpcResponseSink {
3155                    inner: resp_tx,
3156                    metrics: self.metrics.clone(),
3157                };
3158
3159                let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
3160                    extract_trace_context(&payload.headers)
3161                } else {
3162                    None
3163                };
3164                let deadline_ns = payload.deadline_ns;
3165                let ctx = RpcStreamingContext {
3166                    caller_origin: meta.origin_hash,
3167                    call_id: meta.seq_or_ts,
3168                    deadline_ns,
3169                    headers: payload.headers,
3170                    cancellation: cancellation.clone(),
3171                    trace_context,
3172                };
3173                let handler = self.handler.clone();
3174                let emit = self.emit.clone();
3175                let in_flight = self.in_flight.clone();
3176                let senders = self.senders.clone();
3177                let caller_origin = meta.origin_hash;
3178                let call_id = meta.seq_or_ts;
3179                let cancel_probe = cancellation.clone();
3180                let cancel_for_deadline = cancellation.clone();
3181                let metrics = self.metrics.clone();
3182
3183                // Pump: drains resp_rx, emits per-chunk RESPONSE
3184                // events with `nrpc-streaming: continue`.
3185                let pump_emit = emit.clone();
3186                let pump_metrics = metrics.clone();
3187                let pump = tokio::spawn(async move {
3188                    while let Some(chunk) = resp_rx.recv().await {
3189                        if let Some(m) = pump_metrics.as_ref() {
3190                            m.streaming_chunks_emitted_total
3191                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3192                        }
3193                        let resp = RpcResponsePayload {
3194                            status: RpcStatus::Ok,
3195                            headers: vec![(
3196                                HEADER_NRPC_STREAMING.to_string(),
3197                                HEADER_NRPC_STREAMING_CONTINUE.to_vec(),
3198                            )],
3199                            body: chunk.clone(),
3200                        };
3201                        pump_emit(caller_origin, call_id, resp).await;
3202                    }
3203                });
3204
3205                tokio::spawn(async move {
3206                    if let Some(m) = metrics.as_ref() {
3207                        m.handler_invocations_total
3208                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3209                        m.handler_in_flight
3210                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3211                    }
3212                    let handler_started = std::time::Instant::now();
3213                    // Same deadline guard as the client-streaming
3214                    // fold: force-drop the handler future at
3215                    // deadline_ns so an orphaned request stream
3216                    // can't hang the call. `0` means no deadline.
3217                    let call_fut = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
3218                        handler.call(ctx, request_stream, response_sink),
3219                    ));
3220                    let outcome = if deadline_ns > 0 {
3221                        let now_ns = std::time::SystemTime::now()
3222                            .duration_since(std::time::UNIX_EPOCH)
3223                            .map(|d| d.as_nanos() as u64)
3224                            .unwrap_or(0);
3225                        let remaining = deadline_ns.saturating_sub(now_ns);
3226                        if remaining == 0 {
3227                            cancel_for_deadline.cancel();
3228                            Ok(Err(RpcHandlerError::Internal(
3229                                "duplex handler deadline_ns already expired at spawn".to_string(),
3230                            )))
3231                        } else {
3232                            match tokio::time::timeout(
3233                                std::time::Duration::from_nanos(remaining),
3234                                call_fut,
3235                            )
3236                            .await
3237                            {
3238                                Ok(o) => o,
3239                                Err(_) => {
3240                                    cancel_for_deadline.cancel();
3241                                    Ok(Err(RpcHandlerError::Internal(
3242                                        "duplex handler deadline_ns exceeded".to_string(),
3243                                    )))
3244                                }
3245                            }
3246                        }
3247                    } else {
3248                        call_fut.await
3249                    };
3250                    // Handler dropped the sink — let the pump
3251                    // drain any final in-flight chunks before we
3252                    // emit the terminal frame.
3253                    let _ = pump.await;
3254                    if let Some(m) = metrics.as_ref() {
3255                        m.handler_in_flight
3256                            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
3257                        m.record_handler_duration(handler_started.elapsed());
3258                        if outcome.is_err() {
3259                            m.handler_panics_total
3260                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3261                        }
3262                    }
3263                    let terminal = if cancel_probe.is_cancelled() {
3264                        RpcResponsePayload {
3265                            status: RpcStatus::Cancelled,
3266                            headers: vec![],
3267                            body: Bytes::from_static(
3268                                b"server observed CANCEL during duplex handler execution",
3269                            ),
3270                        }
3271                    } else {
3272                        match outcome {
3273                            Ok(Ok(())) => RpcResponsePayload {
3274                                status: RpcStatus::Ok,
3275                                headers: vec![(
3276                                    HEADER_NRPC_STREAMING.to_string(),
3277                                    HEADER_NRPC_STREAMING_END.to_vec(),
3278                                )],
3279                                body: Bytes::new(),
3280                            },
3281                            Ok(Err(RpcHandlerError::Application { code, message })) => {
3282                                RpcResponsePayload {
3283                                    status: RpcStatus::Application(code),
3284                                    headers: vec![],
3285                                    body: Bytes::from(message),
3286                                }
3287                            }
3288                            Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
3289                                status: RpcStatus::Internal,
3290                                headers: vec![],
3291                                body: Bytes::from(message),
3292                            },
3293                            Err(panic) => {
3294                                let panic_msg = panic
3295                                    .downcast_ref::<&'static str>()
3296                                    .map(|s| s.to_string())
3297                                    .or_else(|| panic.downcast_ref::<String>().cloned())
3298                                    .unwrap_or_else(|| "<non-string panic>".into());
3299                                tracing::error!(
3300                                    caller_origin = format!("{:#x}", caller_origin),
3301                                    call_id,
3302                                    panic = %panic_msg,
3303                                    "rpc duplex server handler panicked",
3304                                );
3305                                RpcResponsePayload {
3306                                    status: RpcStatus::Internal,
3307                                    headers: vec![],
3308                                    body: Bytes::from(format!("handler panicked: {panic_msg}")),
3309                                }
3310                            }
3311                        }
3312                    };
3313                    in_flight.lock().remove(&key);
3314                    senders.lock().remove(&key);
3315                    emit(caller_origin, call_id, terminal).await;
3316                });
3317            }
3318            DISPATCH_RPC_REQUEST_CHUNK => {
3319                apply_request_chunk_to_senders(
3320                    ev.payload.slice(EVENT_META_SIZE..),
3321                    &meta,
3322                    &self.senders,
3323                    "duplex",
3324                );
3325            }
3326            DISPATCH_RPC_CANCEL => {
3327                if let Some(token) = self.in_flight.lock().remove(&key) {
3328                    token.cancel();
3329                }
3330                self.senders.lock().remove(&key);
3331            }
3332            _ => {}
3333        }
3334        Ok(())
3335    }
3336}
3337
3338// ============================================================================
3339// Client-side fold.
3340//
3341// `RpcClientFold` is the symmetric companion of `RpcServerFold`.
3342// It sees RESPONSE events on the caller's reply channel
3343// (`<service>.replies.<self_origin>`) and routes each one to the
3344// matching call's awaiting `oneshot::Receiver` keyed on `call_id`
3345// (the `EventMeta::seq_or_ts`).
3346//
3347// The fold's mutable state (the pending-senders map) is shared
3348// with the `Mesh::call` API via a clone of the same Arc — so the
3349// publisher side can `register(call_id)` to stage a receiver
3350// before publishing the REQUEST, and the fold side can `deliver`
3351// when the matching RESPONSE arrives.
3352// ============================================================================
3353
3354/// One pending entry — unary oneshot, server-streaming mpsc, or
3355/// client-streaming (one terminal oneshot + a separate grant
3356/// mpsc). The fold dispatches to the right variant based on
3357/// what's registered for the `call_id`.
3358enum PendingEntry {
3359    /// Unary call — exactly one RESPONSE expected. Completes the
3360    /// oneshot with the decoded payload.
3361    Unary(tokio::sync::oneshot::Sender<RpcResponsePayload>),
3362    /// Server-streaming call — multiple non-terminal `Continue`
3363    /// chunks followed by one terminal frame. Each non-terminal
3364    /// chunk pushes a `StreamItem::Chunk(body)` onto the mpsc;
3365    /// the terminal frame pushes `StreamItem::End` (Ok) or
3366    /// `StreamItem::Error(payload)` (non-Ok status) and the
3367    /// pending entry is removed.
3368    Streaming(tokio::sync::mpsc::UnboundedSender<StreamItem>),
3369    /// Client-streaming or duplex call. Two sender halves:
3370    ///
3371    /// - `terminal_tx`: oneshot that completes when the server's
3372    ///   single terminal RESPONSE arrives. Response shape and
3373    ///   delivery semantics are identical to the unary variant —
3374    ///   the caller awaits one payload, success or failure status.
3375    /// - `grant_tx`: mpsc that ferries REQUEST_GRANT credit values
3376    ///   from the client fold to the caller's send sink. Each
3377    ///   `DISPATCH_RPC_REQUEST_GRANT` event for this call_id
3378    ///   pushes one `u32` credit onto the mpsc; the caller's send
3379    ///   sink consumes credits to gate `send(...).await`.
3380    ///
3381    /// Bidi streaming plan (Phase C). Used for pure client-
3382    /// streaming (one terminal RESPONSE closes the call). Duplex
3383    /// calls use the [`PendingEntry::Duplex`] variant instead,
3384    /// since they receive many response chunks rather than one
3385    /// terminal payload.
3386    ClientStreaming {
3387        terminal_tx: tokio::sync::oneshot::Sender<RpcResponsePayload>,
3388        grant_tx: tokio::sync::mpsc::UnboundedSender<u32>,
3389    },
3390    /// Duplex call — many request chunks out, many response
3391    /// chunks in. Two senders, same shape as `ClientStreaming`
3392    /// except the terminal slot is an mpsc instead of a oneshot
3393    /// because the response side is multi-chunk (terminator is
3394    /// implicit in `StreamItem::End` / `StreamItem::Error` on
3395    /// the chunks_tx mpsc, same as `PendingEntry::Streaming`).
3396    ///
3397    /// - `chunks_tx`: response-chunk mpsc — fed by `deliver`
3398    ///   when RESPONSE events arrive on the reply channel.
3399    ///   `StreamItem::Chunk` for non-terminal, `StreamItem::End`
3400    ///   / `StreamItem::Error` terminates and removes the entry.
3401    /// - `grant_tx`: request-direction credit mpsc — fed by
3402    ///   `deliver_grant` when REQUEST_GRANT events arrive.
3403    ///
3404    /// Bidi streaming plan (Phase D).
3405    Duplex {
3406        chunks_tx: tokio::sync::mpsc::UnboundedSender<StreamItem>,
3407        grant_tx: tokio::sync::mpsc::UnboundedSender<u32>,
3408    },
3409}
3410
3411/// One item delivered to a streaming caller. The caller's
3412/// `RpcStream` translates these into `Stream::Item =
3413/// Result<Bytes, RpcError>` plus stream termination.
3414#[derive(Debug, Clone)]
3415pub enum StreamItem {
3416    /// Non-terminal chunk — a body slice from the server.
3417    Chunk(bytes::Bytes),
3418    /// Terminal frame, server signaled clean stream end.
3419    End,
3420    /// Terminal frame with a non-`Ok` status. Body is the
3421    /// server's diagnostic; status is the wire `RpcStatus` value.
3422    Error(RpcResponsePayload),
3423}
3424
3425/// Shared pending-call state. Held by both the `RpcClientFold`
3426/// (writer side: completes oneshot senders / pushes streaming
3427/// chunks on RESPONSE arrival) and the `Mesh::call*` APIs (reader
3428/// side: registers entries before publishing the REQUEST).
3429/// Concurrent access is mediated by `DashMap`.
3430///
3431/// Multiplexes unary AND streaming calls in a single map keyed
3432/// on `call_id` — the entry's enum variant tells the fold how
3433/// to dispatch incoming RESPONSE events.
3434pub struct RpcClientPending {
3435    /// Map keyed on `call_id`, value carries `(expected_target,
3436    /// PendingEntry)`. `expected_target` is the `NodeId` of the
3437    /// peer the request was dispatched to; `deliver` rejects
3438    /// frames whose wire `from_node` doesn't match. A
3439    /// `expected_target == 0` entry opts out of the binding
3440    /// (loopback tests + paths with no session).
3441    senders: dashmap::DashMap<u64, (super::super::behavior::placement::NodeId, PendingEntry)>,
3442}
3443
3444impl RpcClientPending {
3445    /// Construct an empty pending-call store.
3446    pub fn new() -> Self {
3447        Self {
3448            senders: dashmap::DashMap::new(),
3449        }
3450    }
3451
3452    /// Register a oneshot for a unary `call_id`. Returns the
3453    /// receiver the caller awaits. The caller MUST publish the
3454    /// REQUEST after registration (and not before) so the
3455    /// matching RESPONSE can't arrive while the pending entry is
3456    /// missing.
3457    ///
3458    /// `target_node` is the wire-session peer the request will
3459    /// be sent to; `deliver` rejects RESPONSE frames whose
3460    /// `from_node` doesn't match. Pass `0` for loopback / no-
3461    /// session test paths to opt out of the binding gate.
3462    ///
3463    /// If a sender already exists for `call_id` (improperly reused
3464    /// id), it is replaced and the old receiver gets a
3465    /// `RecvError::Closed` — surfacing the misuse as a hard error
3466    /// at the caller rather than silently delivering the response
3467    /// to the wrong waiter.
3468    pub fn register(
3469        &self,
3470        call_id: u64,
3471        target_node: super::super::behavior::placement::NodeId,
3472    ) -> tokio::sync::oneshot::Receiver<RpcResponsePayload> {
3473        let (tx, rx) = tokio::sync::oneshot::channel();
3474        self.senders
3475            .insert(call_id, (target_node, PendingEntry::Unary(tx)));
3476        rx
3477    }
3478
3479    /// Register a streaming entry for `call_id`. Returns the
3480    /// receive end of an mpsc the fold will push chunks onto.
3481    /// Same registration ordering rules as `register` —
3482    /// publisher must call this BEFORE publishing the REQUEST.
3483    pub fn register_streaming(
3484        &self,
3485        call_id: u64,
3486        target_node: super::super::behavior::placement::NodeId,
3487    ) -> tokio::sync::mpsc::UnboundedReceiver<StreamItem> {
3488        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
3489        self.senders
3490            .insert(call_id, (target_node, PendingEntry::Streaming(tx)));
3491        rx
3492    }
3493
3494    /// Register a client-streaming (or duplex) entry for
3495    /// `call_id`. Returns BOTH the terminal-response receiver
3496    /// (the caller awaits on this for the single terminal
3497    /// RESPONSE that ends the call) AND a grant receiver (the
3498    /// caller's send sink consumes this to gate `send().await`
3499    /// when the caller opted into request-direction flow
3500    /// control).
3501    ///
3502    /// Same registration ordering rules as `register` /
3503    /// `register_streaming` — publisher must call this BEFORE
3504    /// publishing the REQUEST so a fast server's RESPONSE /
3505    /// REQUEST_GRANT can't arrive while no pending entry exists.
3506    ///
3507    /// Bidi streaming plan (Phase C).
3508    pub fn register_client_streaming(
3509        &self,
3510        call_id: u64,
3511        target_node: super::super::behavior::placement::NodeId,
3512    ) -> (
3513        tokio::sync::oneshot::Receiver<RpcResponsePayload>,
3514        tokio::sync::mpsc::UnboundedReceiver<u32>,
3515    ) {
3516        let (terminal_tx, terminal_rx) = tokio::sync::oneshot::channel();
3517        let (grant_tx, grant_rx) = tokio::sync::mpsc::unbounded_channel();
3518        self.senders.insert(
3519            call_id,
3520            (
3521                target_node,
3522                PendingEntry::ClientStreaming {
3523                    terminal_tx,
3524                    grant_tx,
3525                },
3526            ),
3527        );
3528        (terminal_rx, grant_rx)
3529    }
3530
3531    /// Register a duplex entry for `call_id`. Returns BOTH a
3532    /// response-chunk receiver (yields `StreamItem` per inbound
3533    /// RESPONSE chunk; terminator is `End` / `Error`) AND a
3534    /// grant receiver (yields `u32` credits per inbound
3535    /// REQUEST_GRANT).
3536    ///
3537    /// Same registration ordering rules as the other `register_*`
3538    /// methods: publisher must call this BEFORE publishing the
3539    /// REQUEST so the server's response chunks / grants can't
3540    /// arrive while no pending entry exists.
3541    ///
3542    /// Bidi streaming plan (Phase D).
3543    pub fn register_duplex(
3544        &self,
3545        call_id: u64,
3546        target_node: super::super::behavior::placement::NodeId,
3547    ) -> (
3548        tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
3549        tokio::sync::mpsc::UnboundedReceiver<u32>,
3550    ) {
3551        let (chunks_tx, chunks_rx) = tokio::sync::mpsc::unbounded_channel();
3552        let (grant_tx, grant_rx) = tokio::sync::mpsc::unbounded_channel();
3553        self.senders.insert(
3554            call_id,
3555            (
3556                target_node,
3557                PendingEntry::Duplex {
3558                    chunks_tx,
3559                    grant_tx,
3560                },
3561            ),
3562        );
3563        (chunks_rx, grant_rx)
3564    }
3565
3566    /// Drop the pending entry for `call_id`. Called by the
3567    /// caller-side cancellation path (e.g. `Mesh::call`'s future
3568    /// being dropped, the stream being dropped, or a deadline
3569    /// timer firing). The matching RESPONSE(s) that may still
3570    /// arrive afterwards are silently discarded by `deliver`.
3571    pub fn cancel(&self, call_id: u64) {
3572        self.senders.remove(&call_id);
3573    }
3574
3575    /// Deliver `resp` to the waiter for `call_id`, if any.
3576    ///
3577    /// `from_node` is the wire-session peer of the inbound
3578    /// RESPONSE. If the pending entry's recorded `target_node`
3579    /// is non-zero and does not match `from_node`, the frame is
3580    /// dropped with a trace log and the pending entry stays
3581    /// intact — a forged response on a shared reply channel
3582    /// can't resolve a victim's call. A recorded `target_node
3583    /// == 0` opts the call out of the binding (loopback paths).
3584    ///
3585    /// For a unary entry: completes the oneshot and removes the
3586    /// entry.
3587    ///
3588    /// For a streaming entry: examines the response's headers to
3589    /// decide whether it's a non-terminal chunk (`Continue` —
3590    /// push `StreamItem::Chunk`, keep the entry) or terminal
3591    /// (`End` / non-`Ok` — push `StreamItem::End` or `Error`,
3592    /// remove the entry).
3593    ///
3594    /// Idempotent on subsequent deliveries to a removed entry.
3595    fn deliver(
3596        &self,
3597        call_id: u64,
3598        from_node: super::super::behavior::placement::NodeId,
3599        resp: RpcResponsePayload,
3600    ) {
3601        // Look up the entry — but DON'T remove it yet, because for
3602        // streaming we may want to keep it for non-terminal chunks.
3603        // The remove decision is per-variant.
3604        let entry = self.senders.get(&call_id);
3605        let Some(entry) = entry else { return };
3606        // S-4 part 2 gate. The pending registry binds each call
3607        // to the AEAD-verified `target_node` the request was
3608        // dispatched to; any other session peer publishing on the
3609        // shared reply channel with a guessed call_id is dropped
3610        // here without touching the waiter. `0` opts out — used
3611        // by loopback paths that have no session peer.
3612        let (target_node, _entry_value) = entry.value();
3613        if *target_node != 0 && *target_node != from_node {
3614            tracing::trace!(
3615                call_id,
3616                from_node,
3617                expected = *target_node,
3618                "rpc client: dropping RESPONSE from non-target session peer"
3619            );
3620            return;
3621        }
3622        match entry.value() {
3623            (_, PendingEntry::Unary(_)) => {
3624                drop(entry);
3625                if let Some((_, (_, PendingEntry::Unary(tx)))) = self.senders.remove(&call_id) {
3626                    let _ = tx.send(resp);
3627                }
3628            }
3629            (_, PendingEntry::ClientStreaming { .. }) => {
3630                // Terminal RESPONSE for a client-streaming /
3631                // duplex call. Same delivery shape as Unary —
3632                // complete the oneshot, remove the entry. The
3633                // grant_tx half drops with the entry, which is
3634                // fine (no more grants will arrive after the
3635                // terminal frame).
3636                drop(entry);
3637                if let Some((
3638                    _,
3639                    (
3640                        _,
3641                        PendingEntry::ClientStreaming {
3642                            terminal_tx,
3643                            grant_tx: _,
3644                        },
3645                    ),
3646                )) = self.senders.remove(&call_id)
3647                {
3648                    let _ = terminal_tx.send(resp);
3649                }
3650            }
3651            (_, PendingEntry::Streaming(tx)) => {
3652                let tx = tx.clone();
3653                drop(entry);
3654                self.dispatch_streaming_chunk(&tx, resp, call_id);
3655            }
3656            (_, PendingEntry::Duplex { chunks_tx, .. }) => {
3657                // Same dispatch logic as Streaming — duplex
3658                // response side IS a multi-chunk stream.
3659                let tx = chunks_tx.clone();
3660                drop(entry);
3661                self.dispatch_streaming_chunk(&tx, resp, call_id);
3662            }
3663        }
3664    }
3665
3666    /// Shared response-chunk dispatch used by both
3667    /// `PendingEntry::Streaming` and `PendingEntry::Duplex`. The
3668    /// caller has already verified the target-binding gate and
3669    /// dropped its `entry` ref; this helper does the classify-
3670    /// and-push and removes the entry from the senders map on
3671    /// terminal frames.
3672    fn dispatch_streaming_chunk(
3673        &self,
3674        tx: &tokio::sync::mpsc::UnboundedSender<StreamItem>,
3675        resp: RpcResponsePayload,
3676        call_id: u64,
3677    ) {
3678        let kind = classify_streaming_chunk(&resp);
3679        match kind {
3680            StreamingChunkKind::Continue => {
3681                let _ = tx.send(StreamItem::Chunk(resp.body));
3682            }
3683            StreamingChunkKind::Terminal => {
3684                let item = if resp.status.is_ok() {
3685                    if !resp.body.is_empty() {
3686                        let _ = tx.send(StreamItem::Chunk(resp.body));
3687                    }
3688                    StreamItem::End
3689                } else {
3690                    StreamItem::Error(resp)
3691                };
3692                let _ = tx.send(item);
3693                self.senders.remove(&call_id);
3694            }
3695            StreamingChunkKind::Unary => {
3696                tracing::warn!(
3697                    call_id,
3698                    body_len = resp.body.len(),
3699                    "rpc client: streaming / duplex consumer received unary-shaped \
3700                     response (no nrpc-streaming header); server may have bridged a \
3701                     unary path. Bridging to single-chunk + EOF.",
3702                );
3703                if !resp.body.is_empty() {
3704                    let _ = tx.send(StreamItem::Chunk(resp.body));
3705                }
3706                let _ = tx.send(StreamItem::End);
3707                self.senders.remove(&call_id);
3708            }
3709        }
3710    }
3711
3712    /// Deliver a request-direction grant credit to the waiter
3713    /// for `call_id`, if it's a client-streaming / duplex entry.
3714    /// Silently no-op for unknown call_ids, for unary entries
3715    /// (caller bug — grant for a unary call makes no sense),
3716    /// and for server-streaming entries (grants apply only to
3717    /// the upload direction).
3718    ///
3719    /// `from_node` is gated by the same target-binding check
3720    /// as `deliver`: a grant from a non-target session peer is
3721    /// dropped (a forged grant on a shared reply channel can't
3722    /// inject credit into a victim's call).
3723    ///
3724    /// Bidi streaming plan (Phase C).
3725    fn deliver_grant(
3726        &self,
3727        call_id: u64,
3728        from_node: super::super::behavior::placement::NodeId,
3729        credits: u32,
3730    ) {
3731        let entry = self.senders.get(&call_id);
3732        let Some(entry) = entry else { return };
3733        let (target_node, _entry_value) = entry.value();
3734        if *target_node != 0 && *target_node != from_node {
3735            tracing::trace!(
3736                call_id,
3737                from_node,
3738                expected = *target_node,
3739                "rpc client: dropping REQUEST_GRANT from non-target session peer"
3740            );
3741            return;
3742        }
3743        match entry.value() {
3744            (_, PendingEntry::ClientStreaming { grant_tx, .. })
3745            | (_, PendingEntry::Duplex { grant_tx, .. }) => {
3746                let _ = grant_tx.send(credits);
3747            }
3748            // Unary / Streaming entries silently ignore — see
3749            // method docs for the rationale.
3750            _ => {}
3751        }
3752    }
3753
3754    /// Test-only: how many pending calls are registered. Used by
3755    /// integration tests to confirm cleanup after happy-path / cancel.
3756    #[cfg(test)]
3757    pub fn pending_count(&self) -> usize {
3758        self.senders.len()
3759    }
3760}
3761
3762impl Default for RpcClientPending {
3763    fn default() -> Self {
3764        Self::new()
3765    }
3766}
3767
3768/// Client-side fold. Decodes RESPONSE events and routes them to
3769/// awaiting oneshots in the shared [`RpcClientPending`].
3770///
3771/// `Mesh::call` clones the same `Arc<RpcClientPending>` to register
3772/// oneshots before publishing REQUESTs.
3773pub struct RpcClientFold {
3774    pending: Arc<RpcClientPending>,
3775}
3776
3777impl RpcClientFold {
3778    /// Construct a client fold that delivers responses through
3779    /// `pending`. Typical pattern:
3780    ///
3781    /// ```ignore
3782    /// let pending = Arc::new(RpcClientPending::new());
3783    /// let fold = RpcClientFold::new(pending.clone());
3784    /// let adapter = CortexAdapter::open(..., fold, ())?;
3785    /// // `pending` is still usable for register / cancel.
3786    /// ```
3787    pub fn new(pending: Arc<RpcClientPending>) -> Self {
3788        Self { pending }
3789    }
3790
3791    /// Production-path entry point. Mesh dispatch calls this with
3792    /// the AEAD-verified session peer's `NodeId` in
3793    /// `ev.from_node`; the pending registry's S-4 binding gate
3794    /// uses it to reject responses from the wrong target.
3795    pub fn apply_inbound(&mut self, ev: &RpcInboundEvent) {
3796        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3797            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3798        } else {
3799            None
3800        }) else {
3801            tracing::warn!(
3802                payload_len = ev.payload.len(),
3803                "rpc client fold: event payload too short for EventMeta; skipping",
3804            );
3805            return;
3806        };
3807        match meta.dispatch {
3808            DISPATCH_RPC_RESPONSE => {
3809                match RpcResponsePayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3810                    Ok(resp) => self.pending.deliver(meta.seq_or_ts, ev.from_node, resp),
3811                    Err(e) => {
3812                        tracing::warn!(
3813                            error = %e,
3814                            call_id = meta.seq_or_ts,
3815                            "rpc client fold: malformed response payload",
3816                        );
3817                    }
3818                }
3819            }
3820            DISPATCH_RPC_REQUEST_GRANT => {
3821                // Server granted upload credit for a
3822                // client-streaming / duplex call. Route it to the
3823                // matching pending entry's grant mpsc; non-client-
3824                // streaming entries silently ignore (see
3825                // RpcClientPending::deliver_grant docs).
3826                match decode_request_grant(&ev.payload[EVENT_META_SIZE..]) {
3827                    Some(grant) => {
3828                        // The payload's `call_id` MUST agree with
3829                        // the EventMeta's `seq_or_ts`: producer
3830                        // encodes both to the same value (see
3831                        // `RpcRequestGrantPayload::call_id` docs).
3832                        // If they disagree, the frame is malformed
3833                        // or forged — drop it. Otherwise a peer
3834                        // could publish a GRANT whose meta names
3835                        // one call but whose payload credits a
3836                        // different in-flight call_id.
3837                        if grant.call_id != meta.seq_or_ts {
3838                            tracing::debug!(
3839                                meta_call_id = meta.seq_or_ts,
3840                                payload_call_id = grant.call_id,
3841                                "rpc client fold: REQUEST_GRANT meta/payload call_id mismatch; dropping",
3842                            );
3843                            return;
3844                        }
3845                        if grant.credits == 0 {
3846                            return;
3847                        }
3848                        self.pending
3849                            .deliver_grant(grant.call_id, ev.from_node, grant.credits);
3850                    }
3851                    None => {
3852                        tracing::debug!(
3853                            call_id = meta.seq_or_ts,
3854                            "rpc client fold: malformed REQUEST_GRANT payload"
3855                        );
3856                    }
3857                }
3858            }
3859            _ => {
3860                // Unknown / unexpected dispatch on the reply
3861                // channel — ignore (a misconfigured publisher
3862                // shouldn't take down the fold).
3863            }
3864        }
3865    }
3866}
3867
3868impl RedexFold<()> for RpcClientFold {
3869    /// Legacy entry point used by loopback / test paths that
3870    /// don't have a session peer to resolve. Calls `deliver`
3871    /// with `from_node = 0`, which the pending registry treats
3872    /// as "no binding" — callers that registered with
3873    /// `target_node = 0` accept it, callers that registered
3874    /// with a real target reject it.
3875    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
3876        let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3877            EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3878        } else {
3879            None
3880        }) else {
3881            tracing::warn!(
3882                payload_len = ev.payload.len(),
3883                "rpc client fold: event payload too short for EventMeta; skipping",
3884            );
3885            return Ok(());
3886        };
3887        // Route RESPONSE and REQUEST_GRANT events; ignore other
3888        // dispatches a misconfigured publisher might send. The
3889        // loopback path uses `from_node = 0` which the pending
3890        // registry treats as "no binding" — see the apply_inbound
3891        // production-path counterpart above for the AEAD-verified
3892        // peer routing.
3893        match meta.dispatch {
3894            DISPATCH_RPC_RESPONSE => {
3895                match RpcResponsePayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3896                    Ok(resp) => self.pending.deliver(meta.seq_or_ts, 0, resp),
3897                    Err(e) => {
3898                        // Malformed RESPONSE on the reply channel.
3899                        // We can't fabricate a synthetic response
3900                        // (the call_id might be valid; we just
3901                        // can't tell what it was supposed to
3902                        // mean). Log and leave the pending entry
3903                        // intact — the caller's deadline /
3904                        // cancellation path will eventually clean
3905                        // it up.
3906                        tracing::warn!(
3907                            error = %e,
3908                            call_id = meta.seq_or_ts,
3909                            "rpc client fold: malformed response payload",
3910                        );
3911                    }
3912                }
3913            }
3914            DISPATCH_RPC_REQUEST_GRANT => {
3915                match decode_request_grant(&ev.payload[EVENT_META_SIZE..]) {
3916                    Some(grant) => {
3917                        // See `apply_inbound` REQUEST_GRANT arm for
3918                        // the meta/payload call_id invariant.
3919                        if grant.call_id != meta.seq_or_ts {
3920                            tracing::debug!(
3921                                meta_call_id = meta.seq_or_ts,
3922                                payload_call_id = grant.call_id,
3923                                "rpc client fold: REQUEST_GRANT meta/payload call_id mismatch; dropping",
3924                            );
3925                            return Ok(());
3926                        }
3927                        if grant.credits == 0 {
3928                            return Ok(());
3929                        }
3930                        self.pending.deliver_grant(grant.call_id, 0, grant.credits);
3931                    }
3932                    None => {
3933                        tracing::debug!(
3934                            call_id = meta.seq_or_ts,
3935                            "rpc client fold: malformed REQUEST_GRANT payload"
3936                        );
3937                    }
3938                }
3939            }
3940            _ => {}
3941        }
3942        Ok(())
3943    }
3944}
3945
3946#[cfg(test)]
3947mod tests {
3948    use super::*;
3949
3950    fn header(name: &str, value: &[u8]) -> RpcHeader {
3951        (name.to_string(), value.to_vec())
3952    }
3953
3954    // --------------------------------------------------------------------
3955    // Status code numbering.
3956    // --------------------------------------------------------------------
3957
3958    /// Status codes have stable wire numbers. A regression that
3959    /// renumbered any of the canonical statuses would break
3960    /// every cross-version caller / server pair on the wire — pin
3961    /// the numbers explicitly so the test catches it before the
3962    /// bug ships.
3963    #[test]
3964    fn status_wire_numbers_are_stable() {
3965        for (status, expected) in [
3966            (RpcStatus::Ok, 0x0000u16),
3967            (RpcStatus::NotFound, 0x0001),
3968            (RpcStatus::Unauthorized, 0x0002),
3969            (RpcStatus::Timeout, 0x0003),
3970            (RpcStatus::Backpressure, 0x0004),
3971            (RpcStatus::Cancelled, 0x0005),
3972            (RpcStatus::Internal, 0x0006),
3973            (RpcStatus::UnknownVersion, 0x0007),
3974            (RpcStatus::CapabilityDenied, 0x0008),
3975        ] {
3976            assert_eq!(status.to_wire(), expected, "{status:?}");
3977            assert_eq!(RpcStatus::from_wire(expected), status);
3978        }
3979    }
3980
3981    /// Reserved numeric range (`0x0009..=0x7FFF`) decodes as
3982    /// `Application(v)` for forward-compat with future canonical
3983    /// assignments. A future status numbered `0x0009` would round-
3984    /// trip via `from_wire(0x0009)` until that variant is added,
3985    /// at which point the variant takes precedence.
3986    #[test]
3987    fn reserved_status_range_decodes_as_application_for_forward_compat() {
3988        let decoded = RpcStatus::from_wire(0x0009);
3989        assert_eq!(decoded, RpcStatus::Application(0x0009));
3990        assert_eq!(decoded.to_wire(), 0x0009);
3991    }
3992
3993    /// Application range (`0x8000..=0xFFFF`) encodes / decodes
3994    /// transparently as `Application(v)`.
3995    #[test]
3996    fn application_status_range_roundtrips() {
3997        for v in [0x8000u16, 0x8001, 0xCAFE, 0xFFFF] {
3998            let s = RpcStatus::from_wire(v);
3999            assert_eq!(s, RpcStatus::Application(v));
4000            assert_eq!(s.to_wire(), v);
4001        }
4002    }
4003
4004    // --------------------------------------------------------------------
4005    // Dispatch byte assignments.
4006    // --------------------------------------------------------------------
4007
4008    /// Pin the `dispatch` byte assignments so a renumber surfaces
4009    /// here before it ships on the wire. These also live in the
4010    /// design doc; this test is the source-of-truth check.
4011    #[test]
4012    fn dispatch_byte_assignments_are_stable() {
4013        assert_eq!(DISPATCH_RPC_REQUEST, 0x10);
4014        assert_eq!(DISPATCH_RPC_RESPONSE, 0x11);
4015        assert_eq!(DISPATCH_RPC_CANCEL, 0x12);
4016        assert_eq!(DISPATCH_RPC_DEADLINE_EXCEEDED, 0x13);
4017        assert_eq!(DISPATCH_RPC_STREAM_GRANT, 0x14);
4018        assert_eq!(DISPATCH_RPC_REQUEST_CHUNK, 0x15);
4019        assert_eq!(DISPATCH_RPC_REQUEST_GRANT, 0x16);
4020    }
4021
4022    /// Regression: encoder bounds. Encoding a service name longer
4023    /// than `MAX_RPC_SERVICE_NAME_LEN` panics in debug, catching
4024    /// the programmer error in tests rather than silently writing
4025    /// a truncated `as u8` length that the receiver decodes as
4026    /// garbage. The matching debug_asserts guard body length,
4027    /// header count, header name length, and header value length.
4028    #[cfg(debug_assertions)]
4029    #[test]
4030    #[should_panic(expected = "service name")]
4031    fn request_encode_panics_on_oversize_service_name() {
4032        let p = RpcRequestPayload {
4033            service: "x".repeat(MAX_RPC_SERVICE_NAME_LEN + 1),
4034            deadline_ns: 0,
4035            flags: 0,
4036            headers: vec![],
4037            body: Bytes::new(),
4038        };
4039        let _ = p.encode();
4040    }
4041
4042    #[cfg(debug_assertions)]
4043    #[test]
4044    #[should_panic(expected = "body length")]
4045    fn request_encode_panics_on_oversize_body() {
4046        let p = RpcRequestPayload {
4047            service: "x".to_string(),
4048            deadline_ns: 0,
4049            flags: 0,
4050            headers: vec![],
4051            body: Bytes::from(vec![0; MAX_RPC_BODY_LEN + 1]),
4052        };
4053        let _ = p.encode();
4054    }
4055
4056    #[cfg(debug_assertions)]
4057    #[test]
4058    #[should_panic(expected = "header name")]
4059    fn request_encode_panics_on_oversize_header_name() {
4060        let p = RpcRequestPayload {
4061            service: "x".to_string(),
4062            deadline_ns: 0,
4063            flags: 0,
4064            headers: vec![("a".repeat(MAX_RPC_HEADER_NAME_LEN + 1), vec![])],
4065            body: Bytes::new(),
4066        };
4067        let _ = p.encode();
4068    }
4069
4070    /// `encoded_len()` must agree with `encode().len()` for every
4071    /// payload shape — pin this so a future codec change can't
4072    /// silently desynchronize the size-budgeting helper from the
4073    /// actual wire size.
4074    #[test]
4075    fn encoded_len_matches_encode_len_for_request_and_response() {
4076        let req = RpcRequestPayload {
4077            service: "echo.v1".to_string(),
4078            deadline_ns: 1_700_000_000_000_000_000,
4079            flags: FLAG_RPC_PROPAGATE_TRACE,
4080            headers: vec![
4081                header("traceparent", b"00-aabb"),
4082                header("idempotency-key", &7u64.to_le_bytes()),
4083            ],
4084            body: Bytes::from_static(b"{\"hello\":\"world\"}"),
4085        };
4086        assert_eq!(req.encoded_len(), req.encode().len());
4087
4088        let resp = RpcResponsePayload {
4089            status: RpcStatus::Application(0x8001),
4090            headers: vec![header("content-type", b"application/json")],
4091            body: Bytes::from_static(b"ok"),
4092        };
4093        assert_eq!(resp.encoded_len(), resp.encode().len());
4094
4095        // Empty edge cases.
4096        let empty_req = RpcRequestPayload {
4097            service: "x".to_string(),
4098            deadline_ns: 0,
4099            flags: 0,
4100            headers: vec![],
4101            body: Bytes::new(),
4102        };
4103        assert_eq!(empty_req.encoded_len(), empty_req.encode().len());
4104        let empty_resp = RpcResponsePayload {
4105            status: RpcStatus::Ok,
4106            headers: vec![],
4107            body: Bytes::new(),
4108        };
4109        assert_eq!(empty_resp.encoded_len(), empty_resp.encode().len());
4110    }
4111
4112    /// Bit 0 of `RpcRequestPayload::flags` is reserved (was the
4113    /// removed `FLAG_RPC_IDEMPOTENT`). Pin: live flag constants
4114    /// must NOT collide with bit 0, so a future re-add can safely
4115    /// reuse it without breaking existing senders.
4116    #[test]
4117    fn flag_bit_assignments_leave_idempotent_slot_reserved() {
4118        // Bit 0 (1 << 0) and bit 3 (1 << 3) are reserved; live flags
4119        // occupy other bits. Pinning the exact assignments here so
4120        // a renumber that collides with bit 0 (future `IDEMPOTENT`
4121        // re-add) or bit 3 (held in reserve for a future protocol
4122        // flag) surfaces in the test suite before it ships.
4123        assert_eq!(FLAG_RPC_STREAMING_RESPONSE, 1 << 1);
4124        assert_eq!(FLAG_RPC_PROPAGATE_TRACE, 1 << 2);
4125        assert_eq!(FLAG_RPC_CLIENT_STREAMING_REQUEST, 1 << 4);
4126        assert_eq!(FLAG_RPC_REQUEST_END, 1 << 5);
4127        for flag in [
4128            FLAG_RPC_STREAMING_RESPONSE,
4129            FLAG_RPC_PROPAGATE_TRACE,
4130            FLAG_RPC_CLIENT_STREAMING_REQUEST,
4131            FLAG_RPC_REQUEST_END,
4132        ] {
4133            assert_eq!(
4134                flag & (1 << 0),
4135                0,
4136                "flag {flag:#06x} collides with reserved bit 0"
4137            );
4138            assert_eq!(
4139                flag & (1 << 3),
4140                0,
4141                "flag {flag:#06x} collides with reserved bit 3"
4142            );
4143        }
4144    }
4145
4146    // --------------------------------------------------------------------
4147    // Bidi streaming (Phase A) — RpcRequestChunkPayload and
4148    // RpcRequestGrantPayload wire-stability tests.
4149    // --------------------------------------------------------------------
4150
4151    /// 1/5 — RequestChunk round-trip with realistic header set and
4152    /// 1 KiB body. Pins the encode/decode loop on the full shape.
4153    #[test]
4154    fn request_chunk_roundtrip_with_headers_and_body() {
4155        let mut headers = Vec::new();
4156        for i in 0..10u8 {
4157            headers.push(header(&format!("x-chunk-meta-{i}"), &[0xAA, 0xBB, i, !i]));
4158        }
4159        let body: Vec<u8> = (0..1024u32).map(|n| (n & 0xFF) as u8).collect();
4160        let p = RpcRequestChunkPayload {
4161            call_id: 0xCAFE_F00D_DEAD_BEEF,
4162            flags: FLAG_RPC_REQUEST_END | FLAG_RPC_PROPAGATE_TRACE,
4163            headers,
4164            body: Bytes::from(body),
4165        };
4166        let bytes = p.encode();
4167        assert_eq!(
4168            p.encoded_len(),
4169            bytes.len(),
4170            "encoded_len must agree with encode().len()"
4171        );
4172        let decoded = RpcRequestChunkPayload::decode(Bytes::from(bytes)).expect("decode");
4173        assert_eq!(decoded, p);
4174    }
4175
4176    /// 2/5 — truncation rejection at every field boundary. The
4177    /// codec must error rather than panic / allocate-unbounded on
4178    /// any short slice.
4179    #[test]
4180    fn request_chunk_decode_rejects_truncation_at_every_boundary() {
4181        let p = RpcRequestChunkPayload {
4182            call_id: 0x1234,
4183            flags: 0,
4184            headers: vec![header("x", b"y")],
4185            body: Bytes::from_static(b"hello"),
4186        };
4187        let full = p.encode();
4188        // Walk every prefix shorter than the full encoding; every
4189        // one must produce a Truncated / TooLarge / InvalidUtf8
4190        // error, not panic.
4191        for n in 0..full.len() {
4192            let prefix = &full[..n];
4193            let result = RpcRequestChunkPayload::decode(Bytes::copy_from_slice(prefix));
4194            assert!(result.is_err(), "n={n}: expected Err, got Ok({:?})", result);
4195        }
4196        // Full length must decode cleanly.
4197        assert!(RpcRequestChunkPayload::decode(Bytes::from(full)).is_ok());
4198    }
4199
4200    /// 3/5 — body length cap rejection. A wire-claimed body length
4201    /// over `MAX_RPC_BODY_LEN` must error rather than try to
4202    /// allocate 4+ MiB of garbage.
4203    #[test]
4204    fn request_chunk_decode_rejects_oversized_body_length() {
4205        // Build a synthetic encoding by hand: small valid prefix
4206        // up to body_len, then claim body_len = MAX_RPC_BODY_LEN + 1.
4207        let mut buf = Vec::new();
4208        buf.put_u64_le(0x42); // call_id
4209        buf.put_u16_le(0); // flags
4210        buf.put_u8(0); // headers count = 0
4211        buf.put_u32_le((MAX_RPC_BODY_LEN + 1) as u32);
4212        // (no body bytes follow — we want the decoder to reject at
4213        // the length check before it even tries to read body bytes)
4214        let err = RpcRequestChunkPayload::decode(Bytes::from(buf))
4215            .expect_err("oversized body length must reject");
4216        match err {
4217            RpcCodecError::TooLarge {
4218                field,
4219                actual,
4220                limit,
4221            } => {
4222                assert_eq!(field, "body");
4223                assert_eq!(actual, MAX_RPC_BODY_LEN + 1);
4224                assert_eq!(limit, MAX_RPC_BODY_LEN);
4225            }
4226            other => panic!("expected TooLarge {{ field=body }}, got {other:?}"),
4227        }
4228    }
4229
4230    /// 4/5 — header count cap rejection. A header count over
4231    /// `MAX_RPC_HEADERS` must error before the per-header decode
4232    /// loop even starts.
4233    #[test]
4234    fn request_chunk_decode_rejects_oversized_header_count() {
4235        let mut buf = Vec::new();
4236        buf.put_u64_le(0x42); // call_id
4237        buf.put_u16_le(0); // flags
4238        buf.put_u8((MAX_RPC_HEADERS + 1) as u8); // over the cap
4239        let err = RpcRequestChunkPayload::decode(Bytes::from(buf))
4240            .expect_err("oversized header count must reject");
4241        match err {
4242            RpcCodecError::TooLarge {
4243                field,
4244                actual,
4245                limit,
4246            } => {
4247                // The shared `decode_headers` helper reports this
4248                // field as "headers".
4249                assert_eq!(field, "headers");
4250                assert_eq!(actual, MAX_RPC_HEADERS + 1);
4251                assert_eq!(limit, MAX_RPC_HEADERS);
4252            }
4253            other => panic!("expected TooLarge {{ field=headers }}, got {other:?}"),
4254        }
4255    }
4256
4257    /// 5/5 — RequestGrant round-trip + truncation rejection. The
4258    /// payload is fixed-size (12 bytes), so the test surface is
4259    /// "exactly 12 bytes decodes" + "any other length errors".
4260    #[test]
4261    fn request_grant_roundtrip_and_truncation_rejection() {
4262        // Round-trip across the full u32 range corners + an
4263        // arbitrary mid-value.
4264        for (call_id, credits) in [
4265            (0u64, 0u32),
4266            (1, 1),
4267            (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF),
4268            (0xCAFE_F00D, 0x10203040),
4269        ] {
4270            let bytes = encode_request_grant(call_id, credits);
4271            assert_eq!(bytes.len(), 12, "request grant is always 12 bytes");
4272            let decoded = decode_request_grant(&bytes).expect("decode");
4273            assert_eq!(decoded.call_id, call_id);
4274            assert_eq!(decoded.credits, credits);
4275        }
4276        // Wrong-length payloads must reject (return None), not
4277        // panic. Empty, short, long, off-by-one each get covered.
4278        assert!(decode_request_grant(&[]).is_none());
4279        assert!(decode_request_grant(&[0u8; 11]).is_none());
4280        assert!(decode_request_grant(&[0u8; 13]).is_none());
4281    }
4282
4283    /// Bonus pin: `parse_request_window_initial` extracts a valid
4284    /// u32 ASCII-decimal header and rejects everything else.
4285    /// Same coverage shape as `parse_stream_window_initial`'s
4286    /// implicit contract, made explicit here so the request-side
4287    /// helper doesn't drift away from the response-side one.
4288    #[test]
4289    fn parse_request_window_initial_matches_response_side_semantics() {
4290        // Happy path.
4291        let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"32")];
4292        assert_eq!(parse_request_window_initial(&headers), Some(32));
4293        // Case-insensitive on header name.
4294        let headers = vec![header("Nrpc-Request-Window-Initial", b"7")];
4295        assert_eq!(parse_request_window_initial(&headers), Some(7));
4296        // Absent.
4297        assert_eq!(parse_request_window_initial(&[]), None);
4298        // Malformed value (non-numeric).
4299        let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"twelve")];
4300        assert_eq!(parse_request_window_initial(&headers), None);
4301        // Malformed value (non-utf8 bytes).
4302        let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, &[0xFF, 0xFE])];
4303        assert_eq!(parse_request_window_initial(&headers), None);
4304        // Empty value.
4305        let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"")];
4306        assert_eq!(parse_request_window_initial(&headers), None);
4307    }
4308
4309    // --------------------------------------------------------------------
4310    // RpcRequestPayload codec.
4311    // --------------------------------------------------------------------
4312
4313    #[test]
4314    fn request_roundtrip_minimal() {
4315        let p = RpcRequestPayload {
4316            service: "hello".to_string(),
4317            deadline_ns: 0,
4318            flags: 0,
4319            headers: vec![],
4320            body: Bytes::new(),
4321        };
4322        let bytes = p.encode();
4323        let decoded = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap();
4324        assert_eq!(decoded, p);
4325    }
4326
4327    #[test]
4328    fn request_roundtrip_full() {
4329        let p = RpcRequestPayload {
4330            service: "echo.v1".to_string(),
4331            deadline_ns: 1_700_000_000_000_000_000,
4332            flags: FLAG_RPC_PROPAGATE_TRACE,
4333            headers: vec![
4334                header("traceparent", b"00-aabb..."),
4335                header("idempotency-key", &7u64.to_le_bytes()),
4336                header("content-type", b"application/json"),
4337            ],
4338            body: Bytes::from_static(b"{\"hello\":\"world\"}"),
4339        };
4340        let bytes = p.encode();
4341        let decoded = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap();
4342        assert_eq!(decoded, p);
4343    }
4344
4345    #[test]
4346    fn request_decode_rejects_empty_service() {
4347        let bytes = vec![0x00];
4348        let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4349        assert!(matches!(err, RpcCodecError::Truncated(_)));
4350    }
4351
4352    #[test]
4353    fn request_decode_rejects_oversize_body_length() {
4354        // Forge: service "x", deadline 0, flags 0, no headers,
4355        // body length = MAX_RPC_BODY_LEN + 1 (no body bytes).
4356        let mut bytes = vec![1u8, b'x'];
4357        bytes.extend_from_slice(&0u64.to_le_bytes()); // deadline
4358        bytes.extend_from_slice(&0u16.to_le_bytes()); // flags
4359        bytes.push(0); // 0 headers
4360        bytes.extend_from_slice(&((MAX_RPC_BODY_LEN as u32) + 1).to_le_bytes());
4361        let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4362        assert!(
4363            matches!(err, RpcCodecError::TooLarge { field, .. } if field == "body"),
4364            "got {err:?}",
4365        );
4366    }
4367
4368    #[test]
4369    fn request_decode_rejects_oversize_headers_count() {
4370        // Forge: service "x", deadline 0, flags 0, headers count =
4371        // MAX_RPC_HEADERS + 1 (no header bytes).
4372        let mut bytes = vec![1u8, b'x'];
4373        bytes.extend_from_slice(&0u64.to_le_bytes());
4374        bytes.extend_from_slice(&0u16.to_le_bytes());
4375        bytes.push((MAX_RPC_HEADERS as u8).wrapping_add(1));
4376        let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4377        assert!(
4378            matches!(err, RpcCodecError::TooLarge { field, .. } if field == "headers"),
4379            "got {err:?}",
4380        );
4381    }
4382
4383    #[test]
4384    fn request_decode_rejects_truncated_at_each_field() {
4385        // Build a valid payload then truncate at each field
4386        // boundary; every truncation must error rather than silently
4387        // accept partial state.
4388        let p = RpcRequestPayload {
4389            service: "svc".to_string(),
4390            deadline_ns: 1,
4391            flags: 0,
4392            headers: vec![header("h", b"v")],
4393            body: Bytes::from_static(b"body"),
4394        };
4395        let bytes = p.encode();
4396        // Try each prefix length up to but not including the full
4397        // length — every one must be a decode error.
4398        for trim_to in 0..bytes.len() {
4399            let truncated = &bytes[..trim_to];
4400            let result = RpcRequestPayload::decode(Bytes::copy_from_slice(truncated));
4401            assert!(
4402                result.is_err(),
4403                "trim_to={trim_to} of {} must error, got {:?}",
4404                bytes.len(),
4405                result,
4406            );
4407        }
4408        // Full length must succeed.
4409        assert!(RpcRequestPayload::decode(Bytes::from(bytes)).is_ok());
4410    }
4411
4412    // --------------------------------------------------------------------
4413    // RpcResponsePayload codec.
4414    // --------------------------------------------------------------------
4415
4416    #[test]
4417    fn response_roundtrip_ok_with_body() {
4418        let p = RpcResponsePayload {
4419            status: RpcStatus::Ok,
4420            headers: vec![header("content-type", b"application/json")],
4421            body: Bytes::from_static(b"{\"answer\":42}"),
4422        };
4423        let bytes = p.encode();
4424        let decoded = RpcResponsePayload::decode(Bytes::from(bytes)).unwrap();
4425        assert_eq!(decoded, p);
4426    }
4427
4428    #[test]
4429    fn response_roundtrip_application_status() {
4430        let p = RpcResponsePayload {
4431            status: RpcStatus::Application(0xBEEF),
4432            headers: vec![],
4433            body: Bytes::from_static(b"app-specific diagnostic"),
4434        };
4435        let bytes = p.encode();
4436        let decoded = RpcResponsePayload::decode(Bytes::from(bytes)).unwrap();
4437        assert_eq!(decoded.status, RpcStatus::Application(0xBEEF));
4438        assert_eq!(decoded.body, p.body);
4439    }
4440
4441    #[test]
4442    fn response_decode_rejects_empty_buffer() {
4443        let err = RpcResponsePayload::decode(Bytes::new()).unwrap_err();
4444        assert!(matches!(err, RpcCodecError::Truncated(_)));
4445    }
4446
4447    // --------------------------------------------------------------------
4448    // Invariant: encoded sizes are reasonable.
4449    // --------------------------------------------------------------------
4450
4451    /// Wire-size budget regression: a tiny request encodes in a
4452    /// small constant number of bytes plus body. Pre-fix the headers
4453    /// or service-length encoding could have grown unbounded; pin
4454    /// the small-case so a regression in either inflates the
4455    /// minimum.
4456    #[test]
4457    fn request_minimum_wire_size_is_bounded() {
4458        let p = RpcRequestPayload {
4459            service: "x".to_string(),
4460            deadline_ns: 0,
4461            flags: 0,
4462            headers: vec![],
4463            body: Bytes::new(),
4464        };
4465        let size = p.encode().len();
4466        // 1 (svc len) + 1 (svc bytes) + 8 (deadline) + 2 (flags) + 1 (headers count) + 4 (body len) = 17
4467        assert_eq!(size, 17, "minimum request encodes in 17 bytes");
4468        assert_eq!(request_wire_size(&p), EVENT_META_SIZE + 17);
4469    }
4470
4471    #[test]
4472    fn response_minimum_wire_size_is_bounded() {
4473        let p = RpcResponsePayload {
4474            status: RpcStatus::Ok,
4475            headers: vec![],
4476            body: Bytes::new(),
4477        };
4478        let size = p.encode().len();
4479        // 2 (status) + 1 (headers count) + 4 (body len) = 7
4480        assert_eq!(size, 7, "minimum response encodes in 7 bytes");
4481        assert_eq!(response_wire_size(&p), EVENT_META_SIZE + 7);
4482    }
4483
4484    // ====================================================================
4485    // RpcServerFold — server-side dispatch behavior.
4486    //
4487    // These tests drive the fold directly with synthetic events
4488    // and observe the emitter callback. The end-to-end story
4489    // (Mesh::serve_rpc + bus + cortex adapter) is integration-
4490    // tested separately once the glue layer lands.
4491    // ====================================================================
4492
4493    use super::super::super::redex::{RedexEntry, RedexEvent};
4494    use std::sync::atomic::AtomicUsize;
4495    use std::time::Duration;
4496
4497    /// Captured-response store. Test-local typedef so the
4498    /// `capturing_emitter` signature stays under the `clippy::
4499    /// type_complexity` lint.
4500    type CapturedResponses = Arc<Mutex<Vec<(u64, u64, RpcResponsePayload)>>>;
4501
4502    /// Build a synthetic RedexEvent carrying an RPC request payload.
4503    /// Tests use this to drive the fold without going through the
4504    /// real ingest/cortex pipeline.
4505    fn rpc_request_event(
4506        caller_origin: u64,
4507        call_id: u64,
4508        payload: RpcRequestPayload,
4509    ) -> RedexEvent {
4510        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, caller_origin, call_id, 0);
4511        let mut buf = Vec::new();
4512        buf.extend_from_slice(&meta.to_bytes());
4513        buf.extend_from_slice(&payload.encode());
4514        RedexEvent {
4515            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
4516            payload: bytes::Bytes::from(buf),
4517        }
4518    }
4519
4520    fn rpc_cancel_event(caller_origin: u64, call_id: u64) -> RedexEvent {
4521        let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, caller_origin, call_id, 0);
4522        let buf = meta.to_bytes().to_vec();
4523        RedexEvent {
4524            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
4525            payload: bytes::Bytes::from(buf),
4526        }
4527    }
4528
4529    /// Captures responses emitted by the fold for assertion in tests.
4530    fn capturing_emitter() -> (RpcResponseEmitter, CapturedResponses) {
4531        let captured: CapturedResponses = Arc::new(Mutex::new(Vec::new()));
4532        let captured_clone = captured.clone();
4533        let emit: RpcResponseEmitter = Arc::new(move |origin, call_id, resp| {
4534            captured_clone.lock().push((origin, call_id, resp));
4535        });
4536        (emit, captured)
4537    }
4538
4539    /// A handler that just echoes the request body back as the
4540    /// response body, with `RpcStatus::Ok`.
4541    struct EchoHandler;
4542    #[async_trait::async_trait]
4543    impl RpcHandler for EchoHandler {
4544        async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4545            Ok(RpcResponsePayload {
4546                status: RpcStatus::Ok,
4547                headers: vec![],
4548                body: ctx.payload.body,
4549            })
4550        }
4551    }
4552
4553    /// Wait until `pred` is true, polling at 10ms intervals up to
4554    /// `timeout`. Used to await spawned-handler completion in tests
4555    /// without a sleep-and-pray.
4556    async fn wait_until<F: Fn() -> bool>(pred: F, timeout: Duration) -> bool {
4557        let start = std::time::Instant::now();
4558        while start.elapsed() < timeout {
4559            if pred() {
4560                return true;
4561            }
4562            tokio::time::sleep(Duration::from_millis(10)).await;
4563        }
4564        pred()
4565    }
4566
4567    /// Happy path: a REQUEST event triggers the handler; the fold
4568    /// emits a RESPONSE with the handler's payload.
4569    #[tokio::test]
4570    async fn server_fold_request_invokes_handler_and_emits_response() {
4571        let (emit, captured) = capturing_emitter();
4572        let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
4573        let req = RpcRequestPayload {
4574            service: "echo".to_string(),
4575            deadline_ns: 0,
4576            flags: 0,
4577            headers: vec![],
4578            body: Bytes::from_static(b"hello"),
4579        };
4580        let ev = rpc_request_event(0xCAFE, 7, req);
4581        fold.apply(&ev, &mut ()).unwrap();
4582
4583        // Handler runs in tokio::spawn; wait for the emit.
4584        assert!(
4585            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4586            "expected one emitted response"
4587        );
4588        let captured = captured.lock();
4589        assert_eq!(captured.len(), 1);
4590        let (origin, call_id, resp) = &captured[0];
4591        assert_eq!(*origin, 0xCAFE);
4592        assert_eq!(*call_id, 7);
4593        assert_eq!(resp.status, RpcStatus::Ok);
4594        assert_eq!(resp.body.as_ref(), b"hello");
4595        // In-flight set is cleaned up after the handler completes.
4596        assert!(fold.in_flight_keys().is_empty());
4597    }
4598
4599    /// Application error: handler returns
4600    /// `RpcHandlerError::Application` → fold emits a response with
4601    /// `RpcStatus::Application(code)` and the message as body.
4602    #[tokio::test]
4603    async fn server_fold_application_error_maps_to_application_status() {
4604        struct AppErrHandler;
4605        #[async_trait::async_trait]
4606        impl RpcHandler for AppErrHandler {
4607            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4608                Err(RpcHandlerError::Application {
4609                    code: 0xBEEF,
4610                    message: "bad input".to_string(),
4611                })
4612            }
4613        }
4614        let (emit, captured) = capturing_emitter();
4615        let mut fold = RpcServerFold::new(Arc::new(AppErrHandler), emit);
4616        let req = RpcRequestPayload {
4617            service: "x".to_string(),
4618            deadline_ns: 0,
4619            flags: 0,
4620            headers: vec![],
4621            body: Bytes::new(),
4622        };
4623        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4624        assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4625        let captured = captured.lock();
4626        let (_, _, resp) = &captured[0];
4627        assert_eq!(resp.status, RpcStatus::Application(0xBEEF));
4628        assert_eq!(resp.body.as_ref(), b"bad input");
4629    }
4630
4631    /// Internal error: handler returns `RpcHandlerError::Internal`
4632    /// → fold emits `RpcStatus::Internal` with the message body.
4633    #[tokio::test]
4634    async fn server_fold_internal_error_maps_to_internal_status() {
4635        struct IntErrHandler;
4636        #[async_trait::async_trait]
4637        impl RpcHandler for IntErrHandler {
4638            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4639                Err(RpcHandlerError::Internal("db timeout".to_string()))
4640            }
4641        }
4642        let (emit, captured) = capturing_emitter();
4643        let mut fold = RpcServerFold::new(Arc::new(IntErrHandler), emit);
4644        let req = RpcRequestPayload {
4645            service: "x".to_string(),
4646            deadline_ns: 0,
4647            flags: 0,
4648            headers: vec![],
4649            body: Bytes::new(),
4650        };
4651        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4652        assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4653        let captured = captured.lock();
4654        let (_, _, resp) = &captured[0];
4655        assert_eq!(resp.status, RpcStatus::Internal);
4656        assert_eq!(resp.body.as_ref(), b"db timeout");
4657    }
4658
4659    /// Handler panic: caught by the fold's `catch_unwind`; surfaces
4660    /// as `RpcStatus::Internal` to the caller. Pre-fix the panic
4661    /// would propagate up the spawned task, log a tokio
4662    /// uncaught-panic message, and silently leave the caller
4663    /// waiting forever.
4664    #[tokio::test]
4665    async fn server_fold_handler_panic_surfaces_as_internal_status() {
4666        struct PanicHandler;
4667        #[async_trait::async_trait]
4668        impl RpcHandler for PanicHandler {
4669            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4670                panic!("kaboom");
4671            }
4672        }
4673        let (emit, captured) = capturing_emitter();
4674        let mut fold = RpcServerFold::new(Arc::new(PanicHandler), emit);
4675        let req = RpcRequestPayload {
4676            service: "x".to_string(),
4677            deadline_ns: 0,
4678            flags: 0,
4679            headers: vec![],
4680            body: Bytes::new(),
4681        };
4682        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4683        assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4684        let captured = captured.lock();
4685        let (_, _, resp) = &captured[0];
4686        assert_eq!(resp.status, RpcStatus::Internal);
4687        assert!(
4688            String::from_utf8_lossy(&resp.body).contains("kaboom"),
4689            "panic message must surface in body, got {}",
4690            String::from_utf8_lossy(&resp.body),
4691        );
4692    }
4693
4694    /// Deadline already passed: server short-circuits with
4695    /// `Timeout` without invoking the handler. Pinned via the
4696    /// `with_test_now_ns` clock override so the test doesn't race
4697    /// wall time.
4698    #[tokio::test]
4699    async fn server_fold_deadline_already_passed_short_circuits_to_timeout() {
4700        let invoked = Arc::new(AtomicBool::new(false));
4701        struct CountingHandler {
4702            invoked: Arc<AtomicBool>,
4703        }
4704        #[async_trait::async_trait]
4705        impl RpcHandler for CountingHandler {
4706            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4707                self.invoked.store(true, Ordering::Release);
4708                Ok(RpcResponsePayload {
4709                    status: RpcStatus::Ok,
4710                    headers: vec![],
4711                    body: Bytes::new(),
4712                })
4713            }
4714        }
4715        let (emit, captured) = capturing_emitter();
4716        let mut fold = RpcServerFold::new(
4717            Arc::new(CountingHandler {
4718                invoked: invoked.clone(),
4719            }),
4720            emit,
4721        )
4722        // Use a clock value > DEADLINE_SKEW_TOLERANCE_NS + 1
4723        // (10s + 1ns) so the deadline-passed check fires past the
4724        // skew tolerance window. With now=20s and deadline=1ns,
4725        // (now - 10s) > 1ns.
4726        .with_test_now_ns(20_000_000_000);
4727        let req = RpcRequestPayload {
4728            service: "x".to_string(),
4729            // Deadline well in the past — past the skew tolerance.
4730            deadline_ns: 1_000,
4731            flags: 0,
4732            headers: vec![],
4733            body: Bytes::new(),
4734        };
4735        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4736        // Emit happens synchronously in the deadline-passed branch
4737        // (no handler spawn).
4738        let captured = captured.lock();
4739        assert_eq!(captured.len(), 1);
4740        let (_, _, resp) = &captured[0];
4741        assert_eq!(resp.status, RpcStatus::Timeout);
4742        assert!(
4743            !invoked.load(Ordering::Acquire),
4744            "handler must NOT be invoked when deadline already passed",
4745        );
4746    }
4747
4748    /// Regression: a deadline that has elapsed by less than
4749    /// `DEADLINE_SKEW_TOLERANCE_NS` does NOT short-circuit. A
4750    /// peer with a slightly-fast clock would otherwise be
4751    /// prematurely timed out before the handler ever ran.
4752    #[tokio::test]
4753    async fn server_fold_deadline_within_skew_tolerance_invokes_handler() {
4754        let invoked = Arc::new(AtomicBool::new(false));
4755        struct CountingHandler {
4756            invoked: Arc<AtomicBool>,
4757        }
4758        #[async_trait::async_trait]
4759        impl RpcHandler for CountingHandler {
4760            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4761                self.invoked.store(true, Ordering::Release);
4762                Ok(RpcResponsePayload {
4763                    status: RpcStatus::Ok,
4764                    headers: vec![],
4765                    body: Bytes::new(),
4766                })
4767            }
4768        }
4769        let (emit, captured) = capturing_emitter();
4770        let mut fold = RpcServerFold::new(
4771            Arc::new(CountingHandler {
4772                invoked: invoked.clone(),
4773            }),
4774            emit,
4775        )
4776        // now = 100s, deadline = 95s → elapsed = 5s, within the
4777        // 10s skew tolerance.
4778        .with_test_now_ns(100_000_000_000);
4779        let req = RpcRequestPayload {
4780            service: "x".to_string(),
4781            deadline_ns: 95_000_000_000,
4782            flags: 0,
4783            headers: vec![],
4784            body: Bytes::new(),
4785        };
4786        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4787        assert!(
4788            wait_until(|| invoked.load(Ordering::Acquire), Duration::from_secs(1)).await,
4789            "handler must run when deadline is within skew tolerance",
4790        );
4791        let captured = captured.lock();
4792        assert_eq!(captured.len(), 1);
4793        assert_eq!(captured[0].2.status, RpcStatus::Ok);
4794    }
4795
4796    /// CANCEL flips the matching in-flight token. The handler that
4797    /// `select!`s on the cancellation observes the signal and can
4798    /// short-circuit. The fold removes the in-flight entry on
4799    /// CANCEL.
4800    #[tokio::test]
4801    async fn server_fold_cancel_flips_token_and_clears_in_flight() {
4802        let resumed_after_cancel = Arc::new(AtomicBool::new(false));
4803        struct CancelObservingHandler {
4804            resumed: Arc<AtomicBool>,
4805        }
4806        #[async_trait::async_trait]
4807        impl RpcHandler for CancelObservingHandler {
4808            async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4809                tokio::select! {
4810                    _ = ctx.cancellation.cancelled() => {
4811                        self.resumed.store(true, Ordering::Release);
4812                        Err(RpcHandlerError::Internal("cancelled by caller".to_string()))
4813                    }
4814                    _ = tokio::time::sleep(Duration::from_secs(5)) => {
4815                        Ok(RpcResponsePayload {
4816                            status: RpcStatus::Ok,
4817                            headers: vec![],
4818                            body: Bytes::from_static(b"slept the full window"),
4819                        })
4820                    }
4821                }
4822            }
4823        }
4824        let (emit, captured) = capturing_emitter();
4825        let mut fold = RpcServerFold::new(
4826            Arc::new(CancelObservingHandler {
4827                resumed: resumed_after_cancel.clone(),
4828            }),
4829            emit,
4830        );
4831        let req = RpcRequestPayload {
4832            service: "x".to_string(),
4833            deadline_ns: 0,
4834            flags: 0,
4835            headers: vec![],
4836            body: Bytes::new(),
4837        };
4838        fold.apply(&rpc_request_event(1, 42, req), &mut ()).unwrap();
4839        // Wait until the handler's `select!` is parked; then send
4840        // CANCEL.
4841        assert!(
4842            wait_until(
4843                || fold.in_flight_keys().contains(&(1, 42)),
4844                Duration::from_secs(1)
4845            )
4846            .await
4847        );
4848        fold.apply(&rpc_cancel_event(1, 42), &mut ()).unwrap();
4849        // The cancellation is observed by the handler. Even though
4850        // the handler returns `Internal("cancelled by caller")`,
4851        // the fold's CANCEL-wins ordering overrides the response
4852        // with `RpcStatus::Cancelled` so the caller sees the
4853        // documented status code rather than the handler's
4854        // accidental Internal payload.
4855        assert!(
4856            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4857            "handler should observe cancellation and emit response"
4858        );
4859        assert!(
4860            resumed_after_cancel.load(Ordering::Acquire),
4861            "handler must observe cancellation"
4862        );
4863        let captured = captured.lock();
4864        assert_eq!(captured.len(), 1);
4865        let (_, _, resp) = &captured[0];
4866        assert_eq!(
4867            resp.status,
4868            RpcStatus::Cancelled,
4869            "CANCEL must override handler outcome with RpcStatus::Cancelled"
4870        );
4871        // CANCEL also removes the in-flight entry directly.
4872        // Handler completion removes it again (idempotent).
4873        assert!(fold.in_flight_keys().is_empty());
4874    }
4875
4876    /// Regression: a duplicate REQUEST for an already-in-flight
4877    /// `(origin_hash, call_id)` must be refused with a synthetic
4878    /// `Internal` response and must NOT spawn a second handler.
4879    /// Without the refusal, two handlers race under the same key
4880    /// and CANCEL handling is broken (CANCEL removes the entry
4881    /// the first handler reinserts, etc.).
4882    #[tokio::test]
4883    async fn server_fold_duplicate_request_refuses_without_double_dispatch() {
4884        let invocations = Arc::new(AtomicUsize::new(0));
4885        struct CountingHandler {
4886            invocations: Arc<AtomicUsize>,
4887        }
4888        #[async_trait::async_trait]
4889        impl RpcHandler for CountingHandler {
4890            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4891                self.invocations.fetch_add(1, Ordering::SeqCst);
4892                tokio::time::sleep(Duration::from_millis(80)).await;
4893                Ok(RpcResponsePayload {
4894                    status: RpcStatus::Ok,
4895                    headers: vec![],
4896                    body: Bytes::from_static(b"done"),
4897                })
4898            }
4899        }
4900        let (emit, captured) = capturing_emitter();
4901        let mut fold = RpcServerFold::new(
4902            Arc::new(CountingHandler {
4903                invocations: invocations.clone(),
4904            }),
4905            emit,
4906        );
4907        let req = RpcRequestPayload {
4908            service: "x".to_string(),
4909            deadline_ns: 0,
4910            flags: 0,
4911            headers: vec![],
4912            body: Bytes::new(),
4913        };
4914        // First REQUEST — handler spawns and parks in sleep.
4915        fold.apply(&rpc_request_event(1, 99, req.clone()), &mut ())
4916            .unwrap();
4917        assert!(
4918            wait_until(
4919                || fold.in_flight_keys().contains(&(1, 99)),
4920                Duration::from_secs(1)
4921            )
4922            .await
4923        );
4924        // Second REQUEST with same key — must be refused
4925        // synchronously with a synthetic Internal response.
4926        fold.apply(&rpc_request_event(1, 99, req), &mut ()).unwrap();
4927        // The refusal emit happens synchronously in the fold's
4928        // sync emitter path.
4929        let after_dup = captured.lock().clone();
4930        assert_eq!(
4931            after_dup.len(),
4932            1,
4933            "duplicate REQUEST must emit exactly one synthetic refusal",
4934        );
4935        assert_eq!(after_dup[0].2.status, RpcStatus::Internal);
4936        assert!(String::from_utf8_lossy(&after_dup[0].2.body).contains("duplicate"));
4937        // Wait for the first handler to complete.
4938        assert!(
4939            wait_until(|| captured.lock().len() == 2, Duration::from_secs(2)).await,
4940            "first handler should still complete normally"
4941        );
4942        let captured = captured.lock();
4943        assert_eq!(captured.len(), 2);
4944        // The first handler's response is the second emit (Ok).
4945        assert_eq!(captured[1].2.status, RpcStatus::Ok);
4946        assert_eq!(
4947            invocations.load(Ordering::SeqCst),
4948            1,
4949            "duplicate REQUEST must NOT spawn a second handler",
4950        );
4951    }
4952
4953    /// Regression: a CANCEL that fires while the handler is mid-
4954    /// flight must override the handler's outcome with
4955    /// `RpcStatus::Cancelled` even when the handler ignores
4956    /// cancellation and returns `Ok(...)`. Without this, a caller
4957    /// who cancelled would see the handler's accidental success
4958    /// payload and could not tell whether their CANCEL won.
4959    #[tokio::test]
4960    async fn server_fold_cancel_overrides_handler_ok_with_cancelled_status() {
4961        struct IgnoresCancellation;
4962        #[async_trait::async_trait]
4963        impl RpcHandler for IgnoresCancellation {
4964            async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4965                // Sleep long enough for the test to send CANCEL,
4966                // then return Ok regardless. This models a handler
4967                // that doesn't `select!` on `ctx.cancellation`.
4968                tokio::time::sleep(Duration::from_millis(80)).await;
4969                Ok(RpcResponsePayload {
4970                    status: RpcStatus::Ok,
4971                    headers: vec![],
4972                    body: Bytes::from_static(b"finished despite cancellation"),
4973                })
4974            }
4975        }
4976        let (emit, captured) = capturing_emitter();
4977        let mut fold = RpcServerFold::new(Arc::new(IgnoresCancellation), emit);
4978        let req = RpcRequestPayload {
4979            service: "x".to_string(),
4980            deadline_ns: 0,
4981            flags: 0,
4982            headers: vec![],
4983            body: Bytes::new(),
4984        };
4985        fold.apply(&rpc_request_event(7, 11, req), &mut ()).unwrap();
4986        // Wait until the handler is parked, then send CANCEL well
4987        // before the handler's sleep elapses.
4988        assert!(
4989            wait_until(
4990                || fold.in_flight_keys().contains(&(7, 11)),
4991                Duration::from_secs(1)
4992            )
4993            .await
4994        );
4995        fold.apply(&rpc_cancel_event(7, 11), &mut ()).unwrap();
4996        assert!(
4997            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4998            "handler should complete and emit response"
4999        );
5000        let captured = captured.lock();
5001        assert_eq!(captured.len(), 1);
5002        let (_, _, resp) = &captured[0];
5003        assert_eq!(
5004            resp.status,
5005            RpcStatus::Cancelled,
5006            "handler that returned Ok despite CANCEL must surface as Cancelled"
5007        );
5008        assert!(fold.in_flight_keys().is_empty());
5009    }
5010
5011    /// CANCEL for an unknown call_id is a no-op (no panic, no
5012    /// stray emission). This is the case where a CANCEL races a
5013    /// handler completion or a duplicate CANCEL arrives.
5014    #[tokio::test]
5015    async fn server_fold_cancel_for_unknown_call_id_is_no_op() {
5016        let (emit, captured) = capturing_emitter();
5017        let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
5018        // CANCEL with no matching REQUEST.
5019        fold.apply(&rpc_cancel_event(1, 999), &mut ()).unwrap();
5020        assert!(captured.lock().is_empty());
5021        assert!(fold.in_flight_keys().is_empty());
5022    }
5023
5024    /// Malformed request payload: fold emits a
5025    /// `RpcStatus::UnknownVersion` response and continues. A
5026    /// regression that returned `Err` here would kill the cortex
5027    /// adapter's tail-and-fold task on the first malformed event,
5028    /// which is the wrong behavior for an RPC server that needs
5029    /// to keep serving past garbage.
5030    #[tokio::test]
5031    async fn server_fold_malformed_payload_emits_unknown_version_and_keeps_going() {
5032        let (emit, captured) = capturing_emitter();
5033        let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
5034        // Build an event with valid meta but a garbage tail (just
5035        // a single 0x00 byte, which fails the service-len check).
5036        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, 7, 1, 0);
5037        let mut buf = Vec::new();
5038        buf.extend_from_slice(&meta.to_bytes());
5039        buf.push(0x00); // svc_len = 0 → empty service → Truncated
5040        let ev = RedexEvent {
5041            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5042            payload: bytes::Bytes::from(buf),
5043        };
5044        let result = fold.apply(&ev, &mut ());
5045        assert!(
5046            result.is_ok(),
5047            "fold must NOT return Err on malformed payload (would kill the adapter); got {result:?}"
5048        );
5049        let captured = captured.lock();
5050        assert_eq!(captured.len(), 1);
5051        let (_, _, resp) = &captured[0];
5052        assert_eq!(resp.status, RpcStatus::UnknownVersion);
5053    }
5054
5055    /// Cancellation token roundtrip: `cancel()` sets `is_cancelled`
5056    /// and wakes a parked `cancelled().await`.
5057    #[tokio::test]
5058    async fn cancellation_token_signals_waiters() {
5059        let token = RpcCancellationToken::new();
5060        assert!(!token.is_cancelled());
5061        let token2 = token.clone();
5062        let waiter = tokio::spawn(async move {
5063            token2.cancelled().await;
5064        });
5065        // Give the waiter a chance to park.
5066        tokio::time::sleep(Duration::from_millis(10)).await;
5067        token.cancel();
5068        // Waiter wakes.
5069        tokio::time::timeout(Duration::from_secs(1), waiter)
5070            .await
5071            .expect("waiter must wake within 1s")
5072            .expect("waiter task must not panic");
5073        assert!(token.is_cancelled());
5074    }
5075
5076    // ====================================================================
5077    // W3C Trace Context propagation.
5078    // ====================================================================
5079
5080    /// `build_trace_headers` + `extract_trace_context` round-trip
5081    /// a typical W3C trace context through the request headers.
5082    #[test]
5083    fn trace_context_round_trips_through_headers() {
5084        let tc = TraceContext {
5085            traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string(),
5086            tracestate: "vendor1=opaque-value,vendor2=other".to_string(),
5087        };
5088        let headers = build_trace_headers(&tc);
5089        assert_eq!(headers.len(), 2, "non-empty tracestate emits both headers");
5090        let extracted = extract_trace_context(&headers).expect("must extract");
5091        assert_eq!(extracted, tc);
5092    }
5093
5094    /// Regression for M21: `extract_trace_context` does
5095    /// case-INsensitive matching on the header names, matching the
5096    /// W3C and HTTP conventions. A peer that emits capitalized
5097    /// `Traceparent` or `TRACESTATE` must still be picked up — the
5098    /// previous implementation used `name.as_str() == "traceparent"`
5099    /// and silently dropped any non-lowercase variant.
5100    #[test]
5101    fn extract_trace_context_is_case_insensitive_on_header_names() {
5102        // Capital-T traceparent + uppercase TRACESTATE — both must
5103        // be picked up by the extractor.
5104        let headers = vec![
5105            (
5106                "Traceparent".to_string(),
5107                b"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_vec(),
5108            ),
5109            ("TRACESTATE".to_string(), b"vendor=value".to_vec()),
5110        ];
5111        let extracted =
5112            extract_trace_context(&headers).expect("capital-T traceparent must be recognized");
5113        assert_eq!(
5114            extracted.traceparent,
5115            "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
5116        );
5117        assert_eq!(extracted.tracestate, "vendor=value");
5118
5119        // Mixed-case still works.
5120        let headers = vec![
5121            ("traceParent".to_string(), b"00-aa-bb-01".to_vec()),
5122            ("TraceState".to_string(), b"v=1".to_vec()),
5123        ];
5124        let extracted =
5125            extract_trace_context(&headers).expect("mixed-case traceparent must be recognized");
5126        assert_eq!(extracted.traceparent, "00-aa-bb-01");
5127        assert_eq!(extracted.tracestate, "v=1");
5128    }
5129
5130    /// Empty `tracestate` is omitted on the wire (W3C convention)
5131    /// but extracted as empty on the receive side.
5132    #[test]
5133    fn trace_context_empty_tracestate_omitted_from_wire() {
5134        let tc = TraceContext {
5135            traceparent: "00-aa-bb-01".to_string(),
5136            tracestate: String::new(),
5137        };
5138        let headers = build_trace_headers(&tc);
5139        assert_eq!(
5140            headers.len(),
5141            1,
5142            "empty tracestate must NOT be emitted on the wire",
5143        );
5144        assert_eq!(headers[0].0, "traceparent");
5145        let extracted = extract_trace_context(&headers).expect("must extract");
5146        assert_eq!(extracted.traceparent, "00-aa-bb-01");
5147        assert_eq!(extracted.tracestate, "");
5148    }
5149
5150    /// Headers without `traceparent` decode as `None`. Useful for
5151    /// the FLAG_RPC_PROPAGATE_TRACE-set-but-no-headers misuse
5152    /// case — the server gets `None` rather than a bogus context.
5153    #[test]
5154    fn trace_context_missing_traceparent_returns_none() {
5155        let headers = vec![
5156            ("content-type".to_string(), b"application/json".to_vec()),
5157            ("idempotency-key".to_string(), b"abc".to_vec()),
5158        ];
5159        assert!(extract_trace_context(&headers).is_none());
5160    }
5161
5162    /// Server fold populates `RpcContext::trace_context` only when
5163    /// the caller signals `FLAG_RPC_PROPAGATE_TRACE`. End-to-end
5164    /// through the fold's apply path.
5165    #[tokio::test]
5166    async fn server_fold_propagates_trace_context_via_flag() {
5167        struct CapturingHandler {
5168            captured: Arc<Mutex<Option<Option<TraceContext>>>>,
5169        }
5170        #[async_trait::async_trait]
5171        impl RpcHandler for CapturingHandler {
5172            async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
5173                *self.captured.lock() = Some(ctx.trace_context.clone());
5174                Ok(RpcResponsePayload {
5175                    status: RpcStatus::Ok,
5176                    headers: vec![],
5177                    body: Bytes::new(),
5178                })
5179            }
5180        }
5181
5182        // Helper: run one request through a fresh fold and return
5183        // what the handler captured for trace_context.
5184        async fn run(req: RpcRequestPayload) -> Option<TraceContext> {
5185            let captured: Arc<Mutex<Option<Option<TraceContext>>>> = Arc::new(Mutex::new(None));
5186            let (emit, _captured_responses) = capturing_emitter();
5187            let handler = Arc::new(CapturingHandler {
5188                captured: captured.clone(),
5189            });
5190            let mut fold = RpcServerFold::new(handler, emit);
5191            fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
5192            // Wait for the spawned handler to finish.
5193            assert!(
5194                wait_until(|| captured.lock().is_some(), Duration::from_secs(2)).await,
5195                "handler must run"
5196            );
5197            let observed = captured.lock().take().unwrap();
5198            observed
5199        }
5200
5201        // Case 1: FLAG_RPC_PROPAGATE_TRACE NOT set → trace_context is None.
5202        let req_no_flag = RpcRequestPayload {
5203            service: "x".to_string(),
5204            deadline_ns: 0,
5205            flags: 0,
5206            headers: vec![("traceparent".to_string(), b"00-aa-bb-01".to_vec())],
5207            body: Bytes::new(),
5208        };
5209        assert!(
5210            run(req_no_flag).await.is_none(),
5211            "without the flag, server must NOT extract trace_context"
5212        );
5213
5214        // Case 2: FLAG set + headers present → server gets the context.
5215        let tc = TraceContext {
5216            traceparent: "00-trace-span-01".to_string(),
5217            tracestate: "vendor=value".to_string(),
5218        };
5219        let req_with_flag = RpcRequestPayload {
5220            service: "x".to_string(),
5221            deadline_ns: 0,
5222            flags: FLAG_RPC_PROPAGATE_TRACE,
5223            headers: build_trace_headers(&tc),
5224            body: Bytes::new(),
5225        };
5226        let observed = run(req_with_flag).await.expect("flag set → should be Some");
5227        assert_eq!(observed, tc);
5228
5229        // Case 3: FLAG set but headers missing → None (defensive).
5230        let req_flag_no_headers = RpcRequestPayload {
5231            service: "x".to_string(),
5232            deadline_ns: 0,
5233            flags: FLAG_RPC_PROPAGATE_TRACE,
5234            headers: vec![],
5235            body: Bytes::new(),
5236        };
5237        assert!(
5238            run(req_flag_no_headers).await.is_none(),
5239            "flag set but no headers → server gets None (no synthesis)"
5240        );
5241    }
5242
5243    /// Race: cancel fires AFTER the future is registered but
5244    /// BEFORE the await actually parks. The token's
5245    /// `notified()`-then-check ordering must catch this case
5246    /// without sleeping past the cancellation.
5247    #[tokio::test]
5248    async fn cancellation_token_does_not_miss_cancel_racing_register() {
5249        for _ in 0..50 {
5250            let token = RpcCancellationToken::new();
5251            let token2 = token.clone();
5252            let waiter = tokio::spawn(async move {
5253                token2.cancelled().await;
5254            });
5255            // No sleep — fire cancel as fast as possible against
5256            // the just-spawned waiter. In the worst case the
5257            // waiter has not yet reached `notified()`; it will see
5258            // `is_cancelled() == true` on its first check and
5259            // return immediately. In the other case it parks and
5260            // gets woken by `notify_waiters`.
5261            token.cancel();
5262            tokio::time::timeout(Duration::from_secs(1), waiter)
5263                .await
5264                .expect("waiter must complete within 1s")
5265                .expect("waiter task must not panic");
5266        }
5267    }
5268
5269    // ====================================================================
5270    // RpcClientFold — caller-side response routing.
5271    // ====================================================================
5272
5273    fn rpc_response_event(
5274        caller_origin: u64,
5275        call_id: u64,
5276        payload: RpcResponsePayload,
5277    ) -> RedexEvent {
5278        let meta = EventMeta::new(DISPATCH_RPC_RESPONSE, 0, caller_origin, call_id, 0);
5279        let mut buf = Vec::new();
5280        buf.extend_from_slice(&meta.to_bytes());
5281        buf.extend_from_slice(&payload.encode());
5282        RedexEvent {
5283            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5284            payload: bytes::Bytes::from(buf),
5285        }
5286    }
5287
5288    /// Happy path: register a call, drive the matching RESPONSE
5289    /// through the fold, the awaiting receiver gets the payload.
5290    #[tokio::test]
5291    async fn client_fold_routes_response_to_registered_waiter() {
5292        let pending = Arc::new(RpcClientPending::new());
5293        let mut fold = RpcClientFold::new(pending.clone());
5294        let rx = pending.register(42, 0);
5295        assert_eq!(pending.pending_count(), 1);
5296
5297        let resp = RpcResponsePayload {
5298            status: RpcStatus::Ok,
5299            headers: vec![],
5300            body: Bytes::from_static(b"hello back"),
5301        };
5302        fold.apply(&rpc_response_event(0xCAFE, 42, resp.clone()), &mut ())
5303            .unwrap();
5304
5305        // Receiver is completed.
5306        let got = tokio::time::timeout(Duration::from_secs(1), rx)
5307            .await
5308            .expect("receiver must complete within 1s")
5309            .expect("sender must not be dropped");
5310        assert_eq!(got, resp);
5311        // Pending entry cleared after delivery.
5312        assert_eq!(pending.pending_count(), 0);
5313    }
5314
5315    /// RESPONSE for an unknown call_id is a no-op (no panic, no
5316    /// stray side effect). This is the case where a stale RESPONSE
5317    /// arrives after the caller has cancelled or timed out.
5318    #[tokio::test]
5319    async fn client_fold_response_for_unknown_call_id_is_no_op() {
5320        let pending = Arc::new(RpcClientPending::new());
5321        let mut fold = RpcClientFold::new(pending.clone());
5322        let resp = RpcResponsePayload {
5323            status: RpcStatus::Ok,
5324            headers: vec![],
5325            body: Bytes::new(),
5326        };
5327        fold.apply(&rpc_response_event(1, 999, resp), &mut ())
5328            .unwrap();
5329        assert_eq!(pending.pending_count(), 0);
5330    }
5331
5332    /// REQUEST / CANCEL events on the reply channel are ignored
5333    /// rather than producing a stray decode-error or affecting
5334    /// pending state. The reply channel shouldn't carry these in
5335    /// practice (they belong on `<service>.requests`), but a
5336    /// misconfigured publisher must not break the fold.
5337    #[tokio::test]
5338    async fn client_fold_ignores_non_response_dispatches() {
5339        let pending = Arc::new(RpcClientPending::new());
5340        let mut fold = RpcClientFold::new(pending.clone());
5341        let _rx = pending.register(7, 0);
5342
5343        // REQUEST event landing on the caller's reply channel is
5344        // ignored.
5345        let req = RpcRequestPayload {
5346            service: "stray".to_string(),
5347            deadline_ns: 0,
5348            flags: 0,
5349            headers: vec![],
5350            body: Bytes::new(),
5351        };
5352        fold.apply(&rpc_request_event(1, 7, req), &mut ()).unwrap();
5353        // Pending entry untouched.
5354        assert_eq!(pending.pending_count(), 1);
5355
5356        // CANCEL on the reply channel: also ignored.
5357        fold.apply(&rpc_cancel_event(1, 7), &mut ()).unwrap();
5358        assert_eq!(pending.pending_count(), 1);
5359    }
5360
5361    /// `cancel(call_id)` removes the pending entry; a subsequent
5362    /// RESPONSE for that call_id is dropped silently.
5363    #[tokio::test]
5364    async fn client_pending_cancel_drops_subsequent_response() {
5365        let pending = Arc::new(RpcClientPending::new());
5366        let mut fold = RpcClientFold::new(pending.clone());
5367        let rx = pending.register(5, 0);
5368        pending.cancel(5);
5369        assert_eq!(pending.pending_count(), 0);
5370
5371        let resp = RpcResponsePayload {
5372            status: RpcStatus::Ok,
5373            headers: vec![],
5374            body: Bytes::new(),
5375        };
5376        fold.apply(&rpc_response_event(1, 5, resp), &mut ())
5377            .unwrap();
5378
5379        // Receiver was dropped along with the cancel. The previously-
5380        // returned `rx` errors with `Closed`.
5381        let result = tokio::time::timeout(Duration::from_secs(1), rx).await;
5382        let inner = result.expect("must complete within 1s");
5383        assert!(
5384            inner.is_err(),
5385            "receiver after cancel must error (sender dropped)",
5386        );
5387    }
5388
5389    /// Malformed RESPONSE payload: fold returns Ok (does not kill
5390    /// the cortex adapter) and leaves the pending entry intact for
5391    /// the caller's deadline / cancellation path to clean up. Pre-
5392    /// fix a bad payload could either kill the fold or fabricate a
5393    /// synthetic response — both wrong.
5394    #[tokio::test]
5395    async fn client_fold_malformed_response_is_logged_not_fatal() {
5396        let pending = Arc::new(RpcClientPending::new());
5397        let mut fold = RpcClientFold::new(pending.clone());
5398        let rx = pending.register(11, 0);
5399
5400        // Build a malformed RESPONSE: valid meta, garbage tail
5401        // (just `[0xFF]`, which is shorter than the required 2-byte
5402        // status + 1-byte headers count + 4-byte body length).
5403        let meta = EventMeta::new(DISPATCH_RPC_RESPONSE, 0, 1, 11, 0);
5404        let mut buf = Vec::new();
5405        buf.extend_from_slice(&meta.to_bytes());
5406        buf.push(0xFF);
5407        let ev = RedexEvent {
5408            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5409            payload: bytes::Bytes::from(buf),
5410        };
5411        let result = fold.apply(&ev, &mut ());
5412        assert!(
5413            result.is_ok(),
5414            "fold must not return Err on malformed response"
5415        );
5416        // Pending entry NOT cleared — the caller's cancellation
5417        // path will eventually clean it up via `cancel(call_id)`.
5418        assert_eq!(pending.pending_count(), 1);
5419        // Receiver is still pending (not delivered, not closed).
5420        assert!(
5421            tokio::time::timeout(Duration::from_millis(50), rx)
5422                .await
5423                .is_err(),
5424            "receiver should still be parked (no delivery, no drop)",
5425        );
5426    }
5427
5428    /// Re-registering the same call_id replaces the prior sender;
5429    /// the prior `Receiver` errors with `RecvError::Closed`. This
5430    /// is the misuse-detection path — call_ids should be unique
5431    /// per (caller, target) for the lifetime of the call, and a
5432    /// clash surfaces as a hard error rather than silently
5433    /// delivering the response to the wrong waiter.
5434    #[tokio::test]
5435    async fn client_pending_re_register_closes_prior_receiver() {
5436        let pending = Arc::new(RpcClientPending::new());
5437        let rx_a = pending.register(99, 0);
5438        let _rx_b = pending.register(99, 0);
5439        // The first receiver is now closed (sender dropped on
5440        // re-insert).
5441        let result = tokio::time::timeout(Duration::from_secs(1), rx_a).await;
5442        let inner = result.expect("must complete within 1s");
5443        assert!(inner.is_err(), "re-register must close prior receiver");
5444        assert_eq!(pending.pending_count(), 1);
5445    }
5446
5447    /// S-4 part 2 regression: a RESPONSE whose wire `from_node`
5448    /// doesn't match the recorded `target_node` must not resolve
5449    /// the call. Without the gate, any peer with publish access
5450    /// to the caller's reply channel could ship a spoofed
5451    /// response (random call_ids from S-4 part 1 narrow the
5452    /// attack surface, but this gate closes the residual case
5453    /// of an attacker who has observed the victim's call_id via
5454    /// some side channel).
5455    #[tokio::test]
5456    async fn client_pending_drops_response_from_wrong_target() {
5457        let pending = Arc::new(RpcClientPending::new());
5458        let rx = pending.register(0xDEAD_BEEF, 0x42);
5459        let resp = RpcResponsePayload {
5460            status: RpcStatus::Ok,
5461            headers: Vec::new(),
5462            body: Bytes::from_static(b"forged"),
5463        };
5464        // Forged from a different session peer — must drop.
5465        pending.deliver(0xDEAD_BEEF, 0x99, resp.clone());
5466        // Receiver is still parked; pending entry is intact.
5467        let parked = tokio::time::timeout(Duration::from_millis(50), rx).await;
5468        assert!(
5469            parked.is_err(),
5470            "forged RESPONSE from wrong target must not resolve the call"
5471        );
5472        assert_eq!(pending.pending_count(), 1);
5473
5474        // Legitimate RESPONSE from the recorded target resolves.
5475        let rx2 = pending.register(0xCAFE, 0x42);
5476        let ok_resp = RpcResponsePayload {
5477            status: RpcStatus::Ok,
5478            headers: Vec::new(),
5479            body: Bytes::from_static(b"ok"),
5480        };
5481        pending.deliver(0xCAFE, 0x42, ok_resp);
5482        let delivered = tokio::time::timeout(Duration::from_millis(50), rx2)
5483            .await
5484            .expect("must complete")
5485            .expect("must receive");
5486        assert_eq!(delivered.body.as_ref(), b"ok");
5487    }
5488
5489    // ====================================================================
5490    // Phase C — RpcClientPending + RpcClientFold for client-streaming.
5491    // ====================================================================
5492
5493    /// Build a REQUEST_GRANT event for tests. Mirror of
5494    /// `rpc_stream_grant_event` for the request direction.
5495    fn rpc_request_grant_event(caller_origin: u64, call_id: u64, credits: u32) -> RedexEvent {
5496        let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, caller_origin, call_id, 0);
5497        let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
5498        buf.extend_from_slice(&meta.to_bytes());
5499        buf.extend_from_slice(&encode_request_grant(call_id, credits));
5500        RedexEvent {
5501            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5502            payload: bytes::Bytes::from(buf),
5503        }
5504    }
5505
5506    /// `register_client_streaming` returns two halves: a terminal
5507    /// oneshot and a grant mpsc. A terminal RESPONSE resolves the
5508    /// oneshot (same shape as unary delivery); a REQUEST_GRANT
5509    /// for the same call_id pushes its credit onto the mpsc.
5510    #[tokio::test]
5511    async fn client_pending_client_streaming_routes_terminal_and_grants() {
5512        let pending = Arc::new(RpcClientPending::new());
5513        let (terminal_rx, mut grant_rx) = pending.register_client_streaming(0xCAFE_F00D, 0);
5514        // Push two grants — both should land on the mpsc.
5515        pending.deliver_grant(0xCAFE_F00D, 0, 3);
5516        pending.deliver_grant(0xCAFE_F00D, 0, 7);
5517        assert_eq!(grant_rx.recv().await, Some(3));
5518        assert_eq!(grant_rx.recv().await, Some(7));
5519        // Terminal RESPONSE resolves the oneshot and removes the
5520        // entry. Grant mpsc closes too (its sender drops with
5521        // the entry).
5522        let resp = RpcResponsePayload {
5523            status: RpcStatus::Ok,
5524            headers: vec![],
5525            body: Bytes::from_static(b"done"),
5526        };
5527        pending.deliver(0xCAFE_F00D, 0, resp.clone());
5528        let delivered = tokio::time::timeout(Duration::from_millis(50), terminal_rx)
5529            .await
5530            .expect("terminal must complete")
5531            .expect("terminal must receive");
5532        assert_eq!(delivered.body.as_ref(), b"done");
5533        // Grant mpsc now closed.
5534        assert_eq!(grant_rx.recv().await, None);
5535        assert_eq!(pending.pending_count(), 0);
5536    }
5537
5538    /// REQUEST_GRANT from a non-target session peer is dropped
5539    /// without injecting credit. Same S-4-style binding gate as
5540    /// the RESPONSE delivery path — a forged grant on a shared
5541    /// reply channel can't inflate a victim's credit budget.
5542    #[tokio::test]
5543    async fn client_pending_grant_from_wrong_target_is_dropped() {
5544        let pending = Arc::new(RpcClientPending::new());
5545        let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xCAFE_F00D, 0x42);
5546        // Forged grant from a different session peer — must drop.
5547        pending.deliver_grant(0xCAFE_F00D, 0x99, 100);
5548        let parked = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv()).await;
5549        assert!(
5550            parked.is_err(),
5551            "forged REQUEST_GRANT from wrong target must not inject credit"
5552        );
5553        // Legitimate grant from the recorded target lands.
5554        pending.deliver_grant(0xCAFE_F00D, 0x42, 5);
5555        let delivered = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv())
5556            .await
5557            .expect("must complete")
5558            .expect("must receive");
5559        assert_eq!(delivered, 5);
5560    }
5561
5562    /// `deliver_grant` for an unknown call_id is a silent no-op.
5563    /// Same harmless-drop semantics as a STREAM_GRANT for an
5564    /// unknown / non-flow-controlled call (CANCEL/GRANT race is
5565    /// always possible).
5566    #[tokio::test]
5567    async fn client_pending_grant_for_unknown_call_id_is_no_op() {
5568        let pending = Arc::new(RpcClientPending::new());
5569        // No entry registered for this call_id.
5570        pending.deliver_grant(0xDEAD, 0, 42);
5571        // No panics, no entries created.
5572        assert_eq!(pending.pending_count(), 0);
5573    }
5574
5575    /// `deliver_grant` for a unary entry is silently dropped
5576    /// (grants only apply to client-streaming / duplex calls).
5577    #[tokio::test]
5578    async fn client_pending_grant_for_unary_entry_is_no_op() {
5579        let pending = Arc::new(RpcClientPending::new());
5580        let _rx = pending.register(0xDEAD, 0);
5581        pending.deliver_grant(0xDEAD, 0, 42);
5582        // No state changes — entry still pending, no leak.
5583        assert_eq!(pending.pending_count(), 1);
5584    }
5585
5586    /// `RpcClientFold::apply` (legacy / loopback path) routes
5587    /// DISPATCH_RPC_REQUEST_GRANT events through to the matching
5588    /// ClientStreaming entry's grant mpsc. Pins the second
5589    /// dispatch arm the fold gained for Phase C.
5590    #[tokio::test]
5591    async fn client_fold_routes_request_grant_to_registered_waiter() {
5592        let pending = Arc::new(RpcClientPending::new());
5593        let mut fold = RpcClientFold::new(pending.clone());
5594        let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xC0DE, 0);
5595        let ev = rpc_request_grant_event(0xCAFE, 0xC0DE, 9);
5596        fold.apply(&ev, &mut ()).unwrap();
5597        let delivered = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv())
5598            .await
5599            .expect("must complete")
5600            .expect("must receive");
5601        assert_eq!(delivered, 9);
5602    }
5603
5604    /// `RpcClientFold::apply` ignores REQUEST_GRANT events whose
5605    /// payload is malformed (wrong length): no panic, no entry
5606    /// state change, fold returns Ok and keeps going. Mirror of
5607    /// the response-side malformed-payload regression.
5608    #[tokio::test]
5609    async fn client_fold_malformed_request_grant_is_logged_not_fatal() {
5610        let pending = Arc::new(RpcClientPending::new());
5611        let mut fold = RpcClientFold::new(pending.clone());
5612        let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xC0DE, 0);
5613        // Build a GRANT event whose payload is only 4 bytes
5614        // (truncated — codec needs 12).
5615        let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, 0xCAFE, 0xC0DE, 0);
5616        let mut buf = Vec::new();
5617        buf.extend_from_slice(&meta.to_bytes());
5618        buf.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]);
5619        let ev = RedexEvent {
5620            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5621            payload: bytes::Bytes::from(buf),
5622        };
5623        let result = fold.apply(&ev, &mut ());
5624        assert!(
5625            result.is_ok(),
5626            "malformed REQUEST_GRANT must NOT kill the fold"
5627        );
5628        // No credit landed on the mpsc.
5629        let parked = tokio::time::timeout(Duration::from_millis(30), grant_rx.recv()).await;
5630        assert!(
5631            parked.is_err(),
5632            "malformed REQUEST_GRANT must not inject credit"
5633        );
5634    }
5635
5636    /// REQUEST_GRANT frames where the payload `call_id` does NOT
5637    /// agree with `EventMeta::seq_or_ts` must be dropped: the
5638    /// producer is contracted to encode both fields to the same
5639    /// value (see `RpcRequestGrantPayload::call_id` doc), so a
5640    /// mismatch is either a malformed frame or an attempted
5641    /// cross-call credit-injection. Without this check, a peer
5642    /// could publish a GRANT whose meta names one call but whose
5643    /// payload credits a different in-flight call_id.
5644    ///
5645    /// Regression: cubic-dev-ai bot P2 review comment on the
5646    /// `nrpc-streaming` branch.
5647    #[tokio::test]
5648    async fn client_fold_drops_request_grant_with_mismatched_call_ids() {
5649        let pending = Arc::new(RpcClientPending::new());
5650        let mut fold = RpcClientFold::new(pending.clone());
5651        let (_terminal_rx_victim, mut grant_rx_victim) =
5652            pending.register_client_streaming(0xC0DE, 0);
5653        let (_terminal_rx_other, mut grant_rx_other) = pending.register_client_streaming(0xBEEF, 0);
5654
5655        // Build a hand-rolled frame: meta names call 0xC0DE,
5656        // payload encodes credit for call 0xBEEF. Either the
5657        // producer is broken or this is a forged frame; the
5658        // consumer must drop, not deliver.
5659        let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, 0xCAFE, 0xC0DE, 0);
5660        let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
5661        buf.extend_from_slice(&meta.to_bytes());
5662        buf.extend_from_slice(&encode_request_grant(0xBEEF, 5));
5663        let ev = RedexEvent {
5664            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5665            payload: bytes::Bytes::from(buf),
5666        };
5667        fold.apply(&ev, &mut ()).unwrap();
5668
5669        let parked_victim =
5670            tokio::time::timeout(Duration::from_millis(30), grant_rx_victim.recv()).await;
5671        assert!(
5672            parked_victim.is_err(),
5673            "mismatched REQUEST_GRANT must not credit the call named in meta",
5674        );
5675        let parked_other =
5676            tokio::time::timeout(Duration::from_millis(30), grant_rx_other.recv()).await;
5677        assert!(
5678            parked_other.is_err(),
5679            "mismatched REQUEST_GRANT must not credit the call named in payload either",
5680        );
5681    }
5682
5683    // ====================================================================
5684    // RpcServerStreamingFold — coverage for the multi-fire emit path.
5685    //
5686    // The streaming fold is the most complex code in this file:
5687    //   - Per-call cancellation token (same as unary)
5688    //   - Pump task that drains an mpsc and awaits each emit to
5689    //     enforce per-call ordering
5690    //   - Optional flow-control semaphore (caller-set window +
5691    //     STREAM_GRANT credit refills)
5692    //   - Terminal-frame emission with CANCEL-wins override
5693    //
5694    // These tests pin each branch: ordered chunks + clean EOF;
5695    // application error after partial stream; panic surfacing as
5696    // Internal; CANCEL flipping the cancellation token AND being
5697    // surfaced as the terminal status; STREAM_GRANT permits;
5698    // duplicate-REQUEST refusal.
5699    // ====================================================================
5700
5701    /// Build an async `RpcAsyncResponseEmitter` that captures every
5702    /// emit into a shared Vec. Streaming fold tests use this to
5703    /// inspect the multi-frame emit pattern.
5704    fn capturing_async_emitter() -> (RpcAsyncResponseEmitter, CapturedResponses) {
5705        let captured: CapturedResponses = Arc::new(Mutex::new(Vec::new()));
5706        let captured_clone = captured.clone();
5707        let emit: RpcAsyncResponseEmitter = Arc::new(move |origin, call_id, resp| {
5708            let captured_clone = captured_clone.clone();
5709            Box::pin(async move {
5710                captured_clone.lock().push((origin, call_id, resp));
5711            })
5712        });
5713        (emit, captured)
5714    }
5715
5716    /// Synthesize a STREAM_GRANT event for a `(caller_origin, call_id)`
5717    /// asking for `n` additional credits.
5718    fn rpc_stream_grant_event(caller_origin: u64, call_id: u64, n: u32) -> RedexEvent {
5719        let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, caller_origin, call_id, 0);
5720        let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
5721        buf.extend_from_slice(&meta.to_bytes());
5722        buf.extend_from_slice(&encode_stream_grant(n));
5723        RedexEvent {
5724            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5725            payload: bytes::Bytes::from(buf),
5726        }
5727    }
5728
5729    /// Streaming handler that emits N chunks and returns Ok. The
5730    /// caller-side test asserts (a) all N chunks arrive in order
5731    /// with the `nrpc-streaming: continue` header, (b) a final
5732    /// terminal frame with `nrpc-streaming: end` follows.
5733    #[tokio::test]
5734    async fn streaming_fold_emits_chunks_in_order_and_clean_terminal() {
5735        struct CountingHandler {
5736            n: usize,
5737        }
5738        #[async_trait::async_trait]
5739        impl RpcStreamingHandler for CountingHandler {
5740            async fn call(
5741                &self,
5742                _ctx: RpcContext,
5743                sink: RpcResponseSink,
5744            ) -> Result<(), RpcHandlerError> {
5745                for i in 0..self.n {
5746                    sink.send(format!("chunk-{i}").into_bytes());
5747                }
5748                Ok(())
5749            }
5750        }
5751        let (emit, captured) = capturing_async_emitter();
5752        let mut fold = RpcServerStreamingFold::new(Arc::new(CountingHandler { n: 5 }), emit);
5753        let req = RpcRequestPayload {
5754            service: "stream".to_string(),
5755            deadline_ns: 0,
5756            flags: FLAG_RPC_STREAMING_RESPONSE,
5757            headers: vec![],
5758            body: Bytes::new(),
5759        };
5760        fold.apply(&rpc_request_event(11, 22, req), &mut ())
5761            .unwrap();
5762        // 5 continue chunks + 1 terminal end frame.
5763        assert!(
5764            wait_until(|| captured.lock().len() == 6, Duration::from_secs(2)).await,
5765            "expected 6 frames (5 chunks + terminal end), got {}",
5766            captured.lock().len(),
5767        );
5768        let captured = captured.lock();
5769        for (i, (_, _, resp)) in captured.iter().take(5).enumerate() {
5770            assert_eq!(resp.status, RpcStatus::Ok);
5771            // continue header on every non-terminal chunk
5772            let hdr = resp
5773                .headers
5774                .iter()
5775                .find(|(n, _)| n == HEADER_NRPC_STREAMING)
5776                .expect("streaming header present");
5777            assert_eq!(hdr.1.as_slice(), HEADER_NRPC_STREAMING_CONTINUE);
5778            assert_eq!(resp.body, format!("chunk-{i}").into_bytes());
5779        }
5780        // Terminal frame
5781        let (_, _, term) = captured.last().unwrap();
5782        assert_eq!(term.status, RpcStatus::Ok);
5783        let hdr = term
5784            .headers
5785            .iter()
5786            .find(|(n, _)| n == HEADER_NRPC_STREAMING)
5787            .expect("terminal must have streaming header");
5788        assert_eq!(hdr.1.as_slice(), HEADER_NRPC_STREAMING_END);
5789        assert!(term.body.is_empty());
5790    }
5791
5792    /// Handler returns `Err(Internal)` after sending 2 chunks. Caller
5793    /// must see (a) both chunks with the continue header, (b) a
5794    /// terminal frame carrying `RpcStatus::Internal` (NOT the end
5795    /// marker — the terminal-error path drops the header).
5796    #[tokio::test]
5797    async fn streaming_fold_terminal_error_after_partial_stream() {
5798        struct PartialErrHandler;
5799        #[async_trait::async_trait]
5800        impl RpcStreamingHandler for PartialErrHandler {
5801            async fn call(
5802                &self,
5803                _ctx: RpcContext,
5804                sink: RpcResponseSink,
5805            ) -> Result<(), RpcHandlerError> {
5806                sink.send(b"first".to_vec());
5807                sink.send(b"second".to_vec());
5808                Err(RpcHandlerError::Internal("ran out of fuel".into()))
5809            }
5810        }
5811        let (emit, captured) = capturing_async_emitter();
5812        let mut fold = RpcServerStreamingFold::new(Arc::new(PartialErrHandler), emit);
5813        let req = RpcRequestPayload {
5814            service: "x".to_string(),
5815            deadline_ns: 0,
5816            flags: FLAG_RPC_STREAMING_RESPONSE,
5817            headers: vec![],
5818            body: Bytes::new(),
5819        };
5820        fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
5821        assert!(
5822            wait_until(|| captured.lock().len() == 3, Duration::from_secs(2)).await,
5823            "expected 2 chunks + 1 terminal error",
5824        );
5825        let captured = captured.lock();
5826        assert_eq!(captured[0].2.body.as_ref(), b"first");
5827        assert_eq!(captured[1].2.body.as_ref(), b"second");
5828        let (_, _, term) = &captured[2];
5829        assert_eq!(term.status, RpcStatus::Internal);
5830        assert!(
5831            String::from_utf8_lossy(&term.body).contains("ran out of fuel"),
5832            "diagnostic must round-trip, got {:?}",
5833            String::from_utf8_lossy(&term.body),
5834        );
5835    }
5836
5837    /// Handler panics. The fold's `catch_unwind` surfaces it as a
5838    /// terminal `RpcStatus::Internal` rather than killing the
5839    /// runtime.
5840    #[tokio::test]
5841    async fn streaming_fold_handler_panic_surfaces_as_internal_terminal() {
5842        struct PanicHandler;
5843        #[async_trait::async_trait]
5844        impl RpcStreamingHandler for PanicHandler {
5845            async fn call(
5846                &self,
5847                _ctx: RpcContext,
5848                _sink: RpcResponseSink,
5849            ) -> Result<(), RpcHandlerError> {
5850                panic!("kaboom in streaming handler");
5851            }
5852        }
5853        let (emit, captured) = capturing_async_emitter();
5854        let mut fold = RpcServerStreamingFold::new(Arc::new(PanicHandler), emit);
5855        let req = RpcRequestPayload {
5856            service: "x".to_string(),
5857            deadline_ns: 0,
5858            flags: FLAG_RPC_STREAMING_RESPONSE,
5859            headers: vec![],
5860            body: Bytes::new(),
5861        };
5862        fold.apply(&rpc_request_event(1, 2, req), &mut ()).unwrap();
5863        assert!(
5864            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
5865            "panic must surface as a terminal frame",
5866        );
5867        let captured = captured.lock();
5868        assert_eq!(captured.len(), 1);
5869        let (_, _, resp) = &captured[0];
5870        assert_eq!(resp.status, RpcStatus::Internal);
5871        assert!(
5872            String::from_utf8_lossy(&resp.body).contains("kaboom"),
5873            "panic message must surface, got {:?}",
5874            String::from_utf8_lossy(&resp.body),
5875        );
5876    }
5877
5878    /// CANCEL during a streaming call overrides the terminal frame
5879    /// with `RpcStatus::Cancelled` — same CANCEL-wins ordering as
5880    /// the unary fold.
5881    #[tokio::test]
5882    async fn streaming_fold_cancel_overrides_terminal_with_cancelled() {
5883        struct CooperativeHandler;
5884        #[async_trait::async_trait]
5885        impl RpcStreamingHandler for CooperativeHandler {
5886            async fn call(
5887                &self,
5888                ctx: RpcContext,
5889                sink: RpcResponseSink,
5890            ) -> Result<(), RpcHandlerError> {
5891                sink.send(b"chunk-0".to_vec());
5892                tokio::select! {
5893                    _ = ctx.cancellation.cancelled() => Ok(()),
5894                    _ = tokio::time::sleep(Duration::from_secs(5)) => Ok(()),
5895                }
5896            }
5897        }
5898        let (emit, captured) = capturing_async_emitter();
5899        let mut fold = RpcServerStreamingFold::new(Arc::new(CooperativeHandler), emit);
5900        let req = RpcRequestPayload {
5901            service: "x".to_string(),
5902            deadline_ns: 0,
5903            flags: FLAG_RPC_STREAMING_RESPONSE,
5904            headers: vec![],
5905            body: Bytes::new(),
5906        };
5907        fold.apply(&rpc_request_event(7, 13, req), &mut ()).unwrap();
5908        // Wait until at least the first chunk is captured AND the
5909        // handler is parked (in_flight key present), then CANCEL.
5910        assert!(
5911            wait_until(
5912                || !captured.lock().is_empty() && fold.in_flight_keys().contains(&(7, 13)),
5913                Duration::from_secs(2)
5914            )
5915            .await
5916        );
5917        fold.apply(&rpc_cancel_event(7, 13), &mut ()).unwrap();
5918        // Wait for the terminal frame.
5919        assert!(
5920            wait_until(|| captured.lock().len() >= 2, Duration::from_secs(2)).await,
5921            "expected first chunk + terminal frame",
5922        );
5923        let captured = captured.lock();
5924        // First emit was the chunk; the LAST should be the
5925        // Cancelled terminal.
5926        assert_eq!(
5927            captured.last().unwrap().2.status,
5928            RpcStatus::Cancelled,
5929            "CANCEL must override terminal status",
5930        );
5931    }
5932
5933    /// Duplicate REQUEST with the same `(origin, call_id)` is
5934    /// refused with a synthetic Internal terminal frame and does
5935    /// NOT spawn a second handler. Mirror of the unary fold's
5936    /// regression at server_fold_duplicate_request_refuses_*.
5937    #[tokio::test]
5938    async fn streaming_fold_duplicate_request_refuses_without_double_dispatch() {
5939        let invocations = Arc::new(AtomicUsize::new(0));
5940        struct CountingHandler {
5941            invocations: Arc<AtomicUsize>,
5942        }
5943        #[async_trait::async_trait]
5944        impl RpcStreamingHandler for CountingHandler {
5945            async fn call(
5946                &self,
5947                _ctx: RpcContext,
5948                sink: RpcResponseSink,
5949            ) -> Result<(), RpcHandlerError> {
5950                self.invocations.fetch_add(1, Ordering::SeqCst);
5951                tokio::time::sleep(Duration::from_millis(80)).await;
5952                sink.send(b"chunk".to_vec());
5953                Ok(())
5954            }
5955        }
5956        let (emit, captured) = capturing_async_emitter();
5957        let mut fold = RpcServerStreamingFold::new(
5958            Arc::new(CountingHandler {
5959                invocations: invocations.clone(),
5960            }),
5961            emit,
5962        );
5963        let req = RpcRequestPayload {
5964            service: "x".to_string(),
5965            deadline_ns: 0,
5966            flags: FLAG_RPC_STREAMING_RESPONSE,
5967            headers: vec![],
5968            body: Bytes::new(),
5969        };
5970        fold.apply(&rpc_request_event(1, 99, req.clone()), &mut ())
5971            .unwrap();
5972        assert!(
5973            wait_until(
5974                || fold.in_flight_keys().contains(&(1, 99)),
5975                Duration::from_secs(1)
5976            )
5977            .await
5978        );
5979        // Duplicate REQUEST — must emit a synthetic Internal
5980        // terminal and not invoke the handler a second time.
5981        fold.apply(&rpc_request_event(1, 99, req), &mut ()).unwrap();
5982        assert!(
5983            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(1)).await,
5984            "synthetic refusal should be emitted",
5985        );
5986        // First emit (chronologically) is the synthetic refusal.
5987        let refusal = captured.lock()[0].clone();
5988        assert_eq!(refusal.2.status, RpcStatus::Internal);
5989        assert!(String::from_utf8_lossy(&refusal.2.body).contains("duplicate"));
5990        // Wait for the original handler to complete (chunk + terminal).
5991        assert!(
5992            wait_until(|| captured.lock().len() >= 3, Duration::from_secs(2)).await,
5993            "first handler should still complete normally",
5994        );
5995        assert_eq!(
5996            invocations.load(Ordering::SeqCst),
5997            1,
5998            "duplicate REQUEST must NOT spawn a second handler",
5999        );
6000    }
6001
6002    /// STREAM_GRANT for an unknown call_id is silently dropped
6003    /// (no panic, no tracing event escalation). Pin the
6004    /// always-safe behavior so a misbehaving caller (or a CANCEL/
6005    /// GRANT race) can't crash the fold.
6006    #[tokio::test]
6007    async fn streaming_fold_grant_for_unknown_call_id_is_no_op() {
6008        struct NoopHandler;
6009        #[async_trait::async_trait]
6010        impl RpcStreamingHandler for NoopHandler {
6011            async fn call(
6012                &self,
6013                _ctx: RpcContext,
6014                _sink: RpcResponseSink,
6015            ) -> Result<(), RpcHandlerError> {
6016                Ok(())
6017            }
6018        }
6019        let (emit, captured) = capturing_async_emitter();
6020        let mut fold = RpcServerStreamingFold::new(Arc::new(NoopHandler), emit);
6021        let result = fold.apply(&rpc_stream_grant_event(99, 42, 5), &mut ());
6022        assert!(result.is_ok(), "GRANT for unknown call_id must be Ok");
6023        assert!(captured.lock().is_empty(), "no emit for unknown GRANT");
6024    }
6025
6026    /// Regression for M20: the streaming pump's mpsc is bounded
6027    /// at `STREAMING_PUMP_CAPACITY`. A handler that produces
6028    /// chunks faster than the pump drains gets its excess
6029    /// `sink.send(...)` calls silently dropped (matching the
6030    /// "caller cancelled" semantic) — and the metric counter
6031    /// `streaming_chunks_dropped_total` increments.
6032    ///
6033    /// We construct the sink directly with a tiny bounded mpsc
6034    /// (capacity 2) and a metrics handle, then call `send` 5
6035    /// times without a receiver. The first 2 fit in the channel;
6036    /// the next 3 are dropped and counted.
6037    #[tokio::test]
6038    async fn streaming_sink_drops_on_full_and_increments_metric() {
6039        use crate::adapter::net::mesh_rpc_metrics::{RpcMetricsRegistry, ServiceMetricsAtomic};
6040        // Tiny channel to make overflow easy to observe.
6041        let (tx, _rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(2);
6042        let registry = RpcMetricsRegistry::new();
6043        let metrics: Arc<ServiceMetricsAtomic> = registry.for_service("drop_test");
6044        let sink = RpcResponseSink {
6045            inner: tx,
6046            metrics: Some(metrics.clone()),
6047        };
6048        // 5 sends; first 2 buffer, next 3 drop.
6049        for i in 0..5u8 {
6050            sink.send(vec![i]);
6051        }
6052        assert_eq!(
6053            metrics
6054                .streaming_chunks_dropped_total
6055                .load(Ordering::Relaxed),
6056            3,
6057            "expected 3 dropped chunks (capacity=2, sent 5)",
6058        );
6059    }
6060
6061    /// Malformed REQUEST payload on the streaming fold: emits one
6062    /// terminal `UnknownVersion` frame and continues — same
6063    /// keep-the-adapter-alive contract as the unary fold.
6064    #[tokio::test]
6065    async fn streaming_fold_malformed_payload_emits_unknown_version_terminal() {
6066        struct NoopHandler;
6067        #[async_trait::async_trait]
6068        impl RpcStreamingHandler for NoopHandler {
6069            async fn call(
6070                &self,
6071                _ctx: RpcContext,
6072                _sink: RpcResponseSink,
6073            ) -> Result<(), RpcHandlerError> {
6074                Ok(())
6075            }
6076        }
6077        let (emit, captured) = capturing_async_emitter();
6078        let mut fold = RpcServerStreamingFold::new(Arc::new(NoopHandler), emit);
6079        // Garbage tail: valid meta + 0x00 svc_len → Truncated.
6080        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, 1, 1, 0);
6081        let mut buf = Vec::new();
6082        buf.extend_from_slice(&meta.to_bytes());
6083        buf.push(0x00);
6084        let ev = RedexEvent {
6085            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
6086            payload: bytes::Bytes::from(buf),
6087        };
6088        let result = fold.apply(&ev, &mut ());
6089        assert!(
6090            result.is_ok(),
6091            "malformed payload must NOT kill the adapter",
6092        );
6093        assert!(
6094            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6095            "synthetic UnknownVersion terminal must arrive",
6096        );
6097        let captured = captured.lock();
6098        assert_eq!(captured[0].2.status, RpcStatus::UnknownVersion);
6099        let hdr = captured[0]
6100            .2
6101            .headers
6102            .iter()
6103            .find(|(n, _)| n == HEADER_NRPC_STREAMING);
6104        assert!(hdr.is_some(), "malformed terminal must include end marker");
6105    }
6106
6107    // ====================================================================
6108    // Phase B — RpcStreamingRequestFold (server-side client-streaming)
6109    // ====================================================================
6110
6111    /// Build a REQUEST_CHUNK event for tests. Mirrors
6112    /// `rpc_request_event` / `rpc_stream_grant_event` shape.
6113    fn rpc_request_chunk_event(
6114        caller_origin: u64,
6115        call_id: u64,
6116        flags: u16,
6117        body: Vec<u8>,
6118    ) -> RedexEvent {
6119        let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, caller_origin, call_id, 0);
6120        let payload = RpcRequestChunkPayload {
6121            call_id,
6122            flags,
6123            headers: vec![],
6124            body: body.into(),
6125        };
6126        let mut buf = Vec::new();
6127        buf.extend_from_slice(&meta.to_bytes());
6128        buf.extend_from_slice(&payload.encode());
6129        RedexEvent {
6130            entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
6131            payload: bytes::Bytes::from(buf),
6132        }
6133    }
6134
6135    /// Collecting client-streaming handler: drains the stream into
6136    /// a Vec, returns an Ok response whose body is the count of
6137    /// chunks seen (8-byte LE). Captured chunk bodies are exposed
6138    /// via the `Arc<Mutex<Vec<Bytes>>>` so tests can assert
6139    /// ordering and content.
6140    struct CollectingClientStreamHandler {
6141        seen: Arc<Mutex<Vec<bytes::Bytes>>>,
6142        observed_cancel: Arc<AtomicBool>,
6143    }
6144    #[async_trait::async_trait]
6145    impl RpcClientStreamingHandler for CollectingClientStreamHandler {
6146        async fn call(
6147            &self,
6148            ctx: RpcStreamingContext,
6149            mut requests: RequestStream,
6150        ) -> Result<RpcResponsePayload, RpcHandlerError> {
6151            use futures::StreamExt;
6152            while let Some(chunk) = requests.next().await {
6153                self.seen.lock().push(chunk);
6154            }
6155            // Re-check cancellation after EOF so the test can
6156            // distinguish "clean REQUEST_END" from "CANCEL closed
6157            // the stream early".
6158            if ctx.cancellation.is_cancelled() {
6159                self.observed_cancel
6160                    .store(true, std::sync::atomic::Ordering::SeqCst);
6161            }
6162            let count = self.seen.lock().len() as u64;
6163            Ok(RpcResponsePayload {
6164                status: RpcStatus::Ok,
6165                headers: vec![],
6166                body: Bytes::copy_from_slice(&count.to_le_bytes()),
6167            })
6168        }
6169    }
6170
6171    /// 1/6 — happy path: REQUEST + 3 REQUEST_CHUNKs (last has
6172    /// FLAG_END) delivers 4 bodies to the handler in order; the
6173    /// fold emits exactly one terminal RESPONSE carrying the
6174    /// handler's reply.
6175    #[tokio::test]
6176    async fn streaming_request_fold_collects_all_chunks_and_emits_terminal_response() {
6177        let seen = Arc::new(Mutex::new(Vec::new()));
6178        let observed_cancel = Arc::new(AtomicBool::new(false));
6179        let (emit, captured) = capturing_emitter();
6180        let mut fold = RpcStreamingRequestFold::new(
6181            Arc::new(CollectingClientStreamHandler {
6182                seen: seen.clone(),
6183                observed_cancel: observed_cancel.clone(),
6184            }),
6185            emit,
6186        );
6187        // REQUEST with the client-streaming flag, body = "a".
6188        let req = RpcRequestPayload {
6189            service: "agg".to_string(),
6190            deadline_ns: 0,
6191            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6192            headers: vec![],
6193            body: Bytes::from_static(b"a"),
6194        };
6195        fold.apply(&rpc_request_event(0xCAFE, 7, req), &mut ())
6196            .unwrap();
6197        // Wait until the sender is registered (handler task has
6198        // picked up the request and the apply path completed).
6199        assert!(
6200            wait_until(
6201                || fold.sender_keys().contains(&(0xCAFE, 7)),
6202                Duration::from_secs(1)
6203            )
6204            .await
6205        );
6206        // Three more chunks; last sets FLAG_REQUEST_END.
6207        fold.apply(
6208            &rpc_request_chunk_event(0xCAFE, 7, 0, b"b".to_vec()),
6209            &mut (),
6210        )
6211        .unwrap();
6212        fold.apply(
6213            &rpc_request_chunk_event(0xCAFE, 7, 0, b"c".to_vec()),
6214            &mut (),
6215        )
6216        .unwrap();
6217        fold.apply(
6218            &rpc_request_chunk_event(0xCAFE, 7, FLAG_RPC_REQUEST_END, b"d".to_vec()),
6219            &mut (),
6220        )
6221        .unwrap();
6222        // Handler should observe 4 bodies in order and emit one
6223        // terminal RESPONSE whose body encodes the count.
6224        assert!(
6225            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6226            "expected terminal RESPONSE"
6227        );
6228        let captured = captured.lock();
6229        assert_eq!(captured.len(), 1, "exactly one terminal RESPONSE");
6230        let (origin, call_id, resp) = &captured[0];
6231        assert_eq!(*origin, 0xCAFE);
6232        assert_eq!(*call_id, 7);
6233        assert_eq!(resp.status, RpcStatus::Ok);
6234        assert_eq!(resp.body.as_ref(), 4u64.to_le_bytes());
6235        // And the chunks landed in order.
6236        let seen = seen.lock();
6237        let collected: Vec<&[u8]> = seen.iter().map(|b| b.as_ref()).collect();
6238        assert_eq!(collected, vec![b"a", b"b", b"c", b"d"]);
6239        assert!(
6240            !observed_cancel.load(std::sync::atomic::Ordering::SeqCst),
6241            "clean REQUEST_END must NOT register as a cancellation"
6242        );
6243    }
6244
6245    /// 2/6 — degenerate case: initial REQUEST with both the
6246    /// client-streaming AND request-end flags set. Handler sees
6247    /// exactly one body (the REQUEST's own body) and EOF — the
6248    /// "one-item upload" fast path that saves a trailing CHUNK
6249    /// event.
6250    #[tokio::test]
6251    async fn streaming_request_fold_initial_request_with_end_flag_yields_single_item() {
6252        let seen = Arc::new(Mutex::new(Vec::new()));
6253        let observed_cancel = Arc::new(AtomicBool::new(false));
6254        let (emit, captured) = capturing_emitter();
6255        let mut fold = RpcStreamingRequestFold::new(
6256            Arc::new(CollectingClientStreamHandler {
6257                seen: seen.clone(),
6258                observed_cancel,
6259            }),
6260            emit,
6261        );
6262        let req = RpcRequestPayload {
6263            service: "agg".to_string(),
6264            deadline_ns: 0,
6265            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6266            headers: vec![],
6267            body: Bytes::from_static(b"only"),
6268        };
6269        fold.apply(&rpc_request_event(1, 42, req), &mut ()).unwrap();
6270        assert!(
6271            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6272            "expected terminal RESPONSE"
6273        );
6274        let captured = captured.lock();
6275        assert_eq!(captured.len(), 1);
6276        assert_eq!(captured[0].2.status, RpcStatus::Ok);
6277        assert_eq!(captured[0].2.body.as_ref(), 1u64.to_le_bytes());
6278        assert_eq!(
6279            seen.lock()
6280                .iter()
6281                .map(|b| b.as_ref())
6282                .collect::<Vec<&[u8]>>(),
6283            vec![b"only" as &[u8]]
6284        );
6285        // Sender must NOT have been registered (initial-REQUEST-
6286        // with-END skips the map insert).
6287        assert!(fold.sender_keys().is_empty());
6288    }
6289
6290    /// 3/6 — CANCEL closes the request stream early, flips the
6291    /// cancellation token, and the spawned task overrides the
6292    /// handler's terminal with `RpcStatus::Cancelled`.
6293    #[tokio::test]
6294    async fn streaming_request_fold_cancel_closes_stream_and_overrides_terminal() {
6295        let seen = Arc::new(Mutex::new(Vec::new()));
6296        let observed_cancel = Arc::new(AtomicBool::new(false));
6297        let (emit, captured) = capturing_emitter();
6298        let mut fold = RpcStreamingRequestFold::new(
6299            Arc::new(CollectingClientStreamHandler {
6300                seen: seen.clone(),
6301                observed_cancel: observed_cancel.clone(),
6302            }),
6303            emit,
6304        );
6305        let req = RpcRequestPayload {
6306            service: "agg".to_string(),
6307            deadline_ns: 0,
6308            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6309            headers: vec![],
6310            body: Bytes::from_static(b"first"),
6311        };
6312        fold.apply(&rpc_request_event(2, 17, req), &mut ()).unwrap();
6313        // Wait for the handler to register, then one in-flight
6314        // CHUNK, then CANCEL before the handler ever finishes
6315        // draining.
6316        assert!(
6317            wait_until(
6318                || fold.sender_keys().contains(&(2, 17)),
6319                Duration::from_secs(1)
6320            )
6321            .await
6322        );
6323        fold.apply(
6324            &rpc_request_chunk_event(2, 17, 0, b"second".to_vec()),
6325            &mut (),
6326        )
6327        .unwrap();
6328        fold.apply(&rpc_cancel_event(2, 17), &mut ()).unwrap();
6329        // Terminal must arrive and must be Cancelled (CANCEL-wins
6330        // ordering, same as the response-side fold).
6331        assert!(
6332            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6333            "expected terminal RESPONSE"
6334        );
6335        let captured = captured.lock();
6336        assert_eq!(captured.len(), 1);
6337        assert_eq!(
6338            captured[0].2.status,
6339            RpcStatus::Cancelled,
6340            "CANCEL must override terminal status"
6341        );
6342        assert!(
6343            observed_cancel.load(std::sync::atomic::Ordering::SeqCst),
6344            "handler must observe cancellation token after stream EOF"
6345        );
6346        // Both maps must be clean post-cancel.
6347        assert!(fold.in_flight_keys().is_empty());
6348        assert!(fold.sender_keys().is_empty());
6349    }
6350
6351    /// 4/6 — handler returns `Err(RpcHandlerError::Application)`
6352    /// → terminal RESPONSE carries the application status code +
6353    /// message body.
6354    #[tokio::test]
6355    async fn streaming_request_fold_application_error_round_trips() {
6356        struct AppErrHandler;
6357        #[async_trait::async_trait]
6358        impl RpcClientStreamingHandler for AppErrHandler {
6359            async fn call(
6360                &self,
6361                _ctx: RpcStreamingContext,
6362                mut requests: RequestStream,
6363            ) -> Result<RpcResponsePayload, RpcHandlerError> {
6364                use futures::StreamExt;
6365                // Drain so the stream's EOF doesn't race the
6366                // error return.
6367                while requests.next().await.is_some() {}
6368                Err(RpcHandlerError::Application {
6369                    code: 0xBEEF,
6370                    message: "bad input".to_string(),
6371                })
6372            }
6373        }
6374        let (emit, captured) = capturing_emitter();
6375        let mut fold = RpcStreamingRequestFold::new(Arc::new(AppErrHandler), emit);
6376        let req = RpcRequestPayload {
6377            service: "agg".to_string(),
6378            deadline_ns: 0,
6379            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6380            headers: vec![],
6381            body: Bytes::new(),
6382        };
6383        fold.apply(&rpc_request_event(3, 100, req), &mut ())
6384            .unwrap();
6385        assert!(
6386            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6387            "expected terminal RESPONSE"
6388        );
6389        let captured = captured.lock();
6390        assert_eq!(captured.len(), 1);
6391        assert_eq!(captured[0].2.status, RpcStatus::Application(0xBEEF));
6392        assert_eq!(captured[0].2.body.as_ref(), b"bad input");
6393    }
6394
6395    /// 5/6 — handler panic is caught by `catch_unwind`; terminal
6396    /// surfaces as `Internal` carrying the panic message. Same
6397    /// contract as the existing folds — a misbehaving handler
6398    /// can't take down the cortex adapter.
6399    #[tokio::test]
6400    async fn streaming_request_fold_handler_panic_surfaces_as_internal() {
6401        struct PanickyHandler;
6402        #[async_trait::async_trait]
6403        impl RpcClientStreamingHandler for PanickyHandler {
6404            async fn call(
6405                &self,
6406                _ctx: RpcStreamingContext,
6407                _requests: RequestStream,
6408            ) -> Result<RpcResponsePayload, RpcHandlerError> {
6409                panic!("intentional test panic");
6410            }
6411        }
6412        let (emit, captured) = capturing_emitter();
6413        let mut fold = RpcStreamingRequestFold::new(Arc::new(PanickyHandler), emit);
6414        let req = RpcRequestPayload {
6415            service: "agg".to_string(),
6416            deadline_ns: 0,
6417            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6418            headers: vec![],
6419            body: Bytes::new(),
6420        };
6421        fold.apply(&rpc_request_event(4, 200, req), &mut ())
6422            .unwrap();
6423        assert!(
6424            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6425            "expected terminal RESPONSE"
6426        );
6427        let captured = captured.lock();
6428        assert_eq!(captured.len(), 1);
6429        assert_eq!(captured[0].2.status, RpcStatus::Internal);
6430        assert!(
6431            String::from_utf8_lossy(&captured[0].2.body).contains("intentional test panic"),
6432            "panic body should carry the panic message"
6433        );
6434    }
6435
6436    /// 6/6 — duplicate REQUEST with the same `(origin, call_id)`
6437    /// is refused with a synthetic `Internal` terminal frame and
6438    /// does NOT spawn a second handler. Mirror of the regression
6439    /// pinned in the unary + response-streaming folds.
6440    #[tokio::test]
6441    async fn streaming_request_fold_duplicate_request_refuses_without_double_dispatch() {
6442        let invocations = Arc::new(AtomicUsize::new(0));
6443        struct CountingHandler {
6444            invocations: Arc<AtomicUsize>,
6445        }
6446        #[async_trait::async_trait]
6447        impl RpcClientStreamingHandler for CountingHandler {
6448            async fn call(
6449                &self,
6450                _ctx: RpcStreamingContext,
6451                mut requests: RequestStream,
6452            ) -> Result<RpcResponsePayload, RpcHandlerError> {
6453                use futures::StreamExt;
6454                self.invocations.fetch_add(1, Ordering::SeqCst);
6455                // Slow handler to keep the call in-flight while
6456                // the duplicate REQUEST arrives.
6457                tokio::time::sleep(Duration::from_millis(80)).await;
6458                while requests.next().await.is_some() {}
6459                Ok(RpcResponsePayload {
6460                    status: RpcStatus::Ok,
6461                    headers: vec![],
6462                    body: Bytes::new(),
6463                })
6464            }
6465        }
6466        let (emit, captured) = capturing_emitter();
6467        let mut fold = RpcStreamingRequestFold::new(
6468            Arc::new(CountingHandler {
6469                invocations: invocations.clone(),
6470            }),
6471            emit,
6472        );
6473        let req = RpcRequestPayload {
6474            service: "agg".to_string(),
6475            deadline_ns: 0,
6476            flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6477            headers: vec![],
6478            body: Bytes::new(),
6479        };
6480        fold.apply(&rpc_request_event(5, 99, req.clone()), &mut ())
6481            .unwrap();
6482        assert!(
6483            wait_until(
6484                || fold.in_flight_keys().contains(&(5, 99)),
6485                Duration::from_secs(1)
6486            )
6487            .await
6488        );
6489        // Duplicate REQUEST: synthetic Internal terminal emitted,
6490        // handler invocation count must stay at 1.
6491        fold.apply(&rpc_request_event(5, 99, req), &mut ()).unwrap();
6492        assert!(
6493            wait_until(|| !captured.lock().is_empty(), Duration::from_secs(1)).await,
6494            "synthetic refusal terminal expected"
6495        );
6496        let refusal = captured.lock()[0].clone();
6497        assert_eq!(refusal.2.status, RpcStatus::Internal);
6498        assert!(String::from_utf8_lossy(&refusal.2.body).contains("duplicate"));
6499        // Finish the first handler so its terminal lands too.
6500        fold.apply(
6501            &rpc_request_chunk_event(5, 99, FLAG_RPC_REQUEST_END, vec![]),
6502            &mut (),
6503        )
6504        .unwrap();
6505        assert!(
6506            wait_until(|| captured.lock().len() >= 2, Duration::from_secs(2)).await,
6507            "first handler should still complete normally"
6508        );
6509        assert_eq!(
6510            invocations.load(Ordering::SeqCst),
6511            1,
6512            "duplicate REQUEST must NOT spawn a second handler",
6513        );
6514    }
6515}