net/adapter/net/cortex/rpc.rs
1//! nRPC — request/response on top of CortEX folds.
2//!
3//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
4//! In short: an RPC server is a `RedexFold` whose state is the
5//! in-flight call set, whose events are typed `(REQUEST, RESPONSE,
6//! CANCEL, DEADLINE_EXCEEDED)`, whose `EventMeta::seq_or_ts` is the
7//! correlation id, and whose `EventMeta::origin_hash` is the
8//! AEAD-verified caller. The mesh-channel layer's queue-group
9//! subscription mode (see `channel::SubscriptionMode`) does the
10//! one-of-N work distribution across replica servers.
11//!
12//! This module is the **wire codec layer**: dispatch constants for
13//! `EventMeta::dispatch`, payload structs for `RpcRequestPayload` /
14//! `RpcResponsePayload`, and the `RpcStatus` enumeration. The fold
15//! types and the `Mesh::serve_rpc` / `Mesh::call` glue layer build
16//! on top.
17
18use bytes::{Buf, BufMut, Bytes};
19use parking_lot::Mutex;
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use tokio::sync::Notify;
24
25use super::super::redex::{RedexError, RedexEvent, RedexFold};
26use super::meta::{EventMeta, EVENT_META_SIZE};
27
28// ============================================================================
29// `EventMeta::dispatch` byte assignments for nRPC.
30//
31// All four values live in the cortex-internal range (`0x00..0x7F`).
32// Application/vendor dispatches stay in `0x80..0xFF`. Adapters that
33// don't care about RPC ignore unknown dispatches as they ignore any
34// other.
35// ============================================================================
36
37/// Caller → server. The first frame of an RPC call. `EventMeta::seq_or_ts`
38/// is the caller-generated `call_id`; `EventMeta::origin_hash` is the
39/// AEAD-verified caller. Payload is an [`RpcRequestPayload`].
40pub const DISPATCH_RPC_REQUEST: u8 = 0x10;
41
42/// Server → caller. The (terminal, for unary) frame of an RPC call.
43/// `EventMeta::seq_or_ts` matches the request's `call_id`. Payload is
44/// an [`RpcResponsePayload`].
45pub const DISPATCH_RPC_RESPONSE: u8 = 0x11;
46
47/// Caller → server. Cancellation signal. `EventMeta::seq_or_ts` matches
48/// the request's `call_id`. Empty payload — the dispatch byte plus
49/// the matching `call_id` is the whole signal. Server's fold removes
50/// the in-flight entry and (if cooperative) flips the handler's
51/// `CancellationToken`.
52pub const DISPATCH_RPC_CANCEL: u8 = 0x12;
53
54/// Server → caller. Deadline-exceeded signal. Emitted when the
55/// server's fold sees `now_ns() > request.deadline_ns` before
56/// starting the handler (or, optionally, when a long-running handler
57/// is aborted by the deadline timer). `EventMeta::seq_or_ts` matches
58/// the request's `call_id`. Empty payload.
59pub const DISPATCH_RPC_DEADLINE_EXCEEDED: u8 = 0x13;
60
61/// Caller → server. Stream credit grant. Carries a 4-byte
62/// big-endian `u32` in the payload after `EventMeta`: the number
63/// of additional response chunks the caller is willing to accept
64/// for the streaming call identified by `EventMeta::seq_or_ts`.
65///
66/// Only meaningful when the caller opted into flow control via
67/// the `nrpc-stream-window-initial` request header
68/// ([`HEADER_NRPC_STREAM_WINDOW_INITIAL`]). On a flow-controlled
69/// stream the server's pump task awaits one credit per chunk; on
70/// a non-flow-controlled stream (no header) the server ignores
71/// every GRANT.
72///
73/// Phase 3.
74pub const DISPATCH_RPC_STREAM_GRANT: u8 = 0x14;
75
76/// Caller → server. Continuation chunk of a client-streaming or
77/// duplex REQUEST. Carries an [`RpcRequestChunkPayload`] after the
78/// `EventMeta` prefix. `EventMeta::seq_or_ts` matches the initial
79/// REQUEST's `call_id`. Non-terminal chunks have
80/// `flags & FLAG_RPC_REQUEST_END == 0`; the terminal upload chunk
81/// sets [`FLAG_RPC_REQUEST_END`].
82///
83/// Only meaningful for calls whose initial REQUEST set
84/// [`FLAG_RPC_CLIENT_STREAMING_REQUEST`]; otherwise the server
85/// silently drops the chunk (caller bug; no observable effect).
86pub const DISPATCH_RPC_REQUEST_CHUNK: u8 = 0x15;
87
88/// Server → caller. Request-direction stream-credit grant. Mirror
89/// of [`DISPATCH_RPC_STREAM_GRANT`] for the upload direction.
90/// Carries an [`RpcRequestGrantPayload`] after `EventMeta`: a
91/// `call_id` plus a `u32` credit count. `EventMeta::seq_or_ts`
92/// matches the call's `call_id` (redundant with the payload, but
93/// kept symmetric with the rest of the dispatch family).
94///
95/// Only meaningful when the caller opted into request-direction
96/// flow control via the `nrpc-request-window-initial` header
97/// ([`HEADER_NRPC_REQUEST_WINDOW_INITIAL`]). Caller's sink
98/// awaits one credit per `REQUEST_CHUNK`; absent header →
99/// unbounded credit (sink emits as fast as the publish path can
100/// take it).
101pub const DISPATCH_RPC_REQUEST_GRANT: u8 = 0x16;
102
103// ============================================================================
104// `RpcRequestPayload::flags` bit assignments.
105// ============================================================================
106
107// Bit 0 (`1 << 0`) is RESERVED — was previously documented as
108// `FLAG_RPC_IDEMPOTENT`, but the server-side replay-cache (LRU of
109// `(origin_hash, call_id) -> RpcResponsePayload`) was never landed,
110// so the flag silently no-op'd despite a load-bearing contract in
111// its doc-string. Removed to avoid shipping a documented behavior
112// the runtime doesn't implement; reservation kept so a future
113// re-add (with the LRU) preserves wire compatibility.
114
115/// Set if the server may emit multiple `DISPATCH_RPC_RESPONSE` events
116/// for this call. Without it, the first response terminates the
117/// call. With it, each response except the terminal one carries
118/// `headers["nrpc-streaming"] = b"continue"`; the terminal response
119/// has either `b"end"` (success) or a non-`Ok` status.
120pub const FLAG_RPC_STREAMING_RESPONSE: u16 = 1 << 1;
121
122/// Set if the request carries W3C Trace Context headers
123/// (`traceparent`, `tracestate`). Server propagates them to its own
124/// span emission. Phase 3.
125pub const FLAG_RPC_PROPAGATE_TRACE: u16 = 1 << 2;
126
127// Bit `1 << 3` reserved — symmetric to the reserved bit 0 above,
128// kept as breathing room for a future protocol-level flag without
129// pushing every existing live bit.
130
131/// Set on the initial REQUEST if the caller will follow up with
132/// one or more [`DISPATCH_RPC_REQUEST_CHUNK`] events. Distinguishes
133/// client-streaming / duplex calls from unary at the very first
134/// frame so the server's fold knows to open a request-side stream
135/// instead of treating the REQUEST as complete.
136///
137/// Combined with [`FLAG_RPC_STREAMING_RESPONSE`] on the same
138/// REQUEST: full duplex.
139///
140/// Bidi streaming plan (Phase A).
141pub const FLAG_RPC_CLIENT_STREAMING_REQUEST: u16 = 1 << 4;
142
143/// Set on a [`DISPATCH_RPC_REQUEST_CHUNK`] (or on the initial
144/// REQUEST itself) to signal the terminal upload frame for a
145/// client-streaming or duplex call. After receiving this, the
146/// server's request-side stream yields `None` and the handler
147/// proceeds to its terminal response.
148///
149/// Setting this on the initial REQUEST is the degenerate "client-
150/// streaming with exactly one item" path — saves a round-trip
151/// for the trivial case.
152///
153/// Bidi streaming plan (Phase A).
154pub const FLAG_RPC_REQUEST_END: u16 = 1 << 5;
155
156// Bits `6..=15` reserved; producers MUST write zero, consumers MUST
157// ignore unknown bits (forward-compat with future flags).
158
159// ============================================================================
160// `RpcResponsePayload::status` enumeration.
161// ============================================================================
162
163/// Outcome of an nRPC call. Net-native numbering with documented
164/// gRPC equivalents (see comments). Numeric stability: callers and
165/// servers across versions agree on `0x0000..=0x7FFF`; the
166/// application-defined range is `0x8000..=0xFFFF`.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168#[repr(u16)]
169pub enum RpcStatus {
170 /// Success. Payload carries the application response. Terminal
171 /// (or, for streaming responses, may be one of many — see the
172 /// streaming flag).
173 /// gRPC equivalent: `OK` (0).
174 Ok = 0x0000,
175 /// No service registered with the requested name on the server.
176 /// gRPC equivalent: `NOT_FOUND` (5).
177 NotFound = 0x0001,
178 /// Caller's token doesn't list the requested service in scope,
179 /// or the channel-level capability check failed.
180 /// gRPC equivalent: `PERMISSION_DENIED` (7).
181 Unauthorized = 0x0002,
182 /// Server observed `now_ns() > deadline_ns` before starting work.
183 /// (For the in-flight case after the handler started, see
184 /// [`DISPATCH_RPC_DEADLINE_EXCEEDED`].)
185 /// gRPC equivalent: `DEADLINE_EXCEEDED` (4).
186 Timeout = 0x0003,
187 /// Server's per-service queue is at `max_in_flight` capacity.
188 /// gRPC equivalent: `RESOURCE_EXHAUSTED` (8).
189 Backpressure = 0x0004,
190 /// Caller emitted `DISPATCH_RPC_CANCEL` before the server
191 /// completed.
192 /// gRPC equivalent: `CANCELLED` (1).
193 Cancelled = 0x0005,
194 /// Handler panicked or returned an error not classified as one
195 /// of the above. Payload carries a UTF-8 diagnostic.
196 /// gRPC equivalent: `INTERNAL` (13).
197 Internal = 0x0006,
198 /// Request payload version not supported by the server. Should
199 /// normally be caught earlier by subprotocol-version
200 /// negotiation; the in-payload guard is the floor.
201 /// gRPC equivalent: `UNIMPLEMENTED` (12).
202 UnknownVersion = 0x0007,
203 /// v0.4 capability-auth: the target's `CapabilityAnnouncement`
204 /// either does not list the requested `nrpc:<service>` tag, or
205 /// lists it with allow-lists the caller does not match. See
206 /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3. Distinct from
207 /// `Unauthorized` (channel-auth / token-scope failures) so
208 /// operators can tell the two enforcement surfaces apart in
209 /// audit logs.
210 /// gRPC equivalent: `PERMISSION_DENIED` (7) — same outward
211 /// shape as `Unauthorized` but a separate substrate code.
212 CapabilityDenied = 0x0008,
213 /// Application-defined status. The wire carries the raw u16;
214 /// callers / servers agree on the meaning out of band.
215 Application(u16),
216}
217
218impl RpcStatus {
219 /// Encode to the wire `u16`.
220 pub fn to_wire(self) -> u16 {
221 match self {
222 Self::Ok => 0x0000,
223 Self::NotFound => 0x0001,
224 Self::Unauthorized => 0x0002,
225 Self::Timeout => 0x0003,
226 Self::Backpressure => 0x0004,
227 Self::Cancelled => 0x0005,
228 Self::Internal => 0x0006,
229 Self::UnknownVersion => 0x0007,
230 Self::CapabilityDenied => 0x0008,
231 Self::Application(v) => v,
232 }
233 }
234
235 /// Decode from the wire `u16`. Reserved values
236 /// (`0x0009..=0x7FFF`) decode as `Application(v)` rather than
237 /// failing — forward-compat with future status assignments.
238 pub fn from_wire(v: u16) -> Self {
239 match v {
240 0x0000 => Self::Ok,
241 0x0001 => Self::NotFound,
242 0x0002 => Self::Unauthorized,
243 0x0003 => Self::Timeout,
244 0x0004 => Self::Backpressure,
245 0x0005 => Self::Cancelled,
246 0x0006 => Self::Internal,
247 0x0007 => Self::UnknownVersion,
248 0x0008 => Self::CapabilityDenied,
249 other => Self::Application(other),
250 }
251 }
252
253 /// True iff `self == Ok`. Convenience for the hot caller-side
254 /// success-or-error branch.
255 #[inline]
256 pub fn is_ok(self) -> bool {
257 matches!(self, Self::Ok)
258 }
259}
260
261// ============================================================================
262// Request / response payloads.
263//
264// These ride in the bytes AFTER the 24-byte `EventMeta` prefix on a
265// CortEX-adapted event. The cortex adapter handles meta + tail
266// concatenation; this codec produces only the tail.
267// ============================================================================
268
269/// Header name + value pair. Used for trace-context propagation,
270/// idempotency-key carriage, content-type hints. Names are
271/// case-sensitive UTF-8; values are opaque bytes.
272pub type RpcHeader = (String, Vec<u8>);
273
274/// Maximum service-name length on the wire (matches
275/// `MAX_CHANNEL_NAME_LEN` upstream; reasonable upper bound for a
276/// human-readable identifier).
277pub const MAX_RPC_SERVICE_NAME_LEN: usize = 255;
278
279/// Maximum number of headers in a single request or response.
280/// Prevents pathological `headers.len()` reads from a malformed
281/// peer; legitimate callers stay well below this.
282pub const MAX_RPC_HEADERS: usize = 32;
283
284/// Maximum length of a single header name (UTF-8 bytes).
285pub const MAX_RPC_HEADER_NAME_LEN: usize = 64;
286
287/// Maximum length of a single header value (bytes).
288pub const MAX_RPC_HEADER_VALUE_LEN: usize = 4096;
289
290/// Maximum length of a request or response body. Larger payloads
291/// must use streaming responses (Phase 3) or chunk at the
292/// application layer. Comparable to gRPC's default `max_message_size`
293/// of 4 MiB; tuned downward to match RedEX's
294/// `MAX_REDEX_HEAP_PAYLOAD` ceiling.
295pub const MAX_RPC_BODY_LEN: usize = 4 * 1024 * 1024;
296
297/// nRPC request payload. Lives after the 24-byte `EventMeta` prefix
298/// in a `DISPATCH_RPC_REQUEST` event.
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct RpcRequestPayload {
301 /// Service-name dispatch key. The server's fold looks this up
302 /// in its `serve_rpc` registry and routes to the registered
303 /// handler.
304 pub service: String,
305 /// Absolute deadline (unix nanos). `0` means no deadline; the
306 /// caller will cancel via `DISPATCH_RPC_CANCEL` if it changes
307 /// its mind.
308 pub deadline_ns: u64,
309 /// Bitfield of `FLAG_RPC_*` constants.
310 pub flags: u16,
311 /// Headers (trace context, idempotency key, content-type, etc.).
312 /// Capped at `MAX_RPC_HEADERS` entries, name <= `MAX_RPC_HEADER_NAME_LEN`,
313 /// value <= `MAX_RPC_HEADER_VALUE_LEN`.
314 pub headers: Vec<RpcHeader>,
315 /// Application-defined request body. Caller and server agree on
316 /// the codec out-of-band; nRPC doesn't interpret these bytes.
317 ///
318 /// Held as [`Bytes`] so [`Self::decode`] can zero-copy `slice_ref`
319 /// the body out of the inbound event's `Bytes` payload — pre-fix
320 /// perf #84 in `docs/performance/net-perf-analysis.md` this was
321 /// `Vec<u8>` and every decode did a `data[body_start..body_end].to_vec()`
322 /// (a memcpy per frame). For high-RPS systems doing 100K+ RPCs/sec
323 /// with 1 KB+ bodies that was 100+ MB/sec of pure memcpy.
324 pub body: Bytes,
325}
326
327/// Continuation chunk for a client-streaming or duplex REQUEST.
328/// Lives after the 24-byte `EventMeta` prefix in a
329/// [`DISPATCH_RPC_REQUEST_CHUNK`] event.
330///
331/// Unlike the initial [`RpcRequestPayload`] there is no
332/// `service` field (server already routed by service at the
333/// initial REQUEST) and no `deadline_ns` (the initial REQUEST's
334/// deadline applies to the whole call). The `call_id` field is
335/// redundant with `EventMeta::seq_or_ts` but kept on the
336/// payload so the codec is self-contained — a reader handed a
337/// chunk's bytes without the meta header can still recover its
338/// correlation id.
339///
340/// Bidi streaming plan (Phase A).
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub struct RpcRequestChunkPayload {
343 /// Matches `EventMeta::seq_or_ts` and the original REQUEST's
344 /// `call_id`. Kept on the payload so the codec round-trips
345 /// in isolation.
346 pub call_id: u64,
347 /// Bitfield of `FLAG_RPC_*` constants. The only flag that
348 /// makes sense on a chunk today is [`FLAG_RPC_REQUEST_END`];
349 /// other flags MUST be zero on the wire so future protocol
350 /// extensions can claim them without colliding with
351 /// existing chunks.
352 pub flags: u16,
353 /// Per-chunk metadata. Typically empty; reserved for
354 /// trace-span continuity across long uploads, content-type
355 /// changes mid-stream, or other rare per-chunk concerns.
356 /// Capped at `MAX_RPC_HEADERS` entries with the same
357 /// per-field caps as `RpcRequestPayload::headers`.
358 pub headers: Vec<RpcHeader>,
359 /// Application-defined chunk body. Cap is `MAX_RPC_BODY_LEN`
360 /// (4 MiB), same as the initial REQUEST body — clients that
361 /// need >4 MiB total payload chunk their upload across
362 /// multiple `REQUEST_CHUNK` events.
363 ///
364 /// See [`RpcRequestPayload::body`] for the `Bytes`-vs-`Vec<u8>`
365 /// rationale.
366 pub body: Bytes,
367}
368
369/// Request-direction credit grant. Lives after the 24-byte
370/// `EventMeta` prefix in a [`DISPATCH_RPC_REQUEST_GRANT`] event.
371/// Mirror of the response-direction [`encode_stream_grant`] /
372/// [`decode_stream_grant`] pair, but with an explicit `call_id`
373/// in the payload (instead of relying solely on
374/// `EventMeta::seq_or_ts`) so the codec is self-contained — same
375/// rationale as [`RpcRequestChunkPayload::call_id`].
376///
377/// Bidi streaming plan (Phase A).
378#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub struct RpcRequestGrantPayload {
380 /// Matches the call's `call_id`.
381 pub call_id: u64,
382 /// Additional REQUEST_CHUNK frames the server is willing to
383 /// admit beyond the current credit. Server's incoming-credit
384 /// counter is capped defensively (see PHASE-B server fold)
385 /// so a misbehaving grant can't overflow.
386 pub credits: u32,
387}
388
389/// nRPC response payload. Lives after the 24-byte `EventMeta`
390/// prefix in a `DISPATCH_RPC_RESPONSE` event.
391#[derive(Debug, Clone, PartialEq, Eq)]
392pub struct RpcResponsePayload {
393 /// Outcome of the call. Decoded on the caller side via
394 /// [`RpcStatus::from_wire`].
395 pub status: RpcStatus,
396 /// Headers (trace context, content-type, content-encoding,
397 /// etc.). Same caps as `RpcRequestPayload::headers`.
398 pub headers: Vec<RpcHeader>,
399 /// For `status == Ok`: the application response body.
400 /// For non-`Ok` statuses: a UTF-8 diagnostic string (callers
401 /// `String::from_utf8_lossy` for display; the bytes are not
402 /// guaranteed to be valid UTF-8 against a malicious server).
403 ///
404 /// See [`RpcRequestPayload::body`] for the `Bytes`-vs-`Vec<u8>`
405 /// rationale.
406 pub body: Bytes,
407}
408
409// ============================================================================
410// Codec.
411//
412// All wire integers are little-endian. Lengths are u32_le where the
413// upper bound exceeds u16, u16_le where it fits, u8 where it fits.
414// ============================================================================
415
416/// Errors from the request / response codecs.
417#[derive(Debug, thiserror::Error)]
418pub enum RpcCodecError {
419 /// Buffer ended mid-field.
420 #[error("truncated payload at {0}")]
421 Truncated(&'static str),
422 /// Length prefix exceeds the configured maximum.
423 #[error("length {actual} exceeds limit {limit} for {field}")]
424 TooLarge {
425 /// Field name whose declared length exceeded the cap (e.g.
426 /// `"body"`, `"headers"`, `"service"`). Stable strings —
427 /// callers may match on them for diagnostics.
428 field: &'static str,
429 /// The length the wire claimed for the field.
430 actual: usize,
431 /// The maximum the codec accepts (one of the `MAX_RPC_*`
432 /// constants).
433 limit: usize,
434 },
435 /// String field contains non-UTF-8 bytes.
436 #[error("non-utf8 string in {0}")]
437 InvalidUtf8(&'static str),
438}
439
440impl RpcRequestPayload {
441 /// Compute the encoded byte length WITHOUT actually encoding.
442 /// Used by [`request_wire_size`] and any caller that needs to
443 /// budget event size at the bus layer (e.g., to refuse a
444 /// request that wouldn't fit in the configured packet budget)
445 /// without paying the encode cost.
446 pub fn encoded_len(&self) -> usize {
447 // service: u8 length + bytes
448 1 + self.service.len()
449 // deadline_ns: u64
450 + 8
451 // flags: u16
452 + 2
453 // headers: u8 count + per-header (u8 name_len + name + u16 value_len + value)
454 + 1
455 + self
456 .headers
457 .iter()
458 .map(|(n, v)| 1 + n.len() + 2 + v.len())
459 .sum::<usize>()
460 // body: u32 length + bytes
461 + 4
462 + self.body.len()
463 }
464
465 /// Encode to the wire format. The result is the bytes that
466 /// follow the 24-byte `EventMeta` prefix in the RedEX payload.
467 ///
468 /// **Encoder bounds:** every field that has a `MAX_RPC_*` cap
469 /// is asserted against that cap. In debug builds an oversize
470 /// field panics with a useful diagnostic so the programmer
471 /// notices in tests; in release builds the assert is dropped
472 /// (the decoder side still enforces the cap, so a malformed
473 /// frame would be rejected by the receiver — but constructing
474 /// one is always a caller bug).
475 pub fn encode(&self) -> Vec<u8> {
476 let mut buf = Vec::with_capacity(self.encoded_len());
477 // service
478 let svc = self.service.as_bytes();
479 debug_assert!(
480 svc.len() <= MAX_RPC_SERVICE_NAME_LEN,
481 "service name {} exceeds MAX_RPC_SERVICE_NAME_LEN ({})",
482 svc.len(),
483 MAX_RPC_SERVICE_NAME_LEN,
484 );
485 buf.put_u8(svc.len() as u8);
486 buf.extend_from_slice(svc);
487 // deadline_ns
488 buf.put_u64_le(self.deadline_ns);
489 // flags
490 buf.put_u16_le(self.flags);
491 // headers
492 encode_headers(&self.headers, &mut buf);
493 // body
494 debug_assert!(
495 self.body.len() <= MAX_RPC_BODY_LEN,
496 "body length {} exceeds MAX_RPC_BODY_LEN ({})",
497 self.body.len(),
498 MAX_RPC_BODY_LEN,
499 );
500 buf.put_u32_le(self.body.len() as u32);
501 buf.extend_from_slice(&self.body);
502 buf
503 }
504
505 /// Decode from the wire bytes following the `EventMeta` prefix.
506 /// All length fields are bounded by the `MAX_RPC_*` constants;
507 /// over-cap inputs error rather than allocate unbounded
508 /// buffers.
509 ///
510 /// Takes [`Bytes`] (not `&[u8]`) so the decoded `body` field
511 /// can be a zero-copy `data.slice(..)` instead of an owned
512 /// `to_vec` — see perf #84.
513 pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
514 let mut cur = std::io::Cursor::new(data.as_ref());
515 // service
516 if cur.remaining() < 1 {
517 return Err(RpcCodecError::Truncated("service length"));
518 }
519 let svc_len = cur.get_u8() as usize;
520 if svc_len == 0 {
521 return Err(RpcCodecError::Truncated("empty service name"));
522 }
523 if svc_len > MAX_RPC_SERVICE_NAME_LEN {
524 return Err(RpcCodecError::TooLarge {
525 field: "service",
526 actual: svc_len,
527 limit: MAX_RPC_SERVICE_NAME_LEN,
528 });
529 }
530 if cur.remaining() < svc_len {
531 return Err(RpcCodecError::Truncated("service bytes"));
532 }
533 let svc_start = cur.position() as usize;
534 let svc_end = svc_start + svc_len;
535 let service = std::str::from_utf8(&data[svc_start..svc_end])
536 .map_err(|_| RpcCodecError::InvalidUtf8("service"))?
537 .to_string();
538 cur.set_position(svc_end as u64);
539 // deadline_ns
540 if cur.remaining() < 8 {
541 return Err(RpcCodecError::Truncated("deadline_ns"));
542 }
543 let deadline_ns = cur.get_u64_le();
544 // flags
545 if cur.remaining() < 2 {
546 return Err(RpcCodecError::Truncated("flags"));
547 }
548 let flags = cur.get_u16_le();
549 // headers
550 let headers = decode_headers(&mut cur, &data)?;
551 // body
552 if cur.remaining() < 4 {
553 return Err(RpcCodecError::Truncated("body length"));
554 }
555 let body_len = cur.get_u32_le() as usize;
556 if body_len > MAX_RPC_BODY_LEN {
557 return Err(RpcCodecError::TooLarge {
558 field: "body",
559 actual: body_len,
560 limit: MAX_RPC_BODY_LEN,
561 });
562 }
563 if cur.remaining() < body_len {
564 return Err(RpcCodecError::Truncated("body bytes"));
565 }
566 let body_start = cur.position() as usize;
567 let body_end = body_start + body_len;
568 // Zero-copy slice over the input — refcount bump only.
569 let body = data.slice(body_start..body_end);
570 Ok(Self {
571 service,
572 deadline_ns,
573 flags,
574 headers,
575 body,
576 })
577 }
578}
579
580impl RpcRequestChunkPayload {
581 /// Compute the encoded byte length WITHOUT actually encoding.
582 /// See [`RpcRequestPayload::encoded_len`] for the rationale.
583 pub fn encoded_len(&self) -> usize {
584 // call_id: u64
585 8
586 // flags: u16
587 + 2
588 // headers: u8 count + per-header (u8 name_len + name + u16 value_len + value)
589 + 1
590 + self
591 .headers
592 .iter()
593 .map(|(n, v)| 1 + n.len() + 2 + v.len())
594 .sum::<usize>()
595 // body: u32 length + bytes
596 + 4
597 + self.body.len()
598 }
599
600 /// Encode to the wire bytes that follow the 24-byte `EventMeta`
601 /// prefix in a [`DISPATCH_RPC_REQUEST_CHUNK`] event. Same
602 /// encoder-bounds policy as [`RpcRequestPayload::encode`]:
603 /// oversize fields panic in debug, the decoder enforces in
604 /// release.
605 pub fn encode(&self) -> Vec<u8> {
606 let mut buf = Vec::with_capacity(self.encoded_len());
607 // call_id
608 buf.put_u64_le(self.call_id);
609 // flags
610 buf.put_u16_le(self.flags);
611 // headers
612 encode_headers(&self.headers, &mut buf);
613 // body
614 debug_assert!(
615 self.body.len() <= MAX_RPC_BODY_LEN,
616 "body length {} exceeds MAX_RPC_BODY_LEN ({})",
617 self.body.len(),
618 MAX_RPC_BODY_LEN,
619 );
620 buf.put_u32_le(self.body.len() as u32);
621 buf.extend_from_slice(&self.body);
622 buf
623 }
624
625 /// Decode from the wire bytes following the `EventMeta` prefix.
626 /// Bounded by the same `MAX_RPC_*` caps as the initial REQUEST.
627 /// Takes [`Bytes`] for zero-copy `body` slicing — see perf #84.
628 pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
629 let mut cur = std::io::Cursor::new(data.as_ref());
630 // call_id
631 if cur.remaining() < 8 {
632 return Err(RpcCodecError::Truncated("call_id"));
633 }
634 let call_id = cur.get_u64_le();
635 // flags
636 if cur.remaining() < 2 {
637 return Err(RpcCodecError::Truncated("flags"));
638 }
639 let flags = cur.get_u16_le();
640 // headers
641 let headers = decode_headers(&mut cur, &data)?;
642 // body
643 if cur.remaining() < 4 {
644 return Err(RpcCodecError::Truncated("body length"));
645 }
646 let body_len = cur.get_u32_le() as usize;
647 if body_len > MAX_RPC_BODY_LEN {
648 return Err(RpcCodecError::TooLarge {
649 field: "body",
650 actual: body_len,
651 limit: MAX_RPC_BODY_LEN,
652 });
653 }
654 if cur.remaining() < body_len {
655 return Err(RpcCodecError::Truncated("body bytes"));
656 }
657 let body_start = cur.position() as usize;
658 let body_end = body_start + body_len;
659 let body = data.slice(body_start..body_end);
660 Ok(Self {
661 call_id,
662 flags,
663 headers,
664 body,
665 })
666 }
667}
668
669impl RpcResponsePayload {
670 /// Compute the encoded byte length WITHOUT actually encoding.
671 /// See [`RpcRequestPayload::encoded_len`].
672 pub fn encoded_len(&self) -> usize {
673 // status: u16
674 2
675 // headers: u8 count + per-header
676 + 1
677 + self
678 .headers
679 .iter()
680 .map(|(n, v)| 1 + n.len() + 2 + v.len())
681 .sum::<usize>()
682 // body: u32 length + bytes
683 + 4
684 + self.body.len()
685 }
686
687 /// Encode to the wire format. The result is the bytes that
688 /// follow the 24-byte `EventMeta` prefix in the RedEX payload.
689 /// Same encoder-bounds policy as
690 /// [`RpcRequestPayload::encode`] — see that method's doc.
691 pub fn encode(&self) -> Vec<u8> {
692 let mut buf = Vec::with_capacity(self.encoded_len());
693 buf.put_u16_le(self.status.to_wire());
694 encode_headers(&self.headers, &mut buf);
695 debug_assert!(
696 self.body.len() <= MAX_RPC_BODY_LEN,
697 "body length {} exceeds MAX_RPC_BODY_LEN ({})",
698 self.body.len(),
699 MAX_RPC_BODY_LEN,
700 );
701 buf.put_u32_le(self.body.len() as u32);
702 buf.extend_from_slice(&self.body);
703 buf
704 }
705
706 /// Decode from the wire bytes following the `EventMeta` prefix.
707 /// Takes [`Bytes`] for zero-copy `body` slicing — see perf #84.
708 pub fn decode(data: Bytes) -> Result<Self, RpcCodecError> {
709 let mut cur = std::io::Cursor::new(data.as_ref());
710 if cur.remaining() < 2 {
711 return Err(RpcCodecError::Truncated("status"));
712 }
713 let status = RpcStatus::from_wire(cur.get_u16_le());
714 let headers = decode_headers(&mut cur, &data)?;
715 if cur.remaining() < 4 {
716 return Err(RpcCodecError::Truncated("body length"));
717 }
718 let body_len = cur.get_u32_le() as usize;
719 if body_len > MAX_RPC_BODY_LEN {
720 return Err(RpcCodecError::TooLarge {
721 field: "body",
722 actual: body_len,
723 limit: MAX_RPC_BODY_LEN,
724 });
725 }
726 if cur.remaining() < body_len {
727 return Err(RpcCodecError::Truncated("body bytes"));
728 }
729 let body_start = cur.position() as usize;
730 let body_end = body_start + body_len;
731 let body = data.slice(body_start..body_end);
732 Ok(Self {
733 status,
734 headers,
735 body,
736 })
737 }
738}
739
740/// Pull `traceparent` / `tracestate` out of `headers` if present.
741/// Caller-side helper: callers building an `RpcRequestPayload`
742/// with a `TraceContext` use [`build_trace_headers`] to emit the
743/// matching headers; this is the inverse on the server side.
744///
745/// Returns `Some(TraceContext)` if `traceparent` is present;
746/// `None` otherwise. `tracestate` defaults to empty when absent
747/// — W3C says tracestate is optional even when traceparent is
748/// set.
749pub fn extract_trace_context(headers: &[RpcHeader]) -> Option<TraceContext> {
750 let mut traceparent: Option<String> = None;
751 let mut tracestate = String::new();
752 for (name, value) in headers {
753 // Header names are case-insensitive (matches W3C and HTTP
754 // convention) — same comparison style as `parse_stream_
755 // window_initial` for consistency. The wire spec doesn't
756 // mandate case so a peer that emits `Traceparent` (capital
757 // T) shouldn't be silently ignored.
758 if name.eq_ignore_ascii_case("traceparent") {
759 if let Ok(s) = std::str::from_utf8(value) {
760 traceparent = Some(s.to_string());
761 }
762 } else if name.eq_ignore_ascii_case("tracestate") {
763 if let Ok(s) = std::str::from_utf8(value) {
764 tracestate = s.to_string();
765 }
766 }
767 }
768 traceparent.map(|tp| TraceContext {
769 traceparent: tp,
770 tracestate,
771 })
772}
773
774/// Build the headers a caller appends to its
775/// `RpcRequestPayload::headers` to propagate the trace context
776/// across the call. Set `RpcRequestPayload::flags |= FLAG_RPC_PROPAGATE_TRACE`
777/// alongside this so the server's fold knows to extract them.
778///
779/// Always emits `traceparent`. Emits `tracestate` only when
780/// non-empty (matches the W3C convention of skipping empty
781/// tracestate values on the wire).
782pub fn build_trace_headers(ctx: &TraceContext) -> Vec<RpcHeader> {
783 let mut headers = Vec::with_capacity(2);
784 headers.push((
785 "traceparent".to_string(),
786 ctx.traceparent.clone().into_bytes(),
787 ));
788 if !ctx.tracestate.is_empty() {
789 headers.push((
790 "tracestate".to_string(),
791 ctx.tracestate.clone().into_bytes(),
792 ));
793 }
794 headers
795}
796
797fn encode_headers(headers: &[RpcHeader], buf: &mut Vec<u8>) {
798 debug_assert!(
799 headers.len() <= MAX_RPC_HEADERS,
800 "headers count {} exceeds MAX_RPC_HEADERS ({})",
801 headers.len(),
802 MAX_RPC_HEADERS,
803 );
804 buf.put_u8(headers.len() as u8);
805 for (name, value) in headers {
806 let nbytes = name.as_bytes();
807 debug_assert!(
808 nbytes.len() <= MAX_RPC_HEADER_NAME_LEN,
809 "header name {} exceeds MAX_RPC_HEADER_NAME_LEN ({})",
810 nbytes.len(),
811 MAX_RPC_HEADER_NAME_LEN,
812 );
813 debug_assert!(
814 value.len() <= MAX_RPC_HEADER_VALUE_LEN,
815 "header value {} exceeds MAX_RPC_HEADER_VALUE_LEN ({})",
816 value.len(),
817 MAX_RPC_HEADER_VALUE_LEN,
818 );
819 buf.put_u8(nbytes.len() as u8);
820 buf.extend_from_slice(nbytes);
821 buf.put_u16_le(value.len() as u16);
822 buf.extend_from_slice(value);
823 }
824}
825
826fn decode_headers(
827 cur: &mut std::io::Cursor<&[u8]>,
828 data: &[u8],
829) -> Result<Vec<RpcHeader>, RpcCodecError> {
830 if cur.remaining() < 1 {
831 return Err(RpcCodecError::Truncated("headers count"));
832 }
833 let count = cur.get_u8() as usize;
834 if count > MAX_RPC_HEADERS {
835 return Err(RpcCodecError::TooLarge {
836 field: "headers",
837 actual: count,
838 limit: MAX_RPC_HEADERS,
839 });
840 }
841 let mut headers = Vec::with_capacity(count);
842 for _ in 0..count {
843 if cur.remaining() < 1 {
844 return Err(RpcCodecError::Truncated("header name length"));
845 }
846 let name_len = cur.get_u8() as usize;
847 if name_len == 0 {
848 return Err(RpcCodecError::Truncated("empty header name"));
849 }
850 if name_len > MAX_RPC_HEADER_NAME_LEN {
851 return Err(RpcCodecError::TooLarge {
852 field: "header name",
853 actual: name_len,
854 limit: MAX_RPC_HEADER_NAME_LEN,
855 });
856 }
857 if cur.remaining() < name_len {
858 return Err(RpcCodecError::Truncated("header name bytes"));
859 }
860 let nstart = cur.position() as usize;
861 let nend = nstart + name_len;
862 let name = std::str::from_utf8(&data[nstart..nend])
863 .map_err(|_| RpcCodecError::InvalidUtf8("header name"))?
864 .to_string();
865 cur.set_position(nend as u64);
866
867 if cur.remaining() < 2 {
868 return Err(RpcCodecError::Truncated("header value length"));
869 }
870 let value_len = cur.get_u16_le() as usize;
871 if value_len > MAX_RPC_HEADER_VALUE_LEN {
872 return Err(RpcCodecError::TooLarge {
873 field: "header value",
874 actual: value_len,
875 limit: MAX_RPC_HEADER_VALUE_LEN,
876 });
877 }
878 if cur.remaining() < value_len {
879 return Err(RpcCodecError::Truncated("header value bytes"));
880 }
881 let vstart = cur.position() as usize;
882 let vend = vstart + value_len;
883 let value = data[vstart..vend].to_vec();
884 cur.set_position(vend as u64);
885 headers.push((name, value));
886 }
887 Ok(headers)
888}
889
890/// Convenience: the byte layout of an `RpcRequestPayload` that lands
891/// after the `EventMeta` prefix in a `DISPATCH_RPC_REQUEST` event.
892/// Exposed so callers can budget the total event size at the bus
893/// layer without doing the encode first.
894pub fn request_wire_size(payload: &RpcRequestPayload) -> usize {
895 EVENT_META_SIZE + payload.encoded_len()
896}
897
898/// Same for `RpcResponsePayload` after the `EventMeta` prefix in a
899/// `DISPATCH_RPC_RESPONSE` event.
900pub fn response_wire_size(payload: &RpcResponsePayload) -> usize {
901 EVENT_META_SIZE + payload.encoded_len()
902}
903
904// ============================================================================
905// Mesh inbound dispatch hook.
906//
907// `MeshNode::dispatch_packet` normally pushes inbound channel
908// events onto a per-shard `inbound` queue keyed by `shard_id`. The
909// channel name / hash is stripped on the way in — by the time the
910// event lands in the queue, only the payload remains.
911//
912// RPC needs per-channel routing (events for `<service>.requests`
913// drive the server fold; events for `<service>.replies.<origin>`
914// drive the client fold). Without channel info on the queued
915// event, we can't filter from the consumer side.
916//
917// The hook below adds a per-channel-hash dispatcher map that the
918// mesh's inbound dispatch consults BEFORE pushing to the shard
919// queue. If a dispatcher is registered for the event's
920// canonical [`ChannelHash`], the event is routed there directly
921// (bypassing the shard queue); otherwise the existing shard-queue
922// path runs.
923//
924// **Collision posture.** The dispatch event carries the canonical
925// 32-bit [`ChannelHash`] (joint-collision threshold ~65 K
926// channels, well above realistic deployment); the wire
927// `NetHeader::channel_hash` is `u16` and may bucket-collide at
928// scale, so the mesh's inbound dispatch indexes by the wire `u16`
929// and dispatches every canonical entry registered in that bucket
930// (the canonical match resolves on the dispatcher side). At
931// typical sizing this is a single entry per bucket.
932// ============================================================================
933
934/// One inbound event delivered to a registered RPC dispatcher.
935#[derive(Debug, Clone)]
936pub struct RpcInboundEvent {
937 /// Canonical [`ChannelHash`](crate::adapter::net::channel::ChannelHash)
938 /// (u32) of the channel this event arrived on — widened from the
939 /// per-packet wire `u16` `NetHeader::channel_hash` via the
940 /// registered-dispatcher table at receive time.
941 /// Collision-resistant at realistic scale; the wire `u16` may
942 /// bucket-collide but the canonical hash uniquely identifies the
943 /// registered dispatcher target.
944 pub channel_hash: super::super::channel::ChannelHash,
945 /// Caller's `origin_hash` from the packet header — the full
946 /// 64-bit `EntityKeypair::origin_hash()` mirroring the wire
947 /// field's width post-`WIRE_ORIGIN_HASH_64BIT`. The dispatcher
948 /// should treat this as routing metadata, not identity
949 /// authentication (the AEAD-verified `session_node` field
950 /// below carries that).
951 pub origin_hash: u64,
952 /// Wire-session peer's `NodeId` resolved at packet receive
953 /// time from the AEAD-verified session_id. Distinct from
954 /// `origin_hash`: this is the full 64-bit network identity
955 /// of the peer that delivered the packet. Used by
956 /// `RpcClientPending::deliver`
957 /// to reject spoofed RESPONSE frames whose call_id happens
958 /// to match an in-flight request but whose session peer
959 /// isn't the recorded target.
960 ///
961 /// Set to `0` on test / loopback paths that don't have a
962 /// session to resolve against; callers that register
963 /// pending entries with `target_node = 0` opt out of the
964 /// binding gate (and trust the call_id randomness alone).
965 ///
966 /// **Production wire-path invariant**: real over-the-wire
967 /// inbound delivery MUST NOT produce `from_node = 0`. The
968 /// dispatcher in `mesh.rs` (`handle_inbound_user_payload`)
969 /// drops the event when the wire session has no resolvable
970 /// `NodeId`, rather than forwarding under the sentinel — see
971 /// the explicit drop + warn at the
972 /// `dropping cortex-RPC event: wire session has no resolvable NodeId`
973 /// log site. The v0.4 capability-auth callee-side gate in
974 /// `MeshNode::serve_rpc`'s bridge relies on this: it skips
975 /// permissively when `from_node == 0` (loopback compat), so
976 /// a wire-path leak of the sentinel would silently re-open
977 /// the gate. If you change the dispatcher to fall back to 0
978 /// instead of dropping, you ALSO have to teach the bridge
979 /// to deny on the sentinel.
980 pub from_node: super::super::behavior::placement::NodeId,
981 /// Event payload bytes — the same bytes that would have been
982 /// pushed onto the shard inbound queue. For RPC events these
983 /// start with a 24-byte `EventMeta` followed by the
984 /// `RpcRequestPayload` / `RpcResponsePayload` encoding.
985 pub payload: bytes::Bytes,
986}
987
988/// Type-erased callback fired by the mesh's inbound dispatch
989/// when an event arrives for a registered `channel_hash`. The
990/// callback runs on the mesh's dispatch task, so the body should
991/// be quick (push the event onto an mpsc / fold consumer rather
992/// than do real work).
993pub type RpcInboundDispatcher = Arc<dyn Fn(RpcInboundEvent) + Send + Sync + 'static>;
994
995// ============================================================================
996// Streaming-response protocol markers.
997//
998// When a caller sets `FLAG_RPC_STREAMING_RESPONSE` on the request,
999// the server emits multiple `DISPATCH_RPC_RESPONSE` events for the
1000// same `call_id`. Non-terminal chunks carry the
1001// `nrpc-streaming = continue` header; the terminal chunk carries
1002// `nrpc-streaming = end` (or any non-`Ok` status, which is also
1003// terminal). The client-side stream collects chunks until it sees
1004// a terminal marker.
1005// ============================================================================
1006
1007/// Header name nRPC uses to mark streaming-response chunks.
1008/// Present on every chunk of a streaming response, with one of two
1009/// values defined below.
1010pub const HEADER_NRPC_STREAMING: &str = "nrpc-streaming";
1011
1012/// `nrpc-streaming` value on a non-terminal chunk. The client-side
1013/// stream yields the chunk's body and continues waiting for more.
1014pub const HEADER_NRPC_STREAMING_CONTINUE: &[u8] = b"continue";
1015
1016/// `nrpc-streaming` value on the terminal chunk. The client-side
1017/// stream yields the chunk's body (if non-empty) and then closes.
1018/// A non-`Ok` status is also terminal, regardless of header — the
1019/// stream yields the error and closes.
1020pub const HEADER_NRPC_STREAMING_END: &[u8] = b"end";
1021
1022/// Header on a streaming REQUEST that opts into flow control with
1023/// the given initial credit window. Value is the ASCII decimal
1024/// representation of a `u32` (e.g. `"32"`). When present, the
1025/// server's streaming fold creates a per-call semaphore initialized
1026/// to that count and the pump awaits one credit per emitted chunk.
1027/// The caller refills via [`DISPATCH_RPC_STREAM_GRANT`] events.
1028///
1029/// Absent → unbounded credit (the pump emits chunks as fast as
1030/// the publish path can take them). Long-running streams that
1031/// could outpace a slow consumer SHOULD opt into flow control —
1032/// without it, the server's sink mpsc grows unbounded under a
1033/// stalled caller.
1034pub const HEADER_NRPC_STREAM_WINDOW_INITIAL: &str = "nrpc-stream-window-initial";
1035
1036/// Encode a stream-grant payload — 4 bytes big-endian `u32`
1037/// representing additional credit. Pair with [`decode_stream_grant`]
1038/// on the server side.
1039pub fn encode_stream_grant(amount: u32) -> Vec<u8> {
1040 amount.to_be_bytes().to_vec()
1041}
1042
1043/// Decode a stream-grant payload. Returns `None` if the slice is
1044/// not exactly 4 bytes — defends the server fold against
1045/// malformed grants without killing the cortex adapter.
1046pub fn decode_stream_grant(payload: &[u8]) -> Option<u32> {
1047 if payload.len() != 4 {
1048 return None;
1049 }
1050 let mut bytes = [0u8; 4];
1051 bytes.copy_from_slice(payload);
1052 Some(u32::from_be_bytes(bytes))
1053}
1054
1055/// Parse the `nrpc-stream-window-initial` header from a request's
1056/// header list. Returns `Some(window)` if a valid u32 ASCII-decimal
1057/// value is present, else `None` (no header / malformed value /
1058/// non-utf8 — all treated as "no flow control").
1059pub fn parse_stream_window_initial(headers: &[RpcHeader]) -> Option<u32> {
1060 for (name, value) in headers {
1061 if name.eq_ignore_ascii_case(HEADER_NRPC_STREAM_WINDOW_INITIAL) {
1062 return std::str::from_utf8(value).ok()?.parse::<u32>().ok();
1063 }
1064 }
1065 None
1066}
1067
1068/// Header on the initial REQUEST of a client-streaming or duplex
1069/// call that opts the upload direction into flow control with the
1070/// given initial credit window. Value is the ASCII decimal
1071/// representation of a `u32`. When present, the server's
1072/// streaming-request fold creates a per-call semaphore and the
1073/// caller's sink awaits one credit per `REQUEST_CHUNK`. The server
1074/// refills via [`DISPATCH_RPC_REQUEST_GRANT`] events.
1075///
1076/// Absent → unbounded credit (caller's sink emits as fast as the
1077/// publish path can take it). Long client-streaming calls that
1078/// could outpace a slow handler SHOULD opt into flow control —
1079/// without it, the server's chunk mpsc grows unbounded under a
1080/// stalled handler.
1081///
1082/// Bidi streaming plan (Phase A).
1083pub const HEADER_NRPC_REQUEST_WINDOW_INITIAL: &str = "nrpc-request-window-initial";
1084
1085/// Encode a request-grant payload — `call_id` (u64 little-endian)
1086/// followed by additional credit (u32 big-endian). Big-endian on
1087/// the credit field matches [`encode_stream_grant`]; little-endian
1088/// on `call_id` matches the rest of the RPC codec's u64 fields.
1089///
1090/// Pair with [`decode_request_grant`] on the caller side.
1091pub fn encode_request_grant(call_id: u64, credits: u32) -> Vec<u8> {
1092 let mut buf = Vec::with_capacity(12);
1093 buf.put_u64_le(call_id);
1094 buf.extend_from_slice(&credits.to_be_bytes());
1095 buf
1096}
1097
1098/// Decode a request-grant payload. Returns `None` if the slice is
1099/// not exactly 12 bytes — defends the caller's fold against
1100/// malformed grants without killing the cortex adapter.
1101pub fn decode_request_grant(payload: &[u8]) -> Option<RpcRequestGrantPayload> {
1102 if payload.len() != 12 {
1103 return None;
1104 }
1105 let mut cid = [0u8; 8];
1106 cid.copy_from_slice(&payload[..8]);
1107 let call_id = u64::from_le_bytes(cid);
1108 let mut credits = [0u8; 4];
1109 credits.copy_from_slice(&payload[8..]);
1110 Some(RpcRequestGrantPayload {
1111 call_id,
1112 credits: u32::from_be_bytes(credits),
1113 })
1114}
1115
1116/// Parse the `nrpc-request-window-initial` header from a request's
1117/// header list. Same semantics as [`parse_stream_window_initial`]
1118/// but for the upload direction.
1119pub fn parse_request_window_initial(headers: &[RpcHeader]) -> Option<u32> {
1120 for (name, value) in headers {
1121 if name.eq_ignore_ascii_case(HEADER_NRPC_REQUEST_WINDOW_INITIAL) {
1122 return std::str::from_utf8(value).ok()?.parse::<u32>().ok();
1123 }
1124 }
1125 None
1126}
1127
1128/// Inspect a `RpcResponsePayload`'s headers and decide whether
1129/// it's a non-terminal streaming chunk (`continue`), a terminal
1130/// streaming chunk (`end` OR non-`Ok` status), OR a unary
1131/// response (no streaming header at all). Used by the client-side
1132/// fold to demux streaming vs unary responses without needing a
1133/// separate flag.
1134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1135pub enum StreamingChunkKind {
1136 /// Non-terminal chunk — yield body, continue waiting.
1137 Continue,
1138 /// Terminal chunk — yield body (if any), close stream.
1139 Terminal,
1140 /// Not a streaming response — unary semantics apply.
1141 Unary,
1142}
1143
1144/// Classify a response per the streaming-protocol markers.
1145pub fn classify_streaming_chunk(resp: &RpcResponsePayload) -> StreamingChunkKind {
1146 // Non-Ok status is always terminal regardless of header — the
1147 // stream surfaces the error and closes.
1148 if !resp.status.is_ok() {
1149 return StreamingChunkKind::Terminal;
1150 }
1151 // Walk headers for the streaming marker. Absence = unary
1152 // semantics (caller used `call`, not `call_streaming`).
1153 for (name, value) in &resp.headers {
1154 if name == HEADER_NRPC_STREAMING {
1155 return if value.as_slice() == HEADER_NRPC_STREAMING_END {
1156 StreamingChunkKind::Terminal
1157 } else if value.as_slice() == HEADER_NRPC_STREAMING_CONTINUE {
1158 StreamingChunkKind::Continue
1159 } else {
1160 // Unknown marker value — be defensive, treat as
1161 // terminal so a misbehaving server doesn't keep
1162 // a stream open forever.
1163 StreamingChunkKind::Terminal
1164 };
1165 }
1166 }
1167 StreamingChunkKind::Unary
1168}
1169
1170// ============================================================================
1171// Server-side fold.
1172//
1173// `RpcServerFold` is the `RedexFold` half of the server. It sees
1174// REQUEST events on the channel the cortex adapter is opened against,
1175// spawns the user handler in a tokio task, and emits the RESPONSE
1176// via a callback the `Mesh::serve_rpc` glue layer wires up. The
1177// fold itself is small and pure — all I/O happens in the spawned
1178// task and the emitter callback.
1179//
1180// Cancellation: each in-flight call gets an `RpcCancellationToken`
1181// that the handler can `select!` on. CANCEL events flip the
1182// matching token; the handler observes `cancellation.cancelled()`
1183// firing and aborts cooperatively.
1184// ============================================================================
1185
1186/// Cancellation signal for an in-flight RPC handler.
1187///
1188/// Created when the fold dispatches a REQUEST; cloned into the
1189/// handler's `RpcContext` and held in the fold's in-flight map. A
1190/// matching CANCEL event flips the token; handlers observe via
1191/// either [`Self::is_cancelled`] (synchronous probe) or
1192/// [`Self::cancelled`] (await for the signal).
1193#[derive(Clone, Default)]
1194pub struct RpcCancellationToken {
1195 inner: Arc<RpcCancellationInner>,
1196}
1197
1198#[derive(Default)]
1199struct RpcCancellationInner {
1200 fired: AtomicBool,
1201 notify: Notify,
1202}
1203
1204impl RpcCancellationToken {
1205 /// Construct a fresh, un-fired token.
1206 pub fn new() -> Self {
1207 Self::default()
1208 }
1209
1210 /// Flip the token. Idempotent — repeated calls are no-ops.
1211 /// Wakes any task currently in [`Self::cancelled`].
1212 pub fn cancel(&self) {
1213 // Release pairs with the Acquire load in `is_cancelled`
1214 // so a handler that observes `is_cancelled() == true` is
1215 // guaranteed to see every prior write the canceller did.
1216 self.inner.fired.store(true, Ordering::Release);
1217 self.inner.notify.notify_waiters();
1218 }
1219
1220 /// Synchronous probe. `true` once `cancel()` has been called.
1221 #[inline]
1222 pub fn is_cancelled(&self) -> bool {
1223 self.inner.fired.load(Ordering::Acquire)
1224 }
1225
1226 /// Await the cancellation. Returns immediately if already
1227 /// cancelled. Otherwise registers as a waiter and returns when
1228 /// `cancel()` is called.
1229 ///
1230 /// Race-safe: registering the `notified()` future BEFORE the
1231 /// `is_cancelled` check means a `cancel()` racing this method
1232 /// either (a) is observed by the post-register check and we
1233 /// return immediately, OR (b) lands after we register and wakes
1234 /// our future. Either way we don't sleep past a cancellation.
1235 pub async fn cancelled(&self) {
1236 let notified = self.inner.notify.notified();
1237 if self.is_cancelled() {
1238 return;
1239 }
1240 notified.await;
1241 }
1242}
1243
1244/// W3C Trace Context — `traceparent` and `tracestate` headers
1245/// propagated through nRPC for distributed-tracing systems.
1246///
1247/// `traceparent` carries the trace id, parent span id, and flags;
1248/// `tracestate` carries vendor-specific tracing extensions. nRPC
1249/// is **transport-only** for these — it doesn't parse or generate
1250/// IDs, doesn't emit spans, doesn't talk to any tracing backend.
1251/// Application code (typically via `tracing-opentelemetry` or a
1252/// Datadog client) reads these on the server side and continues
1253/// the trace.
1254///
1255/// See <https://www.w3.org/TR/trace-context/> for the wire format
1256/// of each field.
1257#[derive(Debug, Clone, Default, PartialEq, Eq)]
1258pub struct TraceContext {
1259 /// `traceparent` header value (e.g.
1260 /// `"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"`).
1261 /// Required by the W3C spec; nRPC treats it as opaque bytes.
1262 pub traceparent: String,
1263 /// `tracestate` header value — vendor-specific extensions.
1264 /// Optional in W3C; empty string when absent.
1265 pub tracestate: String,
1266}
1267
1268/// Context handed to a `RpcHandler::call`. Carries everything the
1269/// handler needs to fulfill the request: the AEAD-verified caller
1270/// identity, the request payload, and a cancellation token.
1271pub struct RpcContext {
1272 /// AEAD-verified caller `origin_hash`. The bus sets this from
1273 /// the verified peer; not self-claimable from the request body.
1274 pub caller_origin: u64,
1275 /// Caller-generated correlation id. Same value on the matching
1276 /// CANCEL or RESPONSE.
1277 pub call_id: u64,
1278 /// Decoded request payload.
1279 pub payload: RpcRequestPayload,
1280 /// Cancellation signal. Handlers should `select!` on
1281 /// `cancellation.cancelled()` if their work is async-cancellable;
1282 /// long-running synchronous handlers should periodically check
1283 /// `cancellation.is_cancelled()`.
1284 pub cancellation: RpcCancellationToken,
1285 /// W3C Trace Context propagated from the caller, if the
1286 /// caller set `FLAG_RPC_PROPAGATE_TRACE` and supplied
1287 /// `traceparent` / `tracestate` headers in the request. The
1288 /// server's handler reads this to continue the distributed
1289 /// trace. `None` for calls that didn't propagate trace
1290 /// context.
1291 pub trace_context: Option<TraceContext>,
1292}
1293
1294/// Handler-side error that doesn't fit the application's normal
1295/// `Ok(RpcResponsePayload)` channel. The fold maps these onto a
1296/// failure-status `RpcResponsePayload` for the caller.
1297#[derive(Debug, thiserror::Error)]
1298pub enum RpcHandlerError {
1299 /// Application-defined error. The fold encodes this as
1300 /// `RpcStatus::Application(code)` with `message` as the body.
1301 #[error("application error {code:#06x}: {message}")]
1302 Application {
1303 /// Application error code; surfaces as `RpcStatus::Application(code)`
1304 /// to the caller. Use `0x8000..=0xFFFF` to avoid the
1305 /// reserved canonical range.
1306 code: u16,
1307 /// Diagnostic. Becomes the response body (UTF-8 bytes).
1308 message: String,
1309 },
1310 /// Catch-all for handler-internal failures. The fold encodes this
1311 /// as `RpcStatus::Internal` with `message` as the body.
1312 #[error("internal: {0}")]
1313 Internal(String),
1314}
1315
1316/// User-supplied handler. Implementors typically wrap their state
1317/// (or an `Arc<Mutex<State>>`) and route to the appropriate logic
1318/// based on `ctx.payload.service` or per-handler dispatch.
1319///
1320/// Multiple `Mesh::serve_rpc` registrations on different services
1321/// each install their own handler; a single handler typically
1322/// services one service.
1323#[async_trait::async_trait]
1324pub trait RpcHandler: Send + Sync + 'static {
1325 /// Process one request and return the response payload. The
1326 /// fold spawns this in a tokio task; the fold itself doesn't
1327 /// block on it. Handlers should respect `ctx.cancellation` for
1328 /// cooperative early-abort.
1329 async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError>;
1330}
1331
1332/// Callback the fold invokes to publish a response back to the
1333/// caller. Wired up by `Mesh::serve_rpc` to publish on
1334/// `<service>.replies.<caller_origin>`. Type-erased so the fold
1335/// doesn't depend on the mesh layer directly.
1336///
1337/// Arguments: `(caller_origin, call_id, response_payload)`.
1338pub type RpcResponseEmitter = Arc<dyn Fn(u64, u64, RpcResponsePayload) + Send + Sync + 'static>;
1339
1340/// Async counterpart of [`RpcResponseEmitter`] used by the
1341/// streaming fold's pump task to serialize per-call publishes.
1342///
1343/// The streaming pump awaits each emit before reading the next
1344/// chunk from the sink — this guarantees that chunks for one
1345/// `call_id` reach the network publish path in the order the
1346/// handler emitted them. (The unary fold has no such requirement
1347/// — it emits exactly one RESPONSE per call — so it sticks with
1348/// the simpler sync `RpcResponseEmitter`.)
1349pub type RpcAsyncResponseEmitter = Arc<
1350 dyn Fn(u64, u64, RpcResponsePayload) -> futures::future::BoxFuture<'static, ()>
1351 + Send
1352 + Sync
1353 + 'static,
1354>;
1355
1356/// Server-side fold. Sees REQUEST events on the configured channel,
1357/// dispatches to the user-supplied handler, emits RESPONSE events
1358/// via the supplied emitter. CANCEL events flip the matching
1359/// in-flight token.
1360///
1361/// State `()` — the user's state lives on whatever the `RpcHandler`
1362/// captures (typically `Arc<Mutex<S>>`). The fold's own state (the
1363/// in-flight map) lives on `&mut self` and is shared with spawned
1364/// handler tasks via `Arc<Mutex<...>>` so the task can self-clean
1365/// on completion.
1366pub struct RpcServerFold {
1367 handler: Arc<dyn RpcHandler>,
1368 emit: RpcResponseEmitter,
1369 /// (caller_origin, call_id) → cancellation token for the
1370 /// in-flight handler. Inserted on REQUEST, removed by either
1371 /// the spawned handler task on completion or by the fold on
1372 /// CANCEL. Wrapped in `Arc<Mutex<...>>` so spawned tasks can
1373 /// remove their own entries without going back through the
1374 /// fold.
1375 in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
1376 /// Optional per-service metrics handle. When `Some`, the
1377 /// spawned handler task bumps `handler_invocations_total` /
1378 /// `handler_in_flight` / `handler_panics_total` and records
1379 /// per-task wall-clock durations. `None` → no metrics
1380 /// (test-only path; production `Mesh::serve_rpc` always
1381 /// supplies one).
1382 metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
1383 /// Optional clock override for tests. `None` → real wall-clock
1384 /// `unix_nanos`. `Some(...)` → fixed value, lets tests pin
1385 /// deadline-already-passed behavior without sleeping.
1386 #[cfg(test)]
1387 test_now_ns: Option<u64>,
1388}
1389
1390impl RpcServerFold {
1391 /// Construct a server fold around `handler`. `emit` is the
1392 /// callback that publishes RESPONSE events to the caller's
1393 /// reply channel — `Mesh::serve_rpc` wires this to the
1394 /// publisher for `<service>.replies.<caller_origin>`.
1395 /// Constructed without a metrics handle; production callers
1396 /// chain `.with_metrics(...)` to opt into per-service
1397 /// counters.
1398 pub fn new(handler: Arc<dyn RpcHandler>, emit: RpcResponseEmitter) -> Self {
1399 Self {
1400 handler,
1401 emit,
1402 in_flight: Arc::new(Mutex::new(HashMap::new())),
1403 metrics: None,
1404 #[cfg(test)]
1405 test_now_ns: None,
1406 }
1407 }
1408
1409 /// Attach a per-service metrics handle. Hooks the spawned
1410 /// handler task to bump `handler_invocations_total`, balance
1411 /// `handler_in_flight`, count panics, and record handler
1412 /// duration into the histogram.
1413 pub fn with_metrics(
1414 mut self,
1415 metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
1416 ) -> Self {
1417 self.metrics = Some(metrics);
1418 self
1419 }
1420
1421 /// Test-only: pin the clock the fold uses for deadline
1422 /// short-circuit. Lets a unit test exercise the
1423 /// deadline-already-passed branch without waiting for wall
1424 /// time.
1425 #[cfg(test)]
1426 pub fn with_test_now_ns(mut self, now_ns: u64) -> Self {
1427 self.test_now_ns = Some(now_ns);
1428 self
1429 }
1430
1431 /// Test-only: snapshot of the in-flight call set.
1432 #[cfg(test)]
1433 pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
1434 self.in_flight.lock().keys().copied().collect()
1435 }
1436
1437 fn now_ns(&self) -> u64 {
1438 #[cfg(test)]
1439 if let Some(t) = self.test_now_ns {
1440 return t;
1441 }
1442 std::time::SystemTime::now()
1443 .duration_since(std::time::UNIX_EPOCH)
1444 .map(|d| d.as_nanos() as u64)
1445 .unwrap_or(0)
1446 }
1447
1448 /// `true` if the request's deadline has already elapsed at
1449 /// the server's current wall-clock — accounting for a small
1450 /// tolerance window that absorbs clock skew between caller
1451 /// and server. Without the tolerance, a request from a peer
1452 /// whose clock is a few hundred ms ahead of the server's
1453 /// would be timed out before the handler even saw it. Matches
1454 /// gRPC's default deadline-clock-skew tolerance shape (gRPC
1455 /// uses ~10 s).
1456 fn deadline_already_passed(&self, deadline_ns: u64) -> bool {
1457 if deadline_ns == 0 {
1458 return false;
1459 }
1460 self.now_ns().saturating_sub(DEADLINE_SKEW_TOLERANCE_NS) > deadline_ns
1461 }
1462}
1463
1464/// Tolerance for clock skew between caller and server when the
1465/// server short-circuits a request whose `deadline_ns` looks like
1466/// it has already elapsed. The check is
1467/// `now_ns - SKEW > deadline_ns`, so a request from a peer whose
1468/// clock is up to `SKEW` nanoseconds ahead of ours never hits the
1469/// short-circuit path. 10 s matches gRPC's default and is well
1470/// within the threshold an NTP-disciplined cluster ever drifts to.
1471pub const DEADLINE_SKEW_TOLERANCE_NS: u64 = 10_000_000_000; // 10 seconds
1472
1473impl RedexFold<()> for RpcServerFold {
1474 fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
1475 // Decode the meta header. A garbled meta means the event
1476 // doesn't even claim to be an RPC packet — log and skip
1477 // rather than killing the fold. Returning `Err(Decode)`
1478 // here would stop the entire cortex adapter for one
1479 // malformed event, which is wrong for an RPC server that
1480 // needs to keep serving.
1481 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
1482 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
1483 } else {
1484 None
1485 }) else {
1486 tracing::warn!(
1487 payload_len = ev.payload.len(),
1488 "rpc server fold: event payload too short for EventMeta; skipping",
1489 );
1490 return Ok(());
1491 };
1492 let key = (meta.origin_hash, meta.seq_or_ts);
1493 match meta.dispatch {
1494 DISPATCH_RPC_REQUEST => {
1495 let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
1496 Ok(p) => p,
1497 Err(e) => {
1498 // Malformed request payload. Surface as
1499 // `UnknownVersion` to the caller — they sent
1500 // bytes we couldn't parse, which usually
1501 // means a wire-format mismatch (the most
1502 // common cause). Log so operators can
1503 // diagnose.
1504 tracing::warn!(
1505 error = %e,
1506 caller_origin = format!("{:#x}", meta.origin_hash),
1507 call_id = meta.seq_or_ts,
1508 "rpc server fold: malformed request payload",
1509 );
1510 let resp = RpcResponsePayload {
1511 status: RpcStatus::UnknownVersion,
1512 headers: vec![],
1513 body: Bytes::from(format!("malformed request: {e}")),
1514 };
1515 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1516 return Ok(());
1517 }
1518 };
1519 // Fast deadline-already-passed short-circuit.
1520 // Server-side `Timeout` without invoking the
1521 // handler. Includes a clock-skew tolerance window
1522 // so a peer with a slightly-fast clock isn't
1523 // prematurely timed out — see
1524 // `deadline_already_passed`.
1525 if self.deadline_already_passed(payload.deadline_ns) {
1526 let resp = RpcResponsePayload {
1527 status: RpcStatus::Timeout,
1528 headers: vec![],
1529 body: Bytes::from_static(b"deadline already passed when request landed"),
1530 };
1531 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1532 return Ok(());
1533 }
1534 // Refuse a duplicate REQUEST with the same
1535 // `(origin_hash, call_id)` — see streaming fold for
1536 // the full rationale. For the unary fold this would
1537 // spawn a second handler under the same key, and
1538 // whichever handler completes first removes the
1539 // in-flight entry — leaving the second handler's
1540 // CANCEL handling broken (CANCEL events look up
1541 // the now-missing key and no-op). Cleaner to refuse.
1542 {
1543 let in_flight = self.in_flight.lock();
1544 if in_flight.contains_key(&key) {
1545 drop(in_flight);
1546 tracing::warn!(
1547 caller_origin = format!("{:#x}", meta.origin_hash),
1548 call_id = meta.seq_or_ts,
1549 "rpc server fold: duplicate REQUEST for in-flight call_id; refusing",
1550 );
1551 let resp = RpcResponsePayload {
1552 status: RpcStatus::Internal,
1553 headers: vec![],
1554 body: Bytes::from_static(
1555 b"duplicate REQUEST for already-in-flight call_id",
1556 ),
1557 };
1558 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
1559 return Ok(());
1560 }
1561 }
1562 let cancellation = RpcCancellationToken::new();
1563 self.in_flight.lock().insert(key, cancellation.clone());
1564 let handler = self.handler.clone();
1565 let emit = self.emit.clone();
1566 let in_flight = self.in_flight.clone();
1567 let caller_origin = meta.origin_hash;
1568 let call_id = meta.seq_or_ts;
1569 // Decode the W3C Trace Context if the caller
1570 // signaled it via `FLAG_RPC_PROPAGATE_TRACE` and
1571 // included the `traceparent` / `tracestate`
1572 // headers. nRPC is transport-only — application
1573 // code reads `ctx.trace_context` to continue the
1574 // trace via whatever backend it has wired up.
1575 let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
1576 extract_trace_context(&payload.headers)
1577 } else {
1578 None
1579 };
1580 let metrics = self.metrics.clone();
1581 // Keep a probe handle so the spawned task can detect
1582 // a CANCEL that fired during handler execution and
1583 // override its response with `RpcStatus::Cancelled`.
1584 let cancel_probe = cancellation.clone();
1585 tokio::spawn(async move {
1586 // Server-side metrics: count this invocation;
1587 // bump in_flight; time the handler; tally
1588 // panics. Only fires when a metrics handle was
1589 // attached via `with_metrics(...)` — test-only
1590 // folds construct without one.
1591 if let Some(m) = metrics.as_ref() {
1592 m.handler_invocations_total
1593 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1594 m.handler_in_flight
1595 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1596 }
1597 let handler_started = std::time::Instant::now();
1598 let ctx = RpcContext {
1599 caller_origin,
1600 call_id,
1601 payload,
1602 cancellation,
1603 trace_context,
1604 };
1605 // Catch panics so a misbehaving handler can't
1606 // take down the runtime. `AssertUnwindSafe` is
1607 // load-bearing because `RpcHandler::call`
1608 // returns a future that may borrow non-
1609 // `UnwindSafe` types from the handler; we
1610 // accept the assertion because the handler's
1611 // state is untouched on panic (we just don't
1612 // observe its in-progress mutations).
1613 let outcome = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
1614 handler.call(ctx),
1615 ))
1616 .await;
1617 if let Some(m) = metrics.as_ref() {
1618 m.handler_in_flight
1619 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
1620 m.record_handler_duration(handler_started.elapsed());
1621 if outcome.is_err() {
1622 m.handler_panics_total
1623 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1624 }
1625 }
1626 // CANCEL-wins ordering: if the cancellation
1627 // token fired at any point during handler
1628 // execution, override the handler's outcome
1629 // with `RpcStatus::Cancelled` so the caller
1630 // (or hedge primary, retry layer, etc.) sees
1631 // the documented `Cancelled` status code rather
1632 // than whatever the handler happened to return
1633 // before / despite cancellation. A cooperative
1634 // handler that observes the token and bails
1635 // early gets the same Cancelled framing as a
1636 // handler that ignored cancellation and ran to
1637 // completion — the caller's view is uniform.
1638 let resp = if cancel_probe.is_cancelled() {
1639 RpcResponsePayload {
1640 status: RpcStatus::Cancelled,
1641 headers: vec![],
1642 body: Bytes::from_static(
1643 b"server observed CANCEL during handler execution",
1644 ),
1645 }
1646 } else {
1647 match outcome {
1648 Ok(Ok(payload)) => payload,
1649 Ok(Err(RpcHandlerError::Application { code, message })) => {
1650 RpcResponsePayload {
1651 status: RpcStatus::Application(code),
1652 headers: vec![],
1653 body: Bytes::from(message),
1654 }
1655 }
1656 Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
1657 status: RpcStatus::Internal,
1658 headers: vec![],
1659 body: Bytes::from(message),
1660 },
1661 Err(panic) => {
1662 let panic_msg = panic
1663 .downcast_ref::<&'static str>()
1664 .map(|s| s.to_string())
1665 .or_else(|| panic.downcast_ref::<String>().cloned())
1666 .unwrap_or_else(|| "<non-string panic>".into());
1667 tracing::error!(
1668 caller_origin = format!("{:#x}", caller_origin),
1669 call_id,
1670 panic = %panic_msg,
1671 "rpc server handler panicked",
1672 );
1673 RpcResponsePayload {
1674 status: RpcStatus::Internal,
1675 headers: vec![],
1676 body: Bytes::from(format!("handler panicked: {panic_msg}")),
1677 }
1678 }
1679 }
1680 };
1681 in_flight.lock().remove(&key);
1682 emit(caller_origin, call_id, resp);
1683 });
1684 }
1685 DISPATCH_RPC_CANCEL => {
1686 if let Some(token) = self.in_flight.lock().remove(&key) {
1687 token.cancel();
1688 }
1689 // Idempotent — CANCEL for an unknown call_id (e.g.
1690 // a CANCEL that races the handler's completion) is
1691 // a no-op rather than an error. The spawned handler
1692 // task observes `cancel_probe.is_cancelled()` after
1693 // its future resolves and overrides the response
1694 // with `RpcStatus::Cancelled` so the caller sees a
1695 // documented status code rather than the handler's
1696 // accidental Ok / Internal payload.
1697 }
1698 // RESPONSE / DEADLINE_EXCEEDED are server-emitted; if
1699 // the server's own fold sees them (e.g. from a replay)
1700 // there's nothing to do.
1701 _ => {}
1702 }
1703 Ok(())
1704 }
1705}
1706
1707// ============================================================================
1708// Streaming server-side: handler trait + sink + fold.
1709// ============================================================================
1710
1711/// Sink the handler writes to in order to emit streaming-response
1712/// chunks. Each `send` produces one non-terminal `RESPONSE` event
1713/// to the caller. The terminal frame is emitted automatically when
1714/// the sink is dropped — the handler returning `Ok(())` drops the
1715/// sink, which closes the stream cleanly. Returning
1716/// `Err(RpcHandlerError)` drops the sink and emits the error as a
1717/// terminal non-`Ok` RESPONSE.
1718///
1719/// `send` is best-effort and infallible: the underlying mpsc is
1720/// **bounded** at [`STREAMING_PUMP_CAPACITY`] chunks. If the pump
1721/// can't keep up (publish path is congested, caller hasn't granted
1722/// flow-control credits), `send` discards on overflow — same
1723/// observable shape as a closed receiver (caller cancelled mid-
1724/// stream). Counts the drop in `streaming_chunks_dropped_total` so
1725/// operators can see backpressure occurring. Cooperative
1726/// cancellation via `ctx.cancellation` is the right way for the
1727/// handler to notice the consumer is gone; opt-in flow control via
1728/// `CallOptions::stream_window_initial` is the right way to
1729/// throttle a fast handler against a slow consumer.
1730pub struct RpcResponseSink {
1731 inner: tokio::sync::mpsc::Sender<bytes::Bytes>,
1732 /// Optional metrics handle so a dropped-on-full chunk bumps the
1733 /// `streaming_chunks_dropped_total` counter. `None` for unit-
1734 /// test folds that construct without metrics.
1735 metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
1736}
1737
1738impl RpcResponseSink {
1739 /// Emit one non-terminal chunk. Cheap (`try_send` on a
1740 /// [`STREAMING_PUMP_CAPACITY`]-bounded mpsc); never blocks. On
1741 /// overflow OR receiver-closed, the chunk is dropped and (when
1742 /// metrics are wired) `streaming_chunks_dropped_total` is
1743 /// incremented for the service.
1744 pub fn send(&self, body: impl Into<bytes::Bytes>) {
1745 if self.inner.try_send(body.into()).is_err() {
1746 if let Some(m) = self.metrics.as_ref() {
1747 m.streaming_chunks_dropped_total
1748 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1749 }
1750 }
1751 }
1752}
1753
1754/// Bounded capacity for the streaming pump's internal mpsc. A
1755/// runaway handler that produces chunks faster than the publish
1756/// path can drain them stops blocking the runtime past this many
1757/// queued chunks — additional chunks are dropped (and counted via
1758/// `streaming_chunks_dropped_total`). 1024 is generous for typical
1759/// streaming patterns; opt-in flow control via
1760/// `CallOptions::stream_window_initial` is the right primitive for
1761/// strict throttling.
1762pub const STREAMING_PUMP_CAPACITY: usize = 1024;
1763
1764/// Bounded capacity for the client-streaming server fold's
1765/// per-call request mpsc. Mirror of [`STREAMING_PUMP_CAPACITY`]
1766/// for the upload direction. A runaway caller that emits
1767/// REQUEST_CHUNKs faster than the handler can drain stops
1768/// queueing past this many chunks — additional chunks are
1769/// dropped (and counted via `streaming_request_chunks_dropped_total`
1770/// when metrics are wired). Opt-in flow control via the
1771/// `nrpc-request-window-initial` header is the right primitive
1772/// for strict throttling on the upload side.
1773///
1774/// Bidi streaming plan (Phase B).
1775pub const STREAMING_REQUEST_PUMP_CAPACITY: usize = 1024;
1776
1777// ============================================================================
1778// Phase B — server-side primitives for client-streaming.
1779// ============================================================================
1780
1781/// Context handed to an [`RpcClientStreamingHandler::call`]. Same
1782/// shape as [`RpcContext`] minus the eager `payload` (the request
1783/// stream delivers chunk bodies on the fly) and plus the
1784/// per-call `deadline_ns` (which would otherwise have ridden on
1785/// the eager payload).
1786///
1787/// Bidi streaming plan (Phase B).
1788pub struct RpcStreamingContext {
1789 /// AEAD-verified caller `origin_hash`. Same source as
1790 /// [`RpcContext::caller_origin`].
1791 pub caller_origin: u64,
1792 /// Caller-generated correlation id. Matches the initial
1793 /// REQUEST's `call_id` and every subsequent REQUEST_CHUNK /
1794 /// CANCEL / REQUEST_GRANT for this call.
1795 pub call_id: u64,
1796 /// Absolute deadline (unix nanos) from the initial REQUEST.
1797 /// `0` means no deadline; the fold does NOT auto-cancel on
1798 /// deadline (handlers self-supervise via tokio timers, same
1799 /// contract as the unary fold).
1800 pub deadline_ns: u64,
1801 /// Per-chunk metadata headers from the initial REQUEST.
1802 /// Per-REQUEST_CHUNK headers are NOT surfaced at the substrate
1803 /// layer — the typed SDK veneer (Phase E) is where header
1804 /// inspection across chunks lives (if it lands at all; the
1805 /// plan defers per-chunk-headers as opt-in raw-path access).
1806 pub headers: Vec<RpcHeader>,
1807 /// Cancellation signal. Flipped by the fold when a
1808 /// `DISPATCH_RPC_CANCEL` arrives for this call's `call_id`.
1809 /// Long-running handlers should `select!` on
1810 /// `cancellation.cancelled()`; the request stream also
1811 /// terminates on cancellation, but the token is the
1812 /// authoritative signal (the stream's terminator is shared
1813 /// with REQUEST_END).
1814 pub cancellation: RpcCancellationToken,
1815 /// W3C Trace Context propagated from the caller's initial
1816 /// REQUEST. Same semantics as [`RpcContext::trace_context`].
1817 pub trace_context: Option<TraceContext>,
1818}
1819
1820/// Callback the fold invokes to publish a [`DISPATCH_RPC_REQUEST_GRANT`]
1821/// event back to the caller. Wired up by the `Mesh` glue (Phase C)
1822/// to publish on the caller's reply channel. Type-erased so the
1823/// fold doesn't depend on the mesh layer directly.
1824///
1825/// Arguments: `(caller_origin, call_id, credits)`. Synchronous —
1826/// the publish itself is non-blocking (the underlying transport
1827/// has its own internal queueing); the fold fires-and-forgets
1828/// every grant, so dropped grants are at worst a latency wobble,
1829/// not a correctness issue (the caller's send sink will retry
1830/// when its credit budget refills via the next grant or via the
1831/// initial window).
1832///
1833/// Bidi streaming plan (Phase B).
1834pub type RpcRequestGrantEmitter = Arc<dyn Fn(u64, u64, u32) + Send + Sync + 'static>;
1835
1836/// Server-side stream of inbound request chunk bodies for one
1837/// client-streaming (or duplex) call. Yields one `Bytes` per
1838/// `DISPATCH_RPC_REQUEST` / `DISPATCH_RPC_REQUEST_CHUNK` frame
1839/// (including empty bodies — the semantics of "empty bytes" are
1840/// the application's concern, not the substrate's). Closes on
1841/// `FLAG_RPC_REQUEST_END` or on CANCEL.
1842///
1843/// **Stream item ordering convention**: the first item this
1844/// stream yields corresponds to the initial REQUEST body; every
1845/// subsequent item corresponds to a REQUEST_CHUNK body, in the
1846/// order the chunks were received from the wire. The substrate
1847/// does not tag items with their frame kind — the SDK veneer
1848/// (Phase E) is responsible for the Init / Data classification
1849/// via its `Chunk<T>` enum.
1850///
1851/// **Auto-grant behavior**: when the caller opted into
1852/// request-direction flow control via
1853/// [`HEADER_NRPC_REQUEST_WINDOW_INITIAL`], every successful
1854/// `poll_next()` fires one credit back to the caller via the
1855/// captured `grant_emitter`. This keeps the in-flight window
1856/// at the caller's initial value as the handler drains the
1857/// stream. When the caller did NOT opt in (no header), the
1858/// `grant_emitter` is `None` and the auto-grant path is a no-op
1859/// (caller is on the unbounded-credit fast path).
1860///
1861/// Bidi streaming plan (Phase B).
1862pub struct RequestStream {
1863 inner: tokio::sync::mpsc::Receiver<bytes::Bytes>,
1864 grant_emitter: Option<RpcRequestGrantEmitter>,
1865 caller_origin: u64,
1866 call_id: u64,
1867}
1868
1869impl RequestStream {
1870 /// Visible to the fold (and only the fold) for constructing
1871 /// a stream tied to a specific receiver + caller. The
1872 /// `grant_emitter` is `None` when the caller didn't opt into
1873 /// flow control; `Some(...)` when they did.
1874 pub(crate) fn new(
1875 inner: tokio::sync::mpsc::Receiver<bytes::Bytes>,
1876 grant_emitter: Option<RpcRequestGrantEmitter>,
1877 caller_origin: u64,
1878 call_id: u64,
1879 ) -> Self {
1880 Self {
1881 inner,
1882 grant_emitter,
1883 caller_origin,
1884 call_id,
1885 }
1886 }
1887}
1888
1889impl futures::Stream for RequestStream {
1890 type Item = bytes::Bytes;
1891
1892 fn poll_next(
1893 mut self: std::pin::Pin<&mut Self>,
1894 cx: &mut std::task::Context<'_>,
1895 ) -> std::task::Poll<Option<Self::Item>> {
1896 match self.inner.poll_recv(cx) {
1897 std::task::Poll::Ready(Some(bytes)) => {
1898 // Auto-grant fires on every successful pull when
1899 // flow control was opted into. Cheap and
1900 // fire-and-forget; missed grants are recovered
1901 // by subsequent pulls.
1902 if let Some(emit) = self.grant_emitter.as_ref() {
1903 emit(self.caller_origin, self.call_id, 1);
1904 }
1905 std::task::Poll::Ready(Some(bytes))
1906 }
1907 other => other,
1908 }
1909 }
1910}
1911
1912/// User-supplied handler for a client-streaming RPC. Receives an
1913/// [`RpcStreamingContext`] (caller identity, deadline, cancellation,
1914/// trace context, initial REQUEST headers) plus a [`RequestStream`]
1915/// of chunk bodies. Returns one terminal [`RpcResponsePayload`] —
1916/// the fold publishes it as the call's single RESPONSE frame.
1917///
1918/// **Cancellation contract.** Long-running handlers should
1919/// `select!` on `ctx.cancellation.cancelled()` so a caller-side
1920/// drop / deadline correctly stops the handler. The request
1921/// stream also terminates on cancellation (yields `None`), but
1922/// the token is the authoritative signal — the stream's `None`
1923/// is shared with the clean REQUEST_END path, so handlers can't
1924/// distinguish "caller finished cleanly" from "caller cancelled"
1925/// without consulting the token.
1926///
1927/// **Auto-grant.** When the caller opted into request-direction
1928/// flow control via [`HEADER_NRPC_REQUEST_WINDOW_INITIAL`], every
1929/// `stream.next().await` that yields `Some` fires one
1930/// REQUEST_GRANT back to the caller, maintaining the in-flight
1931/// window at the caller's initial value. Handlers don't need to
1932/// think about credit management for the common case.
1933///
1934/// Bidi streaming plan (Phase B).
1935#[async_trait::async_trait]
1936pub trait RpcClientStreamingHandler: Send + Sync + 'static {
1937 /// Process a client-streaming call. Drain the request stream,
1938 /// produce one terminal response payload (or an
1939 /// [`RpcHandlerError`] for failure mapping).
1940 async fn call(
1941 &self,
1942 ctx: RpcStreamingContext,
1943 requests: RequestStream,
1944 ) -> Result<RpcResponsePayload, RpcHandlerError>;
1945}
1946
1947/// User-supplied handler for a duplex RPC — many requests in,
1948/// many responses out, interleaved. Receives an [`RpcStreamingContext`]
1949/// plus a [`RequestStream`] of chunk bodies plus an
1950/// [`RpcResponseSink`] for emitting response chunks. The handler's
1951/// return value is its terminal status, NOT a final payload:
1952/// `Ok(())` closes the response stream cleanly with a terminal
1953/// `Ok` frame, `Err(RpcHandlerError)` closes with the matching
1954/// error status.
1955///
1956/// **Composition.** A duplex handler is a hybrid of an
1957/// [`RpcClientStreamingHandler`] (drains request chunks) and an
1958/// [`RpcStreamingHandler`] (emits response chunks). The two
1959/// directions are independent — a handler can finish emitting
1960/// responses before reading all requests, or vice versa. The
1961/// server fold serializes RESPONSE chunk publishes per call_id
1962/// so wire order matches handler order.
1963///
1964/// **Cancellation contract.** Identical to
1965/// [`RpcClientStreamingHandler`]: long-running work should
1966/// `select!` on `ctx.cancellation.cancelled()`.
1967///
1968/// **Auto-grant.** Identical to [`RpcClientStreamingHandler`]:
1969/// every successful `requests.next().await` emits one
1970/// REQUEST_GRANT back to the caller (when the caller opted in).
1971///
1972/// Bidi streaming plan (Phase D).
1973#[async_trait::async_trait]
1974pub trait RpcDuplexHandler: Send + Sync + 'static {
1975 /// Process one duplex call. Drain inbound chunks via
1976 /// `requests.next().await`; emit outbound chunks via
1977 /// `responses.send(...)`. Return `Ok(())` for clean close,
1978 /// `Err(RpcHandlerError)` for failure mapping.
1979 async fn call(
1980 &self,
1981 ctx: RpcStreamingContext,
1982 requests: RequestStream,
1983 responses: RpcResponseSink,
1984 ) -> Result<(), RpcHandlerError>;
1985}
1986
1987/// User-supplied streaming handler. Receives the same `RpcContext`
1988/// as a unary handler plus a `RpcResponseSink` for emitting chunks.
1989/// Returning `Ok(())` closes the stream cleanly with a terminal
1990/// `Ok` RESPONSE; `Err(RpcHandlerError)` closes the stream with a
1991/// terminal non-`Ok` RESPONSE carrying the diagnostic.
1992///
1993/// **Cancellation contract.** Long-running streams should
1994/// `select!` on `ctx.cancellation.cancelled()` so a caller-side
1995/// drop / deadline correctly stops the handler. Continuing to
1996/// `send` after cancellation is harmless (sink discards) but
1997/// wastes work.
1998#[async_trait::async_trait]
1999pub trait RpcStreamingHandler: Send + Sync + 'static {
2000 /// Process one streaming request. Emit chunks via `sink.send(...)`.
2001 /// Drop the sink (or return) to close the stream.
2002 async fn call(&self, ctx: RpcContext, sink: RpcResponseSink) -> Result<(), RpcHandlerError>;
2003}
2004
2005/// Per-call flow-control map type. Keyed on
2006/// `(caller_origin_hash, call_id)`; value is a tokio
2007/// `Semaphore` shared between the pump task (which awaits
2008/// permits) and the fold's `apply()` method handling
2009/// STREAM_GRANT events (which add permits).
2010type FlowControlMap = Arc<Mutex<HashMap<(u64, u64), Arc<tokio::sync::Semaphore>>>>;
2011
2012/// Server-side fold for streaming RPC. Parallel to `RpcServerFold`
2013/// but multi-fire emit: each handler invocation may produce many
2014/// `RESPONSE` events for the same `call_id`, marked
2015/// non-terminal/terminal via the `nrpc-streaming` header.
2016///
2017/// State `()` — like the unary fold, the handler owns user state
2018/// via captured `Arc<Mutex<S>>`. The fold's own state (in-flight
2019/// cancellation tokens) lives on `&mut self`.
2020pub struct RpcServerStreamingFold {
2021 handler: Arc<dyn RpcStreamingHandler>,
2022 emit: RpcAsyncResponseEmitter,
2023 in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2024 /// Per-call flow-control semaphore (when the caller opted in).
2025 /// `Some(sem)` means "pump must `acquire().await` one permit
2026 /// per chunk before emitting; STREAM_GRANT events
2027 /// `add_permits(n)`". Absence of an entry for a `(origin,
2028 /// call_id)` key means unbounded credit (no flow control —
2029 /// pump emits as fast as the publish path can take chunks).
2030 flow_control: FlowControlMap,
2031 /// Optional per-service metrics handle. Same shape as
2032 /// `RpcServerFold::metrics`; the streaming fold ALSO bumps
2033 /// `streaming_chunks_emitted_total` from the pump task on
2034 /// every chunk.
2035 metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2036}
2037
2038impl RpcServerStreamingFold {
2039 /// Construct a streaming server fold. `emit` publishes
2040 /// individual chunks (and the terminal frame) on the caller's
2041 /// reply channel.
2042 ///
2043 /// Uses the **async** emitter variant so the pump task can
2044 /// serialize per-call publishes — without that ordering
2045 /// guarantee, two chunks emitted in succession can race into
2046 /// the publish path and arrive at the caller out of order
2047 /// (or be eclipsed by the terminal frame and lost entirely).
2048 pub fn new(handler: Arc<dyn RpcStreamingHandler>, emit: RpcAsyncResponseEmitter) -> Self {
2049 Self {
2050 handler,
2051 emit,
2052 in_flight: Arc::new(Mutex::new(HashMap::new())),
2053 flow_control: Arc::new(Mutex::new(HashMap::new())),
2054 metrics: None,
2055 }
2056 }
2057
2058 /// Attach a per-service metrics handle. Hooks the spawned
2059 /// handler task to bump `handler_invocations_total` /
2060 /// `handler_in_flight` / `handler_panics_total` /
2061 /// `handler_duration_*`, and the pump task to bump
2062 /// `streaming_chunks_emitted_total` per emitted chunk.
2063 pub fn with_metrics(
2064 mut self,
2065 metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2066 ) -> Self {
2067 self.metrics = Some(metrics);
2068 self
2069 }
2070
2071 /// Test-only: snapshot of the in-flight call set.
2072 #[cfg(test)]
2073 pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2074 self.in_flight.lock().keys().copied().collect()
2075 }
2076}
2077
2078impl RedexFold<()> for RpcServerStreamingFold {
2079 fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
2080 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
2081 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
2082 } else {
2083 None
2084 }) else {
2085 tracing::warn!(
2086 payload_len = ev.payload.len(),
2087 "rpc streaming server fold: event payload too short for EventMeta",
2088 );
2089 return Ok(());
2090 };
2091 let key = (meta.origin_hash, meta.seq_or_ts);
2092 match meta.dispatch {
2093 DISPATCH_RPC_REQUEST => {
2094 let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
2095 Ok(p) => p,
2096 Err(e) => {
2097 tracing::warn!(
2098 error = %e,
2099 caller_origin = format!("{:#x}", meta.origin_hash),
2100 call_id = meta.seq_or_ts,
2101 "rpc streaming server fold: malformed request payload",
2102 );
2103 // Surface as a terminal error chunk. Spawn
2104 // because the apply method is sync and the
2105 // emit is async; this is a one-shot publish
2106 // so ordering doesn't matter here.
2107 let resp = RpcResponsePayload {
2108 status: RpcStatus::UnknownVersion,
2109 headers: vec![(
2110 HEADER_NRPC_STREAMING.to_string(),
2111 HEADER_NRPC_STREAMING_END.to_vec(),
2112 )],
2113 body: Bytes::from(format!("malformed request: {e}")),
2114 };
2115 let emit = self.emit.clone();
2116 let caller_origin = meta.origin_hash;
2117 let call_id = meta.seq_or_ts;
2118 tokio::spawn(async move {
2119 emit(caller_origin, call_id, resp).await;
2120 });
2121 return Ok(());
2122 }
2123 };
2124 // Refuse a duplicate REQUEST with the same
2125 // `(origin_hash, call_id)`. Without this, a retry
2126 // that arrives while the first attempt's pump is
2127 // still draining will overwrite the prior
2128 // semaphore Arc in `flow_control`, leaving the
2129 // first pump awaiting an orphaned semaphore (the
2130 // terminal cleanup keys on `key` and removes the
2131 // *new* entry, so the orphan never gets dropped
2132 // and the first handler hangs forever).
2133 //
2134 // Idempotent for the caller: we emit a terminal
2135 // `Internal` chunk so the duplicate sender sees a
2136 // clean refusal rather than waiting on a stream
2137 // that will never produce output.
2138 {
2139 let in_flight = self.in_flight.lock();
2140 if in_flight.contains_key(&key) {
2141 drop(in_flight);
2142 tracing::warn!(
2143 caller_origin = format!("{:#x}", meta.origin_hash),
2144 call_id = meta.seq_or_ts,
2145 "rpc streaming server fold: duplicate REQUEST for in-flight call_id; refusing",
2146 );
2147 let resp = RpcResponsePayload {
2148 status: RpcStatus::Internal,
2149 headers: vec![(
2150 HEADER_NRPC_STREAMING.to_string(),
2151 HEADER_NRPC_STREAMING_END.to_vec(),
2152 )],
2153 body: Bytes::from_static(
2154 b"duplicate REQUEST for already-in-flight call_id",
2155 ),
2156 };
2157 let emit = self.emit.clone();
2158 let caller_origin = meta.origin_hash;
2159 let call_id = meta.seq_or_ts;
2160 tokio::spawn(async move {
2161 emit(caller_origin, call_id, resp).await;
2162 });
2163 return Ok(());
2164 }
2165 }
2166 // Cancellation token + in-flight bookkeeping —
2167 // identical to the unary fold's pattern.
2168 let cancellation = RpcCancellationToken::new();
2169 self.in_flight.lock().insert(key, cancellation.clone());
2170 // Flow-control opt-in: parse the
2171 // `nrpc-stream-window-initial` header. When
2172 // present, install a per-call semaphore the pump
2173 // task will await per chunk; subsequent
2174 // STREAM_GRANT events refill it. When absent, no
2175 // entry → pump skips the await (back-compat).
2176 let flow_sem = parse_stream_window_initial(&payload.headers).map(|n| {
2177 let sem = Arc::new(tokio::sync::Semaphore::new(n as usize));
2178 self.flow_control.lock().insert(key, sem.clone());
2179 sem
2180 });
2181 let handler = self.handler.clone();
2182 let emit = self.emit.clone();
2183 let in_flight = self.in_flight.clone();
2184 let flow_control = self.flow_control.clone();
2185 let caller_origin = meta.origin_hash;
2186 let call_id = meta.seq_or_ts;
2187 let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
2188 extract_trace_context(&payload.headers)
2189 } else {
2190 None
2191 };
2192 let metrics = self.metrics.clone();
2193 // See unary fold for rationale — clone the
2194 // cancellation handle so the spawned task can probe
2195 // it after the handler returns and override the
2196 // terminal frame with `RpcStatus::Cancelled`.
2197 let cancel_probe = cancellation.clone();
2198 tokio::spawn(async move {
2199 if let Some(m) = metrics.as_ref() {
2200 m.handler_invocations_total
2201 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2202 m.handler_in_flight
2203 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2204 }
2205 let handler_started = std::time::Instant::now();
2206 let ctx = RpcContext {
2207 caller_origin,
2208 call_id,
2209 payload,
2210 cancellation,
2211 trace_context,
2212 };
2213 // Build the sink + receive end. Spawn a
2214 // pump that forwards each chunk to the emit
2215 // closure. The handler's `sink.send(...)`
2216 // calls show up here as items on the receiver.
2217 // **Bounded** at STREAMING_PUMP_CAPACITY: a
2218 // runaway handler that produces chunks faster
2219 // than the publish path can drain stops
2220 // blocking the runtime past this many queued
2221 // chunks; additional chunks are dropped and
2222 // counted via streaming_chunks_dropped_total.
2223 let (tx, mut rx) =
2224 tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_PUMP_CAPACITY);
2225 let sink = RpcResponseSink {
2226 inner: tx,
2227 metrics: metrics.clone(),
2228 };
2229 let pump_emit = emit.clone();
2230 let pump_metrics = metrics.clone();
2231 let pump_flow = flow_sem.clone();
2232 let pump = tokio::spawn(async move {
2233 while let Some(chunk) = rx.recv().await {
2234 // Flow control: when the caller opted
2235 // in, await one semaphore permit per
2236 // chunk before publishing. The semaphore
2237 // starts at the caller's `initial_window`
2238 // and refills when the caller sends
2239 // STREAM_GRANT events. `forget()`
2240 // consumes the slot — each chunk uses
2241 // exactly one credit, never returned.
2242 // No-op when `pump_flow` is None
2243 // (back-compat path).
2244 if let Some(sem) = pump_flow.as_ref() {
2245 let permit = match sem.clone().acquire_owned().await {
2246 Ok(p) => p,
2247 Err(_) => {
2248 // Semaphore was closed —
2249 // shouldn't happen during
2250 // normal operation; bail.
2251 break;
2252 }
2253 };
2254 permit.forget();
2255 }
2256 if let Some(m) = pump_metrics.as_ref() {
2257 m.streaming_chunks_emitted_total
2258 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2259 }
2260 let resp = RpcResponsePayload {
2261 status: RpcStatus::Ok,
2262 headers: vec![(
2263 HEADER_NRPC_STREAMING.to_string(),
2264 HEADER_NRPC_STREAMING_CONTINUE.to_vec(),
2265 )],
2266 body: chunk.clone(),
2267 };
2268 // Await per-chunk publish so chunks for
2269 // one call_id reach the network in send
2270 // order. Without this, two chunks emitted
2271 // in tight succession can race into the
2272 // publish path and arrive out of order
2273 // (or be eclipsed by the terminal frame
2274 // and lost entirely on the caller side).
2275 pump_emit(caller_origin, call_id, resp).await;
2276 }
2277 });
2278 // Run the handler. Catch panics so a
2279 // misbehaving handler can't take down the
2280 // runtime — same shape as the unary fold.
2281 let outcome = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
2282 handler.call(ctx, sink),
2283 ))
2284 .await;
2285 // The handler dropped the sink (either by
2286 // returning or by panicking through the
2287 // catch_unwind). Wait for the pump to drain
2288 // any final in-flight chunks.
2289 let _ = pump.await;
2290 if let Some(m) = metrics.as_ref() {
2291 m.handler_in_flight
2292 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2293 m.record_handler_duration(handler_started.elapsed());
2294 if outcome.is_err() {
2295 m.handler_panics_total
2296 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2297 }
2298 }
2299 // Emit the terminal frame. CANCEL-wins ordering
2300 // matches the unary fold: if the cancellation
2301 // token fired during execution, override the
2302 // handler's terminal with `RpcStatus::Cancelled`.
2303 let terminal = if cancel_probe.is_cancelled() {
2304 RpcResponsePayload {
2305 status: RpcStatus::Cancelled,
2306 headers: vec![],
2307 body: Bytes::from_static(
2308 b"server observed CANCEL during streaming handler execution",
2309 ),
2310 }
2311 } else {
2312 match outcome {
2313 Ok(Ok(())) => RpcResponsePayload {
2314 status: RpcStatus::Ok,
2315 headers: vec![(
2316 HEADER_NRPC_STREAMING.to_string(),
2317 HEADER_NRPC_STREAMING_END.to_vec(),
2318 )],
2319 body: Bytes::new(),
2320 },
2321 Ok(Err(RpcHandlerError::Application { code, message })) => {
2322 RpcResponsePayload {
2323 status: RpcStatus::Application(code),
2324 headers: vec![],
2325 body: Bytes::from(message),
2326 }
2327 }
2328 Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
2329 status: RpcStatus::Internal,
2330 headers: vec![],
2331 body: Bytes::from(message),
2332 },
2333 Err(panic) => {
2334 let panic_msg = panic
2335 .downcast_ref::<&'static str>()
2336 .map(|s| s.to_string())
2337 .or_else(|| panic.downcast_ref::<String>().cloned())
2338 .unwrap_or_else(|| "<non-string panic>".into());
2339 tracing::error!(
2340 caller_origin = format!("{:#x}", caller_origin),
2341 call_id,
2342 panic = %panic_msg,
2343 "rpc streaming server handler panicked",
2344 );
2345 RpcResponsePayload {
2346 status: RpcStatus::Internal,
2347 headers: vec![],
2348 body: Bytes::from(format!("handler panicked: {panic_msg}")),
2349 }
2350 }
2351 }
2352 };
2353 in_flight.lock().remove(&key);
2354 // Drop the per-call flow-control semaphore
2355 // (if any) so a stale GRANT arriving after
2356 // termination is silently dropped — the entry
2357 // is gone, lookup misses.
2358 flow_control.lock().remove(&key);
2359 // Await the terminal frame's publish too so it
2360 // arrives strictly AFTER the last chunk on the
2361 // wire (the pump has already drained, but the
2362 // emit itself is still async and we must await
2363 // it before the spawned task ends).
2364 emit(caller_origin, call_id, terminal).await;
2365 });
2366 }
2367 DISPATCH_RPC_CANCEL => {
2368 if let Some(token) = self.in_flight.lock().remove(&key) {
2369 token.cancel();
2370 }
2371 // Also drop the flow-control entry — the spawned
2372 // task's terminal cleanup will run too, but doing
2373 // it here makes the CANCEL path immediately stop
2374 // refilling the pump (the pending `acquire().await`
2375 // will resolve once the semaphore is dropped or
2376 // when the task exits).
2377 self.flow_control.lock().remove(&key);
2378 }
2379 DISPATCH_RPC_STREAM_GRANT => {
2380 // Add credit to the per-call semaphore. Silently
2381 // drop GRANT events for unknown / non-flow-
2382 // controlled calls — server can't tell whether
2383 // the caller is racing a terminal vs. sending a
2384 // grant for a non-flow-controlled stream, and
2385 // both are harmless to ignore.
2386 let amount = match decode_stream_grant(&ev.payload[EVENT_META_SIZE..]) {
2387 Some(n) => n,
2388 None => {
2389 tracing::debug!(
2390 caller_origin = format!("{:#x}", meta.origin_hash),
2391 call_id = meta.seq_or_ts,
2392 "rpc streaming server fold: malformed STREAM_GRANT payload",
2393 );
2394 return Ok(());
2395 }
2396 };
2397 if amount == 0 {
2398 return Ok(());
2399 }
2400 if let Some(sem) = self.flow_control.lock().get(&key).cloned() {
2401 // Tokio's `Semaphore::add_permits` is bounded
2402 // by `MAX_PERMITS = usize::MAX >> 3`. A
2403 // misbehaving caller flooding huge grants
2404 // would eventually saturate; cap defensively.
2405 let safe = (amount as usize).min(usize::MAX >> 4);
2406 sem.add_permits(safe);
2407 }
2408 }
2409 _ => {}
2410 }
2411 Ok(())
2412 }
2413}
2414
2415// ============================================================================
2416// Phase B — server-side fold for client-streaming.
2417//
2418// `RpcStreamingRequestFold` mirrors `RpcServerStreamingFold` but
2419// flipped on the data-direction axis: the SERVER consumes a
2420// stream of REQUEST_CHUNK events and the handler produces ONE
2421// terminal RESPONSE (vs. the response-side fold where one REQUEST
2422// drives many RESPONSE chunks).
2423//
2424// Wire shape it handles:
2425// DISPATCH_RPC_REQUEST (FLAG_RPC_CLIENT_STREAMING_REQUEST)
2426// DISPATCH_RPC_REQUEST_CHUNK (zero or more)
2427// DISPATCH_RPC_REQUEST_CHUNK (FLAG_RPC_REQUEST_END)
2428// DISPATCH_RPC_CANCEL (any time; flips token + closes stream)
2429//
2430// Wire shape it EMITS (via callbacks):
2431// DISPATCH_RPC_RESPONSE (one terminal frame; via RpcResponseEmitter)
2432// DISPATCH_RPC_REQUEST_GRANT (one per consumed chunk when flow
2433// control is opted in; via
2434// RpcRequestGrantEmitter)
2435//
2436// Each service binds to exactly one fold shape (unary, server-
2437// streaming, or client-streaming) at `serve_rpc*` registration.
2438// A REQUEST without FLAG_RPC_CLIENT_STREAMING_REQUEST that lands
2439// on the client-streaming fold is a caller bug — the fold emits a
2440// terminal `Internal` and drops the call.
2441// ============================================================================
2442
2443/// Per-call request-direction sender map type. Keyed on
2444/// `(caller_origin_hash, call_id)`; value is the bounded mpsc
2445/// sender the fold's `apply()` pushes REQUEST_CHUNK bodies into.
2446/// The matching receiver lives inside the handler's
2447/// [`RequestStream`]; dropping the sender (on REQUEST_END or
2448/// CANCEL) closes the stream.
2449type RequestChunkSenders = Arc<Mutex<HashMap<(u64, u64), tokio::sync::mpsc::Sender<bytes::Bytes>>>>;
2450
2451/// Shared REQUEST_CHUNK handling used by both
2452/// [`RpcStreamingRequestFold`] and [`RpcDuplexFold`]. Decodes the
2453/// payload, validates the call_id agreement, looks up the per-call
2454/// sender, pushes the body (skipping the empty-body FLAG_END
2455/// terminator), and removes the sender on FLAG_END so the
2456/// handler's stream observes EOF.
2457///
2458/// `diag_tag` selects the log prefix ("client-streaming" or
2459/// "duplex") so the two call sites surface identically-shaped
2460/// diagnostics with the correct fold name. The behavior is
2461/// otherwise identical — both folds carry the same wire format
2462/// and the same per-call mpsc + sender-map contract.
2463fn apply_request_chunk_to_senders(
2464 payload_bytes: Bytes,
2465 meta: &EventMeta,
2466 senders: &RequestChunkSenders,
2467 diag_tag: &'static str,
2468) {
2469 let payload = match RpcRequestChunkPayload::decode(payload_bytes) {
2470 Ok(p) => p,
2471 Err(e) => {
2472 tracing::warn!(
2473 error = %e,
2474 caller_origin = format!("{:#x}", meta.origin_hash),
2475 call_id = meta.seq_or_ts,
2476 tag = diag_tag,
2477 "rpc server fold: malformed REQUEST_CHUNK payload",
2478 );
2479 return;
2480 }
2481 };
2482 if payload.call_id != meta.seq_or_ts {
2483 tracing::warn!(
2484 caller_origin = format!("{:#x}", meta.origin_hash),
2485 meta_call_id = meta.seq_or_ts,
2486 payload_call_id = payload.call_id,
2487 tag = diag_tag,
2488 "rpc server fold: REQUEST_CHUNK payload call_id does not match EventMeta",
2489 );
2490 return;
2491 }
2492 let key = (meta.origin_hash, meta.seq_or_ts);
2493 let is_end = payload.flags & FLAG_RPC_REQUEST_END != 0;
2494 let sender = senders.lock().get(&key).cloned();
2495 let Some(sender) = sender else {
2496 // Unknown call — either the initial REQUEST hasn't
2497 // arrived yet (out-of-order delivery is possible on the
2498 // bus) or the handler already completed and the entry is
2499 // gone. Drop silently.
2500 tracing::debug!(
2501 caller_origin = format!("{:#x}", meta.origin_hash),
2502 call_id = meta.seq_or_ts,
2503 tag = diag_tag,
2504 "rpc server fold: REQUEST_CHUNK for unknown call_id; dropping",
2505 );
2506 return;
2507 };
2508 let is_pure_terminator = is_end && payload.body.is_empty();
2509 if !is_pure_terminator && sender.try_send(payload.body).is_err() {
2510 tracing::debug!(
2511 caller_origin = format!("{:#x}", meta.origin_hash),
2512 call_id = meta.seq_or_ts,
2513 tag = diag_tag,
2514 "rpc server fold: request-chunk mpsc full or closed; dropping",
2515 );
2516 }
2517 if is_end {
2518 // Drop the sender from the map → its clone here goes out
2519 // of scope at end of function → the receiver in the
2520 // handler's RequestStream sees EOF on the next poll.
2521 senders.lock().remove(&key);
2522 }
2523}
2524
2525/// Server-side fold for client-streaming RPC. Parallel to
2526/// [`RpcServerStreamingFold`] but consumes REQUEST_CHUNK on the
2527/// input side and produces one terminal RESPONSE on the output
2528/// side (vs. one REQUEST in / many RESPONSE chunks out).
2529///
2530/// State `()` — like the other folds, application state lives in
2531/// the handler's captured `Arc<Mutex<S>>`. The fold's own state
2532/// (in-flight cancellation tokens + per-call request-chunk
2533/// senders) lives on `&mut self` via `Arc<Mutex<...>>` so spawned
2534/// handler tasks can self-clean on completion.
2535///
2536/// Bidi streaming plan (Phase B).
2537pub struct RpcStreamingRequestFold {
2538 handler: Arc<dyn RpcClientStreamingHandler>,
2539 emit: RpcResponseEmitter,
2540 /// Optional request-direction grant emitter. `Some(...)`
2541 /// when the surrounding mesh glue is wired to publish
2542 /// REQUEST_GRANT events; `None` in unit tests / contexts
2543 /// without a real publish path. When `None`, the auto-grant
2544 /// path on every `RequestStream::poll_next` becomes a no-op
2545 /// (callers that opted into flow control will see no
2546 /// refill and stall once their initial window is exhausted —
2547 /// honest behavior for a fold not wired up for grants).
2548 grant_emit: Option<RpcRequestGrantEmitter>,
2549 in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2550 senders: RequestChunkSenders,
2551 /// Optional per-service metrics handle. Same shape as the
2552 /// other folds. Reuses the response-side counters where they
2553 /// apply (handler_invocations / handler_panics / etc.) and
2554 /// would gain request-side counters (e.g.
2555 /// `streaming_request_chunks_dropped_total`) in a follow-up.
2556 metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2557}
2558
2559impl RpcStreamingRequestFold {
2560 /// Construct a client-streaming server fold. `emit` publishes
2561 /// the terminal RESPONSE on the caller's reply channel.
2562 ///
2563 /// Use the sync [`RpcResponseEmitter`] here — there's only
2564 /// one RESPONSE per call (the terminal frame), so the
2565 /// per-call serialization the async emitter buys for the
2566 /// response-side fold is not needed here.
2567 pub fn new(handler: Arc<dyn RpcClientStreamingHandler>, emit: RpcResponseEmitter) -> Self {
2568 Self {
2569 handler,
2570 emit,
2571 grant_emit: None,
2572 in_flight: Arc::new(Mutex::new(HashMap::new())),
2573 senders: Arc::new(Mutex::new(HashMap::new())),
2574 metrics: None,
2575 }
2576 }
2577
2578 /// Attach the request-direction grant emitter. Hands every
2579 /// `RequestStream::poll_next` a hook to fire one REQUEST_GRANT
2580 /// back to the caller after a chunk is consumed. Optional —
2581 /// folds constructed without it still work, callers that
2582 /// opted into flow control just won't be refilled.
2583 pub fn with_grant_emitter(mut self, grant_emit: RpcRequestGrantEmitter) -> Self {
2584 self.grant_emit = Some(grant_emit);
2585 self
2586 }
2587
2588 /// Attach a per-service metrics handle. Hooks the spawned
2589 /// handler task to bump `handler_invocations_total` /
2590 /// `handler_in_flight` / `handler_panics_total` /
2591 /// `handler_duration_*`. Symmetric with `RpcServerFold` and
2592 /// `RpcServerStreamingFold`.
2593 pub fn with_metrics(
2594 mut self,
2595 metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2596 ) -> Self {
2597 self.metrics = Some(metrics);
2598 self
2599 }
2600
2601 /// Test-only: snapshot of the in-flight call set.
2602 #[cfg(test)]
2603 pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2604 self.in_flight.lock().keys().copied().collect()
2605 }
2606
2607 /// Test-only: snapshot of the in-flight per-call senders.
2608 /// Useful for tests that need to assert a call's sender has
2609 /// been dropped after REQUEST_END / CANCEL.
2610 #[cfg(test)]
2611 pub fn sender_keys(&self) -> Vec<(u64, u64)> {
2612 self.senders.lock().keys().copied().collect()
2613 }
2614}
2615
2616impl RedexFold<()> for RpcStreamingRequestFold {
2617 fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
2618 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
2619 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
2620 } else {
2621 None
2622 }) else {
2623 tracing::warn!(
2624 payload_len = ev.payload.len(),
2625 "rpc client-streaming server fold: event payload too short for EventMeta",
2626 );
2627 return Ok(());
2628 };
2629 let key = (meta.origin_hash, meta.seq_or_ts);
2630 match meta.dispatch {
2631 DISPATCH_RPC_REQUEST => {
2632 let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
2633 Ok(p) => p,
2634 Err(e) => {
2635 tracing::warn!(
2636 error = %e,
2637 caller_origin = format!("{:#x}", meta.origin_hash),
2638 call_id = meta.seq_or_ts,
2639 "rpc client-streaming server fold: malformed request payload",
2640 );
2641 let resp = RpcResponsePayload {
2642 status: RpcStatus::UnknownVersion,
2643 headers: vec![],
2644 body: Bytes::from(format!("malformed request: {e}")),
2645 };
2646 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2647 return Ok(());
2648 }
2649 };
2650 // A REQUEST without the client-streaming flag on
2651 // this fold is a caller bug — the service was
2652 // registered as client-streaming. Refuse cleanly.
2653 if payload.flags & FLAG_RPC_CLIENT_STREAMING_REQUEST == 0 {
2654 tracing::warn!(
2655 caller_origin = format!("{:#x}", meta.origin_hash),
2656 call_id = meta.seq_or_ts,
2657 flags = format!("{:#06x}", payload.flags),
2658 "rpc client-streaming server fold: REQUEST missing FLAG_RPC_CLIENT_STREAMING_REQUEST",
2659 );
2660 let resp = RpcResponsePayload {
2661 status: RpcStatus::Internal,
2662 headers: vec![],
2663 body: Bytes::from_static(
2664 b"REQUEST on a client-streaming service must set FLAG_RPC_CLIENT_STREAMING_REQUEST",
2665 ),
2666 };
2667 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2668 return Ok(());
2669 }
2670 // Refuse a duplicate REQUEST with the same
2671 // `(origin_hash, call_id)` — same rationale as
2672 // the response-side fold: a retry that arrives
2673 // while the first attempt is still in-flight
2674 // would overwrite the prior sender and orphan the
2675 // existing handler.
2676 {
2677 let in_flight = self.in_flight.lock();
2678 if in_flight.contains_key(&key) {
2679 drop(in_flight);
2680 tracing::warn!(
2681 caller_origin = format!("{:#x}", meta.origin_hash),
2682 call_id = meta.seq_or_ts,
2683 "rpc client-streaming server fold: duplicate REQUEST for in-flight call_id; refusing",
2684 );
2685 let resp = RpcResponsePayload {
2686 status: RpcStatus::Internal,
2687 headers: vec![],
2688 body: Bytes::from_static(
2689 b"duplicate REQUEST for already-in-flight call_id",
2690 ),
2691 };
2692 (self.emit)(meta.origin_hash, meta.seq_or_ts, resp);
2693 return Ok(());
2694 }
2695 }
2696 let cancellation = RpcCancellationToken::new();
2697 self.in_flight.lock().insert(key, cancellation.clone());
2698 // Build the per-call request-chunk mpsc. Bounded
2699 // capacity — overflow on the sender side drops the
2700 // chunk (caller can re-send or, if flow-control is
2701 // wired, will naturally not push past the credit
2702 // window).
2703 let (tx, rx) =
2704 tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_REQUEST_PUMP_CAPACITY);
2705 // Terminator-semantics rule: an empty body
2706 // combined with FLAG_REQUEST_END is a pure
2707 // terminator — the caller's `finish()` emits it
2708 // to close the stream without yielding a phantom
2709 // empty item to the handler. A non-empty body on
2710 // a FLAG_END frame IS a final item (used by the
2711 // "single-item degenerate path": initial REQUEST
2712 // with FLAG_END + a real body sends one item +
2713 // closes in a single frame).
2714 let end_on_initial = payload.flags & FLAG_RPC_REQUEST_END != 0;
2715 let is_pure_terminator = end_on_initial && payload.body.is_empty();
2716 if !is_pure_terminator {
2717 // Fresh `mpsc::channel(STREAMING_REQUEST_PUMP_CAPACITY)`
2718 // with a live receiver — try_send cannot fail.
2719 // debug_assert surfaces the invariant break in
2720 // tests; release logs at error level rather than
2721 // silently swallowing the first request body.
2722 if tx.try_send(payload.body).is_err() {
2723 debug_assert!(
2724 false,
2725 "fresh client-streaming request mpsc rejected initial body"
2726 );
2727 tracing::error!(
2728 caller_origin = format!("{:#x}", meta.origin_hash),
2729 call_id = meta.seq_or_ts,
2730 "rpc client-streaming server fold: fresh mpsc rejected initial REQUEST body (invariant break)",
2731 );
2732 }
2733 }
2734 // If the initial REQUEST also set FLAG_REQUEST_END,
2735 // close the stream immediately — degenerate case of
2736 // "one-item upload" where the caller didn't bother
2737 // with a trailing REQUEST_CHUNK. Don't even insert
2738 // the sender into the map; just drop it here.
2739 if !end_on_initial {
2740 self.senders.lock().insert(key, tx);
2741 }
2742 // Build the handler's context + stream. Auto-grant
2743 // is opted into when the caller set the request
2744 // window header AND the fold was wired with a
2745 // grant emitter; both must be present for grants
2746 // to actually fly.
2747 let grant_emitter = if parse_request_window_initial(&payload.headers).is_some() {
2748 self.grant_emit.clone()
2749 } else {
2750 None
2751 };
2752 let request_stream =
2753 RequestStream::new(rx, grant_emitter, meta.origin_hash, meta.seq_or_ts);
2754 let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
2755 extract_trace_context(&payload.headers)
2756 } else {
2757 None
2758 };
2759 let deadline_ns = payload.deadline_ns;
2760 let ctx = RpcStreamingContext {
2761 caller_origin: meta.origin_hash,
2762 call_id: meta.seq_or_ts,
2763 deadline_ns,
2764 headers: payload.headers,
2765 cancellation: cancellation.clone(),
2766 trace_context,
2767 };
2768 let handler = self.handler.clone();
2769 let emit = self.emit.clone();
2770 let in_flight = self.in_flight.clone();
2771 let senders = self.senders.clone();
2772 let caller_origin = meta.origin_hash;
2773 let call_id = meta.seq_or_ts;
2774 let cancel_probe = cancellation.clone();
2775 let cancel_for_deadline = cancellation.clone();
2776 let metrics = self.metrics.clone();
2777 tokio::spawn(async move {
2778 if let Some(m) = metrics.as_ref() {
2779 m.handler_invocations_total
2780 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2781 m.handler_in_flight
2782 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2783 }
2784 let handler_started = std::time::Instant::now();
2785 // Deadline guard: if the caller declared
2786 // `deadline_ns`, force-drop the handler future
2787 // after it elapses so an orphaned request stream
2788 // (caller-side network partition before
2789 // REQUEST_END arrives) can never hang the call
2790 // indefinitely. `deadline_ns = 0` means "no
2791 // deadline" — caller's responsibility.
2792 let call_fut = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
2793 handler.call(ctx, request_stream),
2794 ));
2795 let outcome = if deadline_ns > 0 {
2796 let now_ns = std::time::SystemTime::now()
2797 .duration_since(std::time::UNIX_EPOCH)
2798 .map(|d| d.as_nanos() as u64)
2799 .unwrap_or(0);
2800 let remaining = deadline_ns.saturating_sub(now_ns);
2801 if remaining == 0 {
2802 cancel_for_deadline.cancel();
2803 Ok(Err(RpcHandlerError::Internal(
2804 "handler deadline_ns already expired at spawn".to_string(),
2805 )))
2806 } else {
2807 match tokio::time::timeout(
2808 std::time::Duration::from_nanos(remaining),
2809 call_fut,
2810 )
2811 .await
2812 {
2813 Ok(o) => o,
2814 Err(_) => {
2815 cancel_for_deadline.cancel();
2816 Ok(Err(RpcHandlerError::Internal(
2817 "handler deadline_ns exceeded".to_string(),
2818 )))
2819 }
2820 }
2821 }
2822 } else {
2823 call_fut.await
2824 };
2825 if let Some(m) = metrics.as_ref() {
2826 m.handler_in_flight
2827 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
2828 m.record_handler_duration(handler_started.elapsed());
2829 if outcome.is_err() {
2830 m.handler_panics_total
2831 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2832 }
2833 }
2834 // CANCEL-wins ordering: if the cancellation
2835 // token fired during execution, override the
2836 // handler's terminal with Cancelled.
2837 let terminal = if cancel_probe.is_cancelled() {
2838 RpcResponsePayload {
2839 status: RpcStatus::Cancelled,
2840 headers: vec![],
2841 body: Bytes::from_static(
2842 b"server observed CANCEL during client-streaming handler execution",
2843 ),
2844 }
2845 } else {
2846 match outcome {
2847 Ok(Ok(resp)) => resp,
2848 Ok(Err(RpcHandlerError::Application { code, message })) => {
2849 RpcResponsePayload {
2850 status: RpcStatus::Application(code),
2851 headers: vec![],
2852 body: Bytes::from(message),
2853 }
2854 }
2855 Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
2856 status: RpcStatus::Internal,
2857 headers: vec![],
2858 body: Bytes::from(message),
2859 },
2860 Err(panic) => {
2861 let panic_msg = panic
2862 .downcast_ref::<&'static str>()
2863 .map(|s| s.to_string())
2864 .or_else(|| panic.downcast_ref::<String>().cloned())
2865 .unwrap_or_else(|| "<non-string panic>".into());
2866 tracing::error!(
2867 caller_origin = format!("{:#x}", caller_origin),
2868 call_id,
2869 panic = %panic_msg,
2870 "rpc client-streaming server handler panicked",
2871 );
2872 RpcResponsePayload {
2873 status: RpcStatus::Internal,
2874 headers: vec![],
2875 body: Bytes::from(format!("handler panicked: {panic_msg}")),
2876 }
2877 }
2878 }
2879 };
2880 in_flight.lock().remove(&key);
2881 // Drop the per-call request-chunk sender too
2882 // (idempotent — already gone if REQUEST_END
2883 // arrived; defensive otherwise so a handler
2884 // that returned without consuming all chunks
2885 // doesn't leak the entry).
2886 senders.lock().remove(&key);
2887 (emit)(caller_origin, call_id, terminal);
2888 });
2889 }
2890 DISPATCH_RPC_REQUEST_CHUNK => {
2891 apply_request_chunk_to_senders(
2892 ev.payload.slice(EVENT_META_SIZE..),
2893 &meta,
2894 &self.senders,
2895 "client-streaming",
2896 );
2897 }
2898 DISPATCH_RPC_CANCEL => {
2899 if let Some(token) = self.in_flight.lock().remove(&key) {
2900 token.cancel();
2901 }
2902 // Drop the per-call sender so the handler's
2903 // RequestStream yields None on the next poll
2904 // (handler observes cancel via the token OR via
2905 // the stream's EOF; the cancel_probe in the
2906 // spawned task ensures the terminal RESPONSE is
2907 // Cancelled regardless of which the handler
2908 // checks first).
2909 self.senders.lock().remove(&key);
2910 }
2911 _ => {}
2912 }
2913 Ok(())
2914 }
2915}
2916
2917// ============================================================================
2918// Phase D — server-side fold for full duplex.
2919//
2920// `RpcDuplexFold` is the hybrid of `RpcStreamingRequestFold`
2921// (Phase B — request side) and `RpcServerStreamingFold` (existing
2922// — response side). The handler trait takes BOTH a `RequestStream`
2923// AND an `RpcResponseSink`; the fold spawns one handler task per
2924// REQUEST and one pump task per call_id, then emits a terminal
2925// RESPONSE on handler return.
2926//
2927// Wire shape it consumes:
2928// DISPATCH_RPC_REQUEST (FLAG_CLIENT_STREAMING_REQUEST + FLAG_STREAMING_RESPONSE)
2929// DISPATCH_RPC_REQUEST_CHUNK (zero or more, with FLAG_REQUEST_END on the last)
2930// DISPATCH_RPC_CANCEL (flips token + closes both directions)
2931//
2932// Wire shape it produces:
2933// DISPATCH_RPC_RESPONSE (multi-fire; nrpc-streaming: continue / end)
2934// DISPATCH_RPC_REQUEST_GRANT (one per consumed request-chunk when flow
2935// control is opted in)
2936//
2937// Bidi streaming plan (Phase D).
2938// ============================================================================
2939
2940/// Server-side fold for duplex RPC. Composes Phase B's request
2941/// stream + per-call request-chunk senders with the existing
2942/// response-side pump + multi-fire RESPONSE emit.
2943///
2944/// State `()` — same as the sibling folds.
2945///
2946/// Bidi streaming plan (Phase D).
2947pub struct RpcDuplexFold {
2948 handler: Arc<dyn RpcDuplexHandler>,
2949 /// Async emitter for response chunks (per-call ordering via
2950 /// awaited emits — same rationale as `RpcServerStreamingFold`).
2951 emit: RpcAsyncResponseEmitter,
2952 /// Optional request-direction grant emitter.
2953 grant_emit: Option<RpcRequestGrantEmitter>,
2954 in_flight: Arc<Mutex<HashMap<(u64, u64), RpcCancellationToken>>>,
2955 senders: RequestChunkSenders,
2956 metrics: Option<Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>>,
2957}
2958
2959impl RpcDuplexFold {
2960 /// Construct a duplex server fold. `emit` publishes individual
2961 /// response chunks AND the terminal frame on the caller's
2962 /// reply channel (uses the async emitter for per-call
2963 /// ordering, same as `RpcServerStreamingFold`).
2964 pub fn new(handler: Arc<dyn RpcDuplexHandler>, emit: RpcAsyncResponseEmitter) -> Self {
2965 Self {
2966 handler,
2967 emit,
2968 grant_emit: None,
2969 in_flight: Arc::new(Mutex::new(HashMap::new())),
2970 senders: Arc::new(Mutex::new(HashMap::new())),
2971 metrics: None,
2972 }
2973 }
2974
2975 /// Attach the request-direction grant emitter. See
2976 /// [`RpcStreamingRequestFold::with_grant_emitter`] for the
2977 /// auto-grant behavior. When unset, callers that opted into
2978 /// flow control simply won't get refilled.
2979 pub fn with_grant_emitter(mut self, grant_emit: RpcRequestGrantEmitter) -> Self {
2980 self.grant_emit = Some(grant_emit);
2981 self
2982 }
2983
2984 /// Attach a per-service metrics handle. Bumps
2985 /// handler_invocations / handler_in_flight / handler_panics /
2986 /// handler_duration_* + the response pump's
2987 /// streaming_chunks_emitted_total per emitted chunk.
2988 pub fn with_metrics(
2989 mut self,
2990 metrics: Arc<crate::adapter::net::mesh_rpc_metrics::ServiceMetricsAtomic>,
2991 ) -> Self {
2992 self.metrics = Some(metrics);
2993 self
2994 }
2995
2996 /// Test-only: snapshot of the in-flight call set.
2997 #[cfg(test)]
2998 pub fn in_flight_keys(&self) -> Vec<(u64, u64)> {
2999 self.in_flight.lock().keys().copied().collect()
3000 }
3001
3002 /// Test-only: snapshot of the in-flight per-call senders.
3003 #[cfg(test)]
3004 pub fn sender_keys(&self) -> Vec<(u64, u64)> {
3005 self.senders.lock().keys().copied().collect()
3006 }
3007}
3008
3009impl RedexFold<()> for RpcDuplexFold {
3010 fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
3011 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3012 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3013 } else {
3014 None
3015 }) else {
3016 tracing::warn!(
3017 payload_len = ev.payload.len(),
3018 "rpc duplex server fold: event payload too short for EventMeta",
3019 );
3020 return Ok(());
3021 };
3022 let key = (meta.origin_hash, meta.seq_or_ts);
3023 match meta.dispatch {
3024 DISPATCH_RPC_REQUEST => {
3025 let payload = match RpcRequestPayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3026 Ok(p) => p,
3027 Err(e) => {
3028 tracing::warn!(
3029 error = %e,
3030 caller_origin = format!("{:#x}", meta.origin_hash),
3031 call_id = meta.seq_or_ts,
3032 "rpc duplex server fold: malformed request payload",
3033 );
3034 let resp = RpcResponsePayload {
3035 status: RpcStatus::UnknownVersion,
3036 headers: vec![(
3037 HEADER_NRPC_STREAMING.to_string(),
3038 HEADER_NRPC_STREAMING_END.to_vec(),
3039 )],
3040 body: Bytes::from(format!("malformed request: {e}")),
3041 };
3042 let emit = self.emit.clone();
3043 let caller_origin = meta.origin_hash;
3044 let call_id = meta.seq_or_ts;
3045 tokio::spawn(async move {
3046 emit(caller_origin, call_id, resp).await;
3047 });
3048 return Ok(());
3049 }
3050 };
3051 // Caller-bug guard: a duplex REQUEST must set
3052 // BOTH the client-streaming flag (we'll receive
3053 // request chunks) AND the streaming-response flag
3054 // (we'll emit response chunks). Missing flags →
3055 // refuse cleanly.
3056 let required = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
3057 if payload.flags & required != required {
3058 tracing::warn!(
3059 caller_origin = format!("{:#x}", meta.origin_hash),
3060 call_id = meta.seq_or_ts,
3061 flags = format!("{:#06x}", payload.flags),
3062 "rpc duplex server fold: REQUEST missing required flags",
3063 );
3064 let resp = RpcResponsePayload {
3065 status: RpcStatus::Internal,
3066 headers: vec![(
3067 HEADER_NRPC_STREAMING.to_string(),
3068 HEADER_NRPC_STREAMING_END.to_vec(),
3069 )],
3070 body: Bytes::from_static(
3071 b"REQUEST on a duplex service must set FLAG_RPC_CLIENT_STREAMING_REQUEST and FLAG_RPC_STREAMING_RESPONSE",
3072 ),
3073 };
3074 let emit = self.emit.clone();
3075 let caller_origin = meta.origin_hash;
3076 let call_id = meta.seq_or_ts;
3077 tokio::spawn(async move {
3078 emit(caller_origin, call_id, resp).await;
3079 });
3080 return Ok(());
3081 }
3082 // Duplicate-REQUEST refusal.
3083 {
3084 let in_flight = self.in_flight.lock();
3085 if in_flight.contains_key(&key) {
3086 drop(in_flight);
3087 tracing::warn!(
3088 caller_origin = format!("{:#x}", meta.origin_hash),
3089 call_id = meta.seq_or_ts,
3090 "rpc duplex server fold: duplicate REQUEST for in-flight call_id; refusing",
3091 );
3092 let resp = RpcResponsePayload {
3093 status: RpcStatus::Internal,
3094 headers: vec![(
3095 HEADER_NRPC_STREAMING.to_string(),
3096 HEADER_NRPC_STREAMING_END.to_vec(),
3097 )],
3098 body: Bytes::from_static(
3099 b"duplicate REQUEST for already-in-flight call_id",
3100 ),
3101 };
3102 let emit = self.emit.clone();
3103 let caller_origin = meta.origin_hash;
3104 let call_id = meta.seq_or_ts;
3105 tokio::spawn(async move {
3106 emit(caller_origin, call_id, resp).await;
3107 });
3108 return Ok(());
3109 }
3110 }
3111 let cancellation = RpcCancellationToken::new();
3112 self.in_flight.lock().insert(key, cancellation.clone());
3113
3114 // Build per-call request-side mpsc (Phase B
3115 // pattern).
3116 let (req_tx, req_rx) =
3117 tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_REQUEST_PUMP_CAPACITY);
3118 let end_on_initial = payload.flags & FLAG_RPC_REQUEST_END != 0;
3119 let is_pure_terminator = end_on_initial && payload.body.is_empty();
3120 if !is_pure_terminator {
3121 // Same invariant as the client-streaming fold:
3122 // fresh bounded mpsc with a live receiver cannot
3123 // reject the first send.
3124 if req_tx.try_send(payload.body).is_err() {
3125 debug_assert!(false, "fresh duplex request mpsc rejected initial body");
3126 tracing::error!(
3127 caller_origin = format!("{:#x}", meta.origin_hash),
3128 call_id = meta.seq_or_ts,
3129 "rpc duplex server fold: fresh mpsc rejected initial REQUEST body (invariant break)",
3130 );
3131 }
3132 }
3133 if !end_on_initial {
3134 self.senders.lock().insert(key, req_tx);
3135 }
3136 // Hand the handler an auto-granting RequestStream
3137 // when the caller opted into request-direction
3138 // flow control AND the fold was wired with a
3139 // grant emitter.
3140 let grant_emitter = if parse_request_window_initial(&payload.headers).is_some() {
3141 self.grant_emit.clone()
3142 } else {
3143 None
3144 };
3145 let request_stream =
3146 RequestStream::new(req_rx, grant_emitter, meta.origin_hash, meta.seq_or_ts);
3147
3148 // Build the per-call response-side mpsc (existing
3149 // server-streaming-response pattern). The handler
3150 // writes chunks to the sink; the pump task drains
3151 // the receiver and publishes RESPONSE events.
3152 let (resp_tx, mut resp_rx) =
3153 tokio::sync::mpsc::channel::<bytes::Bytes>(STREAMING_PUMP_CAPACITY);
3154 let response_sink = RpcResponseSink {
3155 inner: resp_tx,
3156 metrics: self.metrics.clone(),
3157 };
3158
3159 let trace_context = if payload.flags & FLAG_RPC_PROPAGATE_TRACE != 0 {
3160 extract_trace_context(&payload.headers)
3161 } else {
3162 None
3163 };
3164 let deadline_ns = payload.deadline_ns;
3165 let ctx = RpcStreamingContext {
3166 caller_origin: meta.origin_hash,
3167 call_id: meta.seq_or_ts,
3168 deadline_ns,
3169 headers: payload.headers,
3170 cancellation: cancellation.clone(),
3171 trace_context,
3172 };
3173 let handler = self.handler.clone();
3174 let emit = self.emit.clone();
3175 let in_flight = self.in_flight.clone();
3176 let senders = self.senders.clone();
3177 let caller_origin = meta.origin_hash;
3178 let call_id = meta.seq_or_ts;
3179 let cancel_probe = cancellation.clone();
3180 let cancel_for_deadline = cancellation.clone();
3181 let metrics = self.metrics.clone();
3182
3183 // Pump: drains resp_rx, emits per-chunk RESPONSE
3184 // events with `nrpc-streaming: continue`.
3185 let pump_emit = emit.clone();
3186 let pump_metrics = metrics.clone();
3187 let pump = tokio::spawn(async move {
3188 while let Some(chunk) = resp_rx.recv().await {
3189 if let Some(m) = pump_metrics.as_ref() {
3190 m.streaming_chunks_emitted_total
3191 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3192 }
3193 let resp = RpcResponsePayload {
3194 status: RpcStatus::Ok,
3195 headers: vec![(
3196 HEADER_NRPC_STREAMING.to_string(),
3197 HEADER_NRPC_STREAMING_CONTINUE.to_vec(),
3198 )],
3199 body: chunk.clone(),
3200 };
3201 pump_emit(caller_origin, call_id, resp).await;
3202 }
3203 });
3204
3205 tokio::spawn(async move {
3206 if let Some(m) = metrics.as_ref() {
3207 m.handler_invocations_total
3208 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3209 m.handler_in_flight
3210 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3211 }
3212 let handler_started = std::time::Instant::now();
3213 // Same deadline guard as the client-streaming
3214 // fold: force-drop the handler future at
3215 // deadline_ns so an orphaned request stream
3216 // can't hang the call. `0` means no deadline.
3217 let call_fut = futures::FutureExt::catch_unwind(std::panic::AssertUnwindSafe(
3218 handler.call(ctx, request_stream, response_sink),
3219 ));
3220 let outcome = if deadline_ns > 0 {
3221 let now_ns = std::time::SystemTime::now()
3222 .duration_since(std::time::UNIX_EPOCH)
3223 .map(|d| d.as_nanos() as u64)
3224 .unwrap_or(0);
3225 let remaining = deadline_ns.saturating_sub(now_ns);
3226 if remaining == 0 {
3227 cancel_for_deadline.cancel();
3228 Ok(Err(RpcHandlerError::Internal(
3229 "duplex handler deadline_ns already expired at spawn".to_string(),
3230 )))
3231 } else {
3232 match tokio::time::timeout(
3233 std::time::Duration::from_nanos(remaining),
3234 call_fut,
3235 )
3236 .await
3237 {
3238 Ok(o) => o,
3239 Err(_) => {
3240 cancel_for_deadline.cancel();
3241 Ok(Err(RpcHandlerError::Internal(
3242 "duplex handler deadline_ns exceeded".to_string(),
3243 )))
3244 }
3245 }
3246 }
3247 } else {
3248 call_fut.await
3249 };
3250 // Handler dropped the sink — let the pump
3251 // drain any final in-flight chunks before we
3252 // emit the terminal frame.
3253 let _ = pump.await;
3254 if let Some(m) = metrics.as_ref() {
3255 m.handler_in_flight
3256 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
3257 m.record_handler_duration(handler_started.elapsed());
3258 if outcome.is_err() {
3259 m.handler_panics_total
3260 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3261 }
3262 }
3263 let terminal = if cancel_probe.is_cancelled() {
3264 RpcResponsePayload {
3265 status: RpcStatus::Cancelled,
3266 headers: vec![],
3267 body: Bytes::from_static(
3268 b"server observed CANCEL during duplex handler execution",
3269 ),
3270 }
3271 } else {
3272 match outcome {
3273 Ok(Ok(())) => RpcResponsePayload {
3274 status: RpcStatus::Ok,
3275 headers: vec![(
3276 HEADER_NRPC_STREAMING.to_string(),
3277 HEADER_NRPC_STREAMING_END.to_vec(),
3278 )],
3279 body: Bytes::new(),
3280 },
3281 Ok(Err(RpcHandlerError::Application { code, message })) => {
3282 RpcResponsePayload {
3283 status: RpcStatus::Application(code),
3284 headers: vec![],
3285 body: Bytes::from(message),
3286 }
3287 }
3288 Ok(Err(RpcHandlerError::Internal(message))) => RpcResponsePayload {
3289 status: RpcStatus::Internal,
3290 headers: vec![],
3291 body: Bytes::from(message),
3292 },
3293 Err(panic) => {
3294 let panic_msg = panic
3295 .downcast_ref::<&'static str>()
3296 .map(|s| s.to_string())
3297 .or_else(|| panic.downcast_ref::<String>().cloned())
3298 .unwrap_or_else(|| "<non-string panic>".into());
3299 tracing::error!(
3300 caller_origin = format!("{:#x}", caller_origin),
3301 call_id,
3302 panic = %panic_msg,
3303 "rpc duplex server handler panicked",
3304 );
3305 RpcResponsePayload {
3306 status: RpcStatus::Internal,
3307 headers: vec![],
3308 body: Bytes::from(format!("handler panicked: {panic_msg}")),
3309 }
3310 }
3311 }
3312 };
3313 in_flight.lock().remove(&key);
3314 senders.lock().remove(&key);
3315 emit(caller_origin, call_id, terminal).await;
3316 });
3317 }
3318 DISPATCH_RPC_REQUEST_CHUNK => {
3319 apply_request_chunk_to_senders(
3320 ev.payload.slice(EVENT_META_SIZE..),
3321 &meta,
3322 &self.senders,
3323 "duplex",
3324 );
3325 }
3326 DISPATCH_RPC_CANCEL => {
3327 if let Some(token) = self.in_flight.lock().remove(&key) {
3328 token.cancel();
3329 }
3330 self.senders.lock().remove(&key);
3331 }
3332 _ => {}
3333 }
3334 Ok(())
3335 }
3336}
3337
3338// ============================================================================
3339// Client-side fold.
3340//
3341// `RpcClientFold` is the symmetric companion of `RpcServerFold`.
3342// It sees RESPONSE events on the caller's reply channel
3343// (`<service>.replies.<self_origin>`) and routes each one to the
3344// matching call's awaiting `oneshot::Receiver` keyed on `call_id`
3345// (the `EventMeta::seq_or_ts`).
3346//
3347// The fold's mutable state (the pending-senders map) is shared
3348// with the `Mesh::call` API via a clone of the same Arc — so the
3349// publisher side can `register(call_id)` to stage a receiver
3350// before publishing the REQUEST, and the fold side can `deliver`
3351// when the matching RESPONSE arrives.
3352// ============================================================================
3353
3354/// One pending entry — unary oneshot, server-streaming mpsc, or
3355/// client-streaming (one terminal oneshot + a separate grant
3356/// mpsc). The fold dispatches to the right variant based on
3357/// what's registered for the `call_id`.
3358enum PendingEntry {
3359 /// Unary call — exactly one RESPONSE expected. Completes the
3360 /// oneshot with the decoded payload.
3361 Unary(tokio::sync::oneshot::Sender<RpcResponsePayload>),
3362 /// Server-streaming call — multiple non-terminal `Continue`
3363 /// chunks followed by one terminal frame. Each non-terminal
3364 /// chunk pushes a `StreamItem::Chunk(body)` onto the mpsc;
3365 /// the terminal frame pushes `StreamItem::End` (Ok) or
3366 /// `StreamItem::Error(payload)` (non-Ok status) and the
3367 /// pending entry is removed.
3368 Streaming(tokio::sync::mpsc::UnboundedSender<StreamItem>),
3369 /// Client-streaming or duplex call. Two sender halves:
3370 ///
3371 /// - `terminal_tx`: oneshot that completes when the server's
3372 /// single terminal RESPONSE arrives. Response shape and
3373 /// delivery semantics are identical to the unary variant —
3374 /// the caller awaits one payload, success or failure status.
3375 /// - `grant_tx`: mpsc that ferries REQUEST_GRANT credit values
3376 /// from the client fold to the caller's send sink. Each
3377 /// `DISPATCH_RPC_REQUEST_GRANT` event for this call_id
3378 /// pushes one `u32` credit onto the mpsc; the caller's send
3379 /// sink consumes credits to gate `send(...).await`.
3380 ///
3381 /// Bidi streaming plan (Phase C). Used for pure client-
3382 /// streaming (one terminal RESPONSE closes the call). Duplex
3383 /// calls use the [`PendingEntry::Duplex`] variant instead,
3384 /// since they receive many response chunks rather than one
3385 /// terminal payload.
3386 ClientStreaming {
3387 terminal_tx: tokio::sync::oneshot::Sender<RpcResponsePayload>,
3388 grant_tx: tokio::sync::mpsc::UnboundedSender<u32>,
3389 },
3390 /// Duplex call — many request chunks out, many response
3391 /// chunks in. Two senders, same shape as `ClientStreaming`
3392 /// except the terminal slot is an mpsc instead of a oneshot
3393 /// because the response side is multi-chunk (terminator is
3394 /// implicit in `StreamItem::End` / `StreamItem::Error` on
3395 /// the chunks_tx mpsc, same as `PendingEntry::Streaming`).
3396 ///
3397 /// - `chunks_tx`: response-chunk mpsc — fed by `deliver`
3398 /// when RESPONSE events arrive on the reply channel.
3399 /// `StreamItem::Chunk` for non-terminal, `StreamItem::End`
3400 /// / `StreamItem::Error` terminates and removes the entry.
3401 /// - `grant_tx`: request-direction credit mpsc — fed by
3402 /// `deliver_grant` when REQUEST_GRANT events arrive.
3403 ///
3404 /// Bidi streaming plan (Phase D).
3405 Duplex {
3406 chunks_tx: tokio::sync::mpsc::UnboundedSender<StreamItem>,
3407 grant_tx: tokio::sync::mpsc::UnboundedSender<u32>,
3408 },
3409}
3410
3411/// One item delivered to a streaming caller. The caller's
3412/// `RpcStream` translates these into `Stream::Item =
3413/// Result<Bytes, RpcError>` plus stream termination.
3414#[derive(Debug, Clone)]
3415pub enum StreamItem {
3416 /// Non-terminal chunk — a body slice from the server.
3417 Chunk(bytes::Bytes),
3418 /// Terminal frame, server signaled clean stream end.
3419 End,
3420 /// Terminal frame with a non-`Ok` status. Body is the
3421 /// server's diagnostic; status is the wire `RpcStatus` value.
3422 Error(RpcResponsePayload),
3423}
3424
3425/// Shared pending-call state. Held by both the `RpcClientFold`
3426/// (writer side: completes oneshot senders / pushes streaming
3427/// chunks on RESPONSE arrival) and the `Mesh::call*` APIs (reader
3428/// side: registers entries before publishing the REQUEST).
3429/// Concurrent access is mediated by `DashMap`.
3430///
3431/// Multiplexes unary AND streaming calls in a single map keyed
3432/// on `call_id` — the entry's enum variant tells the fold how
3433/// to dispatch incoming RESPONSE events.
3434pub struct RpcClientPending {
3435 /// Map keyed on `call_id`, value carries `(expected_target,
3436 /// PendingEntry)`. `expected_target` is the `NodeId` of the
3437 /// peer the request was dispatched to; `deliver` rejects
3438 /// frames whose wire `from_node` doesn't match. A
3439 /// `expected_target == 0` entry opts out of the binding
3440 /// (loopback tests + paths with no session).
3441 senders: dashmap::DashMap<u64, (super::super::behavior::placement::NodeId, PendingEntry)>,
3442}
3443
3444impl RpcClientPending {
3445 /// Construct an empty pending-call store.
3446 pub fn new() -> Self {
3447 Self {
3448 senders: dashmap::DashMap::new(),
3449 }
3450 }
3451
3452 /// Register a oneshot for a unary `call_id`. Returns the
3453 /// receiver the caller awaits. The caller MUST publish the
3454 /// REQUEST after registration (and not before) so the
3455 /// matching RESPONSE can't arrive while the pending entry is
3456 /// missing.
3457 ///
3458 /// `target_node` is the wire-session peer the request will
3459 /// be sent to; `deliver` rejects RESPONSE frames whose
3460 /// `from_node` doesn't match. Pass `0` for loopback / no-
3461 /// session test paths to opt out of the binding gate.
3462 ///
3463 /// If a sender already exists for `call_id` (improperly reused
3464 /// id), it is replaced and the old receiver gets a
3465 /// `RecvError::Closed` — surfacing the misuse as a hard error
3466 /// at the caller rather than silently delivering the response
3467 /// to the wrong waiter.
3468 pub fn register(
3469 &self,
3470 call_id: u64,
3471 target_node: super::super::behavior::placement::NodeId,
3472 ) -> tokio::sync::oneshot::Receiver<RpcResponsePayload> {
3473 let (tx, rx) = tokio::sync::oneshot::channel();
3474 self.senders
3475 .insert(call_id, (target_node, PendingEntry::Unary(tx)));
3476 rx
3477 }
3478
3479 /// Register a streaming entry for `call_id`. Returns the
3480 /// receive end of an mpsc the fold will push chunks onto.
3481 /// Same registration ordering rules as `register` —
3482 /// publisher must call this BEFORE publishing the REQUEST.
3483 pub fn register_streaming(
3484 &self,
3485 call_id: u64,
3486 target_node: super::super::behavior::placement::NodeId,
3487 ) -> tokio::sync::mpsc::UnboundedReceiver<StreamItem> {
3488 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
3489 self.senders
3490 .insert(call_id, (target_node, PendingEntry::Streaming(tx)));
3491 rx
3492 }
3493
3494 /// Register a client-streaming (or duplex) entry for
3495 /// `call_id`. Returns BOTH the terminal-response receiver
3496 /// (the caller awaits on this for the single terminal
3497 /// RESPONSE that ends the call) AND a grant receiver (the
3498 /// caller's send sink consumes this to gate `send().await`
3499 /// when the caller opted into request-direction flow
3500 /// control).
3501 ///
3502 /// Same registration ordering rules as `register` /
3503 /// `register_streaming` — publisher must call this BEFORE
3504 /// publishing the REQUEST so a fast server's RESPONSE /
3505 /// REQUEST_GRANT can't arrive while no pending entry exists.
3506 ///
3507 /// Bidi streaming plan (Phase C).
3508 pub fn register_client_streaming(
3509 &self,
3510 call_id: u64,
3511 target_node: super::super::behavior::placement::NodeId,
3512 ) -> (
3513 tokio::sync::oneshot::Receiver<RpcResponsePayload>,
3514 tokio::sync::mpsc::UnboundedReceiver<u32>,
3515 ) {
3516 let (terminal_tx, terminal_rx) = tokio::sync::oneshot::channel();
3517 let (grant_tx, grant_rx) = tokio::sync::mpsc::unbounded_channel();
3518 self.senders.insert(
3519 call_id,
3520 (
3521 target_node,
3522 PendingEntry::ClientStreaming {
3523 terminal_tx,
3524 grant_tx,
3525 },
3526 ),
3527 );
3528 (terminal_rx, grant_rx)
3529 }
3530
3531 /// Register a duplex entry for `call_id`. Returns BOTH a
3532 /// response-chunk receiver (yields `StreamItem` per inbound
3533 /// RESPONSE chunk; terminator is `End` / `Error`) AND a
3534 /// grant receiver (yields `u32` credits per inbound
3535 /// REQUEST_GRANT).
3536 ///
3537 /// Same registration ordering rules as the other `register_*`
3538 /// methods: publisher must call this BEFORE publishing the
3539 /// REQUEST so the server's response chunks / grants can't
3540 /// arrive while no pending entry exists.
3541 ///
3542 /// Bidi streaming plan (Phase D).
3543 pub fn register_duplex(
3544 &self,
3545 call_id: u64,
3546 target_node: super::super::behavior::placement::NodeId,
3547 ) -> (
3548 tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
3549 tokio::sync::mpsc::UnboundedReceiver<u32>,
3550 ) {
3551 let (chunks_tx, chunks_rx) = tokio::sync::mpsc::unbounded_channel();
3552 let (grant_tx, grant_rx) = tokio::sync::mpsc::unbounded_channel();
3553 self.senders.insert(
3554 call_id,
3555 (
3556 target_node,
3557 PendingEntry::Duplex {
3558 chunks_tx,
3559 grant_tx,
3560 },
3561 ),
3562 );
3563 (chunks_rx, grant_rx)
3564 }
3565
3566 /// Drop the pending entry for `call_id`. Called by the
3567 /// caller-side cancellation path (e.g. `Mesh::call`'s future
3568 /// being dropped, the stream being dropped, or a deadline
3569 /// timer firing). The matching RESPONSE(s) that may still
3570 /// arrive afterwards are silently discarded by `deliver`.
3571 pub fn cancel(&self, call_id: u64) {
3572 self.senders.remove(&call_id);
3573 }
3574
3575 /// Deliver `resp` to the waiter for `call_id`, if any.
3576 ///
3577 /// `from_node` is the wire-session peer of the inbound
3578 /// RESPONSE. If the pending entry's recorded `target_node`
3579 /// is non-zero and does not match `from_node`, the frame is
3580 /// dropped with a trace log and the pending entry stays
3581 /// intact — a forged response on a shared reply channel
3582 /// can't resolve a victim's call. A recorded `target_node
3583 /// == 0` opts the call out of the binding (loopback paths).
3584 ///
3585 /// For a unary entry: completes the oneshot and removes the
3586 /// entry.
3587 ///
3588 /// For a streaming entry: examines the response's headers to
3589 /// decide whether it's a non-terminal chunk (`Continue` —
3590 /// push `StreamItem::Chunk`, keep the entry) or terminal
3591 /// (`End` / non-`Ok` — push `StreamItem::End` or `Error`,
3592 /// remove the entry).
3593 ///
3594 /// Idempotent on subsequent deliveries to a removed entry.
3595 fn deliver(
3596 &self,
3597 call_id: u64,
3598 from_node: super::super::behavior::placement::NodeId,
3599 resp: RpcResponsePayload,
3600 ) {
3601 // Look up the entry — but DON'T remove it yet, because for
3602 // streaming we may want to keep it for non-terminal chunks.
3603 // The remove decision is per-variant.
3604 let entry = self.senders.get(&call_id);
3605 let Some(entry) = entry else { return };
3606 // S-4 part 2 gate. The pending registry binds each call
3607 // to the AEAD-verified `target_node` the request was
3608 // dispatched to; any other session peer publishing on the
3609 // shared reply channel with a guessed call_id is dropped
3610 // here without touching the waiter. `0` opts out — used
3611 // by loopback paths that have no session peer.
3612 let (target_node, _entry_value) = entry.value();
3613 if *target_node != 0 && *target_node != from_node {
3614 tracing::trace!(
3615 call_id,
3616 from_node,
3617 expected = *target_node,
3618 "rpc client: dropping RESPONSE from non-target session peer"
3619 );
3620 return;
3621 }
3622 match entry.value() {
3623 (_, PendingEntry::Unary(_)) => {
3624 drop(entry);
3625 if let Some((_, (_, PendingEntry::Unary(tx)))) = self.senders.remove(&call_id) {
3626 let _ = tx.send(resp);
3627 }
3628 }
3629 (_, PendingEntry::ClientStreaming { .. }) => {
3630 // Terminal RESPONSE for a client-streaming /
3631 // duplex call. Same delivery shape as Unary —
3632 // complete the oneshot, remove the entry. The
3633 // grant_tx half drops with the entry, which is
3634 // fine (no more grants will arrive after the
3635 // terminal frame).
3636 drop(entry);
3637 if let Some((
3638 _,
3639 (
3640 _,
3641 PendingEntry::ClientStreaming {
3642 terminal_tx,
3643 grant_tx: _,
3644 },
3645 ),
3646 )) = self.senders.remove(&call_id)
3647 {
3648 let _ = terminal_tx.send(resp);
3649 }
3650 }
3651 (_, PendingEntry::Streaming(tx)) => {
3652 let tx = tx.clone();
3653 drop(entry);
3654 self.dispatch_streaming_chunk(&tx, resp, call_id);
3655 }
3656 (_, PendingEntry::Duplex { chunks_tx, .. }) => {
3657 // Same dispatch logic as Streaming — duplex
3658 // response side IS a multi-chunk stream.
3659 let tx = chunks_tx.clone();
3660 drop(entry);
3661 self.dispatch_streaming_chunk(&tx, resp, call_id);
3662 }
3663 }
3664 }
3665
3666 /// Shared response-chunk dispatch used by both
3667 /// `PendingEntry::Streaming` and `PendingEntry::Duplex`. The
3668 /// caller has already verified the target-binding gate and
3669 /// dropped its `entry` ref; this helper does the classify-
3670 /// and-push and removes the entry from the senders map on
3671 /// terminal frames.
3672 fn dispatch_streaming_chunk(
3673 &self,
3674 tx: &tokio::sync::mpsc::UnboundedSender<StreamItem>,
3675 resp: RpcResponsePayload,
3676 call_id: u64,
3677 ) {
3678 let kind = classify_streaming_chunk(&resp);
3679 match kind {
3680 StreamingChunkKind::Continue => {
3681 let _ = tx.send(StreamItem::Chunk(resp.body));
3682 }
3683 StreamingChunkKind::Terminal => {
3684 let item = if resp.status.is_ok() {
3685 if !resp.body.is_empty() {
3686 let _ = tx.send(StreamItem::Chunk(resp.body));
3687 }
3688 StreamItem::End
3689 } else {
3690 StreamItem::Error(resp)
3691 };
3692 let _ = tx.send(item);
3693 self.senders.remove(&call_id);
3694 }
3695 StreamingChunkKind::Unary => {
3696 tracing::warn!(
3697 call_id,
3698 body_len = resp.body.len(),
3699 "rpc client: streaming / duplex consumer received unary-shaped \
3700 response (no nrpc-streaming header); server may have bridged a \
3701 unary path. Bridging to single-chunk + EOF.",
3702 );
3703 if !resp.body.is_empty() {
3704 let _ = tx.send(StreamItem::Chunk(resp.body));
3705 }
3706 let _ = tx.send(StreamItem::End);
3707 self.senders.remove(&call_id);
3708 }
3709 }
3710 }
3711
3712 /// Deliver a request-direction grant credit to the waiter
3713 /// for `call_id`, if it's a client-streaming / duplex entry.
3714 /// Silently no-op for unknown call_ids, for unary entries
3715 /// (caller bug — grant for a unary call makes no sense),
3716 /// and for server-streaming entries (grants apply only to
3717 /// the upload direction).
3718 ///
3719 /// `from_node` is gated by the same target-binding check
3720 /// as `deliver`: a grant from a non-target session peer is
3721 /// dropped (a forged grant on a shared reply channel can't
3722 /// inject credit into a victim's call).
3723 ///
3724 /// Bidi streaming plan (Phase C).
3725 fn deliver_grant(
3726 &self,
3727 call_id: u64,
3728 from_node: super::super::behavior::placement::NodeId,
3729 credits: u32,
3730 ) {
3731 let entry = self.senders.get(&call_id);
3732 let Some(entry) = entry else { return };
3733 let (target_node, _entry_value) = entry.value();
3734 if *target_node != 0 && *target_node != from_node {
3735 tracing::trace!(
3736 call_id,
3737 from_node,
3738 expected = *target_node,
3739 "rpc client: dropping REQUEST_GRANT from non-target session peer"
3740 );
3741 return;
3742 }
3743 match entry.value() {
3744 (_, PendingEntry::ClientStreaming { grant_tx, .. })
3745 | (_, PendingEntry::Duplex { grant_tx, .. }) => {
3746 let _ = grant_tx.send(credits);
3747 }
3748 // Unary / Streaming entries silently ignore — see
3749 // method docs for the rationale.
3750 _ => {}
3751 }
3752 }
3753
3754 /// Test-only: how many pending calls are registered. Used by
3755 /// integration tests to confirm cleanup after happy-path / cancel.
3756 #[cfg(test)]
3757 pub fn pending_count(&self) -> usize {
3758 self.senders.len()
3759 }
3760}
3761
3762impl Default for RpcClientPending {
3763 fn default() -> Self {
3764 Self::new()
3765 }
3766}
3767
3768/// Client-side fold. Decodes RESPONSE events and routes them to
3769/// awaiting oneshots in the shared [`RpcClientPending`].
3770///
3771/// `Mesh::call` clones the same `Arc<RpcClientPending>` to register
3772/// oneshots before publishing REQUESTs.
3773pub struct RpcClientFold {
3774 pending: Arc<RpcClientPending>,
3775}
3776
3777impl RpcClientFold {
3778 /// Construct a client fold that delivers responses through
3779 /// `pending`. Typical pattern:
3780 ///
3781 /// ```ignore
3782 /// let pending = Arc::new(RpcClientPending::new());
3783 /// let fold = RpcClientFold::new(pending.clone());
3784 /// let adapter = CortexAdapter::open(..., fold, ())?;
3785 /// // `pending` is still usable for register / cancel.
3786 /// ```
3787 pub fn new(pending: Arc<RpcClientPending>) -> Self {
3788 Self { pending }
3789 }
3790
3791 /// Production-path entry point. Mesh dispatch calls this with
3792 /// the AEAD-verified session peer's `NodeId` in
3793 /// `ev.from_node`; the pending registry's S-4 binding gate
3794 /// uses it to reject responses from the wrong target.
3795 pub fn apply_inbound(&mut self, ev: &RpcInboundEvent) {
3796 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3797 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3798 } else {
3799 None
3800 }) else {
3801 tracing::warn!(
3802 payload_len = ev.payload.len(),
3803 "rpc client fold: event payload too short for EventMeta; skipping",
3804 );
3805 return;
3806 };
3807 match meta.dispatch {
3808 DISPATCH_RPC_RESPONSE => {
3809 match RpcResponsePayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3810 Ok(resp) => self.pending.deliver(meta.seq_or_ts, ev.from_node, resp),
3811 Err(e) => {
3812 tracing::warn!(
3813 error = %e,
3814 call_id = meta.seq_or_ts,
3815 "rpc client fold: malformed response payload",
3816 );
3817 }
3818 }
3819 }
3820 DISPATCH_RPC_REQUEST_GRANT => {
3821 // Server granted upload credit for a
3822 // client-streaming / duplex call. Route it to the
3823 // matching pending entry's grant mpsc; non-client-
3824 // streaming entries silently ignore (see
3825 // RpcClientPending::deliver_grant docs).
3826 match decode_request_grant(&ev.payload[EVENT_META_SIZE..]) {
3827 Some(grant) => {
3828 // The payload's `call_id` MUST agree with
3829 // the EventMeta's `seq_or_ts`: producer
3830 // encodes both to the same value (see
3831 // `RpcRequestGrantPayload::call_id` docs).
3832 // If they disagree, the frame is malformed
3833 // or forged — drop it. Otherwise a peer
3834 // could publish a GRANT whose meta names
3835 // one call but whose payload credits a
3836 // different in-flight call_id.
3837 if grant.call_id != meta.seq_or_ts {
3838 tracing::debug!(
3839 meta_call_id = meta.seq_or_ts,
3840 payload_call_id = grant.call_id,
3841 "rpc client fold: REQUEST_GRANT meta/payload call_id mismatch; dropping",
3842 );
3843 return;
3844 }
3845 if grant.credits == 0 {
3846 return;
3847 }
3848 self.pending
3849 .deliver_grant(grant.call_id, ev.from_node, grant.credits);
3850 }
3851 None => {
3852 tracing::debug!(
3853 call_id = meta.seq_or_ts,
3854 "rpc client fold: malformed REQUEST_GRANT payload"
3855 );
3856 }
3857 }
3858 }
3859 _ => {
3860 // Unknown / unexpected dispatch on the reply
3861 // channel — ignore (a misconfigured publisher
3862 // shouldn't take down the fold).
3863 }
3864 }
3865 }
3866}
3867
3868impl RedexFold<()> for RpcClientFold {
3869 /// Legacy entry point used by loopback / test paths that
3870 /// don't have a session peer to resolve. Calls `deliver`
3871 /// with `from_node = 0`, which the pending registry treats
3872 /// as "no binding" — callers that registered with
3873 /// `target_node = 0` accept it, callers that registered
3874 /// with a real target reject it.
3875 fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
3876 let Some(meta) = (if ev.payload.len() >= EVENT_META_SIZE {
3877 EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
3878 } else {
3879 None
3880 }) else {
3881 tracing::warn!(
3882 payload_len = ev.payload.len(),
3883 "rpc client fold: event payload too short for EventMeta; skipping",
3884 );
3885 return Ok(());
3886 };
3887 // Route RESPONSE and REQUEST_GRANT events; ignore other
3888 // dispatches a misconfigured publisher might send. The
3889 // loopback path uses `from_node = 0` which the pending
3890 // registry treats as "no binding" — see the apply_inbound
3891 // production-path counterpart above for the AEAD-verified
3892 // peer routing.
3893 match meta.dispatch {
3894 DISPATCH_RPC_RESPONSE => {
3895 match RpcResponsePayload::decode(ev.payload.slice(EVENT_META_SIZE..)) {
3896 Ok(resp) => self.pending.deliver(meta.seq_or_ts, 0, resp),
3897 Err(e) => {
3898 // Malformed RESPONSE on the reply channel.
3899 // We can't fabricate a synthetic response
3900 // (the call_id might be valid; we just
3901 // can't tell what it was supposed to
3902 // mean). Log and leave the pending entry
3903 // intact — the caller's deadline /
3904 // cancellation path will eventually clean
3905 // it up.
3906 tracing::warn!(
3907 error = %e,
3908 call_id = meta.seq_or_ts,
3909 "rpc client fold: malformed response payload",
3910 );
3911 }
3912 }
3913 }
3914 DISPATCH_RPC_REQUEST_GRANT => {
3915 match decode_request_grant(&ev.payload[EVENT_META_SIZE..]) {
3916 Some(grant) => {
3917 // See `apply_inbound` REQUEST_GRANT arm for
3918 // the meta/payload call_id invariant.
3919 if grant.call_id != meta.seq_or_ts {
3920 tracing::debug!(
3921 meta_call_id = meta.seq_or_ts,
3922 payload_call_id = grant.call_id,
3923 "rpc client fold: REQUEST_GRANT meta/payload call_id mismatch; dropping",
3924 );
3925 return Ok(());
3926 }
3927 if grant.credits == 0 {
3928 return Ok(());
3929 }
3930 self.pending.deliver_grant(grant.call_id, 0, grant.credits);
3931 }
3932 None => {
3933 tracing::debug!(
3934 call_id = meta.seq_or_ts,
3935 "rpc client fold: malformed REQUEST_GRANT payload"
3936 );
3937 }
3938 }
3939 }
3940 _ => {}
3941 }
3942 Ok(())
3943 }
3944}
3945
3946#[cfg(test)]
3947mod tests {
3948 use super::*;
3949
3950 fn header(name: &str, value: &[u8]) -> RpcHeader {
3951 (name.to_string(), value.to_vec())
3952 }
3953
3954 // --------------------------------------------------------------------
3955 // Status code numbering.
3956 // --------------------------------------------------------------------
3957
3958 /// Status codes have stable wire numbers. A regression that
3959 /// renumbered any of the canonical statuses would break
3960 /// every cross-version caller / server pair on the wire — pin
3961 /// the numbers explicitly so the test catches it before the
3962 /// bug ships.
3963 #[test]
3964 fn status_wire_numbers_are_stable() {
3965 for (status, expected) in [
3966 (RpcStatus::Ok, 0x0000u16),
3967 (RpcStatus::NotFound, 0x0001),
3968 (RpcStatus::Unauthorized, 0x0002),
3969 (RpcStatus::Timeout, 0x0003),
3970 (RpcStatus::Backpressure, 0x0004),
3971 (RpcStatus::Cancelled, 0x0005),
3972 (RpcStatus::Internal, 0x0006),
3973 (RpcStatus::UnknownVersion, 0x0007),
3974 (RpcStatus::CapabilityDenied, 0x0008),
3975 ] {
3976 assert_eq!(status.to_wire(), expected, "{status:?}");
3977 assert_eq!(RpcStatus::from_wire(expected), status);
3978 }
3979 }
3980
3981 /// Reserved numeric range (`0x0009..=0x7FFF`) decodes as
3982 /// `Application(v)` for forward-compat with future canonical
3983 /// assignments. A future status numbered `0x0009` would round-
3984 /// trip via `from_wire(0x0009)` until that variant is added,
3985 /// at which point the variant takes precedence.
3986 #[test]
3987 fn reserved_status_range_decodes_as_application_for_forward_compat() {
3988 let decoded = RpcStatus::from_wire(0x0009);
3989 assert_eq!(decoded, RpcStatus::Application(0x0009));
3990 assert_eq!(decoded.to_wire(), 0x0009);
3991 }
3992
3993 /// Application range (`0x8000..=0xFFFF`) encodes / decodes
3994 /// transparently as `Application(v)`.
3995 #[test]
3996 fn application_status_range_roundtrips() {
3997 for v in [0x8000u16, 0x8001, 0xCAFE, 0xFFFF] {
3998 let s = RpcStatus::from_wire(v);
3999 assert_eq!(s, RpcStatus::Application(v));
4000 assert_eq!(s.to_wire(), v);
4001 }
4002 }
4003
4004 // --------------------------------------------------------------------
4005 // Dispatch byte assignments.
4006 // --------------------------------------------------------------------
4007
4008 /// Pin the `dispatch` byte assignments so a renumber surfaces
4009 /// here before it ships on the wire. These also live in the
4010 /// design doc; this test is the source-of-truth check.
4011 #[test]
4012 fn dispatch_byte_assignments_are_stable() {
4013 assert_eq!(DISPATCH_RPC_REQUEST, 0x10);
4014 assert_eq!(DISPATCH_RPC_RESPONSE, 0x11);
4015 assert_eq!(DISPATCH_RPC_CANCEL, 0x12);
4016 assert_eq!(DISPATCH_RPC_DEADLINE_EXCEEDED, 0x13);
4017 assert_eq!(DISPATCH_RPC_STREAM_GRANT, 0x14);
4018 assert_eq!(DISPATCH_RPC_REQUEST_CHUNK, 0x15);
4019 assert_eq!(DISPATCH_RPC_REQUEST_GRANT, 0x16);
4020 }
4021
4022 /// Regression: encoder bounds. Encoding a service name longer
4023 /// than `MAX_RPC_SERVICE_NAME_LEN` panics in debug, catching
4024 /// the programmer error in tests rather than silently writing
4025 /// a truncated `as u8` length that the receiver decodes as
4026 /// garbage. The matching debug_asserts guard body length,
4027 /// header count, header name length, and header value length.
4028 #[cfg(debug_assertions)]
4029 #[test]
4030 #[should_panic(expected = "service name")]
4031 fn request_encode_panics_on_oversize_service_name() {
4032 let p = RpcRequestPayload {
4033 service: "x".repeat(MAX_RPC_SERVICE_NAME_LEN + 1),
4034 deadline_ns: 0,
4035 flags: 0,
4036 headers: vec![],
4037 body: Bytes::new(),
4038 };
4039 let _ = p.encode();
4040 }
4041
4042 #[cfg(debug_assertions)]
4043 #[test]
4044 #[should_panic(expected = "body length")]
4045 fn request_encode_panics_on_oversize_body() {
4046 let p = RpcRequestPayload {
4047 service: "x".to_string(),
4048 deadline_ns: 0,
4049 flags: 0,
4050 headers: vec![],
4051 body: Bytes::from(vec![0; MAX_RPC_BODY_LEN + 1]),
4052 };
4053 let _ = p.encode();
4054 }
4055
4056 #[cfg(debug_assertions)]
4057 #[test]
4058 #[should_panic(expected = "header name")]
4059 fn request_encode_panics_on_oversize_header_name() {
4060 let p = RpcRequestPayload {
4061 service: "x".to_string(),
4062 deadline_ns: 0,
4063 flags: 0,
4064 headers: vec![("a".repeat(MAX_RPC_HEADER_NAME_LEN + 1), vec![])],
4065 body: Bytes::new(),
4066 };
4067 let _ = p.encode();
4068 }
4069
4070 /// `encoded_len()` must agree with `encode().len()` for every
4071 /// payload shape — pin this so a future codec change can't
4072 /// silently desynchronize the size-budgeting helper from the
4073 /// actual wire size.
4074 #[test]
4075 fn encoded_len_matches_encode_len_for_request_and_response() {
4076 let req = RpcRequestPayload {
4077 service: "echo.v1".to_string(),
4078 deadline_ns: 1_700_000_000_000_000_000,
4079 flags: FLAG_RPC_PROPAGATE_TRACE,
4080 headers: vec![
4081 header("traceparent", b"00-aabb"),
4082 header("idempotency-key", &7u64.to_le_bytes()),
4083 ],
4084 body: Bytes::from_static(b"{\"hello\":\"world\"}"),
4085 };
4086 assert_eq!(req.encoded_len(), req.encode().len());
4087
4088 let resp = RpcResponsePayload {
4089 status: RpcStatus::Application(0x8001),
4090 headers: vec![header("content-type", b"application/json")],
4091 body: Bytes::from_static(b"ok"),
4092 };
4093 assert_eq!(resp.encoded_len(), resp.encode().len());
4094
4095 // Empty edge cases.
4096 let empty_req = RpcRequestPayload {
4097 service: "x".to_string(),
4098 deadline_ns: 0,
4099 flags: 0,
4100 headers: vec![],
4101 body: Bytes::new(),
4102 };
4103 assert_eq!(empty_req.encoded_len(), empty_req.encode().len());
4104 let empty_resp = RpcResponsePayload {
4105 status: RpcStatus::Ok,
4106 headers: vec![],
4107 body: Bytes::new(),
4108 };
4109 assert_eq!(empty_resp.encoded_len(), empty_resp.encode().len());
4110 }
4111
4112 /// Bit 0 of `RpcRequestPayload::flags` is reserved (was the
4113 /// removed `FLAG_RPC_IDEMPOTENT`). Pin: live flag constants
4114 /// must NOT collide with bit 0, so a future re-add can safely
4115 /// reuse it without breaking existing senders.
4116 #[test]
4117 fn flag_bit_assignments_leave_idempotent_slot_reserved() {
4118 // Bit 0 (1 << 0) and bit 3 (1 << 3) are reserved; live flags
4119 // occupy other bits. Pinning the exact assignments here so
4120 // a renumber that collides with bit 0 (future `IDEMPOTENT`
4121 // re-add) or bit 3 (held in reserve for a future protocol
4122 // flag) surfaces in the test suite before it ships.
4123 assert_eq!(FLAG_RPC_STREAMING_RESPONSE, 1 << 1);
4124 assert_eq!(FLAG_RPC_PROPAGATE_TRACE, 1 << 2);
4125 assert_eq!(FLAG_RPC_CLIENT_STREAMING_REQUEST, 1 << 4);
4126 assert_eq!(FLAG_RPC_REQUEST_END, 1 << 5);
4127 for flag in [
4128 FLAG_RPC_STREAMING_RESPONSE,
4129 FLAG_RPC_PROPAGATE_TRACE,
4130 FLAG_RPC_CLIENT_STREAMING_REQUEST,
4131 FLAG_RPC_REQUEST_END,
4132 ] {
4133 assert_eq!(
4134 flag & (1 << 0),
4135 0,
4136 "flag {flag:#06x} collides with reserved bit 0"
4137 );
4138 assert_eq!(
4139 flag & (1 << 3),
4140 0,
4141 "flag {flag:#06x} collides with reserved bit 3"
4142 );
4143 }
4144 }
4145
4146 // --------------------------------------------------------------------
4147 // Bidi streaming (Phase A) — RpcRequestChunkPayload and
4148 // RpcRequestGrantPayload wire-stability tests.
4149 // --------------------------------------------------------------------
4150
4151 /// 1/5 — RequestChunk round-trip with realistic header set and
4152 /// 1 KiB body. Pins the encode/decode loop on the full shape.
4153 #[test]
4154 fn request_chunk_roundtrip_with_headers_and_body() {
4155 let mut headers = Vec::new();
4156 for i in 0..10u8 {
4157 headers.push(header(&format!("x-chunk-meta-{i}"), &[0xAA, 0xBB, i, !i]));
4158 }
4159 let body: Vec<u8> = (0..1024u32).map(|n| (n & 0xFF) as u8).collect();
4160 let p = RpcRequestChunkPayload {
4161 call_id: 0xCAFE_F00D_DEAD_BEEF,
4162 flags: FLAG_RPC_REQUEST_END | FLAG_RPC_PROPAGATE_TRACE,
4163 headers,
4164 body: Bytes::from(body),
4165 };
4166 let bytes = p.encode();
4167 assert_eq!(
4168 p.encoded_len(),
4169 bytes.len(),
4170 "encoded_len must agree with encode().len()"
4171 );
4172 let decoded = RpcRequestChunkPayload::decode(Bytes::from(bytes)).expect("decode");
4173 assert_eq!(decoded, p);
4174 }
4175
4176 /// 2/5 — truncation rejection at every field boundary. The
4177 /// codec must error rather than panic / allocate-unbounded on
4178 /// any short slice.
4179 #[test]
4180 fn request_chunk_decode_rejects_truncation_at_every_boundary() {
4181 let p = RpcRequestChunkPayload {
4182 call_id: 0x1234,
4183 flags: 0,
4184 headers: vec![header("x", b"y")],
4185 body: Bytes::from_static(b"hello"),
4186 };
4187 let full = p.encode();
4188 // Walk every prefix shorter than the full encoding; every
4189 // one must produce a Truncated / TooLarge / InvalidUtf8
4190 // error, not panic.
4191 for n in 0..full.len() {
4192 let prefix = &full[..n];
4193 let result = RpcRequestChunkPayload::decode(Bytes::copy_from_slice(prefix));
4194 assert!(result.is_err(), "n={n}: expected Err, got Ok({:?})", result);
4195 }
4196 // Full length must decode cleanly.
4197 assert!(RpcRequestChunkPayload::decode(Bytes::from(full)).is_ok());
4198 }
4199
4200 /// 3/5 — body length cap rejection. A wire-claimed body length
4201 /// over `MAX_RPC_BODY_LEN` must error rather than try to
4202 /// allocate 4+ MiB of garbage.
4203 #[test]
4204 fn request_chunk_decode_rejects_oversized_body_length() {
4205 // Build a synthetic encoding by hand: small valid prefix
4206 // up to body_len, then claim body_len = MAX_RPC_BODY_LEN + 1.
4207 let mut buf = Vec::new();
4208 buf.put_u64_le(0x42); // call_id
4209 buf.put_u16_le(0); // flags
4210 buf.put_u8(0); // headers count = 0
4211 buf.put_u32_le((MAX_RPC_BODY_LEN + 1) as u32);
4212 // (no body bytes follow — we want the decoder to reject at
4213 // the length check before it even tries to read body bytes)
4214 let err = RpcRequestChunkPayload::decode(Bytes::from(buf))
4215 .expect_err("oversized body length must reject");
4216 match err {
4217 RpcCodecError::TooLarge {
4218 field,
4219 actual,
4220 limit,
4221 } => {
4222 assert_eq!(field, "body");
4223 assert_eq!(actual, MAX_RPC_BODY_LEN + 1);
4224 assert_eq!(limit, MAX_RPC_BODY_LEN);
4225 }
4226 other => panic!("expected TooLarge {{ field=body }}, got {other:?}"),
4227 }
4228 }
4229
4230 /// 4/5 — header count cap rejection. A header count over
4231 /// `MAX_RPC_HEADERS` must error before the per-header decode
4232 /// loop even starts.
4233 #[test]
4234 fn request_chunk_decode_rejects_oversized_header_count() {
4235 let mut buf = Vec::new();
4236 buf.put_u64_le(0x42); // call_id
4237 buf.put_u16_le(0); // flags
4238 buf.put_u8((MAX_RPC_HEADERS + 1) as u8); // over the cap
4239 let err = RpcRequestChunkPayload::decode(Bytes::from(buf))
4240 .expect_err("oversized header count must reject");
4241 match err {
4242 RpcCodecError::TooLarge {
4243 field,
4244 actual,
4245 limit,
4246 } => {
4247 // The shared `decode_headers` helper reports this
4248 // field as "headers".
4249 assert_eq!(field, "headers");
4250 assert_eq!(actual, MAX_RPC_HEADERS + 1);
4251 assert_eq!(limit, MAX_RPC_HEADERS);
4252 }
4253 other => panic!("expected TooLarge {{ field=headers }}, got {other:?}"),
4254 }
4255 }
4256
4257 /// 5/5 — RequestGrant round-trip + truncation rejection. The
4258 /// payload is fixed-size (12 bytes), so the test surface is
4259 /// "exactly 12 bytes decodes" + "any other length errors".
4260 #[test]
4261 fn request_grant_roundtrip_and_truncation_rejection() {
4262 // Round-trip across the full u32 range corners + an
4263 // arbitrary mid-value.
4264 for (call_id, credits) in [
4265 (0u64, 0u32),
4266 (1, 1),
4267 (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF),
4268 (0xCAFE_F00D, 0x10203040),
4269 ] {
4270 let bytes = encode_request_grant(call_id, credits);
4271 assert_eq!(bytes.len(), 12, "request grant is always 12 bytes");
4272 let decoded = decode_request_grant(&bytes).expect("decode");
4273 assert_eq!(decoded.call_id, call_id);
4274 assert_eq!(decoded.credits, credits);
4275 }
4276 // Wrong-length payloads must reject (return None), not
4277 // panic. Empty, short, long, off-by-one each get covered.
4278 assert!(decode_request_grant(&[]).is_none());
4279 assert!(decode_request_grant(&[0u8; 11]).is_none());
4280 assert!(decode_request_grant(&[0u8; 13]).is_none());
4281 }
4282
4283 /// Bonus pin: `parse_request_window_initial` extracts a valid
4284 /// u32 ASCII-decimal header and rejects everything else.
4285 /// Same coverage shape as `parse_stream_window_initial`'s
4286 /// implicit contract, made explicit here so the request-side
4287 /// helper doesn't drift away from the response-side one.
4288 #[test]
4289 fn parse_request_window_initial_matches_response_side_semantics() {
4290 // Happy path.
4291 let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"32")];
4292 assert_eq!(parse_request_window_initial(&headers), Some(32));
4293 // Case-insensitive on header name.
4294 let headers = vec![header("Nrpc-Request-Window-Initial", b"7")];
4295 assert_eq!(parse_request_window_initial(&headers), Some(7));
4296 // Absent.
4297 assert_eq!(parse_request_window_initial(&[]), None);
4298 // Malformed value (non-numeric).
4299 let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"twelve")];
4300 assert_eq!(parse_request_window_initial(&headers), None);
4301 // Malformed value (non-utf8 bytes).
4302 let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, &[0xFF, 0xFE])];
4303 assert_eq!(parse_request_window_initial(&headers), None);
4304 // Empty value.
4305 let headers = vec![header(HEADER_NRPC_REQUEST_WINDOW_INITIAL, b"")];
4306 assert_eq!(parse_request_window_initial(&headers), None);
4307 }
4308
4309 // --------------------------------------------------------------------
4310 // RpcRequestPayload codec.
4311 // --------------------------------------------------------------------
4312
4313 #[test]
4314 fn request_roundtrip_minimal() {
4315 let p = RpcRequestPayload {
4316 service: "hello".to_string(),
4317 deadline_ns: 0,
4318 flags: 0,
4319 headers: vec![],
4320 body: Bytes::new(),
4321 };
4322 let bytes = p.encode();
4323 let decoded = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap();
4324 assert_eq!(decoded, p);
4325 }
4326
4327 #[test]
4328 fn request_roundtrip_full() {
4329 let p = RpcRequestPayload {
4330 service: "echo.v1".to_string(),
4331 deadline_ns: 1_700_000_000_000_000_000,
4332 flags: FLAG_RPC_PROPAGATE_TRACE,
4333 headers: vec![
4334 header("traceparent", b"00-aabb..."),
4335 header("idempotency-key", &7u64.to_le_bytes()),
4336 header("content-type", b"application/json"),
4337 ],
4338 body: Bytes::from_static(b"{\"hello\":\"world\"}"),
4339 };
4340 let bytes = p.encode();
4341 let decoded = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap();
4342 assert_eq!(decoded, p);
4343 }
4344
4345 #[test]
4346 fn request_decode_rejects_empty_service() {
4347 let bytes = vec![0x00];
4348 let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4349 assert!(matches!(err, RpcCodecError::Truncated(_)));
4350 }
4351
4352 #[test]
4353 fn request_decode_rejects_oversize_body_length() {
4354 // Forge: service "x", deadline 0, flags 0, no headers,
4355 // body length = MAX_RPC_BODY_LEN + 1 (no body bytes).
4356 let mut bytes = vec![1u8, b'x'];
4357 bytes.extend_from_slice(&0u64.to_le_bytes()); // deadline
4358 bytes.extend_from_slice(&0u16.to_le_bytes()); // flags
4359 bytes.push(0); // 0 headers
4360 bytes.extend_from_slice(&((MAX_RPC_BODY_LEN as u32) + 1).to_le_bytes());
4361 let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4362 assert!(
4363 matches!(err, RpcCodecError::TooLarge { field, .. } if field == "body"),
4364 "got {err:?}",
4365 );
4366 }
4367
4368 #[test]
4369 fn request_decode_rejects_oversize_headers_count() {
4370 // Forge: service "x", deadline 0, flags 0, headers count =
4371 // MAX_RPC_HEADERS + 1 (no header bytes).
4372 let mut bytes = vec![1u8, b'x'];
4373 bytes.extend_from_slice(&0u64.to_le_bytes());
4374 bytes.extend_from_slice(&0u16.to_le_bytes());
4375 bytes.push((MAX_RPC_HEADERS as u8).wrapping_add(1));
4376 let err = RpcRequestPayload::decode(Bytes::from(bytes)).unwrap_err();
4377 assert!(
4378 matches!(err, RpcCodecError::TooLarge { field, .. } if field == "headers"),
4379 "got {err:?}",
4380 );
4381 }
4382
4383 #[test]
4384 fn request_decode_rejects_truncated_at_each_field() {
4385 // Build a valid payload then truncate at each field
4386 // boundary; every truncation must error rather than silently
4387 // accept partial state.
4388 let p = RpcRequestPayload {
4389 service: "svc".to_string(),
4390 deadline_ns: 1,
4391 flags: 0,
4392 headers: vec![header("h", b"v")],
4393 body: Bytes::from_static(b"body"),
4394 };
4395 let bytes = p.encode();
4396 // Try each prefix length up to but not including the full
4397 // length — every one must be a decode error.
4398 for trim_to in 0..bytes.len() {
4399 let truncated = &bytes[..trim_to];
4400 let result = RpcRequestPayload::decode(Bytes::copy_from_slice(truncated));
4401 assert!(
4402 result.is_err(),
4403 "trim_to={trim_to} of {} must error, got {:?}",
4404 bytes.len(),
4405 result,
4406 );
4407 }
4408 // Full length must succeed.
4409 assert!(RpcRequestPayload::decode(Bytes::from(bytes)).is_ok());
4410 }
4411
4412 // --------------------------------------------------------------------
4413 // RpcResponsePayload codec.
4414 // --------------------------------------------------------------------
4415
4416 #[test]
4417 fn response_roundtrip_ok_with_body() {
4418 let p = RpcResponsePayload {
4419 status: RpcStatus::Ok,
4420 headers: vec![header("content-type", b"application/json")],
4421 body: Bytes::from_static(b"{\"answer\":42}"),
4422 };
4423 let bytes = p.encode();
4424 let decoded = RpcResponsePayload::decode(Bytes::from(bytes)).unwrap();
4425 assert_eq!(decoded, p);
4426 }
4427
4428 #[test]
4429 fn response_roundtrip_application_status() {
4430 let p = RpcResponsePayload {
4431 status: RpcStatus::Application(0xBEEF),
4432 headers: vec![],
4433 body: Bytes::from_static(b"app-specific diagnostic"),
4434 };
4435 let bytes = p.encode();
4436 let decoded = RpcResponsePayload::decode(Bytes::from(bytes)).unwrap();
4437 assert_eq!(decoded.status, RpcStatus::Application(0xBEEF));
4438 assert_eq!(decoded.body, p.body);
4439 }
4440
4441 #[test]
4442 fn response_decode_rejects_empty_buffer() {
4443 let err = RpcResponsePayload::decode(Bytes::new()).unwrap_err();
4444 assert!(matches!(err, RpcCodecError::Truncated(_)));
4445 }
4446
4447 // --------------------------------------------------------------------
4448 // Invariant: encoded sizes are reasonable.
4449 // --------------------------------------------------------------------
4450
4451 /// Wire-size budget regression: a tiny request encodes in a
4452 /// small constant number of bytes plus body. Pre-fix the headers
4453 /// or service-length encoding could have grown unbounded; pin
4454 /// the small-case so a regression in either inflates the
4455 /// minimum.
4456 #[test]
4457 fn request_minimum_wire_size_is_bounded() {
4458 let p = RpcRequestPayload {
4459 service: "x".to_string(),
4460 deadline_ns: 0,
4461 flags: 0,
4462 headers: vec![],
4463 body: Bytes::new(),
4464 };
4465 let size = p.encode().len();
4466 // 1 (svc len) + 1 (svc bytes) + 8 (deadline) + 2 (flags) + 1 (headers count) + 4 (body len) = 17
4467 assert_eq!(size, 17, "minimum request encodes in 17 bytes");
4468 assert_eq!(request_wire_size(&p), EVENT_META_SIZE + 17);
4469 }
4470
4471 #[test]
4472 fn response_minimum_wire_size_is_bounded() {
4473 let p = RpcResponsePayload {
4474 status: RpcStatus::Ok,
4475 headers: vec![],
4476 body: Bytes::new(),
4477 };
4478 let size = p.encode().len();
4479 // 2 (status) + 1 (headers count) + 4 (body len) = 7
4480 assert_eq!(size, 7, "minimum response encodes in 7 bytes");
4481 assert_eq!(response_wire_size(&p), EVENT_META_SIZE + 7);
4482 }
4483
4484 // ====================================================================
4485 // RpcServerFold — server-side dispatch behavior.
4486 //
4487 // These tests drive the fold directly with synthetic events
4488 // and observe the emitter callback. The end-to-end story
4489 // (Mesh::serve_rpc + bus + cortex adapter) is integration-
4490 // tested separately once the glue layer lands.
4491 // ====================================================================
4492
4493 use super::super::super::redex::{RedexEntry, RedexEvent};
4494 use std::sync::atomic::AtomicUsize;
4495 use std::time::Duration;
4496
4497 /// Captured-response store. Test-local typedef so the
4498 /// `capturing_emitter` signature stays under the `clippy::
4499 /// type_complexity` lint.
4500 type CapturedResponses = Arc<Mutex<Vec<(u64, u64, RpcResponsePayload)>>>;
4501
4502 /// Build a synthetic RedexEvent carrying an RPC request payload.
4503 /// Tests use this to drive the fold without going through the
4504 /// real ingest/cortex pipeline.
4505 fn rpc_request_event(
4506 caller_origin: u64,
4507 call_id: u64,
4508 payload: RpcRequestPayload,
4509 ) -> RedexEvent {
4510 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, caller_origin, call_id, 0);
4511 let mut buf = Vec::new();
4512 buf.extend_from_slice(&meta.to_bytes());
4513 buf.extend_from_slice(&payload.encode());
4514 RedexEvent {
4515 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
4516 payload: bytes::Bytes::from(buf),
4517 }
4518 }
4519
4520 fn rpc_cancel_event(caller_origin: u64, call_id: u64) -> RedexEvent {
4521 let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, caller_origin, call_id, 0);
4522 let buf = meta.to_bytes().to_vec();
4523 RedexEvent {
4524 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
4525 payload: bytes::Bytes::from(buf),
4526 }
4527 }
4528
4529 /// Captures responses emitted by the fold for assertion in tests.
4530 fn capturing_emitter() -> (RpcResponseEmitter, CapturedResponses) {
4531 let captured: CapturedResponses = Arc::new(Mutex::new(Vec::new()));
4532 let captured_clone = captured.clone();
4533 let emit: RpcResponseEmitter = Arc::new(move |origin, call_id, resp| {
4534 captured_clone.lock().push((origin, call_id, resp));
4535 });
4536 (emit, captured)
4537 }
4538
4539 /// A handler that just echoes the request body back as the
4540 /// response body, with `RpcStatus::Ok`.
4541 struct EchoHandler;
4542 #[async_trait::async_trait]
4543 impl RpcHandler for EchoHandler {
4544 async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4545 Ok(RpcResponsePayload {
4546 status: RpcStatus::Ok,
4547 headers: vec![],
4548 body: ctx.payload.body,
4549 })
4550 }
4551 }
4552
4553 /// Wait until `pred` is true, polling at 10ms intervals up to
4554 /// `timeout`. Used to await spawned-handler completion in tests
4555 /// without a sleep-and-pray.
4556 async fn wait_until<F: Fn() -> bool>(pred: F, timeout: Duration) -> bool {
4557 let start = std::time::Instant::now();
4558 while start.elapsed() < timeout {
4559 if pred() {
4560 return true;
4561 }
4562 tokio::time::sleep(Duration::from_millis(10)).await;
4563 }
4564 pred()
4565 }
4566
4567 /// Happy path: a REQUEST event triggers the handler; the fold
4568 /// emits a RESPONSE with the handler's payload.
4569 #[tokio::test]
4570 async fn server_fold_request_invokes_handler_and_emits_response() {
4571 let (emit, captured) = capturing_emitter();
4572 let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
4573 let req = RpcRequestPayload {
4574 service: "echo".to_string(),
4575 deadline_ns: 0,
4576 flags: 0,
4577 headers: vec![],
4578 body: Bytes::from_static(b"hello"),
4579 };
4580 let ev = rpc_request_event(0xCAFE, 7, req);
4581 fold.apply(&ev, &mut ()).unwrap();
4582
4583 // Handler runs in tokio::spawn; wait for the emit.
4584 assert!(
4585 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4586 "expected one emitted response"
4587 );
4588 let captured = captured.lock();
4589 assert_eq!(captured.len(), 1);
4590 let (origin, call_id, resp) = &captured[0];
4591 assert_eq!(*origin, 0xCAFE);
4592 assert_eq!(*call_id, 7);
4593 assert_eq!(resp.status, RpcStatus::Ok);
4594 assert_eq!(resp.body.as_ref(), b"hello");
4595 // In-flight set is cleaned up after the handler completes.
4596 assert!(fold.in_flight_keys().is_empty());
4597 }
4598
4599 /// Application error: handler returns
4600 /// `RpcHandlerError::Application` → fold emits a response with
4601 /// `RpcStatus::Application(code)` and the message as body.
4602 #[tokio::test]
4603 async fn server_fold_application_error_maps_to_application_status() {
4604 struct AppErrHandler;
4605 #[async_trait::async_trait]
4606 impl RpcHandler for AppErrHandler {
4607 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4608 Err(RpcHandlerError::Application {
4609 code: 0xBEEF,
4610 message: "bad input".to_string(),
4611 })
4612 }
4613 }
4614 let (emit, captured) = capturing_emitter();
4615 let mut fold = RpcServerFold::new(Arc::new(AppErrHandler), emit);
4616 let req = RpcRequestPayload {
4617 service: "x".to_string(),
4618 deadline_ns: 0,
4619 flags: 0,
4620 headers: vec![],
4621 body: Bytes::new(),
4622 };
4623 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4624 assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4625 let captured = captured.lock();
4626 let (_, _, resp) = &captured[0];
4627 assert_eq!(resp.status, RpcStatus::Application(0xBEEF));
4628 assert_eq!(resp.body.as_ref(), b"bad input");
4629 }
4630
4631 /// Internal error: handler returns `RpcHandlerError::Internal`
4632 /// → fold emits `RpcStatus::Internal` with the message body.
4633 #[tokio::test]
4634 async fn server_fold_internal_error_maps_to_internal_status() {
4635 struct IntErrHandler;
4636 #[async_trait::async_trait]
4637 impl RpcHandler for IntErrHandler {
4638 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4639 Err(RpcHandlerError::Internal("db timeout".to_string()))
4640 }
4641 }
4642 let (emit, captured) = capturing_emitter();
4643 let mut fold = RpcServerFold::new(Arc::new(IntErrHandler), emit);
4644 let req = RpcRequestPayload {
4645 service: "x".to_string(),
4646 deadline_ns: 0,
4647 flags: 0,
4648 headers: vec![],
4649 body: Bytes::new(),
4650 };
4651 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4652 assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4653 let captured = captured.lock();
4654 let (_, _, resp) = &captured[0];
4655 assert_eq!(resp.status, RpcStatus::Internal);
4656 assert_eq!(resp.body.as_ref(), b"db timeout");
4657 }
4658
4659 /// Handler panic: caught by the fold's `catch_unwind`; surfaces
4660 /// as `RpcStatus::Internal` to the caller. Pre-fix the panic
4661 /// would propagate up the spawned task, log a tokio
4662 /// uncaught-panic message, and silently leave the caller
4663 /// waiting forever.
4664 #[tokio::test]
4665 async fn server_fold_handler_panic_surfaces_as_internal_status() {
4666 struct PanicHandler;
4667 #[async_trait::async_trait]
4668 impl RpcHandler for PanicHandler {
4669 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4670 panic!("kaboom");
4671 }
4672 }
4673 let (emit, captured) = capturing_emitter();
4674 let mut fold = RpcServerFold::new(Arc::new(PanicHandler), emit);
4675 let req = RpcRequestPayload {
4676 service: "x".to_string(),
4677 deadline_ns: 0,
4678 flags: 0,
4679 headers: vec![],
4680 body: Bytes::new(),
4681 };
4682 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4683 assert!(wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await);
4684 let captured = captured.lock();
4685 let (_, _, resp) = &captured[0];
4686 assert_eq!(resp.status, RpcStatus::Internal);
4687 assert!(
4688 String::from_utf8_lossy(&resp.body).contains("kaboom"),
4689 "panic message must surface in body, got {}",
4690 String::from_utf8_lossy(&resp.body),
4691 );
4692 }
4693
4694 /// Deadline already passed: server short-circuits with
4695 /// `Timeout` without invoking the handler. Pinned via the
4696 /// `with_test_now_ns` clock override so the test doesn't race
4697 /// wall time.
4698 #[tokio::test]
4699 async fn server_fold_deadline_already_passed_short_circuits_to_timeout() {
4700 let invoked = Arc::new(AtomicBool::new(false));
4701 struct CountingHandler {
4702 invoked: Arc<AtomicBool>,
4703 }
4704 #[async_trait::async_trait]
4705 impl RpcHandler for CountingHandler {
4706 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4707 self.invoked.store(true, Ordering::Release);
4708 Ok(RpcResponsePayload {
4709 status: RpcStatus::Ok,
4710 headers: vec![],
4711 body: Bytes::new(),
4712 })
4713 }
4714 }
4715 let (emit, captured) = capturing_emitter();
4716 let mut fold = RpcServerFold::new(
4717 Arc::new(CountingHandler {
4718 invoked: invoked.clone(),
4719 }),
4720 emit,
4721 )
4722 // Use a clock value > DEADLINE_SKEW_TOLERANCE_NS + 1
4723 // (10s + 1ns) so the deadline-passed check fires past the
4724 // skew tolerance window. With now=20s and deadline=1ns,
4725 // (now - 10s) > 1ns.
4726 .with_test_now_ns(20_000_000_000);
4727 let req = RpcRequestPayload {
4728 service: "x".to_string(),
4729 // Deadline well in the past — past the skew tolerance.
4730 deadline_ns: 1_000,
4731 flags: 0,
4732 headers: vec![],
4733 body: Bytes::new(),
4734 };
4735 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4736 // Emit happens synchronously in the deadline-passed branch
4737 // (no handler spawn).
4738 let captured = captured.lock();
4739 assert_eq!(captured.len(), 1);
4740 let (_, _, resp) = &captured[0];
4741 assert_eq!(resp.status, RpcStatus::Timeout);
4742 assert!(
4743 !invoked.load(Ordering::Acquire),
4744 "handler must NOT be invoked when deadline already passed",
4745 );
4746 }
4747
4748 /// Regression: a deadline that has elapsed by less than
4749 /// `DEADLINE_SKEW_TOLERANCE_NS` does NOT short-circuit. A
4750 /// peer with a slightly-fast clock would otherwise be
4751 /// prematurely timed out before the handler ever ran.
4752 #[tokio::test]
4753 async fn server_fold_deadline_within_skew_tolerance_invokes_handler() {
4754 let invoked = Arc::new(AtomicBool::new(false));
4755 struct CountingHandler {
4756 invoked: Arc<AtomicBool>,
4757 }
4758 #[async_trait::async_trait]
4759 impl RpcHandler for CountingHandler {
4760 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4761 self.invoked.store(true, Ordering::Release);
4762 Ok(RpcResponsePayload {
4763 status: RpcStatus::Ok,
4764 headers: vec![],
4765 body: Bytes::new(),
4766 })
4767 }
4768 }
4769 let (emit, captured) = capturing_emitter();
4770 let mut fold = RpcServerFold::new(
4771 Arc::new(CountingHandler {
4772 invoked: invoked.clone(),
4773 }),
4774 emit,
4775 )
4776 // now = 100s, deadline = 95s → elapsed = 5s, within the
4777 // 10s skew tolerance.
4778 .with_test_now_ns(100_000_000_000);
4779 let req = RpcRequestPayload {
4780 service: "x".to_string(),
4781 deadline_ns: 95_000_000_000,
4782 flags: 0,
4783 headers: vec![],
4784 body: Bytes::new(),
4785 };
4786 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
4787 assert!(
4788 wait_until(|| invoked.load(Ordering::Acquire), Duration::from_secs(1)).await,
4789 "handler must run when deadline is within skew tolerance",
4790 );
4791 let captured = captured.lock();
4792 assert_eq!(captured.len(), 1);
4793 assert_eq!(captured[0].2.status, RpcStatus::Ok);
4794 }
4795
4796 /// CANCEL flips the matching in-flight token. The handler that
4797 /// `select!`s on the cancellation observes the signal and can
4798 /// short-circuit. The fold removes the in-flight entry on
4799 /// CANCEL.
4800 #[tokio::test]
4801 async fn server_fold_cancel_flips_token_and_clears_in_flight() {
4802 let resumed_after_cancel = Arc::new(AtomicBool::new(false));
4803 struct CancelObservingHandler {
4804 resumed: Arc<AtomicBool>,
4805 }
4806 #[async_trait::async_trait]
4807 impl RpcHandler for CancelObservingHandler {
4808 async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4809 tokio::select! {
4810 _ = ctx.cancellation.cancelled() => {
4811 self.resumed.store(true, Ordering::Release);
4812 Err(RpcHandlerError::Internal("cancelled by caller".to_string()))
4813 }
4814 _ = tokio::time::sleep(Duration::from_secs(5)) => {
4815 Ok(RpcResponsePayload {
4816 status: RpcStatus::Ok,
4817 headers: vec![],
4818 body: Bytes::from_static(b"slept the full window"),
4819 })
4820 }
4821 }
4822 }
4823 }
4824 let (emit, captured) = capturing_emitter();
4825 let mut fold = RpcServerFold::new(
4826 Arc::new(CancelObservingHandler {
4827 resumed: resumed_after_cancel.clone(),
4828 }),
4829 emit,
4830 );
4831 let req = RpcRequestPayload {
4832 service: "x".to_string(),
4833 deadline_ns: 0,
4834 flags: 0,
4835 headers: vec![],
4836 body: Bytes::new(),
4837 };
4838 fold.apply(&rpc_request_event(1, 42, req), &mut ()).unwrap();
4839 // Wait until the handler's `select!` is parked; then send
4840 // CANCEL.
4841 assert!(
4842 wait_until(
4843 || fold.in_flight_keys().contains(&(1, 42)),
4844 Duration::from_secs(1)
4845 )
4846 .await
4847 );
4848 fold.apply(&rpc_cancel_event(1, 42), &mut ()).unwrap();
4849 // The cancellation is observed by the handler. Even though
4850 // the handler returns `Internal("cancelled by caller")`,
4851 // the fold's CANCEL-wins ordering overrides the response
4852 // with `RpcStatus::Cancelled` so the caller sees the
4853 // documented status code rather than the handler's
4854 // accidental Internal payload.
4855 assert!(
4856 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4857 "handler should observe cancellation and emit response"
4858 );
4859 assert!(
4860 resumed_after_cancel.load(Ordering::Acquire),
4861 "handler must observe cancellation"
4862 );
4863 let captured = captured.lock();
4864 assert_eq!(captured.len(), 1);
4865 let (_, _, resp) = &captured[0];
4866 assert_eq!(
4867 resp.status,
4868 RpcStatus::Cancelled,
4869 "CANCEL must override handler outcome with RpcStatus::Cancelled"
4870 );
4871 // CANCEL also removes the in-flight entry directly.
4872 // Handler completion removes it again (idempotent).
4873 assert!(fold.in_flight_keys().is_empty());
4874 }
4875
4876 /// Regression: a duplicate REQUEST for an already-in-flight
4877 /// `(origin_hash, call_id)` must be refused with a synthetic
4878 /// `Internal` response and must NOT spawn a second handler.
4879 /// Without the refusal, two handlers race under the same key
4880 /// and CANCEL handling is broken (CANCEL removes the entry
4881 /// the first handler reinserts, etc.).
4882 #[tokio::test]
4883 async fn server_fold_duplicate_request_refuses_without_double_dispatch() {
4884 let invocations = Arc::new(AtomicUsize::new(0));
4885 struct CountingHandler {
4886 invocations: Arc<AtomicUsize>,
4887 }
4888 #[async_trait::async_trait]
4889 impl RpcHandler for CountingHandler {
4890 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4891 self.invocations.fetch_add(1, Ordering::SeqCst);
4892 tokio::time::sleep(Duration::from_millis(80)).await;
4893 Ok(RpcResponsePayload {
4894 status: RpcStatus::Ok,
4895 headers: vec![],
4896 body: Bytes::from_static(b"done"),
4897 })
4898 }
4899 }
4900 let (emit, captured) = capturing_emitter();
4901 let mut fold = RpcServerFold::new(
4902 Arc::new(CountingHandler {
4903 invocations: invocations.clone(),
4904 }),
4905 emit,
4906 );
4907 let req = RpcRequestPayload {
4908 service: "x".to_string(),
4909 deadline_ns: 0,
4910 flags: 0,
4911 headers: vec![],
4912 body: Bytes::new(),
4913 };
4914 // First REQUEST — handler spawns and parks in sleep.
4915 fold.apply(&rpc_request_event(1, 99, req.clone()), &mut ())
4916 .unwrap();
4917 assert!(
4918 wait_until(
4919 || fold.in_flight_keys().contains(&(1, 99)),
4920 Duration::from_secs(1)
4921 )
4922 .await
4923 );
4924 // Second REQUEST with same key — must be refused
4925 // synchronously with a synthetic Internal response.
4926 fold.apply(&rpc_request_event(1, 99, req), &mut ()).unwrap();
4927 // The refusal emit happens synchronously in the fold's
4928 // sync emitter path.
4929 let after_dup = captured.lock().clone();
4930 assert_eq!(
4931 after_dup.len(),
4932 1,
4933 "duplicate REQUEST must emit exactly one synthetic refusal",
4934 );
4935 assert_eq!(after_dup[0].2.status, RpcStatus::Internal);
4936 assert!(String::from_utf8_lossy(&after_dup[0].2.body).contains("duplicate"));
4937 // Wait for the first handler to complete.
4938 assert!(
4939 wait_until(|| captured.lock().len() == 2, Duration::from_secs(2)).await,
4940 "first handler should still complete normally"
4941 );
4942 let captured = captured.lock();
4943 assert_eq!(captured.len(), 2);
4944 // The first handler's response is the second emit (Ok).
4945 assert_eq!(captured[1].2.status, RpcStatus::Ok);
4946 assert_eq!(
4947 invocations.load(Ordering::SeqCst),
4948 1,
4949 "duplicate REQUEST must NOT spawn a second handler",
4950 );
4951 }
4952
4953 /// Regression: a CANCEL that fires while the handler is mid-
4954 /// flight must override the handler's outcome with
4955 /// `RpcStatus::Cancelled` even when the handler ignores
4956 /// cancellation and returns `Ok(...)`. Without this, a caller
4957 /// who cancelled would see the handler's accidental success
4958 /// payload and could not tell whether their CANCEL won.
4959 #[tokio::test]
4960 async fn server_fold_cancel_overrides_handler_ok_with_cancelled_status() {
4961 struct IgnoresCancellation;
4962 #[async_trait::async_trait]
4963 impl RpcHandler for IgnoresCancellation {
4964 async fn call(&self, _ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
4965 // Sleep long enough for the test to send CANCEL,
4966 // then return Ok regardless. This models a handler
4967 // that doesn't `select!` on `ctx.cancellation`.
4968 tokio::time::sleep(Duration::from_millis(80)).await;
4969 Ok(RpcResponsePayload {
4970 status: RpcStatus::Ok,
4971 headers: vec![],
4972 body: Bytes::from_static(b"finished despite cancellation"),
4973 })
4974 }
4975 }
4976 let (emit, captured) = capturing_emitter();
4977 let mut fold = RpcServerFold::new(Arc::new(IgnoresCancellation), emit);
4978 let req = RpcRequestPayload {
4979 service: "x".to_string(),
4980 deadline_ns: 0,
4981 flags: 0,
4982 headers: vec![],
4983 body: Bytes::new(),
4984 };
4985 fold.apply(&rpc_request_event(7, 11, req), &mut ()).unwrap();
4986 // Wait until the handler is parked, then send CANCEL well
4987 // before the handler's sleep elapses.
4988 assert!(
4989 wait_until(
4990 || fold.in_flight_keys().contains(&(7, 11)),
4991 Duration::from_secs(1)
4992 )
4993 .await
4994 );
4995 fold.apply(&rpc_cancel_event(7, 11), &mut ()).unwrap();
4996 assert!(
4997 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
4998 "handler should complete and emit response"
4999 );
5000 let captured = captured.lock();
5001 assert_eq!(captured.len(), 1);
5002 let (_, _, resp) = &captured[0];
5003 assert_eq!(
5004 resp.status,
5005 RpcStatus::Cancelled,
5006 "handler that returned Ok despite CANCEL must surface as Cancelled"
5007 );
5008 assert!(fold.in_flight_keys().is_empty());
5009 }
5010
5011 /// CANCEL for an unknown call_id is a no-op (no panic, no
5012 /// stray emission). This is the case where a CANCEL races a
5013 /// handler completion or a duplicate CANCEL arrives.
5014 #[tokio::test]
5015 async fn server_fold_cancel_for_unknown_call_id_is_no_op() {
5016 let (emit, captured) = capturing_emitter();
5017 let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
5018 // CANCEL with no matching REQUEST.
5019 fold.apply(&rpc_cancel_event(1, 999), &mut ()).unwrap();
5020 assert!(captured.lock().is_empty());
5021 assert!(fold.in_flight_keys().is_empty());
5022 }
5023
5024 /// Malformed request payload: fold emits a
5025 /// `RpcStatus::UnknownVersion` response and continues. A
5026 /// regression that returned `Err` here would kill the cortex
5027 /// adapter's tail-and-fold task on the first malformed event,
5028 /// which is the wrong behavior for an RPC server that needs
5029 /// to keep serving past garbage.
5030 #[tokio::test]
5031 async fn server_fold_malformed_payload_emits_unknown_version_and_keeps_going() {
5032 let (emit, captured) = capturing_emitter();
5033 let mut fold = RpcServerFold::new(Arc::new(EchoHandler), emit);
5034 // Build an event with valid meta but a garbage tail (just
5035 // a single 0x00 byte, which fails the service-len check).
5036 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, 7, 1, 0);
5037 let mut buf = Vec::new();
5038 buf.extend_from_slice(&meta.to_bytes());
5039 buf.push(0x00); // svc_len = 0 → empty service → Truncated
5040 let ev = RedexEvent {
5041 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5042 payload: bytes::Bytes::from(buf),
5043 };
5044 let result = fold.apply(&ev, &mut ());
5045 assert!(
5046 result.is_ok(),
5047 "fold must NOT return Err on malformed payload (would kill the adapter); got {result:?}"
5048 );
5049 let captured = captured.lock();
5050 assert_eq!(captured.len(), 1);
5051 let (_, _, resp) = &captured[0];
5052 assert_eq!(resp.status, RpcStatus::UnknownVersion);
5053 }
5054
5055 /// Cancellation token roundtrip: `cancel()` sets `is_cancelled`
5056 /// and wakes a parked `cancelled().await`.
5057 #[tokio::test]
5058 async fn cancellation_token_signals_waiters() {
5059 let token = RpcCancellationToken::new();
5060 assert!(!token.is_cancelled());
5061 let token2 = token.clone();
5062 let waiter = tokio::spawn(async move {
5063 token2.cancelled().await;
5064 });
5065 // Give the waiter a chance to park.
5066 tokio::time::sleep(Duration::from_millis(10)).await;
5067 token.cancel();
5068 // Waiter wakes.
5069 tokio::time::timeout(Duration::from_secs(1), waiter)
5070 .await
5071 .expect("waiter must wake within 1s")
5072 .expect("waiter task must not panic");
5073 assert!(token.is_cancelled());
5074 }
5075
5076 // ====================================================================
5077 // W3C Trace Context propagation.
5078 // ====================================================================
5079
5080 /// `build_trace_headers` + `extract_trace_context` round-trip
5081 /// a typical W3C trace context through the request headers.
5082 #[test]
5083 fn trace_context_round_trips_through_headers() {
5084 let tc = TraceContext {
5085 traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string(),
5086 tracestate: "vendor1=opaque-value,vendor2=other".to_string(),
5087 };
5088 let headers = build_trace_headers(&tc);
5089 assert_eq!(headers.len(), 2, "non-empty tracestate emits both headers");
5090 let extracted = extract_trace_context(&headers).expect("must extract");
5091 assert_eq!(extracted, tc);
5092 }
5093
5094 /// Regression for M21: `extract_trace_context` does
5095 /// case-INsensitive matching on the header names, matching the
5096 /// W3C and HTTP conventions. A peer that emits capitalized
5097 /// `Traceparent` or `TRACESTATE` must still be picked up — the
5098 /// previous implementation used `name.as_str() == "traceparent"`
5099 /// and silently dropped any non-lowercase variant.
5100 #[test]
5101 fn extract_trace_context_is_case_insensitive_on_header_names() {
5102 // Capital-T traceparent + uppercase TRACESTATE — both must
5103 // be picked up by the extractor.
5104 let headers = vec![
5105 (
5106 "Traceparent".to_string(),
5107 b"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_vec(),
5108 ),
5109 ("TRACESTATE".to_string(), b"vendor=value".to_vec()),
5110 ];
5111 let extracted =
5112 extract_trace_context(&headers).expect("capital-T traceparent must be recognized");
5113 assert_eq!(
5114 extracted.traceparent,
5115 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
5116 );
5117 assert_eq!(extracted.tracestate, "vendor=value");
5118
5119 // Mixed-case still works.
5120 let headers = vec![
5121 ("traceParent".to_string(), b"00-aa-bb-01".to_vec()),
5122 ("TraceState".to_string(), b"v=1".to_vec()),
5123 ];
5124 let extracted =
5125 extract_trace_context(&headers).expect("mixed-case traceparent must be recognized");
5126 assert_eq!(extracted.traceparent, "00-aa-bb-01");
5127 assert_eq!(extracted.tracestate, "v=1");
5128 }
5129
5130 /// Empty `tracestate` is omitted on the wire (W3C convention)
5131 /// but extracted as empty on the receive side.
5132 #[test]
5133 fn trace_context_empty_tracestate_omitted_from_wire() {
5134 let tc = TraceContext {
5135 traceparent: "00-aa-bb-01".to_string(),
5136 tracestate: String::new(),
5137 };
5138 let headers = build_trace_headers(&tc);
5139 assert_eq!(
5140 headers.len(),
5141 1,
5142 "empty tracestate must NOT be emitted on the wire",
5143 );
5144 assert_eq!(headers[0].0, "traceparent");
5145 let extracted = extract_trace_context(&headers).expect("must extract");
5146 assert_eq!(extracted.traceparent, "00-aa-bb-01");
5147 assert_eq!(extracted.tracestate, "");
5148 }
5149
5150 /// Headers without `traceparent` decode as `None`. Useful for
5151 /// the FLAG_RPC_PROPAGATE_TRACE-set-but-no-headers misuse
5152 /// case — the server gets `None` rather than a bogus context.
5153 #[test]
5154 fn trace_context_missing_traceparent_returns_none() {
5155 let headers = vec![
5156 ("content-type".to_string(), b"application/json".to_vec()),
5157 ("idempotency-key".to_string(), b"abc".to_vec()),
5158 ];
5159 assert!(extract_trace_context(&headers).is_none());
5160 }
5161
5162 /// Server fold populates `RpcContext::trace_context` only when
5163 /// the caller signals `FLAG_RPC_PROPAGATE_TRACE`. End-to-end
5164 /// through the fold's apply path.
5165 #[tokio::test]
5166 async fn server_fold_propagates_trace_context_via_flag() {
5167 struct CapturingHandler {
5168 captured: Arc<Mutex<Option<Option<TraceContext>>>>,
5169 }
5170 #[async_trait::async_trait]
5171 impl RpcHandler for CapturingHandler {
5172 async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
5173 *self.captured.lock() = Some(ctx.trace_context.clone());
5174 Ok(RpcResponsePayload {
5175 status: RpcStatus::Ok,
5176 headers: vec![],
5177 body: Bytes::new(),
5178 })
5179 }
5180 }
5181
5182 // Helper: run one request through a fresh fold and return
5183 // what the handler captured for trace_context.
5184 async fn run(req: RpcRequestPayload) -> Option<TraceContext> {
5185 let captured: Arc<Mutex<Option<Option<TraceContext>>>> = Arc::new(Mutex::new(None));
5186 let (emit, _captured_responses) = capturing_emitter();
5187 let handler = Arc::new(CapturingHandler {
5188 captured: captured.clone(),
5189 });
5190 let mut fold = RpcServerFold::new(handler, emit);
5191 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
5192 // Wait for the spawned handler to finish.
5193 assert!(
5194 wait_until(|| captured.lock().is_some(), Duration::from_secs(2)).await,
5195 "handler must run"
5196 );
5197 let observed = captured.lock().take().unwrap();
5198 observed
5199 }
5200
5201 // Case 1: FLAG_RPC_PROPAGATE_TRACE NOT set → trace_context is None.
5202 let req_no_flag = RpcRequestPayload {
5203 service: "x".to_string(),
5204 deadline_ns: 0,
5205 flags: 0,
5206 headers: vec![("traceparent".to_string(), b"00-aa-bb-01".to_vec())],
5207 body: Bytes::new(),
5208 };
5209 assert!(
5210 run(req_no_flag).await.is_none(),
5211 "without the flag, server must NOT extract trace_context"
5212 );
5213
5214 // Case 2: FLAG set + headers present → server gets the context.
5215 let tc = TraceContext {
5216 traceparent: "00-trace-span-01".to_string(),
5217 tracestate: "vendor=value".to_string(),
5218 };
5219 let req_with_flag = RpcRequestPayload {
5220 service: "x".to_string(),
5221 deadline_ns: 0,
5222 flags: FLAG_RPC_PROPAGATE_TRACE,
5223 headers: build_trace_headers(&tc),
5224 body: Bytes::new(),
5225 };
5226 let observed = run(req_with_flag).await.expect("flag set → should be Some");
5227 assert_eq!(observed, tc);
5228
5229 // Case 3: FLAG set but headers missing → None (defensive).
5230 let req_flag_no_headers = RpcRequestPayload {
5231 service: "x".to_string(),
5232 deadline_ns: 0,
5233 flags: FLAG_RPC_PROPAGATE_TRACE,
5234 headers: vec![],
5235 body: Bytes::new(),
5236 };
5237 assert!(
5238 run(req_flag_no_headers).await.is_none(),
5239 "flag set but no headers → server gets None (no synthesis)"
5240 );
5241 }
5242
5243 /// Race: cancel fires AFTER the future is registered but
5244 /// BEFORE the await actually parks. The token's
5245 /// `notified()`-then-check ordering must catch this case
5246 /// without sleeping past the cancellation.
5247 #[tokio::test]
5248 async fn cancellation_token_does_not_miss_cancel_racing_register() {
5249 for _ in 0..50 {
5250 let token = RpcCancellationToken::new();
5251 let token2 = token.clone();
5252 let waiter = tokio::spawn(async move {
5253 token2.cancelled().await;
5254 });
5255 // No sleep — fire cancel as fast as possible against
5256 // the just-spawned waiter. In the worst case the
5257 // waiter has not yet reached `notified()`; it will see
5258 // `is_cancelled() == true` on its first check and
5259 // return immediately. In the other case it parks and
5260 // gets woken by `notify_waiters`.
5261 token.cancel();
5262 tokio::time::timeout(Duration::from_secs(1), waiter)
5263 .await
5264 .expect("waiter must complete within 1s")
5265 .expect("waiter task must not panic");
5266 }
5267 }
5268
5269 // ====================================================================
5270 // RpcClientFold — caller-side response routing.
5271 // ====================================================================
5272
5273 fn rpc_response_event(
5274 caller_origin: u64,
5275 call_id: u64,
5276 payload: RpcResponsePayload,
5277 ) -> RedexEvent {
5278 let meta = EventMeta::new(DISPATCH_RPC_RESPONSE, 0, caller_origin, call_id, 0);
5279 let mut buf = Vec::new();
5280 buf.extend_from_slice(&meta.to_bytes());
5281 buf.extend_from_slice(&payload.encode());
5282 RedexEvent {
5283 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5284 payload: bytes::Bytes::from(buf),
5285 }
5286 }
5287
5288 /// Happy path: register a call, drive the matching RESPONSE
5289 /// through the fold, the awaiting receiver gets the payload.
5290 #[tokio::test]
5291 async fn client_fold_routes_response_to_registered_waiter() {
5292 let pending = Arc::new(RpcClientPending::new());
5293 let mut fold = RpcClientFold::new(pending.clone());
5294 let rx = pending.register(42, 0);
5295 assert_eq!(pending.pending_count(), 1);
5296
5297 let resp = RpcResponsePayload {
5298 status: RpcStatus::Ok,
5299 headers: vec![],
5300 body: Bytes::from_static(b"hello back"),
5301 };
5302 fold.apply(&rpc_response_event(0xCAFE, 42, resp.clone()), &mut ())
5303 .unwrap();
5304
5305 // Receiver is completed.
5306 let got = tokio::time::timeout(Duration::from_secs(1), rx)
5307 .await
5308 .expect("receiver must complete within 1s")
5309 .expect("sender must not be dropped");
5310 assert_eq!(got, resp);
5311 // Pending entry cleared after delivery.
5312 assert_eq!(pending.pending_count(), 0);
5313 }
5314
5315 /// RESPONSE for an unknown call_id is a no-op (no panic, no
5316 /// stray side effect). This is the case where a stale RESPONSE
5317 /// arrives after the caller has cancelled or timed out.
5318 #[tokio::test]
5319 async fn client_fold_response_for_unknown_call_id_is_no_op() {
5320 let pending = Arc::new(RpcClientPending::new());
5321 let mut fold = RpcClientFold::new(pending.clone());
5322 let resp = RpcResponsePayload {
5323 status: RpcStatus::Ok,
5324 headers: vec![],
5325 body: Bytes::new(),
5326 };
5327 fold.apply(&rpc_response_event(1, 999, resp), &mut ())
5328 .unwrap();
5329 assert_eq!(pending.pending_count(), 0);
5330 }
5331
5332 /// REQUEST / CANCEL events on the reply channel are ignored
5333 /// rather than producing a stray decode-error or affecting
5334 /// pending state. The reply channel shouldn't carry these in
5335 /// practice (they belong on `<service>.requests`), but a
5336 /// misconfigured publisher must not break the fold.
5337 #[tokio::test]
5338 async fn client_fold_ignores_non_response_dispatches() {
5339 let pending = Arc::new(RpcClientPending::new());
5340 let mut fold = RpcClientFold::new(pending.clone());
5341 let _rx = pending.register(7, 0);
5342
5343 // REQUEST event landing on the caller's reply channel is
5344 // ignored.
5345 let req = RpcRequestPayload {
5346 service: "stray".to_string(),
5347 deadline_ns: 0,
5348 flags: 0,
5349 headers: vec![],
5350 body: Bytes::new(),
5351 };
5352 fold.apply(&rpc_request_event(1, 7, req), &mut ()).unwrap();
5353 // Pending entry untouched.
5354 assert_eq!(pending.pending_count(), 1);
5355
5356 // CANCEL on the reply channel: also ignored.
5357 fold.apply(&rpc_cancel_event(1, 7), &mut ()).unwrap();
5358 assert_eq!(pending.pending_count(), 1);
5359 }
5360
5361 /// `cancel(call_id)` removes the pending entry; a subsequent
5362 /// RESPONSE for that call_id is dropped silently.
5363 #[tokio::test]
5364 async fn client_pending_cancel_drops_subsequent_response() {
5365 let pending = Arc::new(RpcClientPending::new());
5366 let mut fold = RpcClientFold::new(pending.clone());
5367 let rx = pending.register(5, 0);
5368 pending.cancel(5);
5369 assert_eq!(pending.pending_count(), 0);
5370
5371 let resp = RpcResponsePayload {
5372 status: RpcStatus::Ok,
5373 headers: vec![],
5374 body: Bytes::new(),
5375 };
5376 fold.apply(&rpc_response_event(1, 5, resp), &mut ())
5377 .unwrap();
5378
5379 // Receiver was dropped along with the cancel. The previously-
5380 // returned `rx` errors with `Closed`.
5381 let result = tokio::time::timeout(Duration::from_secs(1), rx).await;
5382 let inner = result.expect("must complete within 1s");
5383 assert!(
5384 inner.is_err(),
5385 "receiver after cancel must error (sender dropped)",
5386 );
5387 }
5388
5389 /// Malformed RESPONSE payload: fold returns Ok (does not kill
5390 /// the cortex adapter) and leaves the pending entry intact for
5391 /// the caller's deadline / cancellation path to clean up. Pre-
5392 /// fix a bad payload could either kill the fold or fabricate a
5393 /// synthetic response — both wrong.
5394 #[tokio::test]
5395 async fn client_fold_malformed_response_is_logged_not_fatal() {
5396 let pending = Arc::new(RpcClientPending::new());
5397 let mut fold = RpcClientFold::new(pending.clone());
5398 let rx = pending.register(11, 0);
5399
5400 // Build a malformed RESPONSE: valid meta, garbage tail
5401 // (just `[0xFF]`, which is shorter than the required 2-byte
5402 // status + 1-byte headers count + 4-byte body length).
5403 let meta = EventMeta::new(DISPATCH_RPC_RESPONSE, 0, 1, 11, 0);
5404 let mut buf = Vec::new();
5405 buf.extend_from_slice(&meta.to_bytes());
5406 buf.push(0xFF);
5407 let ev = RedexEvent {
5408 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5409 payload: bytes::Bytes::from(buf),
5410 };
5411 let result = fold.apply(&ev, &mut ());
5412 assert!(
5413 result.is_ok(),
5414 "fold must not return Err on malformed response"
5415 );
5416 // Pending entry NOT cleared — the caller's cancellation
5417 // path will eventually clean it up via `cancel(call_id)`.
5418 assert_eq!(pending.pending_count(), 1);
5419 // Receiver is still pending (not delivered, not closed).
5420 assert!(
5421 tokio::time::timeout(Duration::from_millis(50), rx)
5422 .await
5423 .is_err(),
5424 "receiver should still be parked (no delivery, no drop)",
5425 );
5426 }
5427
5428 /// Re-registering the same call_id replaces the prior sender;
5429 /// the prior `Receiver` errors with `RecvError::Closed`. This
5430 /// is the misuse-detection path — call_ids should be unique
5431 /// per (caller, target) for the lifetime of the call, and a
5432 /// clash surfaces as a hard error rather than silently
5433 /// delivering the response to the wrong waiter.
5434 #[tokio::test]
5435 async fn client_pending_re_register_closes_prior_receiver() {
5436 let pending = Arc::new(RpcClientPending::new());
5437 let rx_a = pending.register(99, 0);
5438 let _rx_b = pending.register(99, 0);
5439 // The first receiver is now closed (sender dropped on
5440 // re-insert).
5441 let result = tokio::time::timeout(Duration::from_secs(1), rx_a).await;
5442 let inner = result.expect("must complete within 1s");
5443 assert!(inner.is_err(), "re-register must close prior receiver");
5444 assert_eq!(pending.pending_count(), 1);
5445 }
5446
5447 /// S-4 part 2 regression: a RESPONSE whose wire `from_node`
5448 /// doesn't match the recorded `target_node` must not resolve
5449 /// the call. Without the gate, any peer with publish access
5450 /// to the caller's reply channel could ship a spoofed
5451 /// response (random call_ids from S-4 part 1 narrow the
5452 /// attack surface, but this gate closes the residual case
5453 /// of an attacker who has observed the victim's call_id via
5454 /// some side channel).
5455 #[tokio::test]
5456 async fn client_pending_drops_response_from_wrong_target() {
5457 let pending = Arc::new(RpcClientPending::new());
5458 let rx = pending.register(0xDEAD_BEEF, 0x42);
5459 let resp = RpcResponsePayload {
5460 status: RpcStatus::Ok,
5461 headers: Vec::new(),
5462 body: Bytes::from_static(b"forged"),
5463 };
5464 // Forged from a different session peer — must drop.
5465 pending.deliver(0xDEAD_BEEF, 0x99, resp.clone());
5466 // Receiver is still parked; pending entry is intact.
5467 let parked = tokio::time::timeout(Duration::from_millis(50), rx).await;
5468 assert!(
5469 parked.is_err(),
5470 "forged RESPONSE from wrong target must not resolve the call"
5471 );
5472 assert_eq!(pending.pending_count(), 1);
5473
5474 // Legitimate RESPONSE from the recorded target resolves.
5475 let rx2 = pending.register(0xCAFE, 0x42);
5476 let ok_resp = RpcResponsePayload {
5477 status: RpcStatus::Ok,
5478 headers: Vec::new(),
5479 body: Bytes::from_static(b"ok"),
5480 };
5481 pending.deliver(0xCAFE, 0x42, ok_resp);
5482 let delivered = tokio::time::timeout(Duration::from_millis(50), rx2)
5483 .await
5484 .expect("must complete")
5485 .expect("must receive");
5486 assert_eq!(delivered.body.as_ref(), b"ok");
5487 }
5488
5489 // ====================================================================
5490 // Phase C — RpcClientPending + RpcClientFold for client-streaming.
5491 // ====================================================================
5492
5493 /// Build a REQUEST_GRANT event for tests. Mirror of
5494 /// `rpc_stream_grant_event` for the request direction.
5495 fn rpc_request_grant_event(caller_origin: u64, call_id: u64, credits: u32) -> RedexEvent {
5496 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, caller_origin, call_id, 0);
5497 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
5498 buf.extend_from_slice(&meta.to_bytes());
5499 buf.extend_from_slice(&encode_request_grant(call_id, credits));
5500 RedexEvent {
5501 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5502 payload: bytes::Bytes::from(buf),
5503 }
5504 }
5505
5506 /// `register_client_streaming` returns two halves: a terminal
5507 /// oneshot and a grant mpsc. A terminal RESPONSE resolves the
5508 /// oneshot (same shape as unary delivery); a REQUEST_GRANT
5509 /// for the same call_id pushes its credit onto the mpsc.
5510 #[tokio::test]
5511 async fn client_pending_client_streaming_routes_terminal_and_grants() {
5512 let pending = Arc::new(RpcClientPending::new());
5513 let (terminal_rx, mut grant_rx) = pending.register_client_streaming(0xCAFE_F00D, 0);
5514 // Push two grants — both should land on the mpsc.
5515 pending.deliver_grant(0xCAFE_F00D, 0, 3);
5516 pending.deliver_grant(0xCAFE_F00D, 0, 7);
5517 assert_eq!(grant_rx.recv().await, Some(3));
5518 assert_eq!(grant_rx.recv().await, Some(7));
5519 // Terminal RESPONSE resolves the oneshot and removes the
5520 // entry. Grant mpsc closes too (its sender drops with
5521 // the entry).
5522 let resp = RpcResponsePayload {
5523 status: RpcStatus::Ok,
5524 headers: vec![],
5525 body: Bytes::from_static(b"done"),
5526 };
5527 pending.deliver(0xCAFE_F00D, 0, resp.clone());
5528 let delivered = tokio::time::timeout(Duration::from_millis(50), terminal_rx)
5529 .await
5530 .expect("terminal must complete")
5531 .expect("terminal must receive");
5532 assert_eq!(delivered.body.as_ref(), b"done");
5533 // Grant mpsc now closed.
5534 assert_eq!(grant_rx.recv().await, None);
5535 assert_eq!(pending.pending_count(), 0);
5536 }
5537
5538 /// REQUEST_GRANT from a non-target session peer is dropped
5539 /// without injecting credit. Same S-4-style binding gate as
5540 /// the RESPONSE delivery path — a forged grant on a shared
5541 /// reply channel can't inflate a victim's credit budget.
5542 #[tokio::test]
5543 async fn client_pending_grant_from_wrong_target_is_dropped() {
5544 let pending = Arc::new(RpcClientPending::new());
5545 let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xCAFE_F00D, 0x42);
5546 // Forged grant from a different session peer — must drop.
5547 pending.deliver_grant(0xCAFE_F00D, 0x99, 100);
5548 let parked = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv()).await;
5549 assert!(
5550 parked.is_err(),
5551 "forged REQUEST_GRANT from wrong target must not inject credit"
5552 );
5553 // Legitimate grant from the recorded target lands.
5554 pending.deliver_grant(0xCAFE_F00D, 0x42, 5);
5555 let delivered = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv())
5556 .await
5557 .expect("must complete")
5558 .expect("must receive");
5559 assert_eq!(delivered, 5);
5560 }
5561
5562 /// `deliver_grant` for an unknown call_id is a silent no-op.
5563 /// Same harmless-drop semantics as a STREAM_GRANT for an
5564 /// unknown / non-flow-controlled call (CANCEL/GRANT race is
5565 /// always possible).
5566 #[tokio::test]
5567 async fn client_pending_grant_for_unknown_call_id_is_no_op() {
5568 let pending = Arc::new(RpcClientPending::new());
5569 // No entry registered for this call_id.
5570 pending.deliver_grant(0xDEAD, 0, 42);
5571 // No panics, no entries created.
5572 assert_eq!(pending.pending_count(), 0);
5573 }
5574
5575 /// `deliver_grant` for a unary entry is silently dropped
5576 /// (grants only apply to client-streaming / duplex calls).
5577 #[tokio::test]
5578 async fn client_pending_grant_for_unary_entry_is_no_op() {
5579 let pending = Arc::new(RpcClientPending::new());
5580 let _rx = pending.register(0xDEAD, 0);
5581 pending.deliver_grant(0xDEAD, 0, 42);
5582 // No state changes — entry still pending, no leak.
5583 assert_eq!(pending.pending_count(), 1);
5584 }
5585
5586 /// `RpcClientFold::apply` (legacy / loopback path) routes
5587 /// DISPATCH_RPC_REQUEST_GRANT events through to the matching
5588 /// ClientStreaming entry's grant mpsc. Pins the second
5589 /// dispatch arm the fold gained for Phase C.
5590 #[tokio::test]
5591 async fn client_fold_routes_request_grant_to_registered_waiter() {
5592 let pending = Arc::new(RpcClientPending::new());
5593 let mut fold = RpcClientFold::new(pending.clone());
5594 let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xC0DE, 0);
5595 let ev = rpc_request_grant_event(0xCAFE, 0xC0DE, 9);
5596 fold.apply(&ev, &mut ()).unwrap();
5597 let delivered = tokio::time::timeout(Duration::from_millis(50), grant_rx.recv())
5598 .await
5599 .expect("must complete")
5600 .expect("must receive");
5601 assert_eq!(delivered, 9);
5602 }
5603
5604 /// `RpcClientFold::apply` ignores REQUEST_GRANT events whose
5605 /// payload is malformed (wrong length): no panic, no entry
5606 /// state change, fold returns Ok and keeps going. Mirror of
5607 /// the response-side malformed-payload regression.
5608 #[tokio::test]
5609 async fn client_fold_malformed_request_grant_is_logged_not_fatal() {
5610 let pending = Arc::new(RpcClientPending::new());
5611 let mut fold = RpcClientFold::new(pending.clone());
5612 let (_terminal_rx, mut grant_rx) = pending.register_client_streaming(0xC0DE, 0);
5613 // Build a GRANT event whose payload is only 4 bytes
5614 // (truncated — codec needs 12).
5615 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, 0xCAFE, 0xC0DE, 0);
5616 let mut buf = Vec::new();
5617 buf.extend_from_slice(&meta.to_bytes());
5618 buf.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]);
5619 let ev = RedexEvent {
5620 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5621 payload: bytes::Bytes::from(buf),
5622 };
5623 let result = fold.apply(&ev, &mut ());
5624 assert!(
5625 result.is_ok(),
5626 "malformed REQUEST_GRANT must NOT kill the fold"
5627 );
5628 // No credit landed on the mpsc.
5629 let parked = tokio::time::timeout(Duration::from_millis(30), grant_rx.recv()).await;
5630 assert!(
5631 parked.is_err(),
5632 "malformed REQUEST_GRANT must not inject credit"
5633 );
5634 }
5635
5636 /// REQUEST_GRANT frames where the payload `call_id` does NOT
5637 /// agree with `EventMeta::seq_or_ts` must be dropped: the
5638 /// producer is contracted to encode both fields to the same
5639 /// value (see `RpcRequestGrantPayload::call_id` doc), so a
5640 /// mismatch is either a malformed frame or an attempted
5641 /// cross-call credit-injection. Without this check, a peer
5642 /// could publish a GRANT whose meta names one call but whose
5643 /// payload credits a different in-flight call_id.
5644 ///
5645 /// Regression: cubic-dev-ai bot P2 review comment on the
5646 /// `nrpc-streaming` branch.
5647 #[tokio::test]
5648 async fn client_fold_drops_request_grant_with_mismatched_call_ids() {
5649 let pending = Arc::new(RpcClientPending::new());
5650 let mut fold = RpcClientFold::new(pending.clone());
5651 let (_terminal_rx_victim, mut grant_rx_victim) =
5652 pending.register_client_streaming(0xC0DE, 0);
5653 let (_terminal_rx_other, mut grant_rx_other) = pending.register_client_streaming(0xBEEF, 0);
5654
5655 // Build a hand-rolled frame: meta names call 0xC0DE,
5656 // payload encodes credit for call 0xBEEF. Either the
5657 // producer is broken or this is a forged frame; the
5658 // consumer must drop, not deliver.
5659 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, 0xCAFE, 0xC0DE, 0);
5660 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
5661 buf.extend_from_slice(&meta.to_bytes());
5662 buf.extend_from_slice(&encode_request_grant(0xBEEF, 5));
5663 let ev = RedexEvent {
5664 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5665 payload: bytes::Bytes::from(buf),
5666 };
5667 fold.apply(&ev, &mut ()).unwrap();
5668
5669 let parked_victim =
5670 tokio::time::timeout(Duration::from_millis(30), grant_rx_victim.recv()).await;
5671 assert!(
5672 parked_victim.is_err(),
5673 "mismatched REQUEST_GRANT must not credit the call named in meta",
5674 );
5675 let parked_other =
5676 tokio::time::timeout(Duration::from_millis(30), grant_rx_other.recv()).await;
5677 assert!(
5678 parked_other.is_err(),
5679 "mismatched REQUEST_GRANT must not credit the call named in payload either",
5680 );
5681 }
5682
5683 // ====================================================================
5684 // RpcServerStreamingFold — coverage for the multi-fire emit path.
5685 //
5686 // The streaming fold is the most complex code in this file:
5687 // - Per-call cancellation token (same as unary)
5688 // - Pump task that drains an mpsc and awaits each emit to
5689 // enforce per-call ordering
5690 // - Optional flow-control semaphore (caller-set window +
5691 // STREAM_GRANT credit refills)
5692 // - Terminal-frame emission with CANCEL-wins override
5693 //
5694 // These tests pin each branch: ordered chunks + clean EOF;
5695 // application error after partial stream; panic surfacing as
5696 // Internal; CANCEL flipping the cancellation token AND being
5697 // surfaced as the terminal status; STREAM_GRANT permits;
5698 // duplicate-REQUEST refusal.
5699 // ====================================================================
5700
5701 /// Build an async `RpcAsyncResponseEmitter` that captures every
5702 /// emit into a shared Vec. Streaming fold tests use this to
5703 /// inspect the multi-frame emit pattern.
5704 fn capturing_async_emitter() -> (RpcAsyncResponseEmitter, CapturedResponses) {
5705 let captured: CapturedResponses = Arc::new(Mutex::new(Vec::new()));
5706 let captured_clone = captured.clone();
5707 let emit: RpcAsyncResponseEmitter = Arc::new(move |origin, call_id, resp| {
5708 let captured_clone = captured_clone.clone();
5709 Box::pin(async move {
5710 captured_clone.lock().push((origin, call_id, resp));
5711 })
5712 });
5713 (emit, captured)
5714 }
5715
5716 /// Synthesize a STREAM_GRANT event for a `(caller_origin, call_id)`
5717 /// asking for `n` additional credits.
5718 fn rpc_stream_grant_event(caller_origin: u64, call_id: u64, n: u32) -> RedexEvent {
5719 let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, caller_origin, call_id, 0);
5720 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
5721 buf.extend_from_slice(&meta.to_bytes());
5722 buf.extend_from_slice(&encode_stream_grant(n));
5723 RedexEvent {
5724 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
5725 payload: bytes::Bytes::from(buf),
5726 }
5727 }
5728
5729 /// Streaming handler that emits N chunks and returns Ok. The
5730 /// caller-side test asserts (a) all N chunks arrive in order
5731 /// with the `nrpc-streaming: continue` header, (b) a final
5732 /// terminal frame with `nrpc-streaming: end` follows.
5733 #[tokio::test]
5734 async fn streaming_fold_emits_chunks_in_order_and_clean_terminal() {
5735 struct CountingHandler {
5736 n: usize,
5737 }
5738 #[async_trait::async_trait]
5739 impl RpcStreamingHandler for CountingHandler {
5740 async fn call(
5741 &self,
5742 _ctx: RpcContext,
5743 sink: RpcResponseSink,
5744 ) -> Result<(), RpcHandlerError> {
5745 for i in 0..self.n {
5746 sink.send(format!("chunk-{i}").into_bytes());
5747 }
5748 Ok(())
5749 }
5750 }
5751 let (emit, captured) = capturing_async_emitter();
5752 let mut fold = RpcServerStreamingFold::new(Arc::new(CountingHandler { n: 5 }), emit);
5753 let req = RpcRequestPayload {
5754 service: "stream".to_string(),
5755 deadline_ns: 0,
5756 flags: FLAG_RPC_STREAMING_RESPONSE,
5757 headers: vec![],
5758 body: Bytes::new(),
5759 };
5760 fold.apply(&rpc_request_event(11, 22, req), &mut ())
5761 .unwrap();
5762 // 5 continue chunks + 1 terminal end frame.
5763 assert!(
5764 wait_until(|| captured.lock().len() == 6, Duration::from_secs(2)).await,
5765 "expected 6 frames (5 chunks + terminal end), got {}",
5766 captured.lock().len(),
5767 );
5768 let captured = captured.lock();
5769 for (i, (_, _, resp)) in captured.iter().take(5).enumerate() {
5770 assert_eq!(resp.status, RpcStatus::Ok);
5771 // continue header on every non-terminal chunk
5772 let hdr = resp
5773 .headers
5774 .iter()
5775 .find(|(n, _)| n == HEADER_NRPC_STREAMING)
5776 .expect("streaming header present");
5777 assert_eq!(hdr.1.as_slice(), HEADER_NRPC_STREAMING_CONTINUE);
5778 assert_eq!(resp.body, format!("chunk-{i}").into_bytes());
5779 }
5780 // Terminal frame
5781 let (_, _, term) = captured.last().unwrap();
5782 assert_eq!(term.status, RpcStatus::Ok);
5783 let hdr = term
5784 .headers
5785 .iter()
5786 .find(|(n, _)| n == HEADER_NRPC_STREAMING)
5787 .expect("terminal must have streaming header");
5788 assert_eq!(hdr.1.as_slice(), HEADER_NRPC_STREAMING_END);
5789 assert!(term.body.is_empty());
5790 }
5791
5792 /// Handler returns `Err(Internal)` after sending 2 chunks. Caller
5793 /// must see (a) both chunks with the continue header, (b) a
5794 /// terminal frame carrying `RpcStatus::Internal` (NOT the end
5795 /// marker — the terminal-error path drops the header).
5796 #[tokio::test]
5797 async fn streaming_fold_terminal_error_after_partial_stream() {
5798 struct PartialErrHandler;
5799 #[async_trait::async_trait]
5800 impl RpcStreamingHandler for PartialErrHandler {
5801 async fn call(
5802 &self,
5803 _ctx: RpcContext,
5804 sink: RpcResponseSink,
5805 ) -> Result<(), RpcHandlerError> {
5806 sink.send(b"first".to_vec());
5807 sink.send(b"second".to_vec());
5808 Err(RpcHandlerError::Internal("ran out of fuel".into()))
5809 }
5810 }
5811 let (emit, captured) = capturing_async_emitter();
5812 let mut fold = RpcServerStreamingFold::new(Arc::new(PartialErrHandler), emit);
5813 let req = RpcRequestPayload {
5814 service: "x".to_string(),
5815 deadline_ns: 0,
5816 flags: FLAG_RPC_STREAMING_RESPONSE,
5817 headers: vec![],
5818 body: Bytes::new(),
5819 };
5820 fold.apply(&rpc_request_event(1, 1, req), &mut ()).unwrap();
5821 assert!(
5822 wait_until(|| captured.lock().len() == 3, Duration::from_secs(2)).await,
5823 "expected 2 chunks + 1 terminal error",
5824 );
5825 let captured = captured.lock();
5826 assert_eq!(captured[0].2.body.as_ref(), b"first");
5827 assert_eq!(captured[1].2.body.as_ref(), b"second");
5828 let (_, _, term) = &captured[2];
5829 assert_eq!(term.status, RpcStatus::Internal);
5830 assert!(
5831 String::from_utf8_lossy(&term.body).contains("ran out of fuel"),
5832 "diagnostic must round-trip, got {:?}",
5833 String::from_utf8_lossy(&term.body),
5834 );
5835 }
5836
5837 /// Handler panics. The fold's `catch_unwind` surfaces it as a
5838 /// terminal `RpcStatus::Internal` rather than killing the
5839 /// runtime.
5840 #[tokio::test]
5841 async fn streaming_fold_handler_panic_surfaces_as_internal_terminal() {
5842 struct PanicHandler;
5843 #[async_trait::async_trait]
5844 impl RpcStreamingHandler for PanicHandler {
5845 async fn call(
5846 &self,
5847 _ctx: RpcContext,
5848 _sink: RpcResponseSink,
5849 ) -> Result<(), RpcHandlerError> {
5850 panic!("kaboom in streaming handler");
5851 }
5852 }
5853 let (emit, captured) = capturing_async_emitter();
5854 let mut fold = RpcServerStreamingFold::new(Arc::new(PanicHandler), emit);
5855 let req = RpcRequestPayload {
5856 service: "x".to_string(),
5857 deadline_ns: 0,
5858 flags: FLAG_RPC_STREAMING_RESPONSE,
5859 headers: vec![],
5860 body: Bytes::new(),
5861 };
5862 fold.apply(&rpc_request_event(1, 2, req), &mut ()).unwrap();
5863 assert!(
5864 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
5865 "panic must surface as a terminal frame",
5866 );
5867 let captured = captured.lock();
5868 assert_eq!(captured.len(), 1);
5869 let (_, _, resp) = &captured[0];
5870 assert_eq!(resp.status, RpcStatus::Internal);
5871 assert!(
5872 String::from_utf8_lossy(&resp.body).contains("kaboom"),
5873 "panic message must surface, got {:?}",
5874 String::from_utf8_lossy(&resp.body),
5875 );
5876 }
5877
5878 /// CANCEL during a streaming call overrides the terminal frame
5879 /// with `RpcStatus::Cancelled` — same CANCEL-wins ordering as
5880 /// the unary fold.
5881 #[tokio::test]
5882 async fn streaming_fold_cancel_overrides_terminal_with_cancelled() {
5883 struct CooperativeHandler;
5884 #[async_trait::async_trait]
5885 impl RpcStreamingHandler for CooperativeHandler {
5886 async fn call(
5887 &self,
5888 ctx: RpcContext,
5889 sink: RpcResponseSink,
5890 ) -> Result<(), RpcHandlerError> {
5891 sink.send(b"chunk-0".to_vec());
5892 tokio::select! {
5893 _ = ctx.cancellation.cancelled() => Ok(()),
5894 _ = tokio::time::sleep(Duration::from_secs(5)) => Ok(()),
5895 }
5896 }
5897 }
5898 let (emit, captured) = capturing_async_emitter();
5899 let mut fold = RpcServerStreamingFold::new(Arc::new(CooperativeHandler), emit);
5900 let req = RpcRequestPayload {
5901 service: "x".to_string(),
5902 deadline_ns: 0,
5903 flags: FLAG_RPC_STREAMING_RESPONSE,
5904 headers: vec![],
5905 body: Bytes::new(),
5906 };
5907 fold.apply(&rpc_request_event(7, 13, req), &mut ()).unwrap();
5908 // Wait until at least the first chunk is captured AND the
5909 // handler is parked (in_flight key present), then CANCEL.
5910 assert!(
5911 wait_until(
5912 || !captured.lock().is_empty() && fold.in_flight_keys().contains(&(7, 13)),
5913 Duration::from_secs(2)
5914 )
5915 .await
5916 );
5917 fold.apply(&rpc_cancel_event(7, 13), &mut ()).unwrap();
5918 // Wait for the terminal frame.
5919 assert!(
5920 wait_until(|| captured.lock().len() >= 2, Duration::from_secs(2)).await,
5921 "expected first chunk + terminal frame",
5922 );
5923 let captured = captured.lock();
5924 // First emit was the chunk; the LAST should be the
5925 // Cancelled terminal.
5926 assert_eq!(
5927 captured.last().unwrap().2.status,
5928 RpcStatus::Cancelled,
5929 "CANCEL must override terminal status",
5930 );
5931 }
5932
5933 /// Duplicate REQUEST with the same `(origin, call_id)` is
5934 /// refused with a synthetic Internal terminal frame and does
5935 /// NOT spawn a second handler. Mirror of the unary fold's
5936 /// regression at server_fold_duplicate_request_refuses_*.
5937 #[tokio::test]
5938 async fn streaming_fold_duplicate_request_refuses_without_double_dispatch() {
5939 let invocations = Arc::new(AtomicUsize::new(0));
5940 struct CountingHandler {
5941 invocations: Arc<AtomicUsize>,
5942 }
5943 #[async_trait::async_trait]
5944 impl RpcStreamingHandler for CountingHandler {
5945 async fn call(
5946 &self,
5947 _ctx: RpcContext,
5948 sink: RpcResponseSink,
5949 ) -> Result<(), RpcHandlerError> {
5950 self.invocations.fetch_add(1, Ordering::SeqCst);
5951 tokio::time::sleep(Duration::from_millis(80)).await;
5952 sink.send(b"chunk".to_vec());
5953 Ok(())
5954 }
5955 }
5956 let (emit, captured) = capturing_async_emitter();
5957 let mut fold = RpcServerStreamingFold::new(
5958 Arc::new(CountingHandler {
5959 invocations: invocations.clone(),
5960 }),
5961 emit,
5962 );
5963 let req = RpcRequestPayload {
5964 service: "x".to_string(),
5965 deadline_ns: 0,
5966 flags: FLAG_RPC_STREAMING_RESPONSE,
5967 headers: vec![],
5968 body: Bytes::new(),
5969 };
5970 fold.apply(&rpc_request_event(1, 99, req.clone()), &mut ())
5971 .unwrap();
5972 assert!(
5973 wait_until(
5974 || fold.in_flight_keys().contains(&(1, 99)),
5975 Duration::from_secs(1)
5976 )
5977 .await
5978 );
5979 // Duplicate REQUEST — must emit a synthetic Internal
5980 // terminal and not invoke the handler a second time.
5981 fold.apply(&rpc_request_event(1, 99, req), &mut ()).unwrap();
5982 assert!(
5983 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(1)).await,
5984 "synthetic refusal should be emitted",
5985 );
5986 // First emit (chronologically) is the synthetic refusal.
5987 let refusal = captured.lock()[0].clone();
5988 assert_eq!(refusal.2.status, RpcStatus::Internal);
5989 assert!(String::from_utf8_lossy(&refusal.2.body).contains("duplicate"));
5990 // Wait for the original handler to complete (chunk + terminal).
5991 assert!(
5992 wait_until(|| captured.lock().len() >= 3, Duration::from_secs(2)).await,
5993 "first handler should still complete normally",
5994 );
5995 assert_eq!(
5996 invocations.load(Ordering::SeqCst),
5997 1,
5998 "duplicate REQUEST must NOT spawn a second handler",
5999 );
6000 }
6001
6002 /// STREAM_GRANT for an unknown call_id is silently dropped
6003 /// (no panic, no tracing event escalation). Pin the
6004 /// always-safe behavior so a misbehaving caller (or a CANCEL/
6005 /// GRANT race) can't crash the fold.
6006 #[tokio::test]
6007 async fn streaming_fold_grant_for_unknown_call_id_is_no_op() {
6008 struct NoopHandler;
6009 #[async_trait::async_trait]
6010 impl RpcStreamingHandler for NoopHandler {
6011 async fn call(
6012 &self,
6013 _ctx: RpcContext,
6014 _sink: RpcResponseSink,
6015 ) -> Result<(), RpcHandlerError> {
6016 Ok(())
6017 }
6018 }
6019 let (emit, captured) = capturing_async_emitter();
6020 let mut fold = RpcServerStreamingFold::new(Arc::new(NoopHandler), emit);
6021 let result = fold.apply(&rpc_stream_grant_event(99, 42, 5), &mut ());
6022 assert!(result.is_ok(), "GRANT for unknown call_id must be Ok");
6023 assert!(captured.lock().is_empty(), "no emit for unknown GRANT");
6024 }
6025
6026 /// Regression for M20: the streaming pump's mpsc is bounded
6027 /// at `STREAMING_PUMP_CAPACITY`. A handler that produces
6028 /// chunks faster than the pump drains gets its excess
6029 /// `sink.send(...)` calls silently dropped (matching the
6030 /// "caller cancelled" semantic) — and the metric counter
6031 /// `streaming_chunks_dropped_total` increments.
6032 ///
6033 /// We construct the sink directly with a tiny bounded mpsc
6034 /// (capacity 2) and a metrics handle, then call `send` 5
6035 /// times without a receiver. The first 2 fit in the channel;
6036 /// the next 3 are dropped and counted.
6037 #[tokio::test]
6038 async fn streaming_sink_drops_on_full_and_increments_metric() {
6039 use crate::adapter::net::mesh_rpc_metrics::{RpcMetricsRegistry, ServiceMetricsAtomic};
6040 // Tiny channel to make overflow easy to observe.
6041 let (tx, _rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(2);
6042 let registry = RpcMetricsRegistry::new();
6043 let metrics: Arc<ServiceMetricsAtomic> = registry.for_service("drop_test");
6044 let sink = RpcResponseSink {
6045 inner: tx,
6046 metrics: Some(metrics.clone()),
6047 };
6048 // 5 sends; first 2 buffer, next 3 drop.
6049 for i in 0..5u8 {
6050 sink.send(vec![i]);
6051 }
6052 assert_eq!(
6053 metrics
6054 .streaming_chunks_dropped_total
6055 .load(Ordering::Relaxed),
6056 3,
6057 "expected 3 dropped chunks (capacity=2, sent 5)",
6058 );
6059 }
6060
6061 /// Malformed REQUEST payload on the streaming fold: emits one
6062 /// terminal `UnknownVersion` frame and continues — same
6063 /// keep-the-adapter-alive contract as the unary fold.
6064 #[tokio::test]
6065 async fn streaming_fold_malformed_payload_emits_unknown_version_terminal() {
6066 struct NoopHandler;
6067 #[async_trait::async_trait]
6068 impl RpcStreamingHandler for NoopHandler {
6069 async fn call(
6070 &self,
6071 _ctx: RpcContext,
6072 _sink: RpcResponseSink,
6073 ) -> Result<(), RpcHandlerError> {
6074 Ok(())
6075 }
6076 }
6077 let (emit, captured) = capturing_async_emitter();
6078 let mut fold = RpcServerStreamingFold::new(Arc::new(NoopHandler), emit);
6079 // Garbage tail: valid meta + 0x00 svc_len → Truncated.
6080 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, 1, 1, 0);
6081 let mut buf = Vec::new();
6082 buf.extend_from_slice(&meta.to_bytes());
6083 buf.push(0x00);
6084 let ev = RedexEvent {
6085 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
6086 payload: bytes::Bytes::from(buf),
6087 };
6088 let result = fold.apply(&ev, &mut ());
6089 assert!(
6090 result.is_ok(),
6091 "malformed payload must NOT kill the adapter",
6092 );
6093 assert!(
6094 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6095 "synthetic UnknownVersion terminal must arrive",
6096 );
6097 let captured = captured.lock();
6098 assert_eq!(captured[0].2.status, RpcStatus::UnknownVersion);
6099 let hdr = captured[0]
6100 .2
6101 .headers
6102 .iter()
6103 .find(|(n, _)| n == HEADER_NRPC_STREAMING);
6104 assert!(hdr.is_some(), "malformed terminal must include end marker");
6105 }
6106
6107 // ====================================================================
6108 // Phase B — RpcStreamingRequestFold (server-side client-streaming)
6109 // ====================================================================
6110
6111 /// Build a REQUEST_CHUNK event for tests. Mirrors
6112 /// `rpc_request_event` / `rpc_stream_grant_event` shape.
6113 fn rpc_request_chunk_event(
6114 caller_origin: u64,
6115 call_id: u64,
6116 flags: u16,
6117 body: Vec<u8>,
6118 ) -> RedexEvent {
6119 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, caller_origin, call_id, 0);
6120 let payload = RpcRequestChunkPayload {
6121 call_id,
6122 flags,
6123 headers: vec![],
6124 body: body.into(),
6125 };
6126 let mut buf = Vec::new();
6127 buf.extend_from_slice(&meta.to_bytes());
6128 buf.extend_from_slice(&payload.encode());
6129 RedexEvent {
6130 entry: RedexEntry::new_heap(0, 0, buf.len() as u32, 0, 0),
6131 payload: bytes::Bytes::from(buf),
6132 }
6133 }
6134
6135 /// Collecting client-streaming handler: drains the stream into
6136 /// a Vec, returns an Ok response whose body is the count of
6137 /// chunks seen (8-byte LE). Captured chunk bodies are exposed
6138 /// via the `Arc<Mutex<Vec<Bytes>>>` so tests can assert
6139 /// ordering and content.
6140 struct CollectingClientStreamHandler {
6141 seen: Arc<Mutex<Vec<bytes::Bytes>>>,
6142 observed_cancel: Arc<AtomicBool>,
6143 }
6144 #[async_trait::async_trait]
6145 impl RpcClientStreamingHandler for CollectingClientStreamHandler {
6146 async fn call(
6147 &self,
6148 ctx: RpcStreamingContext,
6149 mut requests: RequestStream,
6150 ) -> Result<RpcResponsePayload, RpcHandlerError> {
6151 use futures::StreamExt;
6152 while let Some(chunk) = requests.next().await {
6153 self.seen.lock().push(chunk);
6154 }
6155 // Re-check cancellation after EOF so the test can
6156 // distinguish "clean REQUEST_END" from "CANCEL closed
6157 // the stream early".
6158 if ctx.cancellation.is_cancelled() {
6159 self.observed_cancel
6160 .store(true, std::sync::atomic::Ordering::SeqCst);
6161 }
6162 let count = self.seen.lock().len() as u64;
6163 Ok(RpcResponsePayload {
6164 status: RpcStatus::Ok,
6165 headers: vec![],
6166 body: Bytes::copy_from_slice(&count.to_le_bytes()),
6167 })
6168 }
6169 }
6170
6171 /// 1/6 — happy path: REQUEST + 3 REQUEST_CHUNKs (last has
6172 /// FLAG_END) delivers 4 bodies to the handler in order; the
6173 /// fold emits exactly one terminal RESPONSE carrying the
6174 /// handler's reply.
6175 #[tokio::test]
6176 async fn streaming_request_fold_collects_all_chunks_and_emits_terminal_response() {
6177 let seen = Arc::new(Mutex::new(Vec::new()));
6178 let observed_cancel = Arc::new(AtomicBool::new(false));
6179 let (emit, captured) = capturing_emitter();
6180 let mut fold = RpcStreamingRequestFold::new(
6181 Arc::new(CollectingClientStreamHandler {
6182 seen: seen.clone(),
6183 observed_cancel: observed_cancel.clone(),
6184 }),
6185 emit,
6186 );
6187 // REQUEST with the client-streaming flag, body = "a".
6188 let req = RpcRequestPayload {
6189 service: "agg".to_string(),
6190 deadline_ns: 0,
6191 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6192 headers: vec![],
6193 body: Bytes::from_static(b"a"),
6194 };
6195 fold.apply(&rpc_request_event(0xCAFE, 7, req), &mut ())
6196 .unwrap();
6197 // Wait until the sender is registered (handler task has
6198 // picked up the request and the apply path completed).
6199 assert!(
6200 wait_until(
6201 || fold.sender_keys().contains(&(0xCAFE, 7)),
6202 Duration::from_secs(1)
6203 )
6204 .await
6205 );
6206 // Three more chunks; last sets FLAG_REQUEST_END.
6207 fold.apply(
6208 &rpc_request_chunk_event(0xCAFE, 7, 0, b"b".to_vec()),
6209 &mut (),
6210 )
6211 .unwrap();
6212 fold.apply(
6213 &rpc_request_chunk_event(0xCAFE, 7, 0, b"c".to_vec()),
6214 &mut (),
6215 )
6216 .unwrap();
6217 fold.apply(
6218 &rpc_request_chunk_event(0xCAFE, 7, FLAG_RPC_REQUEST_END, b"d".to_vec()),
6219 &mut (),
6220 )
6221 .unwrap();
6222 // Handler should observe 4 bodies in order and emit one
6223 // terminal RESPONSE whose body encodes the count.
6224 assert!(
6225 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6226 "expected terminal RESPONSE"
6227 );
6228 let captured = captured.lock();
6229 assert_eq!(captured.len(), 1, "exactly one terminal RESPONSE");
6230 let (origin, call_id, resp) = &captured[0];
6231 assert_eq!(*origin, 0xCAFE);
6232 assert_eq!(*call_id, 7);
6233 assert_eq!(resp.status, RpcStatus::Ok);
6234 assert_eq!(resp.body.as_ref(), 4u64.to_le_bytes());
6235 // And the chunks landed in order.
6236 let seen = seen.lock();
6237 let collected: Vec<&[u8]> = seen.iter().map(|b| b.as_ref()).collect();
6238 assert_eq!(collected, vec![b"a", b"b", b"c", b"d"]);
6239 assert!(
6240 !observed_cancel.load(std::sync::atomic::Ordering::SeqCst),
6241 "clean REQUEST_END must NOT register as a cancellation"
6242 );
6243 }
6244
6245 /// 2/6 — degenerate case: initial REQUEST with both the
6246 /// client-streaming AND request-end flags set. Handler sees
6247 /// exactly one body (the REQUEST's own body) and EOF — the
6248 /// "one-item upload" fast path that saves a trailing CHUNK
6249 /// event.
6250 #[tokio::test]
6251 async fn streaming_request_fold_initial_request_with_end_flag_yields_single_item() {
6252 let seen = Arc::new(Mutex::new(Vec::new()));
6253 let observed_cancel = Arc::new(AtomicBool::new(false));
6254 let (emit, captured) = capturing_emitter();
6255 let mut fold = RpcStreamingRequestFold::new(
6256 Arc::new(CollectingClientStreamHandler {
6257 seen: seen.clone(),
6258 observed_cancel,
6259 }),
6260 emit,
6261 );
6262 let req = RpcRequestPayload {
6263 service: "agg".to_string(),
6264 deadline_ns: 0,
6265 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6266 headers: vec![],
6267 body: Bytes::from_static(b"only"),
6268 };
6269 fold.apply(&rpc_request_event(1, 42, req), &mut ()).unwrap();
6270 assert!(
6271 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6272 "expected terminal RESPONSE"
6273 );
6274 let captured = captured.lock();
6275 assert_eq!(captured.len(), 1);
6276 assert_eq!(captured[0].2.status, RpcStatus::Ok);
6277 assert_eq!(captured[0].2.body.as_ref(), 1u64.to_le_bytes());
6278 assert_eq!(
6279 seen.lock()
6280 .iter()
6281 .map(|b| b.as_ref())
6282 .collect::<Vec<&[u8]>>(),
6283 vec![b"only" as &[u8]]
6284 );
6285 // Sender must NOT have been registered (initial-REQUEST-
6286 // with-END skips the map insert).
6287 assert!(fold.sender_keys().is_empty());
6288 }
6289
6290 /// 3/6 — CANCEL closes the request stream early, flips the
6291 /// cancellation token, and the spawned task overrides the
6292 /// handler's terminal with `RpcStatus::Cancelled`.
6293 #[tokio::test]
6294 async fn streaming_request_fold_cancel_closes_stream_and_overrides_terminal() {
6295 let seen = Arc::new(Mutex::new(Vec::new()));
6296 let observed_cancel = Arc::new(AtomicBool::new(false));
6297 let (emit, captured) = capturing_emitter();
6298 let mut fold = RpcStreamingRequestFold::new(
6299 Arc::new(CollectingClientStreamHandler {
6300 seen: seen.clone(),
6301 observed_cancel: observed_cancel.clone(),
6302 }),
6303 emit,
6304 );
6305 let req = RpcRequestPayload {
6306 service: "agg".to_string(),
6307 deadline_ns: 0,
6308 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6309 headers: vec![],
6310 body: Bytes::from_static(b"first"),
6311 };
6312 fold.apply(&rpc_request_event(2, 17, req), &mut ()).unwrap();
6313 // Wait for the handler to register, then one in-flight
6314 // CHUNK, then CANCEL before the handler ever finishes
6315 // draining.
6316 assert!(
6317 wait_until(
6318 || fold.sender_keys().contains(&(2, 17)),
6319 Duration::from_secs(1)
6320 )
6321 .await
6322 );
6323 fold.apply(
6324 &rpc_request_chunk_event(2, 17, 0, b"second".to_vec()),
6325 &mut (),
6326 )
6327 .unwrap();
6328 fold.apply(&rpc_cancel_event(2, 17), &mut ()).unwrap();
6329 // Terminal must arrive and must be Cancelled (CANCEL-wins
6330 // ordering, same as the response-side fold).
6331 assert!(
6332 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6333 "expected terminal RESPONSE"
6334 );
6335 let captured = captured.lock();
6336 assert_eq!(captured.len(), 1);
6337 assert_eq!(
6338 captured[0].2.status,
6339 RpcStatus::Cancelled,
6340 "CANCEL must override terminal status"
6341 );
6342 assert!(
6343 observed_cancel.load(std::sync::atomic::Ordering::SeqCst),
6344 "handler must observe cancellation token after stream EOF"
6345 );
6346 // Both maps must be clean post-cancel.
6347 assert!(fold.in_flight_keys().is_empty());
6348 assert!(fold.sender_keys().is_empty());
6349 }
6350
6351 /// 4/6 — handler returns `Err(RpcHandlerError::Application)`
6352 /// → terminal RESPONSE carries the application status code +
6353 /// message body.
6354 #[tokio::test]
6355 async fn streaming_request_fold_application_error_round_trips() {
6356 struct AppErrHandler;
6357 #[async_trait::async_trait]
6358 impl RpcClientStreamingHandler for AppErrHandler {
6359 async fn call(
6360 &self,
6361 _ctx: RpcStreamingContext,
6362 mut requests: RequestStream,
6363 ) -> Result<RpcResponsePayload, RpcHandlerError> {
6364 use futures::StreamExt;
6365 // Drain so the stream's EOF doesn't race the
6366 // error return.
6367 while requests.next().await.is_some() {}
6368 Err(RpcHandlerError::Application {
6369 code: 0xBEEF,
6370 message: "bad input".to_string(),
6371 })
6372 }
6373 }
6374 let (emit, captured) = capturing_emitter();
6375 let mut fold = RpcStreamingRequestFold::new(Arc::new(AppErrHandler), emit);
6376 let req = RpcRequestPayload {
6377 service: "agg".to_string(),
6378 deadline_ns: 0,
6379 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6380 headers: vec![],
6381 body: Bytes::new(),
6382 };
6383 fold.apply(&rpc_request_event(3, 100, req), &mut ())
6384 .unwrap();
6385 assert!(
6386 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6387 "expected terminal RESPONSE"
6388 );
6389 let captured = captured.lock();
6390 assert_eq!(captured.len(), 1);
6391 assert_eq!(captured[0].2.status, RpcStatus::Application(0xBEEF));
6392 assert_eq!(captured[0].2.body.as_ref(), b"bad input");
6393 }
6394
6395 /// 5/6 — handler panic is caught by `catch_unwind`; terminal
6396 /// surfaces as `Internal` carrying the panic message. Same
6397 /// contract as the existing folds — a misbehaving handler
6398 /// can't take down the cortex adapter.
6399 #[tokio::test]
6400 async fn streaming_request_fold_handler_panic_surfaces_as_internal() {
6401 struct PanickyHandler;
6402 #[async_trait::async_trait]
6403 impl RpcClientStreamingHandler for PanickyHandler {
6404 async fn call(
6405 &self,
6406 _ctx: RpcStreamingContext,
6407 _requests: RequestStream,
6408 ) -> Result<RpcResponsePayload, RpcHandlerError> {
6409 panic!("intentional test panic");
6410 }
6411 }
6412 let (emit, captured) = capturing_emitter();
6413 let mut fold = RpcStreamingRequestFold::new(Arc::new(PanickyHandler), emit);
6414 let req = RpcRequestPayload {
6415 service: "agg".to_string(),
6416 deadline_ns: 0,
6417 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_REQUEST_END,
6418 headers: vec![],
6419 body: Bytes::new(),
6420 };
6421 fold.apply(&rpc_request_event(4, 200, req), &mut ())
6422 .unwrap();
6423 assert!(
6424 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(2)).await,
6425 "expected terminal RESPONSE"
6426 );
6427 let captured = captured.lock();
6428 assert_eq!(captured.len(), 1);
6429 assert_eq!(captured[0].2.status, RpcStatus::Internal);
6430 assert!(
6431 String::from_utf8_lossy(&captured[0].2.body).contains("intentional test panic"),
6432 "panic body should carry the panic message"
6433 );
6434 }
6435
6436 /// 6/6 — duplicate REQUEST with the same `(origin, call_id)`
6437 /// is refused with a synthetic `Internal` terminal frame and
6438 /// does NOT spawn a second handler. Mirror of the regression
6439 /// pinned in the unary + response-streaming folds.
6440 #[tokio::test]
6441 async fn streaming_request_fold_duplicate_request_refuses_without_double_dispatch() {
6442 let invocations = Arc::new(AtomicUsize::new(0));
6443 struct CountingHandler {
6444 invocations: Arc<AtomicUsize>,
6445 }
6446 #[async_trait::async_trait]
6447 impl RpcClientStreamingHandler for CountingHandler {
6448 async fn call(
6449 &self,
6450 _ctx: RpcStreamingContext,
6451 mut requests: RequestStream,
6452 ) -> Result<RpcResponsePayload, RpcHandlerError> {
6453 use futures::StreamExt;
6454 self.invocations.fetch_add(1, Ordering::SeqCst);
6455 // Slow handler to keep the call in-flight while
6456 // the duplicate REQUEST arrives.
6457 tokio::time::sleep(Duration::from_millis(80)).await;
6458 while requests.next().await.is_some() {}
6459 Ok(RpcResponsePayload {
6460 status: RpcStatus::Ok,
6461 headers: vec![],
6462 body: Bytes::new(),
6463 })
6464 }
6465 }
6466 let (emit, captured) = capturing_emitter();
6467 let mut fold = RpcStreamingRequestFold::new(
6468 Arc::new(CountingHandler {
6469 invocations: invocations.clone(),
6470 }),
6471 emit,
6472 );
6473 let req = RpcRequestPayload {
6474 service: "agg".to_string(),
6475 deadline_ns: 0,
6476 flags: FLAG_RPC_CLIENT_STREAMING_REQUEST,
6477 headers: vec![],
6478 body: Bytes::new(),
6479 };
6480 fold.apply(&rpc_request_event(5, 99, req.clone()), &mut ())
6481 .unwrap();
6482 assert!(
6483 wait_until(
6484 || fold.in_flight_keys().contains(&(5, 99)),
6485 Duration::from_secs(1)
6486 )
6487 .await
6488 );
6489 // Duplicate REQUEST: synthetic Internal terminal emitted,
6490 // handler invocation count must stay at 1.
6491 fold.apply(&rpc_request_event(5, 99, req), &mut ()).unwrap();
6492 assert!(
6493 wait_until(|| !captured.lock().is_empty(), Duration::from_secs(1)).await,
6494 "synthetic refusal terminal expected"
6495 );
6496 let refusal = captured.lock()[0].clone();
6497 assert_eq!(refusal.2.status, RpcStatus::Internal);
6498 assert!(String::from_utf8_lossy(&refusal.2.body).contains("duplicate"));
6499 // Finish the first handler so its terminal lands too.
6500 fold.apply(
6501 &rpc_request_chunk_event(5, 99, FLAG_RPC_REQUEST_END, vec![]),
6502 &mut (),
6503 )
6504 .unwrap();
6505 assert!(
6506 wait_until(|| captured.lock().len() >= 2, Duration::from_secs(2)).await,
6507 "first handler should still complete normally"
6508 );
6509 assert_eq!(
6510 invocations.load(Ordering::SeqCst),
6511 1,
6512 "duplicate REQUEST must NOT spawn a second handler",
6513 );
6514 }
6515}