net/adapter/net/mesh_rpc.rs
1//! `Mesh::serve_rpc` / `Mesh::call` glue — the wire-up between
2//! `MeshNode`'s pub/sub + per-channel-hash dispatch hook and the
3//! `cortex::rpc` server / client folds.
4//!
5//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
6//! In short:
7//!
8//! - `serve_rpc(service, handler)` registers an inbound dispatcher
9//! for `<service>.requests`'s channel hash. The dispatcher pushes
10//! inbound REQUEST/CANCEL events through the
11//! [`crate::adapter::net::cortex::RpcServerFold`], which spawns
12//! the user handler. The fold's emit closure publishes RESPONSE
13//! events on `<service>.replies.<caller_origin>` via
14//! [`MeshNode::publish`].
15//!
16//! - `call(target, service, payload, opts)` allocates a `call_id`,
17//! registers a oneshot in the per-Mesh `RpcClientPending`,
18//! subscribes to its own reply channel from `target` (lazy,
19//! cached), publishes the REQUEST envelope on `<service>.requests`,
20//! awaits the oneshot. Drop sends a CANCEL.
21//!
22//! Phase 1 surface — direct entity-to-entity addressing
23//! (`call(target_node_id, ...)`), no service discovery layer yet.
24//! Phase 2 will add `call_service(name, ...)` over the existing
25//! capability-announcement registry.
26
27use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31use bytes::Bytes;
32use parking_lot::Mutex;
33use tokio::sync::mpsc;
34use tokio::task::JoinHandle;
35
36use super::channel::{ChannelHash, ChannelId, ChannelName, ChannelPublisher, PublishConfig};
37use super::cortex::{
38 build_trace_headers, encode_request_grant, encode_stream_grant, EventMeta,
39 RpcAsyncResponseEmitter, RpcCancellationToken, RpcClientFold, RpcClientStreamingHandler,
40 RpcContext, RpcDuplexFold, RpcDuplexHandler, RpcHandler, RpcHandlerError, RpcInboundDispatcher,
41 RpcInboundEvent, RpcRequestChunkPayload, RpcRequestGrantEmitter, RpcRequestPayload,
42 RpcResponseEmitter, RpcResponsePayload, RpcServerFold, RpcServerStreamingFold, RpcStatus,
43 RpcStreamingHandler, RpcStreamingRequestFold, StreamItem, TraceContext, DISPATCH_RPC_CANCEL,
44 DISPATCH_RPC_REQUEST, DISPATCH_RPC_REQUEST_CHUNK, DISPATCH_RPC_REQUEST_GRANT,
45 DISPATCH_RPC_STREAM_GRANT, EVENT_META_SIZE, FLAG_RPC_CLIENT_STREAMING_REQUEST,
46 FLAG_RPC_PROPAGATE_TRACE, FLAG_RPC_REQUEST_END, FLAG_RPC_STREAMING_RESPONSE,
47 HEADER_NRPC_REQUEST_WINDOW_INITIAL, HEADER_NRPC_STREAM_WINDOW_INITIAL,
48};
49use super::mesh_rpc_metrics::{CallMetricsGuard, CallOutcome};
50use crate::error::AdapterError;
51
52use super::mesh::MeshNode;
53use super::redex::{RedexEntry, RedexEvent, RedexFold};
54
55// ============================================================================
56// Public types.
57// ============================================================================
58
59/// How `Mesh::call_service` picks a target from the set of nodes
60/// advertising the requested service.
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
62pub enum RoutingPolicy {
63 /// Naive round-robin via the per-Mesh `call_id` counter.
64 /// Distributes calls evenly across candidates regardless of
65 /// load. The default.
66 #[default]
67 RoundRobin,
68 /// Pick a candidate at random per call. Stateless, cheap, and
69 /// gives even distribution under independent calls.
70 Random,
71 /// Consistent-hash to a target by `key`. Same `key` always
72 /// hits the same target as long as the candidate set is
73 /// stable. Useful for session affinity (route a given
74 /// conversation / shard / user to the same backend).
75 Sticky {
76 /// Caller-supplied identifier — hash maps this to the
77 /// target. Use a session id, shard key, or conversation
78 /// id depending on the application.
79 key: u64,
80 },
81 /// Pick the candidate with the smallest measured `latency_us`
82 /// per the local `ProximityGraph`. Candidates the proximity
83 /// graph hasn't observed yet (no entity ↔ node_id mapping or
84 /// no pingwave received) sort to the bottom — better to pick
85 /// a known-fast node than gamble on an unknown one.
86 ///
87 /// Falls back deterministically to the first sorted candidate
88 /// when no candidates have proximity data, so a freshly-
89 /// discovered service still routes consistently.
90 LowestLatency,
91}
92
93/// Options for [`MeshNode::call`] and [`MeshNode::call_service`].
94#[derive(Debug, Clone)]
95pub struct CallOptions {
96 /// Hard deadline for the call. The future returned by `call`
97 /// races a `tokio::time::sleep_until`; whichever fires first
98 /// wins. On timeout the caller emits a CANCEL event for
99 /// `call_id` so the server can drop the in-flight handler.
100 /// `None` means no deadline; the caller waits indefinitely
101 /// (or until the future is dropped).
102 pub deadline: Option<Instant>,
103 /// How `call_service` picks a target. Ignored by `call`
104 /// (which takes an explicit `target_node_id`). Default:
105 /// `RoundRobin`.
106 pub routing_policy: RoutingPolicy,
107 /// Skip candidates whose `ProximityGraph` entry reports
108 /// `!is_available()` (i.e. `Unhealthy` or `Unknown`).
109 /// Default `true`. Candidates with no proximity entry at all
110 /// are KEPT — absence of evidence is not evidence of
111 /// unhealth, and a freshly-announced service shouldn't be
112 /// filtered just because pingwaves haven't propagated yet.
113 pub filter_unhealthy: bool,
114 /// W3C Trace Context to propagate to the server. When `Some`,
115 /// the call sets `FLAG_RPC_PROPAGATE_TRACE` on the request and
116 /// emits `traceparent` / `tracestate` headers; the server's
117 /// `RpcContext::trace_context` will be populated with the same
118 /// values. nRPC is transport-only — application code on both
119 /// sides reads / writes this via whatever tracing backend it
120 /// has wired up (tracing-opentelemetry, Datadog, etc.).
121 pub trace_context: Option<TraceContext>,
122 /// Per-call concurrency cap. Future Phase 2 work; v1 ignores
123 /// this and the per-Mesh `RpcClientPending` doesn't bound
124 /// in-flight count.
125 pub max_in_flight_per_target: u32,
126 /// **Streaming responses only.** Initial credit window for
127 /// per-streaming-response flow control. When `Some(n)`, the
128 /// caller emits `nrpc-stream-window-initial: n` on the
129 /// REQUEST and the server's pump task awaits one credit per
130 /// emitted chunk. The returned [`RpcStream`] auto-grants 1
131 /// credit per consumed chunk so the in-flight credit holds
132 /// near `n` (or use [`RpcStream::grant`] for batched / custom
133 /// cadence). `None` (the default) → unbounded: server pumps
134 /// chunks as fast as the publish path can take them
135 /// (back-compat / pre-flow-control behavior). Ignored by
136 /// non-streaming `call` / `call_service`.
137 pub stream_window_initial: Option<u32>,
138 /// **Client-streaming / duplex only.** Initial credit window
139 /// for per-call request-direction flow control. Mirror of
140 /// [`Self::stream_window_initial`] for the upload direction. When
141 /// `Some(n)`, the caller emits `nrpc-request-window-initial: n`
142 /// on the REQUEST and its `send().await` sink awaits one
143 /// credit per pushed chunk; the server refills via
144 /// [`DISPATCH_RPC_REQUEST_GRANT`] events. `None` → unbounded:
145 /// caller's send sink doesn't block (legacy / fast-path).
146 /// Ignored by unary `call` / `call_streaming`.
147 ///
148 /// Bidi streaming plan (Phase C).
149 pub request_window_initial: Option<u32>,
150 /// Caller-supplied request headers. Appended to the wire
151 /// `RpcRequestPayload::headers` after any auto-generated
152 /// headers (trace context, stream-window). Useful for
153 /// application-level metadata the server needs at
154 /// dispatch-time — e.g., the `net-where` predicate
155 /// header (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`) that
156 /// services consult for predicate-pushdown filtering.
157 ///
158 /// Each entry is `(name, value_bytes)`. Names use the lowercase
159 /// `cyberdeck-*` / `nrpc-*` convention; the substrate doesn't
160 /// validate names beyond the `MAX_RPC_HEADER_NAME_LEN` cap
161 /// enforced at encode time.
162 ///
163 /// Default: empty.
164 pub request_headers: Vec<(String, Vec<u8>)>,
165 /// Caller-side cancel token. Mint via
166 /// [`MeshNode::reserve_cancel_token`]; pair with
167 /// [`MeshNode::cancel`] from any thread to abort the in-flight
168 /// call. `None` (or `Some(0)` — the "no token" sentinel) → no
169 /// cancel slot is reserved and the call has no external abort
170 /// path beyond Drop-on-future-cancellation.
171 ///
172 /// Honored uniformly by every call shape: `call`, `call_service`,
173 /// `call_streaming`, `call_client_stream`, `call_duplex`. The
174 /// substrate registers the token in a per-mesh cancel registry
175 /// at call construction and removes it on resolution (success,
176 /// error, or Drop). A cancel that fires mid-flight surfaces to
177 /// the caller as [`RpcError::Cancelled`] and emits CANCEL on
178 /// the wire via the existing per-call-shape guards (UnaryCallGuard,
179 /// ClientStreamCallRaw::Drop, DuplexCallRaw::Drop).
180 ///
181 /// Cancel-before-register is race-safe: a cancel that arrives
182 /// in the gap between `reserve_cancel_token` and the call's
183 /// internal register step latches a pre-cancel flag on the
184 /// registry's orphan entry; the subsequent register observes
185 /// it and the call short-circuits to [`RpcError::Cancelled`]
186 /// without ever publishing the REQUEST.
187 pub cancel_token: Option<u64>,
188}
189
190impl Default for CallOptions {
191 fn default() -> Self {
192 Self {
193 deadline: None,
194 routing_policy: RoutingPolicy::default(),
195 filter_unhealthy: true,
196 trace_context: None,
197 max_in_flight_per_target: 64,
198 stream_window_initial: None,
199 request_window_initial: None,
200 request_headers: Vec::new(),
201 cancel_token: None,
202 }
203 }
204}
205
206/// What [`MeshNode::call`] returns on success.
207#[derive(Debug, Clone)]
208pub struct RpcReply {
209 /// Response payload from the server's handler. Caller decodes
210 /// according to its application protocol.
211 pub body: Bytes,
212 /// Headers attached by the server's response.
213 pub headers: Vec<(String, Vec<u8>)>,
214 /// Wall-clock latency from `call(...)` to RESPONSE arrival.
215 pub latency_ns: u64,
216}
217
218/// What [`MeshNode::call`] returns on failure.
219#[derive(Debug, thiserror::Error)]
220pub enum RpcError {
221 /// No subscription / no route to the target. Either
222 /// `target_node_id` is unknown to the local mesh, or the
223 /// caller's reply-channel subscription couldn't be set up.
224 #[error("no route to target {target:#x}: {reason}")]
225 NoRoute {
226 /// Target node id the call was directed at.
227 target: u64,
228 /// Diagnostic — typically the underlying transport error.
229 reason: String,
230 },
231 /// Caller's deadline elapsed before a RESPONSE arrived. The
232 /// caller emits a CANCEL on timeout so the server can drop
233 /// the in-flight handler; this variant is returned to the
234 /// awaiting caller.
235 #[error("timeout after {elapsed_ms}ms")]
236 Timeout {
237 /// Wall-clock milliseconds elapsed before timeout fired.
238 elapsed_ms: u64,
239 },
240 /// Server returned a non-`Ok` status. Body carries the
241 /// server's diagnostic (UTF-8) when available.
242 #[error("server returned status {status:#06x}: {message}")]
243 ServerError {
244 /// Wire-level `RpcStatus` value the server returned.
245 status: u16,
246 /// UTF-8 diagnostic from the response body, when the body
247 /// decodes as valid UTF-8; otherwise hex-truncated.
248 message: String,
249 },
250 /// Underlying transport error (publish failure, encryption,
251 /// etc.).
252 #[error("transport: {0}")]
253 Transport(#[from] AdapterError),
254 /// Client-local serialization or deserialization failure.
255 /// `direction = Encode` means the typed wrapper failed to
256 /// encode the request before it ever hit the wire;
257 /// `direction = Decode` means the response landed but the
258 /// typed wrapper failed to decode it. Either way this is a
259 /// caller-fixable bug (wrong codec, schema drift, malformed
260 /// `Serialize` impl) — NOT a transient infra failure — so
261 /// retry / circuit-breaker predicates skip it by default.
262 #[error("codec ({direction:?}): {message}")]
263 Codec {
264 /// Which side of the call the codec failure happened on.
265 direction: CodecDirection,
266 /// Decode/encode diagnostic from the underlying serde impl.
267 message: String,
268 },
269 /// v0.4 capability-auth gate denied the call. Either the
270 /// target's latest `CapabilityAnnouncement` does not list
271 /// the requested `nrpc:<service>` tag, or it lists the tag
272 /// with allow-lists the caller does not match. See
273 /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3 for the model.
274 ///
275 /// Raised by the caller-side gate inside
276 /// [`MeshNode::call_service`] BEFORE the request hits the
277 /// wire, and surfaced by the caller on receipt of a
278 /// `RpcStatus::CapabilityDenied` response (the callee-side
279 /// defense-in-depth path).
280 #[error("capability denied: target {target:#x} does not authorize nrpc:{capability}")]
281 CapabilityDenied {
282 /// Target node id the gate denied.
283 target: u64,
284 /// Service / capability tag (without the `nrpc:` prefix)
285 /// the gate denied.
286 capability: String,
287 },
288 /// Caller-side cancellation fired via
289 /// [`MeshNode::cancel`] with the call's `cancel_token`.
290 /// Triggers a Drop-on-cancel CANCEL frame on the wire so the
291 /// server's in-flight handler observes the cancel; the
292 /// awaiting caller returns this variant. NOT retried by the
293 /// default retry policy — cancellation is caller-driven and
294 /// re-issuing the call defeats the point.
295 #[error("call cancelled by caller")]
296 Cancelled,
297}
298
299/// Which side of the call surfaced a [`RpcError::Codec`] failure.
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub enum CodecDirection {
302 /// Encoding the outbound request failed before the call was issued.
303 Encode,
304 /// Decoding the inbound response failed after the call returned Ok.
305 Decode,
306}
307
308/// RAII handle returned by [`MeshNode::serve_rpc`]. Dropping it
309/// unregisters the inbound dispatcher and removes the service
310/// from the local-services registry (so subsequent
311/// `announce_capabilities` calls stop emitting the
312/// `nrpc:<service>` tag).
313///
314/// **Bridge task lifecycle.** The bridge task that drains the
315/// inbound mpsc into the fold is NOT aborted on Drop. The
316/// `register_rpc_inbound` dispatcher closure owns the only
317/// `mpsc::Sender` clone, so `unregister_rpc_inbound` (which drops
318/// the dispatcher) closes the channel; the bridge's `rx.recv()`
319/// then yields `None` and the task exits cleanly after draining
320/// any queued events. Aborting would race events that are
321/// mid-`fold.lock().apply()` — those events would be killed
322/// without their RESPONSE being emitted, so the corresponding
323/// callers would just time out.
324///
325/// Outstanding handler executions (already-spawned tokio tasks)
326/// continue to completion regardless.
327pub struct ServeHandle {
328 /// Channel hash to unregister on Drop.
329 channel_hash: ChannelHash,
330 /// Service name to remove from `rpc_local_services` on Drop.
331 service: String,
332 /// The bridge task. Held only so callers can introspect /
333 /// detach it; Drop does NOT abort it (see struct doc-comment).
334 /// Detaches naturally when the handle is dropped — the bridge
335 /// exits on its own once the dispatcher's `mpsc::Sender` is
336 /// dropped via `unregister_rpc_inbound`.
337 _bridge: JoinHandle<()>,
338 /// The per-service response drainer task (unary `serve_rpc` only;
339 /// `None` for the streaming/duplex variants, which still spawn per
340 /// emit). Like `_bridge`, held only to detach — it exits on its own
341 /// once the emit closure (the sole `Sender` owner, dropped when the
342 /// bridge task ends and the fold drops) is gone. See §8a.
343 _response_drain: Option<JoinHandle<()>>,
344 /// Hold an Arc back to the mesh so we can unregister on Drop
345 /// without the mesh having to track us.
346 mesh: Arc<MeshNode>,
347}
348
349impl Drop for ServeHandle {
350 fn drop(&mut self) {
351 // Order matters: unregister the dispatcher FIRST so no new
352 // events can land in the bridge's mpsc, THEN drop the
353 // service-tag entry. The bridge task drains any in-flight
354 // events naturally and exits when its `rx.recv()` yields
355 // `None` (which happens as soon as the dispatcher closure
356 // — the sole `tx` owner — is dropped above).
357 self.mesh.unregister_rpc_inbound(self.channel_hash);
358 self.mesh.rpc_local_services_arc().remove(&self.service);
359 }
360}
361
362/// A response ready to publish, handed from a (synchronous) `serve_rpc`
363/// emit closure to the per-service response drainer task. Replaces the
364/// pre-§8a `tokio::spawn`-per-response: the emit closure builds the wire
365/// payload (cheap, sync) and `try_send`s this job; one drain task does the
366/// `.await` publish. The reply `ChannelName` is `Arc<str>` and `payload` is
367/// `Bytes`, so the hand-off is a couple of moves — no copy, no per-response
368/// task allocation/scheduling.
369struct RpcResponseJob {
370 caller_origin: u64,
371 call_id: u64,
372 target_hint: Option<u64>,
373 reply_channel: ChannelName,
374 /// PERF_AUDIT §3.10 — cached
375 /// `ChannelId::new(reply_channel).hash()`, populated by the
376 /// emit closure's `reply_channel_cache` lookup. Pre-fix the
377 /// drainer re-ran xxh3 over the channel name per response;
378 /// the same `OriginKeyedLru` now caches the triple so a
379 /// cache hit is one Arc bump + two `u64` copies.
380 reply_channel_hash: ChannelHash,
381 /// PERF_AUDIT §3.10 — cached
382 /// `MeshNode::publish_stream_id(&reply_channel_id)`.
383 reply_stream_id: u64,
384 payload: Bytes,
385}
386
387/// Cached triple `(ChannelName, ChannelHash, stream_id)` for the
388/// per-caller reply channel. Stored in the per-`serve_rpc`
389/// `OriginKeyedLru` so each subsequent response to the same
390/// caller is one Arc bump on the name + two `u64` copies — no
391/// xxh3, no `publish_stream_id`.
392///
393/// Per PERF_AUDIT §3.10.
394#[derive(Clone)]
395struct CachedReplyChannel {
396 name: ChannelName,
397 hash: ChannelHash,
398 stream_id: u64,
399}
400
401// ============================================================================
402// Streaming caller-side: RpcStream.
403// ============================================================================
404
405/// An open streaming RPC call. Implements `Stream<Item =
406/// Result<Bytes, RpcError>>` — yields chunks as the server emits
407/// them, terminates on a clean stream-end frame OR a non-`Ok`
408/// status (which is yielded as the last `Err` item before the
409/// stream closes).
410///
411/// Dropping the stream emits a CANCEL to the server (best-effort)
412/// and discards the pending entry — any chunks the server emits
413/// after the drop are silently discarded by the client fold.
414pub struct RpcStream {
415 mesh: Arc<MeshNode>,
416 target_node_id: u64,
417 request_channel: ChannelName,
418 /// Cached `ChannelId::new(request_channel).hash()`. Pre-fix
419 /// `spawn_grant_publish` re-ran xxh3 over the channel name on
420 /// every auto/explicit grant — per PERF_AUDIT §3.10 the value
421 /// is invariant for the stream's lifetime, so we cache it once
422 /// at construction.
423 request_channel_hash: ChannelHash,
424 /// Cached `MeshNode::publish_stream_id(&request_channel_id)`.
425 /// Same reasoning as `request_channel_hash`.
426 request_stream_id: u64,
427 self_origin: u64,
428 call_id: u64,
429 inner: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
430 /// Set true once we've yielded the terminal item (or an
431 /// error). Subsequent polls return `None`.
432 done: bool,
433 /// `Some(_)` if this stream uses flow control (caller set
434 /// `CallOptions::stream_window_initial`). Auto-grant
435 /// accumulates 1 credit per delivered chunk and fires one
436 /// batched `spawn_grant_publish` once the accumulator reaches
437 /// `window / 2` (or 1 for tiny windows). Keeps the server's
438 /// pump fed at roughly the configured rate without the per-
439 /// chunk spawn-storm + AEAD-storm the pre-fix path produced.
440 /// `None` → no flow control; `poll_next` does not emit grants.
441 /// Per PERF_AUDIT_2026_06_10_FULL_CRATE.md §3.3.
442 stream_window: Option<u32>,
443 /// Auto-grant accumulator: chunks delivered since the last
444 /// emitted grant. Flushed at the `window / 2` threshold (see
445 /// the doc on [`Self::stream_window`]).
446 grant_pending: u32,
447 /// Observer-fire bookkeeping. Latched on terminal observation
448 /// in `poll_next`; fired once from `Drop` so the Deck NRPC
449 /// tab + every other `RpcObserver` consumer sees one event
450 /// per streaming-response call.
451 observer: StreamingObserverState,
452 /// v3 cancel-watcher keep-alive (C-S1). Dropping this field
453 /// (on stream Drop) resolves the matching watcher task's
454 /// oneshot receiver with `Err`, telling the watcher to exit
455 /// cleanly + release the registry entry. When the call was
456 /// opened without `cancel_token`, this is a placeholder sender
457 /// with no watcher behind it — drop has no observable effect.
458 _cancel_keep_alive: StreamCancelKeepAlive,
459}
460
461impl RpcStream {
462 /// Server-assigned `call_id`. Useful for trace correlation /
463 /// custom logging at the call site.
464 pub fn call_id(&self) -> u64 {
465 self.call_id
466 }
467
468 /// Whether this stream is flow-controlled (caller set
469 /// `CallOptions::stream_window_initial`). Useful for tests +
470 /// diagnostics; user code typically doesn't need to inspect
471 /// this.
472 pub fn flow_controlled(&self) -> bool {
473 self.stream_window.is_some()
474 }
475
476 /// Explicitly grant `amount` more credits to the server's
477 /// pump. Spawns a fire-and-forget publish; doesn't await
478 /// acknowledgement. **No-op when flow control was not enabled
479 /// for this stream** — the server would silently drop the
480 /// grant anyway, and emitting wire traffic with no purpose
481 /// would just burn bandwidth.
482 ///
483 /// Auto-grant (1 credit per delivered chunk) covers the
484 /// common case; use this for batched cadence (e.g. grant
485 /// `window/2` after every `window/2` chunks consumed) when
486 /// `auto_grant`-style amortization isn't enough.
487 pub fn grant(&self, amount: u32) {
488 if !self.flow_controlled() || amount == 0 {
489 return;
490 }
491 spawn_grant_publish(
492 Arc::clone(&self.mesh),
493 self.target_node_id,
494 self.request_channel_hash,
495 self.request_stream_id,
496 self.self_origin,
497 self.call_id,
498 amount,
499 );
500 }
501}
502
503/// PERF_AUDIT §3.3 — auto-grant coalescing decision for
504/// [`RpcStream::poll_next`]. Accumulates one credit (the chunk
505/// that was just delivered to the consumer) into `pending` and
506/// returns `Some(amount)` when the accumulator reaches the flush
507/// threshold of `window / 2` (clamped to ≥ 1 so a window of 1
508/// degenerates to the pre-fix per-chunk cadence).
509///
510/// Liveness invariant (why no flush-on-drop / timer backstop is
511/// needed): the credits left pending never exceed
512/// `threshold - 1 < window`. The server starts with `window`
513/// credits and `credits = window - (sent - delivered) - pending`,
514/// so whenever the consumer has polled everything that was sent
515/// (the only state in which it could block waiting on the server),
516/// `credits = window - pending >= window - threshold + 1 >= 1` —
517/// the server can always make progress. A consumer that stops
518/// polling stalls the pump by design (that's flow control), and
519/// the chunks already buffered in the stream's mpsc are enough to
520/// carry `pending` across the threshold as soon as it resumes.
521fn accumulate_auto_grant(pending: &mut u32, window: u32) -> Option<u32> {
522 *pending = pending.saturating_add(1);
523 let threshold = (window / 2).max(1);
524 if *pending >= threshold {
525 let amount = *pending;
526 *pending = 0;
527 Some(amount)
528 } else {
529 None
530 }
531}
532
533/// Shared fire-and-forget GRANT-publish helper. Used by
534/// [`RpcStream::grant`] (explicit) and the auto-grant in
535/// [`RpcStream::poll_next`]. Same direct-unicast publish path as
536/// [`spawn_cancel_publish`], just with a different dispatch byte
537/// + a 4-byte u32 payload.
538///
539/// PERF_AUDIT §3.10 — takes `request_channel_hash` and
540/// `request_stream_id` as pre-computed inputs (cached on
541/// `RpcStream`) so the per-chunk grant path doesn't re-run
542/// `ChannelId::new` + xxh3 on every call.
543fn spawn_grant_publish(
544 mesh: Arc<MeshNode>,
545 target: u64,
546 request_channel_hash: ChannelHash,
547 request_stream_id: u64,
548 self_origin: u64,
549 call_id: u64,
550 amount: u32,
551) {
552 tokio::spawn(async move {
553 let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, self_origin, call_id, 0);
554 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
555 buf.extend_from_slice(&meta.to_bytes());
556 buf.extend_from_slice(&encode_stream_grant(amount));
557 let payload = Bytes::from(buf);
558 let _ = mesh
559 .publish_to_peer(
560 target,
561 request_channel_hash,
562 request_stream_id,
563 /* reliable */ true,
564 std::slice::from_ref(&payload),
565 )
566 .await;
567 });
568}
569
570impl futures::Stream for RpcStream {
571 type Item = Result<Bytes, RpcError>;
572
573 fn poll_next(
574 mut self: std::pin::Pin<&mut Self>,
575 cx: &mut std::task::Context<'_>,
576 ) -> std::task::Poll<Option<Self::Item>> {
577 if self.done {
578 return std::task::Poll::Ready(None);
579 }
580 match self.inner.poll_recv(cx) {
581 std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
582 // Auto-grant: accumulate 1 credit per delivered
583 // chunk and fire a batched `spawn_grant_publish`
584 // only when the accumulator reaches `window / 2`
585 // (or 1 for tiny windows). Per PERF_AUDIT §3.3 —
586 // pre-fix this spawned one task + one reliable
587 // AEAD packet per chunk, a spawn-storm + AEAD-
588 // storm under bursting; the server side already
589 // fixed the identical shape via
590 // `build_request_grant_emitter` (§3.3 audit text).
591 // Callers needing finer cadence still have
592 // `RpcStream::grant` for explicit batches.
593 if let Some(window) = self.stream_window {
594 let mut pending = self.grant_pending;
595 if let Some(amount) = accumulate_auto_grant(&mut pending, window) {
596 spawn_grant_publish(
597 Arc::clone(&self.mesh),
598 self.target_node_id,
599 self.request_channel_hash,
600 self.request_stream_id,
601 self.self_origin,
602 self.call_id,
603 amount,
604 );
605 }
606 self.grant_pending = pending;
607 }
608 self.observer.add_response_bytes(body.len() as u32);
609 std::task::Poll::Ready(Some(Ok(body)))
610 }
611 std::task::Poll::Ready(Some(StreamItem::End)) => {
612 self.done = true;
613 self.observer.latch_ok();
614 std::task::Poll::Ready(None)
615 }
616 std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
617 self.done = true;
618 let status = resp.status.to_wire();
619 let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
620 format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
621 });
622 self.observer
623 .latch_error(format!("server returned status {status:#06x}: {message}"));
624 std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
625 }
626 std::task::Poll::Ready(None) => {
627 self.done = true;
628 std::task::Poll::Ready(None)
629 }
630 std::task::Poll::Pending => std::task::Poll::Pending,
631 }
632 }
633}
634
635impl Drop for RpcStream {
636 fn drop(&mut self) {
637 // Best-effort CANCEL to the server. Spawn a task because
638 // Drop can't be async; the publish happens off-thread.
639 // Also clear our pending entry so any in-flight chunks
640 // are dropped on arrival.
641 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
642 spawn_cancel_publish(
643 Arc::clone(&self.mesh),
644 self.target_node_id,
645 self.request_channel.clone(),
646 self.self_origin,
647 self.call_id,
648 );
649 // Fire the observer with the latched status (Ok / Error /
650 // Canceled). Idempotent — only the first fire emits.
651 self.observer.fire();
652 }
653}
654
655// ============================================================================
656// Phase C — caller-side client-streaming / duplex primitive.
657// ============================================================================
658
659/// Shared REQUEST_CHUNK-publish helper. Builds the wire frame and
660/// fires through `publish_to_peer` direct-unicast (same routing
661/// pattern as the initial REQUEST — caller knows the target).
662/// PERF_AUDIT §3.10 — accepts pre-computed
663/// `request_channel_hash` and `request_stream_id` (cached on
664/// `ClientStreamCallRaw`) so the per-chunk client-stream send path
665/// doesn't re-run `ChannelId::new` + xxh3 on every chunk.
666async fn publish_request_chunk(
667 mesh: &Arc<MeshNode>,
668 target: u64,
669 request_channel_hash: ChannelHash,
670 request_stream_id: u64,
671 self_origin: u64,
672 chunk: &RpcRequestChunkPayload,
673) -> Result<(), RpcError> {
674 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, self_origin, chunk.call_id, 0);
675 let mut buf = Vec::with_capacity(EVENT_META_SIZE + chunk.encoded_len());
676 buf.extend_from_slice(&meta.to_bytes());
677 chunk.encode_into(&mut buf);
678 let payload = Bytes::from(buf);
679 mesh.publish_to_peer(
680 target,
681 request_channel_hash,
682 request_stream_id,
683 /* reliable */ true,
684 std::slice::from_ref(&payload),
685 )
686 .await
687 .map_err(RpcError::Transport)
688}
689
690/// Internal state of a [`ClientStreamCallRaw`]. The state machine
691/// is small: open the call (initial REQUEST not yet sent), then
692/// send N items (the first becomes the initial REQUEST, subsequent
693/// become REQUEST_CHUNKs), then finish (terminal REQUEST_END
694/// frame). After finish, no further sends are accepted.
695#[derive(Debug, Clone, Copy, PartialEq, Eq)]
696enum ClientStreamState {
697 /// Pending entry registered, reply subscription ensured, but
698 /// the initial REQUEST has NOT been published to the wire yet.
699 /// First `send` flips this to `Sending`.
700 JustOpened,
701 /// Initial REQUEST has been published; subsequent sends ride
702 /// as REQUEST_CHUNKs.
703 Sending,
704 /// `finish` has been called; the terminal REQUEST_END frame
705 /// (or the initial REQUEST with FLAG_END for the degenerate
706 /// zero-send path) has been published. The terminal RESPONSE
707 /// has not necessarily arrived yet — that's awaited on the
708 /// caller's terminal_rx.
709 Finishing,
710 /// Terminal RESPONSE has been delivered. Drop is a no-op.
711 Done,
712}
713
714/// Caller-side handle for a client-streaming (or duplex Phase D)
715/// RPC. Push N items via [`ClientStreamCallRaw::send`], then
716/// [`ClientStreamCallRaw::finish`] to await the terminal RESPONSE.
717///
718/// **Lazy initial REQUEST.** The initial REQUEST is published on
719/// the FIRST `send()` (or on `finish()` if the caller sends nothing
720/// — that's the "zero-item upload" degenerate path that opens and
721/// closes the call in one frame). Constructing the handle does
722/// NOT yet emit any wire traffic beyond the reply-channel
723/// subscription setup.
724///
725/// **Flow control.** When the caller set
726/// [`CallOptions::request_window_initial`] to `Some(n)`, the
727/// handle holds an `n`-permit `Semaphore` that gates `send`. The
728/// server's [`DISPATCH_RPC_REQUEST_GRANT`] events refill the
729/// semaphore. When `None`, `send` doesn't block (caller is on the
730/// unbounded-credit fast path).
731///
732/// **Cancellation.** Dropping the handle BEFORE `finish` returns
733/// `Ok` fires a best-effort CANCEL to the server and clears the
734/// pending entry. Dropping after a successful `finish` is a no-op
735/// (terminal RESPONSE already delivered + entry removed).
736///
737/// Bidi streaming plan (Phase C).
738pub struct ClientStreamCallRaw {
739 mesh: Arc<MeshNode>,
740 target_node_id: u64,
741 request_channel: ChannelName,
742 /// PERF_AUDIT §3.10 — cached `ChannelId::new(request_channel).hash()`
743 /// so per-chunk REQUEST_CHUNK publishes don't re-run xxh3.
744 request_channel_hash: ChannelHash,
745 /// PERF_AUDIT §3.10 — cached
746 /// `MeshNode::publish_stream_id(&request_channel_id)`.
747 request_stream_id: u64,
748 self_origin: u64,
749 call_id: u64,
750 service: String,
751 /// Header set queued for the initial REQUEST. Drained on the
752 /// first publish (either `send` or `finish`).
753 initial_headers: Vec<(String, Vec<u8>)>,
754 /// Flag bits queued for the initial REQUEST. Always carries
755 /// `FLAG_RPC_CLIENT_STREAMING_REQUEST`; may also carry
756 /// `FLAG_RPC_PROPAGATE_TRACE` when the caller supplied a
757 /// trace context.
758 initial_flags: u16,
759 /// `deadline_ns` from `CallOptions::deadline`. Embedded in the
760 /// initial REQUEST.
761 deadline_ns: u64,
762 /// Per-call semaphore for upload credits. `None` when the
763 /// caller didn't opt into flow control (`request_window_initial`
764 /// was `None` on the `CallOptions`).
765 credit_sem: Option<Arc<tokio::sync::Semaphore>>,
766 /// Background task that drains REQUEST_GRANT credits from the
767 /// pending entry's grant mpsc into `credit_sem`. Aborted on
768 /// Drop. `None` when flow control is off.
769 grant_pump: Option<JoinHandle<()>>,
770 /// Single-shot terminal-RESPONSE receiver. Taken by `finish`;
771 /// after that `Drop` doesn't attempt to await again.
772 terminal_rx: Option<tokio::sync::oneshot::Receiver<RpcResponsePayload>>,
773 /// State machine. See [`ClientStreamState`].
774 state: ClientStreamState,
775 /// Wall-clock start (for `RpcReply::latency_ns` reporting).
776 started: Instant,
777 /// Observer-fire bookkeeping. Latched on terminal observation
778 /// in `finish`; fired once from `Drop` so the Deck NRPC tab +
779 /// every `RpcObserver` consumer sees one event per
780 /// client-streaming call.
781 observer: StreamingObserverState,
782 /// v3 cancel-watcher keep-alive (C-S1). Dropping this field
783 /// (on call Drop) tells the watcher task to exit cleanly and
784 /// release the registry entry. See
785 /// [`spawn_stream_cancel_watcher`] for the lifecycle.
786 _cancel_keep_alive: StreamCancelKeepAlive,
787}
788
789impl ClientStreamCallRaw {
790 /// Server-assigned `call_id`. Useful for trace correlation /
791 /// custom logging.
792 pub fn call_id(&self) -> u64 {
793 self.call_id
794 }
795
796 /// Whether this call is flow-controlled (caller set
797 /// `CallOptions::request_window_initial`).
798 pub fn flow_controlled(&self) -> bool {
799 self.credit_sem.is_some()
800 }
801
802 /// Push one body chunk to the server. Encodes as the initial
803 /// REQUEST (first call) or as a REQUEST_CHUNK (subsequent
804 /// calls). When flow control is opted into, awaits one credit
805 /// before publishing.
806 ///
807 /// Returns `Err(RpcError::Codec)` if called after [`Self::finish`].
808 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
809 match self.state {
810 ClientStreamState::Finishing | ClientStreamState::Done => {
811 return Err(RpcError::Codec {
812 direction: CodecDirection::Encode,
813 message: "send() called after finish()".to_string(),
814 });
815 }
816 _ => {}
817 }
818 // Gate on credit when flow control is opted into.
819 if let Some(sem) = self.credit_sem.as_ref() {
820 let permit = sem.clone().acquire_owned().await.map_err(|_| {
821 RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
822 })?;
823 permit.forget();
824 }
825 self.observer.add_request_bytes(body.len() as u32);
826 match self.state {
827 ClientStreamState::JustOpened => {
828 // First send → initial REQUEST.
829 let req = RpcRequestPayload {
830 service: self.service.clone(),
831 deadline_ns: self.deadline_ns,
832 flags: self.initial_flags,
833 headers: std::mem::take(&mut self.initial_headers),
834 body: body.clone(),
835 };
836 self.publish_initial_request(&req).await?;
837 self.state = ClientStreamState::Sending;
838 }
839 ClientStreamState::Sending => {
840 let chunk = RpcRequestChunkPayload {
841 call_id: self.call_id,
842 flags: 0,
843 headers: vec![],
844 body: body.clone(),
845 };
846 publish_request_chunk(
847 &self.mesh,
848 self.target_node_id,
849 self.request_channel_hash,
850 self.request_stream_id,
851 self.self_origin,
852 &chunk,
853 )
854 .await?;
855 }
856 ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
857 }
858 Ok(())
859 }
860
861 /// Close the upload direction and await the server's terminal
862 /// RESPONSE. Emits a REQUEST_CHUNK with `FLAG_RPC_REQUEST_END`
863 /// (empty body) if the call has already published its initial
864 /// REQUEST, or an initial REQUEST with both
865 /// `FLAG_RPC_CLIENT_STREAMING_REQUEST` and
866 /// `FLAG_RPC_REQUEST_END` set (the degenerate "zero-item
867 /// upload" path) if nothing was sent.
868 ///
869 /// Consumes the handle — Drop after `finish` is a no-op.
870 pub async fn finish(mut self) -> Result<RpcReply, RpcError> {
871 match self.state {
872 ClientStreamState::JustOpened => {
873 let req = RpcRequestPayload {
874 service: self.service.clone(),
875 deadline_ns: self.deadline_ns,
876 flags: self.initial_flags | FLAG_RPC_REQUEST_END,
877 headers: std::mem::take(&mut self.initial_headers),
878 body: Bytes::new(),
879 };
880 self.publish_initial_request(&req).await?;
881 }
882 ClientStreamState::Sending => {
883 let chunk = RpcRequestChunkPayload {
884 call_id: self.call_id,
885 flags: FLAG_RPC_REQUEST_END,
886 headers: vec![],
887 body: Bytes::new(),
888 };
889 publish_request_chunk(
890 &self.mesh,
891 self.target_node_id,
892 self.request_channel_hash,
893 self.request_stream_id,
894 self.self_origin,
895 &chunk,
896 )
897 .await?;
898 }
899 ClientStreamState::Finishing | ClientStreamState::Done => {
900 return Err(RpcError::Codec {
901 direction: CodecDirection::Encode,
902 message: "finish() called twice".to_string(),
903 });
904 }
905 }
906 self.state = ClientStreamState::Finishing;
907 let terminal_rx = self.terminal_rx.take().ok_or_else(|| {
908 RpcError::Transport(AdapterError::Connection(
909 "terminal receiver already consumed".into(),
910 ))
911 })?;
912 // Honor the deadline if the caller set one.
913 let resp = if self.deadline_ns > 0 {
914 let now = std::time::SystemTime::now()
915 .duration_since(std::time::UNIX_EPOCH)
916 .map(|d| d.as_nanos() as u64)
917 .unwrap_or(0);
918 let remaining = self.deadline_ns.saturating_sub(now);
919 match tokio::time::timeout(std::time::Duration::from_nanos(remaining), terminal_rx)
920 .await
921 {
922 Ok(Ok(r)) => r,
923 Ok(Err(_)) => {
924 let msg = "terminal sender dropped before response arrived";
925 self.observer.latch_error(msg);
926 return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
927 }
928 Err(_elapsed) => {
929 let elapsed_ms = self.started.elapsed().as_millis() as u64;
930 self.observer.latch_timeout();
931 return Err(RpcError::Timeout { elapsed_ms });
932 }
933 }
934 } else {
935 match terminal_rx.await {
936 Ok(r) => r,
937 Err(_) => {
938 let msg = "terminal sender dropped before response arrived";
939 self.observer.latch_error(msg);
940 return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
941 }
942 }
943 };
944 self.state = ClientStreamState::Done;
945 self.observer.add_response_bytes(resp.body.len() as u32);
946 if !resp.status.is_ok() {
947 // String::from_utf8 takes `Vec<u8>`. `Bytes::to_vec()`
948 // matches the prior `resp.body.clone()` semantics (full
949 // copy of the body for the error-formatting path);
950 // bulk-throughput improvement lives on the decode side,
951 // not here.
952 let message = String::from_utf8(resp.body.to_vec())
953 .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
954 self.observer.latch_error(format!(
955 "server returned status {:#06x}: {message}",
956 resp.status.to_wire()
957 ));
958 return Err(RpcError::ServerError {
959 status: resp.status.to_wire(),
960 message,
961 });
962 }
963 self.observer.latch_ok();
964 let latency_ns = self.started.elapsed().as_nanos() as u64;
965 Ok(RpcReply {
966 body: resp.body,
967 headers: resp.headers,
968 latency_ns,
969 })
970 }
971
972 async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
973 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self.self_origin, self.call_id, 0);
974 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
975 buf.extend_from_slice(&meta.to_bytes());
976 req.encode_into(&mut buf);
977 let payload = Bytes::from(buf);
978 // PERF_AUDIT §3.10 — use the cached hash + stream_id from
979 // construction; no per-publish `ChannelId::new` + xxh3.
980 self.mesh
981 .publish_to_peer(
982 self.target_node_id,
983 self.request_channel_hash,
984 self.request_stream_id,
985 /* reliable */ true,
986 std::slice::from_ref(&payload),
987 )
988 .await
989 .map_err(RpcError::Transport)
990 }
991}
992
993impl Drop for ClientStreamCallRaw {
994 fn drop(&mut self) {
995 if let Some(task) = self.grant_pump.take() {
996 task.abort();
997 }
998 // Fire the observer with whatever status was latched
999 // (Ok / Error / Timeout / Canceled). Idempotent — only
1000 // the first call emits.
1001 self.observer.fire();
1002 if matches!(self.state, ClientStreamState::Done) {
1003 // Successful completion — pending entry already gone,
1004 // no CANCEL needed.
1005 return;
1006 }
1007 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
1008 // Only fire CANCEL on the wire if the server has actually
1009 // seen the initial REQUEST. A `JustOpened` Drop means we
1010 // never published anything; no need to CANCEL a call the
1011 // server doesn't know about.
1012 if !matches!(self.state, ClientStreamState::JustOpened) {
1013 spawn_cancel_publish(
1014 Arc::clone(&self.mesh),
1015 self.target_node_id,
1016 self.request_channel.clone(),
1017 self.self_origin,
1018 self.call_id,
1019 );
1020 }
1021 }
1022}
1023
1024// ============================================================================
1025// Phase D — caller-side duplex primitive.
1026// ============================================================================
1027
1028/// Shared state between a `DuplexSink` and its sibling
1029/// `DuplexStream`. Both halves hold an `Arc<DuplexInner>`; when
1030/// the refcount hits zero (i.e. both halves dropped) the Drop
1031/// fires CANCEL to the server unless the call was cleanly closed
1032/// (`clean_close = true`).
1033struct DuplexInner {
1034 mesh: Arc<MeshNode>,
1035 target_node_id: u64,
1036 request_channel: ChannelName,
1037 /// PERF_AUDIT §3.10 — cached channel-id hash + stream id so
1038 /// per-chunk publishes from the upload side don't re-run
1039 /// `ChannelId::new` + xxh3.
1040 request_channel_hash: ChannelHash,
1041 request_stream_id: u64,
1042 self_origin: u64,
1043 call_id: u64,
1044 /// Whether the initial REQUEST was successfully published.
1045 /// `false` means we never reached the wire — no CANCEL needed
1046 /// (server doesn't know about the call).
1047 initial_sent: std::sync::atomic::AtomicBool,
1048 /// Set true when the call closes cleanly — terminal RESPONSE
1049 /// (or terminal Error) was observed on the response stream.
1050 /// Suppresses CANCEL-on-drop.
1051 clean_close: std::sync::atomic::AtomicBool,
1052 /// Observer-fire bookkeeping. Latched from the various
1053 /// terminal-observation sites (DuplexCall::next /
1054 /// DuplexStream::poll_next yielding End or Error); fired
1055 /// once on Drop. The DuplexCall / DuplexSink / DuplexStream
1056 /// each share access via the surrounding Arc<DuplexInner>.
1057 observer: StreamingObserverState,
1058 /// v3 cancel-watcher keep-alive (C-S1). Lives on
1059 /// `Arc<DuplexInner>` so it survives `into_split` — both
1060 /// halves of the duplex hold the same Arc, so the watcher
1061 /// task exits only when BOTH halves drop (matching the
1062 /// Drop-fires-CANCEL semantics above). Wrapped in `Option`
1063 /// for `mem::take`-style construction patterns; populated
1064 /// once at `call_duplex` time and never cleared.
1065 _cancel_keep_alive: Option<StreamCancelKeepAlive>,
1066}
1067
1068impl Drop for DuplexInner {
1069 fn drop(&mut self) {
1070 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
1071 // Fire the observer with the latched status (Ok / Error /
1072 // Canceled). Idempotent — only the first call emits.
1073 self.observer.fire();
1074 if self.clean_close.load(Ordering::SeqCst) {
1075 return;
1076 }
1077 if !self.initial_sent.load(Ordering::SeqCst) {
1078 return;
1079 }
1080 spawn_cancel_publish(
1081 Arc::clone(&self.mesh),
1082 self.target_node_id,
1083 self.request_channel.clone(),
1084 self.self_origin,
1085 self.call_id,
1086 );
1087 }
1088}
1089
1090/// Send half of a duplex call. Push items via `send`; emit the
1091/// terminal REQUEST_END frame via `finish_sending`. After
1092/// `finish_sending` the upload side is closed but the sibling
1093/// `DuplexStream` continues yielding response chunks until the
1094/// server's terminal frame arrives.
1095///
1096/// Bidi streaming plan (Phase D).
1097pub struct DuplexSink {
1098 inner: Arc<DuplexInner>,
1099 service: String,
1100 initial_headers: Vec<(String, Vec<u8>)>,
1101 initial_flags: u16,
1102 deadline_ns: u64,
1103 credit_sem: Option<Arc<tokio::sync::Semaphore>>,
1104 grant_pump: Option<JoinHandle<()>>,
1105 state: ClientStreamState,
1106}
1107
1108impl DuplexSink {
1109 /// Push one body chunk to the server. Same semantics as
1110 /// [`ClientStreamCallRaw::send`].
1111 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1112 match self.state {
1113 ClientStreamState::Finishing | ClientStreamState::Done => {
1114 return Err(RpcError::Codec {
1115 direction: CodecDirection::Encode,
1116 message: "send() called after finish_sending()".to_string(),
1117 });
1118 }
1119 _ => {}
1120 }
1121 if let Some(sem) = self.credit_sem.as_ref() {
1122 let permit = sem.clone().acquire_owned().await.map_err(|_| {
1123 RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
1124 })?;
1125 permit.forget();
1126 }
1127 self.inner.observer.add_request_bytes(body.len() as u32);
1128 match self.state {
1129 ClientStreamState::JustOpened => {
1130 let req = RpcRequestPayload {
1131 service: self.service.clone(),
1132 deadline_ns: self.deadline_ns,
1133 flags: self.initial_flags,
1134 headers: std::mem::take(&mut self.initial_headers),
1135 body: body.clone(),
1136 };
1137 self.publish_initial_request(&req).await?;
1138 self.inner.initial_sent.store(true, Ordering::SeqCst);
1139 self.state = ClientStreamState::Sending;
1140 }
1141 ClientStreamState::Sending => {
1142 let chunk = RpcRequestChunkPayload {
1143 call_id: self.inner.call_id,
1144 flags: 0,
1145 headers: vec![],
1146 body: body.clone(),
1147 };
1148 publish_request_chunk(
1149 &self.inner.mesh,
1150 self.inner.target_node_id,
1151 self.inner.request_channel_hash,
1152 self.inner.request_stream_id,
1153 self.inner.self_origin,
1154 &chunk,
1155 )
1156 .await?;
1157 }
1158 ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
1159 }
1160 Ok(())
1161 }
1162
1163 /// Close the upload direction. Emits the terminal REQUEST_END
1164 /// frame. The response stream continues until the server's
1165 /// terminal RESPONSE arrives (use the sibling `DuplexStream`).
1166 pub async fn finish_sending(mut self) -> Result<(), RpcError> {
1167 match self.state {
1168 ClientStreamState::JustOpened => {
1169 let req = RpcRequestPayload {
1170 service: self.service.clone(),
1171 deadline_ns: self.deadline_ns,
1172 flags: self.initial_flags | FLAG_RPC_REQUEST_END,
1173 headers: std::mem::take(&mut self.initial_headers),
1174 body: Bytes::new(),
1175 };
1176 self.publish_initial_request(&req).await?;
1177 self.inner.initial_sent.store(true, Ordering::SeqCst);
1178 }
1179 ClientStreamState::Sending => {
1180 let chunk = RpcRequestChunkPayload {
1181 call_id: self.inner.call_id,
1182 flags: FLAG_RPC_REQUEST_END,
1183 headers: vec![],
1184 body: Bytes::new(),
1185 };
1186 publish_request_chunk(
1187 &self.inner.mesh,
1188 self.inner.target_node_id,
1189 self.inner.request_channel_hash,
1190 self.inner.request_stream_id,
1191 self.inner.self_origin,
1192 &chunk,
1193 )
1194 .await?;
1195 }
1196 ClientStreamState::Finishing | ClientStreamState::Done => {
1197 return Err(RpcError::Codec {
1198 direction: CodecDirection::Encode,
1199 message: "finish_sending() called twice".to_string(),
1200 });
1201 }
1202 }
1203 self.state = ClientStreamState::Finishing;
1204 Ok(())
1205 }
1206
1207 /// Server-assigned `call_id`. Same value on the sibling
1208 /// `DuplexStream`.
1209 pub fn call_id(&self) -> u64 {
1210 self.inner.call_id
1211 }
1212
1213 /// Whether this call is flow-controlled on the upload side.
1214 pub fn flow_controlled(&self) -> bool {
1215 self.credit_sem.is_some()
1216 }
1217
1218 async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
1219 let meta = EventMeta::new(
1220 DISPATCH_RPC_REQUEST,
1221 0,
1222 self.inner.self_origin,
1223 self.inner.call_id,
1224 0,
1225 );
1226 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
1227 buf.extend_from_slice(&meta.to_bytes());
1228 req.encode_into(&mut buf);
1229 let payload = Bytes::from(buf);
1230 // PERF_AUDIT §3.10 — cached hash + stream_id from the
1231 // inner `ClientStreamCallRaw`.
1232 self.inner
1233 .mesh
1234 .publish_to_peer(
1235 self.inner.target_node_id,
1236 self.inner.request_channel_hash,
1237 self.inner.request_stream_id,
1238 /* reliable */ true,
1239 std::slice::from_ref(&payload),
1240 )
1241 .await
1242 .map_err(RpcError::Transport)
1243 }
1244}
1245
1246impl Drop for DuplexSink {
1247 fn drop(&mut self) {
1248 if let Some(task) = self.grant_pump.take() {
1249 task.abort();
1250 }
1251 // The shared DuplexInner's Drop (when refcount hits 0)
1252 // does the CANCEL — nothing to do here beyond aborting
1253 // the grant pump.
1254 }
1255}
1256
1257/// Receive half of a duplex call. Implements `futures::Stream`
1258/// yielding `Result<Bytes, RpcError>` per inbound RESPONSE chunk.
1259/// EOF on terminal Ok; one final `Err(RpcError::ServerError)` on
1260/// terminal non-Ok.
1261///
1262/// Bidi streaming plan (Phase D).
1263pub struct DuplexStream {
1264 inner: Arc<DuplexInner>,
1265 chunks_rx: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
1266 done: bool,
1267}
1268
1269impl DuplexStream {
1270 /// Server-assigned `call_id`. Same value on the sibling
1271 /// `DuplexSink`.
1272 pub fn call_id(&self) -> u64 {
1273 self.inner.call_id
1274 }
1275}
1276
1277impl futures::Stream for DuplexStream {
1278 type Item = Result<Bytes, RpcError>;
1279
1280 fn poll_next(
1281 mut self: std::pin::Pin<&mut Self>,
1282 cx: &mut std::task::Context<'_>,
1283 ) -> std::task::Poll<Option<Self::Item>> {
1284 if self.done {
1285 return std::task::Poll::Ready(None);
1286 }
1287 match self.chunks_rx.poll_recv(cx) {
1288 std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
1289 self.inner.observer.add_response_bytes(body.len() as u32);
1290 std::task::Poll::Ready(Some(Ok(body)))
1291 }
1292 std::task::Poll::Ready(Some(StreamItem::End)) => {
1293 self.done = true;
1294 self.inner.clean_close.store(true, Ordering::SeqCst);
1295 self.inner.observer.latch_ok();
1296 std::task::Poll::Ready(None)
1297 }
1298 std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
1299 self.done = true;
1300 self.inner.clean_close.store(true, Ordering::SeqCst);
1301 let status = resp.status.to_wire();
1302 let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
1303 format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
1304 });
1305 self.inner
1306 .observer
1307 .latch_error(format!("server returned status {status:#06x}: {message}"));
1308 std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
1309 }
1310 std::task::Poll::Ready(None) => {
1311 self.done = true;
1312 std::task::Poll::Ready(None)
1313 }
1314 std::task::Poll::Pending => std::task::Poll::Pending,
1315 }
1316 }
1317}
1318
1319/// Caller-side handle for a duplex RPC. Combines a `DuplexSink`
1320/// (upload) and `DuplexStream` (download). For application code
1321/// that wants to encode requests in one task and decode responses
1322/// in another, use [`Self::into_split`] to peel off the two halves.
1323///
1324/// Bidi streaming plan (Phase D).
1325pub struct DuplexCallRaw {
1326 sink: DuplexSink,
1327 stream: DuplexStream,
1328}
1329
1330impl DuplexCallRaw {
1331 /// Server-assigned `call_id`.
1332 pub fn call_id(&self) -> u64 {
1333 self.sink.call_id()
1334 }
1335
1336 /// Whether the upload side is flow-controlled.
1337 pub fn flow_controlled(&self) -> bool {
1338 self.sink.flow_controlled()
1339 }
1340
1341 /// Push one body chunk to the server. Delegates to the inner
1342 /// `DuplexSink::send`.
1343 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1344 self.sink.send(body).await
1345 }
1346
1347 /// Close the upload direction. Delegates to the inner
1348 /// `DuplexSink::finish_sending` but keeps the receive side
1349 /// alive so the caller can keep polling response chunks.
1350 ///
1351 /// NOTE: consumes the sink half but not the stream half.
1352 /// Internally, we replace `self.sink` with a no-op
1353 /// placeholder so subsequent send() / finish_sending()
1354 /// surface a clear error (`send() after finish_sending()`).
1355 pub async fn finish_sending(&mut self) -> Result<(), RpcError> {
1356 // Take the sink out by swapping in a placeholder whose
1357 // state is `Done` so subsequent sends error cleanly.
1358 let placeholder = DuplexSink {
1359 inner: Arc::clone(&self.sink.inner),
1360 service: String::new(),
1361 initial_headers: Vec::new(),
1362 initial_flags: 0,
1363 deadline_ns: 0,
1364 credit_sem: None,
1365 grant_pump: None,
1366 state: ClientStreamState::Done,
1367 };
1368 let sink = std::mem::replace(&mut self.sink, placeholder);
1369 sink.finish_sending().await
1370 }
1371
1372 /// Pull the next response chunk. `None` on terminal Ok;
1373 /// `Some(Err)` then `None` on terminal non-Ok. Same shape as
1374 /// `futures::StreamExt::next`.
1375 pub async fn next(&mut self) -> Option<Result<Bytes, RpcError>> {
1376 use futures::StreamExt;
1377 self.stream.next().await
1378 }
1379
1380 /// Split into independent send / receive halves. Both halves
1381 /// hold an `Arc<DuplexInner>`; CANCEL fires only when BOTH
1382 /// halves drop without a clean close.
1383 pub fn into_split(self) -> (DuplexSink, DuplexStream) {
1384 (self.sink, self.stream)
1385 }
1386}
1387
1388impl futures::Stream for DuplexCallRaw {
1389 type Item = Result<Bytes, RpcError>;
1390
1391 fn poll_next(
1392 mut self: std::pin::Pin<&mut Self>,
1393 cx: &mut std::task::Context<'_>,
1394 ) -> std::task::Poll<Option<Self::Item>> {
1395 std::pin::Pin::new(&mut self.stream).poll_next(cx)
1396 }
1397}
1398
1399// ============================================================================
1400// Unary call: CANCEL-on-drop guard.
1401// ============================================================================
1402
1403/// RAII guard that fires CANCEL to the server if the unary call
1404/// future is dropped before a response arrives. Without this, a
1405/// `select!`-loser future (e.g. hedge runner-up) would leave the
1406/// server-side handler running to completion — wasting CPU on a
1407/// reply nobody will read.
1408///
1409/// The guard is built *after* the REQUEST has been successfully
1410/// published — if the publish fails, no guard is constructed and
1411/// no CANCEL is sent. On the success path the call function flips
1412/// `completed = true` so Drop becomes a no-op (the server already
1413/// finished and removed its in-flight entry).
1414struct UnaryCallGuard {
1415 pending: Arc<super::cortex::RpcClientPending>,
1416 mesh: Arc<MeshNode>,
1417 target_node_id: u64,
1418 request_channel: ChannelName,
1419 self_origin: u64,
1420 call_id: u64,
1421 /// True after the call resolved Ok or got a definitive
1422 /// non-cancellable Err. Drop checks this — `false` fires
1423 /// CANCEL, `true` is a no-op (still removes the pending
1424 /// entry).
1425 completed: bool,
1426}
1427
1428impl Drop for UnaryCallGuard {
1429 fn drop(&mut self) {
1430 self.pending.cancel(self.call_id);
1431 if !self.completed {
1432 spawn_cancel_publish(
1433 Arc::clone(&self.mesh),
1434 self.target_node_id,
1435 self.request_channel.clone(),
1436 self.self_origin,
1437 self.call_id,
1438 );
1439 }
1440 }
1441}
1442
1443// ============================================================================
1444// Streaming/duplex observer-fire bookkeeping.
1445//
1446// The unary `MeshNode::call` fires `RpcObserver::on_call` at each
1447// terminal return path (see line ~2306). The streaming /
1448// client-streaming / duplex paths have multiple terminal points
1449// (poll_next sees End / Error; finish() returns; Drop without
1450// terminal observation). To avoid sprinkling `fire_rpc_observer_outbound`
1451// at every terminal site, each handle holds a
1452// `StreamingObserverState` that latches the terminal status on
1453// observation and fires exactly once on Drop. The Deck NRPC tab
1454// + every consumer of `RpcObserver` get one event per streaming
1455// / duplex call, same as for unary today.
1456// ============================================================================
1457
1458/// Per-call observer-fire bookkeeping shared between the
1459/// streaming + client-streaming + duplex caller-side handles.
1460/// Latches terminal status on observation; `fire()` (called from
1461/// the handle's Drop) emits one `RpcCallEvent` with the latched
1462/// status (or `Canceled` if nothing latched — i.e. the handle
1463/// was dropped before observing its terminator).
1464///
1465/// Status discriminator:
1466/// 0 = none latched (Drop → Canceled)
1467/// 1 = Ok
1468/// 2 = Error (message in `observer_msg`)
1469/// 3 = Timeout
1470pub(crate) struct StreamingObserverState {
1471 mesh: Arc<MeshNode>,
1472 target_node_id: u64,
1473 service: String,
1474 started: Instant,
1475 request_bytes: AtomicU32,
1476 response_bytes: AtomicU32,
1477 observer_status: AtomicU8,
1478 observer_msg: parking_lot::Mutex<Option<String>>,
1479 fired: AtomicBool,
1480}
1481
1482impl StreamingObserverState {
1483 pub(crate) fn new(
1484 mesh: Arc<MeshNode>,
1485 target_node_id: u64,
1486 service: impl Into<String>,
1487 request_bytes: u32,
1488 ) -> Self {
1489 Self {
1490 mesh,
1491 target_node_id,
1492 service: service.into(),
1493 started: Instant::now(),
1494 request_bytes: AtomicU32::new(request_bytes),
1495 response_bytes: AtomicU32::new(0),
1496 observer_status: AtomicU8::new(0),
1497 observer_msg: parking_lot::Mutex::new(None),
1498 fired: AtomicBool::new(false),
1499 }
1500 }
1501
1502 pub(crate) fn add_request_bytes(&self, n: u32) {
1503 self.request_bytes.fetch_add(n, Ordering::Relaxed);
1504 }
1505
1506 pub(crate) fn add_response_bytes(&self, n: u32) {
1507 self.response_bytes.fetch_add(n, Ordering::Relaxed);
1508 }
1509
1510 pub(crate) fn latch_ok(&self) {
1511 self.observer_status.store(1, Ordering::Relaxed);
1512 }
1513
1514 pub(crate) fn latch_error(&self, msg: impl Into<String>) {
1515 *self.observer_msg.lock() = Some(msg.into());
1516 self.observer_status.store(2, Ordering::Relaxed);
1517 }
1518
1519 pub(crate) fn latch_timeout(&self) {
1520 self.observer_status.store(3, Ordering::Relaxed);
1521 }
1522
1523 /// Fire the observer event. Idempotent — only the first call
1524 /// actually emits; subsequent are no-ops. Called from each
1525 /// streaming handle's Drop.
1526 pub(crate) fn fire(&self) {
1527 if self.fired.swap(true, Ordering::SeqCst) {
1528 return;
1529 }
1530 let status_code = self.observer_status.load(Ordering::Relaxed);
1531 let status = match status_code {
1532 1 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
1533 2 => {
1534 let msg = self.observer_msg.lock().clone().unwrap_or_default();
1535 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(msg)
1536 }
1537 3 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
1538 _ => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1539 };
1540 self.mesh.fire_rpc_observer_outbound(
1541 self.target_node_id,
1542 &self.service,
1543 self.started.elapsed().as_millis() as u32,
1544 status,
1545 self.request_bytes.load(Ordering::Relaxed),
1546 self.response_bytes.load(Ordering::Relaxed),
1547 );
1548 }
1549}
1550
1551/// Per-call cap on in-flight request-direction credits. Tokio's
1552/// `Semaphore::MAX_PERMITS` is `usize::MAX >> 3`; we cap the
1553/// caller-side accumulator at this value so a misbehaving server
1554/// can't make the caller hold an unbounded outstanding window.
1555/// 1M credits is already orders of magnitude beyond any sane
1556/// request burst — a caller sitting on 1M unconsumed credits is
1557/// either misconfigured or under attack.
1558const REQUEST_GRANT_PER_CALL_CAP: usize = 1_000_000;
1559
1560/// Add `credits` to a caller-side request-direction credit
1561/// semaphore, capped so the accumulator never exceeds
1562/// [`REQUEST_GRANT_PER_CALL_CAP`]. Per-frame cap of `usize::MAX >> 4`
1563/// remains as a second line of defense against pathological frame
1564/// values.
1565fn add_request_grant_credits(sem: &tokio::sync::Semaphore, credits: u32) {
1566 if credits == 0 {
1567 return;
1568 }
1569 let current = sem.available_permits();
1570 let remaining = REQUEST_GRANT_PER_CALL_CAP.saturating_sub(current);
1571 let safe = (credits as usize).min(usize::MAX >> 4).min(remaining);
1572 if safe > 0 {
1573 sem.add_permits(safe);
1574 }
1575}
1576
1577/// Build a coalescing REQUEST_GRANT emitter.
1578///
1579/// Naive emitters `tokio::spawn` one publish task per consumed
1580/// chunk, which becomes a spawn-storm + AEAD-storm under bursting.
1581/// This helper hands back an emitter that pushes `(caller_origin,
1582/// call_id, credits)` into an unbounded mpsc; a single dedicated
1583/// drainer task `try_recv`s the queue to drain whatever is
1584/// immediately available, coalesces credits per call_id, and
1585/// publishes ONE batched REQUEST_GRANT per call per drain cycle.
1586///
1587/// Lifecycle: the drainer task lives as long as any clone of the
1588/// returned emitter (mpsc sender count > 0). When the fold and all
1589/// in-flight handlers release the emitter, `rx.recv` returns `None`
1590/// and the drainer exits naturally.
1591fn build_request_grant_emitter(
1592 mesh: Arc<MeshNode>,
1593 service: String,
1594 server_origin: u64,
1595 diag_tag: &'static str,
1596) -> RpcRequestGrantEmitter {
1597 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(u64, u64, u32)>();
1598 tokio::spawn(async move {
1599 while let Some(first) = rx.recv().await {
1600 let mut summed: std::collections::HashMap<(u64, u64), u32> =
1601 std::collections::HashMap::new();
1602 let (caller, call_id, credits) = first;
1603 summed.insert((caller, call_id), credits);
1604 // Coalesce anything immediately queued behind the first
1605 // wake. Bounded by what the substrate has produced so
1606 // far; doesn't add latency since `try_recv` returns
1607 // immediately when the queue is empty.
1608 while let Ok((caller, call_id, credits)) = rx.try_recv() {
1609 let entry = summed.entry((caller, call_id)).or_insert(0);
1610 *entry = entry.saturating_add(credits);
1611 }
1612 for ((caller, call_id), credits) in summed {
1613 let reply_channel_name = format!("{service}.replies.{caller:016x}");
1614 let reply_channel = match ChannelName::new(&reply_channel_name) {
1615 Ok(c) => c,
1616 Err(e) => {
1617 tracing::warn!(
1618 error = %e,
1619 channel = %reply_channel_name,
1620 tag = diag_tag,
1621 "rpc grant drainer: invalid reply channel name");
1622 continue;
1623 }
1624 };
1625 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, server_origin, call_id, 0);
1626 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
1627 buf.extend_from_slice(&meta.to_bytes());
1628 buf.extend_from_slice(&encode_request_grant(call_id, credits));
1629 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1630 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1631 tracing::warn!(
1632 error = %e,
1633 caller_origin = format!("{:#x}", caller),
1634 call_id,
1635 tag = diag_tag,
1636 "rpc grant drainer: REQUEST_GRANT publish failed");
1637 }
1638 }
1639 }
1640 });
1641 Arc::new(move |caller_origin, call_id, credits| {
1642 // Send failure means the drainer has exited (all sender
1643 // clones dropped, then we somehow cloned a stale one).
1644 // Treat as a no-op — the call is tearing down anyway.
1645 let _ = tx.send((caller_origin, call_id, credits));
1646 })
1647}
1648
1649/// Per-service map from a caller's `origin_hash` (wire field) to the
1650/// AEAD-verified `from_node` of the session that delivered their
1651/// inbound REQUEST. Populated by the serve_rpc bridge tasks at
1652/// REQUEST-receipt time; consulted by [`publish_response_to_caller`]
1653/// to skip the roster fan-out on the response leg.
1654///
1655/// Lives per `serve_rpc*` registration rather than mesh-wide because
1656/// the source-of-truth `MeshNode::origin_hash_to_node` is only safe
1657/// to populate from *signed* capability announcements — populating
1658/// it from unsigned wire `origin_hash` fields would let any session
1659/// peer pre-claim arbitrary origins. This map is bridge-local and
1660/// only used by the matching service's response emit, so a malicious
1661/// peer can at most misdirect responses for THEIR own request — they
1662/// already could.
1663///
1664/// **Bounded** ([`OriginKeyedLru`]): the key is the wire-claimed
1665/// `origin_hash` and the bridge inserts it *before* the capability gate,
1666/// so an unbounded map would let one authed peer spray distinct origins and
1667/// amplify server memory. The LRU caps the footprint; eviction costs only a
1668/// response-path cache miss (roster fallback), never correctness.
1669type RpcOriginNodeCache = Arc<OriginKeyedLru<u64>>;
1670
1671/// Capacity bound for the per-`serve_rpc` caller-keyed caches
1672/// ([`RpcOriginNodeCache`] and the §8b reply-channel cache). Sized for the
1673/// legitimate active-caller working set of a single service; well past it the
1674/// LRU evicts cold origins rather than growing without limit under a
1675/// crafted-origin flood. Each entry is tiny (a `u64` and, for the reply
1676/// cache, an `Arc<str>` channel name), so the whole bound is a few hundred KB
1677/// per service.
1678const RPC_CALLER_CACHE_CAP: usize = 4096;
1679
1680/// Non-zero form of [`RPC_CALLER_CACHE_CAP`], validated at compile time so
1681/// `OriginKeyedLru::new` carries no runtime `unwrap`/`expect`. A zero cap
1682/// would fail the build here rather than panic at startup.
1683const RPC_CALLER_CACHE_CAP_NZ: std::num::NonZeroUsize =
1684 match std::num::NonZeroUsize::new(RPC_CALLER_CACHE_CAP) {
1685 Some(n) => n,
1686 None => panic!("RPC_CALLER_CACHE_CAP must be non-zero"),
1687 };
1688
1689/// Thread-safe, bounded LRU keyed by the wire-claimed caller `origin_hash`.
1690///
1691/// Backs both [`RpcOriginNodeCache`] and the §8b reply-channel cache. Wraps
1692/// `lru::LruCache` (which needs `&mut` even to read, to bump the entry to
1693/// most-recently-used) in a `parking_lot::Mutex`. The per-response lock is
1694/// uncontended in the common case — one fold drives a given service — and is
1695/// far cheaper than the `format!` + `ChannelName` allocation / roster fan-out
1696/// the caches exist to avoid. Eviction is always safe: a miss just recomputes
1697/// the value (channel name) or falls back to the roster lookup.
1698struct OriginKeyedLru<V>(Mutex<lru::LruCache<u64, V>>);
1699
1700impl<V: Clone> OriginKeyedLru<V> {
1701 fn new() -> Self {
1702 Self(Mutex::new(lru::LruCache::new(RPC_CALLER_CACHE_CAP_NZ)))
1703 }
1704
1705 /// Look up `origin`, promoting it to most-recently-used on a hit.
1706 fn get(&self, origin: u64) -> Option<V> {
1707 self.0.lock().get(&origin).cloned()
1708 }
1709
1710 /// Insert / refresh `origin`, evicting the least-recently-used entry
1711 /// when at capacity.
1712 fn insert(&self, origin: u64, value: V) {
1713 self.0.lock().put(origin, value);
1714 }
1715}
1716
1717/// Direct-send a built RESPONSE (or streaming chunk) packet to the
1718/// caller's reply channel, bypassing the roster fan-out path
1719/// [`MeshNode::publish`] uses.
1720///
1721/// **Fast path:** when the bridge has cached the caller's
1722/// `from_node` (i.e. the server processed an inbound REQUEST from
1723/// this caller via an AEAD-authenticated session), or when the
1724/// caller's capability announcement has reached us, the response
1725/// rides `publish_to_peer` — one DashMap lookup instead of roster
1726/// lookup + ACL check + subnet filter + per-recipient `Vec<Bytes>`
1727/// allocation.
1728///
1729/// **Fallback:** when neither lookup resolves — pathological cases
1730/// like a test harness where the caller never announces and the
1731/// bridge cache is empty — fall back to [`MeshNode::publish`] via
1732/// the roster, matching the pre-T1.2 behavior verbatim.
1733/// PERF_AUDIT §3.10 — accepts pre-computed
1734/// `reply_channel_hash` and `reply_stream_id` so the per-response
1735/// path doesn't re-run `ChannelId::new` + xxh3 + `publish_stream_id`
1736/// on every send. The emit closure's `OriginKeyedLru<CachedReplyChannel>`
1737/// caches the triple per caller_origin.
1738async fn publish_response_to_caller(
1739 mesh: &MeshNode,
1740 caller_origin: u64,
1741 target_hint: Option<u64>,
1742 reply_channel: &ChannelName,
1743 reply_channel_hash: ChannelHash,
1744 reply_stream_id: u64,
1745 payload: Bytes,
1746) -> Result<(), AdapterError> {
1747 let resolved = target_hint.or_else(|| mesh.get_node_by_origin_hash(caller_origin));
1748 if let Some(target_node_id) = resolved {
1749 return mesh
1750 .publish_to_peer(
1751 target_node_id,
1752 reply_channel_hash,
1753 reply_stream_id,
1754 /* reliable */ true,
1755 std::slice::from_ref(&payload),
1756 )
1757 .await;
1758 }
1759 // Fallback: roster fan-out. Reached when the caller's origin is
1760 // unknown to both the bridge cache AND the global reverse index.
1761 let publisher = ChannelPublisher::new(reply_channel.clone(), PublishConfig::default());
1762 mesh.publish(&publisher, payload).await.map(|_| ())
1763}
1764
1765/// Shared CANCEL-publish helper: spawn a task that fires a
1766/// CANCEL event for `call_id` to `target` on the request channel.
1767/// Both [`RpcStream::Drop`] and [`UnaryCallGuard::Drop`] use it.
1768fn spawn_cancel_publish(
1769 mesh: Arc<MeshNode>,
1770 target: u64,
1771 request_channel: ChannelName,
1772 self_origin: u64,
1773 call_id: u64,
1774) {
1775 tokio::spawn(async move {
1776 let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, self_origin, call_id, 0);
1777 let request_channel_id = ChannelId::new(request_channel);
1778 let request_channel_hash = request_channel_id.hash();
1779 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1780 let payload = Bytes::from(meta.to_bytes().to_vec());
1781 let _ = mesh
1782 .publish_to_peer(
1783 target,
1784 request_channel_hash,
1785 stream_id,
1786 /* reliable */ true,
1787 std::slice::from_ref(&payload),
1788 )
1789 .await;
1790 });
1791}
1792
1793/// Type alias for the keep-alive sender that streaming-call handles
1794/// store. Its purpose is *only* to signal "stream done" when the
1795/// handle drops: the cancel-watcher task `select!`s on the matching
1796/// receiver, and dropping the sender (which happens on handle Drop)
1797/// resolves the receiver with an `Err` so the watcher exits cleanly.
1798///
1799/// `()` payload because the signal IS the resolution; no data is
1800/// transmitted.
1801type StreamCancelKeepAlive = tokio::sync::oneshot::Sender<()>;
1802
1803/// Spawn a cancel-watcher task for a streaming call (call_streaming,
1804/// call_client_stream, call_duplex). The watcher races
1805/// `cancel_notify.notified()` against the keep-alive oneshot — first
1806/// to fire wins. On cancel, the watcher drops the pending-streaming
1807/// entry (which closes the receiver's mpsc, letting the stream's
1808/// poll_next observe EOF), then releases the registry entry. On
1809/// handle Drop, the keep-alive sender drops, the oneshot resolves
1810/// `Err`, and the watcher exits via the done arm with a registry
1811/// release.
1812///
1813/// When `cancel_token == 0` (the "no token" sentinel), this is a
1814/// no-op: the returned sender is a placeholder whose drop has no
1815/// observable effect, and no task is spawned. Lets the streaming
1816/// call shapes always store a keep-alive on the returned handle
1817/// without branching on whether a token was set.
1818fn spawn_stream_cancel_watcher(
1819 cancel_notify: Arc<tokio::sync::Notify>,
1820 cancel_token: u64,
1821 cancel_registry: Arc<crate::adapter::net::cancel_registry::CancelRegistry>,
1822 pending: Arc<crate::adapter::net::cortex::RpcClientPending>,
1823 call_id: u64,
1824) -> StreamCancelKeepAlive {
1825 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1826 if cancel_token == 0 {
1827 // No-op fast path. The returned sender is held by the
1828 // handle but never paired with a watcher, so its eventual
1829 // drop has no effect. Avoids spawning a task per
1830 // cancel-less stream.
1831 return done_tx;
1832 }
1833 tokio::spawn(async move {
1834 tokio::select! {
1835 biased;
1836 _ = cancel_notify.notified() => {
1837 // Cancel fired. Drop the pending-stream entry so
1838 // the receiver's mpsc closes (causing the stream's
1839 // poll_next to observe EOF via Ready(None)). The
1840 // handle's Drop will then fire CANCEL on the wire
1841 // via its existing per-shape Drop impl.
1842 pending.cancel(call_id);
1843 cancel_registry.release(cancel_token);
1844 }
1845 _ = done_rx => {
1846 // Stream completed normally — sender dropped on
1847 // handle Drop, recv returns Err. Just release the
1848 // registry entry; no CANCEL emission needed (the
1849 // handle's Drop handles that path itself if it
1850 // wasn't a clean close).
1851 cancel_registry.release(cancel_token);
1852 }
1853 }
1854 });
1855 done_tx
1856}
1857
1858/// One-call helper that registers a cancel-notify against the
1859/// caller's `opts.cancel_token` and spawns the stream cancel
1860/// watcher. Used by every streaming call shape (`call_streaming`,
1861/// `call_client_stream`, `call_duplex`) to keep their bodies free
1862/// of the three-step token/notify/spawn boilerplate.
1863///
1864/// When `opts.cancel_token` is `None` (or `Some(0)`), this is the
1865/// same no-op fast path as [`spawn_stream_cancel_watcher`].
1866fn arm_stream_cancel(
1867 mesh: &Arc<MeshNode>,
1868 opts: &CallOptions,
1869 pending: &Arc<crate::adapter::net::cortex::RpcClientPending>,
1870 call_id: u64,
1871) -> StreamCancelKeepAlive {
1872 let cancel_token = opts.cancel_token.unwrap_or(0);
1873 let cancel_notify = mesh.cancel_registry().register_notify(cancel_token);
1874 spawn_stream_cancel_watcher(
1875 cancel_notify,
1876 cancel_token,
1877 Arc::clone(mesh.cancel_registry()),
1878 Arc::clone(pending),
1879 call_id,
1880 )
1881}
1882
1883/// Side-effects + return value for the unary `call`'s cancel
1884/// branch. Releases the registry entry, records the Transport
1885/// outcome on the metrics guard, fires the Canceled observer
1886/// event, and returns `RpcError::Cancelled`. Both the
1887/// no-deadline and with-deadline `select!` arms invoke this so a
1888/// shape change to the cancel outcome (extra metric, new field
1889/// on the observer event) lands in exactly one place.
1890fn fire_unary_cancel_outcome(
1891 mesh: &Arc<MeshNode>,
1892 metrics_guard: &mut crate::adapter::net::mesh_rpc_metrics::CallMetricsGuard,
1893 cancel_token: u64,
1894 target_node_id: u64,
1895 service: &str,
1896 started_total: Instant,
1897 request_bytes_len: u32,
1898) -> RpcError {
1899 mesh.cancel_registry().release(cancel_token);
1900 metrics_guard.record(crate::adapter::net::mesh_rpc_metrics::CallOutcome::Transport);
1901 mesh.fire_rpc_observer_outbound(
1902 target_node_id,
1903 service,
1904 started_total.elapsed().as_millis() as u32,
1905 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1906 request_bytes_len,
1907 0,
1908 );
1909 RpcError::Cancelled
1910}
1911
1912// ============================================================================
1913// MeshNode extensions.
1914// ============================================================================
1915
1916impl MeshNode {
1917 /// Register an nRPC handler for `service` on this node.
1918 ///
1919 /// Subscribes this node to `<service>.requests` (so the local
1920 /// `register_rpc_inbound` dispatcher feeds inbound REQUEST
1921 /// events into the [`RpcServerFold`]) and wires the fold's
1922 /// RESPONSE-emit callback to publish on
1923 /// `<service>.replies.<caller_origin>` via the existing
1924 /// pub/sub path.
1925 ///
1926 /// **Local-only registration** (Phase 1). Multi-instance
1927 /// services that load-balance via `SubscriptionMode::QueueGroup`
1928 /// require each replica to call `serve_rpc` on its own node;
1929 /// the mesh-level subscriber roster + `dispatch_recipients`
1930 /// then routes one-of-N as designed. Each replica's local
1931 /// `serve_rpc` must use the same service name (which becomes
1932 /// the queue-group identifier).
1933 ///
1934 /// Returns a [`ServeHandle`] whose Drop tears down the
1935 /// registration. Concurrent registrations for the same service
1936 /// on one node return `Err(ServeError::AlreadyServing)`.
1937 pub fn serve_rpc<H: RpcHandler>(
1938 self: &Arc<Self>,
1939 service: &str,
1940 handler: Arc<H>,
1941 ) -> Result<ServeHandle, ServeError> {
1942 let request_channel = ChannelName::new(&format!("{service}.requests"))
1943 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1944 let channel_hash = request_channel.hash();
1945
1946 // Bridge: a tokio mpsc the inbound dispatcher pushes into.
1947 // The bridge task drains it and runs each event through
1948 // the fold. Bounded so a runaway publisher can't OOM the
1949 // server; over-cap pushes drop the inbound event (which
1950 // surfaces to the caller as a timeout).
1951 let (tx, mut rx) = mpsc::channel::<RpcInboundEvent>(1024);
1952
1953 // T1.2 cache: maps each caller's wire `origin_hash` to the
1954 // AEAD-verified `from_node` of the session that delivered
1955 // its REQUEST. Populated by the bridge below; consumed by
1956 // the emit closure so [`publish_response_to_caller`] can
1957 // skip the roster fan-out on the response leg.
1958 let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
1959
1960 // Build the emit closure. When the handler completes, the
1961 // fold calls this (synchronously) with `(caller_origin, call_id,
1962 // response)`. §8a: instead of `tokio::spawn`ing a task per response,
1963 // the closure builds the wire payload (cheap, no await) and hands a
1964 // job to a single per-service response drainer task (below), which
1965 // does the `.await` publish. A `tokio::spawn` per response cost
1966 // ~1–2 µs of scheduling on a wake-bound path; a channel send is a
1967 // fraction of that, and the drainer amortizes the wakeup.
1968 let service_for_emit = service.to_string();
1969 let server_origin = self.identity_origin_hash();
1970 let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
1971 // §8b reply-channel cache: the reply channel name is
1972 // `<service>.replies.<caller_origin:016x>` — deterministic from
1973 // `(service, caller_origin)`, and `service` is fixed for this
1974 // `serve_rpc`, so it varies only by `caller_origin`. `ChannelName` is
1975 // `Arc<str>`, so a cache hit is an Arc bump; this removes the per-
1976 // response `format!` String + `ChannelName::new` (`Arc<str>`) allocation
1977 // (and the per-call `service.clone()`) the emit closure used to pay on
1978 // every response. Keyed by the wire-claimed `caller_origin` and so
1979 // bounded the same way as `origin_node_cache` above — an
1980 // `OriginKeyedLru`, not an unbounded map, so a crafted-origin flood
1981 // can't amplify server memory (a miss just rebuilds the name).
1982 // PERF_AUDIT §3.10 — cache the triple (name, hash, stream_id)
1983 // per caller_origin so the per-response drainer doesn't
1984 // recompute xxh3 + publish_stream_id on every send.
1985 let reply_channel_cache: Arc<OriginKeyedLru<CachedReplyChannel>> =
1986 Arc::new(OriginKeyedLru::new());
1987 // §8a response drainer channel. Bounded like the inbound channel; a
1988 // full channel means the drainer can't keep up, so we drop (the
1989 // caller times out) rather than block the fold.
1990 let (resp_tx, mut resp_rx) = mpsc::channel::<RpcResponseJob>(1024);
1991 let emit: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1992 let target_hint = origin_node_cache_for_emit.get(caller_origin);
1993 // Resolve the reply channel from cache (Arc bump on hit; one
1994 // `format!` + `ChannelName::new` the first time we see a caller).
1995 let cached = match reply_channel_cache.get(caller_origin) {
1996 Some(c) => c,
1997 None => {
1998 let name = format!("{service_for_emit}.replies.{caller_origin:016x}");
1999 match ChannelName::new(&name) {
2000 Ok(channel_name) => {
2001 // Compute hash + stream_id ONCE per caller_origin
2002 // and stash them alongside the name.
2003 let channel_id = ChannelId::new(channel_name.clone());
2004 let triple = CachedReplyChannel {
2005 hash: channel_id.hash(),
2006 stream_id: MeshNode::publish_stream_id(&channel_id),
2007 name: channel_name,
2008 };
2009 reply_channel_cache.insert(caller_origin, triple.clone());
2010 triple
2011 }
2012 Err(e) => {
2013 tracing::warn!(error = %e, channel = %name,
2014 "rpc serve_rpc: invalid reply channel name");
2015 return;
2016 }
2017 }
2018 }
2019 };
2020 // Build the RESPONSE event envelope (24-byte meta + encoded
2021 // payload) synchronously — pure CPU, no await — then hand it to
2022 // the drainer.
2023 let meta = EventMeta::new(
2024 super::cortex::DISPATCH_RPC_RESPONSE,
2025 0,
2026 server_origin,
2027 call_id,
2028 0,
2029 );
2030 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2031 buf.extend_from_slice(&meta.to_bytes());
2032 resp.encode_into(&mut buf);
2033 if resp_tx
2034 .try_send(RpcResponseJob {
2035 caller_origin,
2036 call_id,
2037 target_hint,
2038 reply_channel: cached.name,
2039 reply_channel_hash: cached.hash,
2040 reply_stream_id: cached.stream_id,
2041 payload: Bytes::from(buf),
2042 })
2043 .is_err()
2044 {
2045 tracing::debug!(
2046 caller_origin = format!("{:#x}", caller_origin),
2047 call_id,
2048 "rpc serve_rpc: response drainer at capacity; dropping response"
2049 );
2050 }
2051 });
2052
2053 // Build the server fold and wrap it in an Arc<Mutex<...>>
2054 // so the bridge task can drive it (the trait takes
2055 // `&mut self`). Attach the per-service metrics handle so
2056 // the spawned handler tasks bump server-side counters.
2057 let metrics_handle = self.rpc_metrics_arc().for_service(service);
2058 // Keep a clone of the emit closure for the callee-side
2059 // capability-auth defense-in-depth path in the bridge
2060 // below — the fold owns its own clone, this one only
2061 // emits the `CapabilityDenied` rejection before the fold
2062 // sees the event.
2063 let emit_for_bridge = Arc::clone(&emit);
2064 // Clone the per-service metrics handle so the bridge can
2065 // bump `capability_denied_total` on gate rejection. The
2066 // fold's own clone (passed via `with_metrics`) handles the
2067 // handler-side counters; this one covers the path BEFORE
2068 // the handler runs, which the fold-side metrics never see.
2069 let metrics_for_bridge = Arc::clone(&metrics_handle);
2070 let fold = Arc::new(Mutex::new(
2071 RpcServerFold::new(handler as Arc<dyn RpcHandler>, emit).with_metrics(metrics_handle),
2072 ));
2073
2074 // Register the inbound dispatcher. Push into the mpsc;
2075 // the bridge task does the actual fold work.
2076 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2077 // Best-effort send — over-cap means the bridge can't
2078 // keep up; drop and let the caller time out. Logging
2079 // here would spam.
2080 let _ = tx.try_send(ev);
2081 });
2082 // Register the service in `rpc_local_services` and refresh
2083 // the self-indexed announcement BEFORE installing the
2084 // dispatcher so the callee-side gate (in the bridge below)
2085 // sees a self-announcement carrying `nrpc:<service>` the
2086 // moment the first inbound event lands. Without this, the
2087 // gate was either silently permissive (no self-ann) or
2088 // silently denying (self-ann from a prior
2089 // `announce_capabilities` that pre-dated this service's
2090 // registration). See `docs/misc/CODE_REVIEW_2026_05_19_CAPABILITY_AUTH.md`
2091 // H1 + H2.
2092 self.rpc_local_services_arc().insert(service.to_string());
2093 self.index_self_with_local_services();
2094
2095 if self
2096 .register_rpc_inbound(channel_hash, dispatcher)
2097 .is_some()
2098 {
2099 return Err(ServeError::AlreadyServing(service.to_string()));
2100 }
2101
2102 // Spawn the bridge task. It reads inbound events, runs
2103 // the v0.4 capability-auth callee-side gate (defense in
2104 // depth — the caller-side gate inside `call_service`
2105 // covers the well-behaved client path), and on accept
2106 // feeds them to the fold.
2107 let mesh_for_bridge = Arc::clone(self);
2108 let service_for_bridge = service.to_string();
2109 let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2110 let bridge = tokio::spawn(async move {
2111 let tag = format!("nrpc:{}", service_for_bridge);
2112 use crate::adapter::net::behavior::fold::capability_bridge;
2113 while let Some(inbound) = rx.recv().await {
2114 // T1.2 cache populate. `from_node` is the
2115 // AEAD-verified session peer; `origin_hash` is the
2116 // wire-claimed entity (untrusted on its own but
2117 // bound here to a session peer we just authed).
2118 // `from_node == 0` is the loopback/test sentinel —
2119 // skip the insert so the response path falls back
2120 // to the roster lookup instead of trying to send to
2121 // node 0.
2122 if inbound.from_node != 0 {
2123 origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2124 }
2125 // Defense-in-depth check. Skip only when the wire
2126 // session resolved no NodeId (`from_node == 0` is
2127 // the loopback / test sentinel per
2128 // `RpcInboundEvent::from_node` — production wire
2129 // delivery drops events that fail NodeId
2130 // resolution rather than passing 0). The cold-
2131 // start "no self-ann" skip the original
2132 // implementation carried was a permissive hole;
2133 // `index_self_with_local_services` above
2134 // guarantees a self-ann exists before the
2135 // dispatcher is wired, so denying when the gate
2136 // says no is now the safe failure mode.
2137 let self_node = mesh_for_bridge.node_id();
2138 let from_node = inbound.from_node;
2139 if from_node != 0
2140 && !capability_bridge::may_execute(
2141 mesh_for_bridge.capability_fold(),
2142 self_node,
2143 &tag,
2144 from_node,
2145 )
2146 {
2147 // Decode the EventMeta so we can address the
2148 // caller's reply channel (keyed on
2149 // `caller_origin`) and tag the response with
2150 // the correct `call_id`. A garbled meta means
2151 // the request would have been rejected by the
2152 // fold's own decode path too; drop silently
2153 // to match the existing skip-on-malformed
2154 // behavior there.
2155 let Some(meta) = (if inbound.payload.len() >= EVENT_META_SIZE {
2156 EventMeta::from_bytes(&inbound.payload[..EVENT_META_SIZE])
2157 } else {
2158 None
2159 }) else {
2160 continue;
2161 };
2162 let resp = super::cortex::RpcResponsePayload {
2163 status: RpcStatus::CapabilityDenied,
2164 headers: vec![],
2165 body: Bytes::from(format!(
2166 "callee-side capability-auth gate denied nrpc:{}",
2167 service_for_bridge
2168 )),
2169 };
2170 // Server-side metrics: bump `capability_denied_total`
2171 // on the per-service counter. The fold-side
2172 // metrics never see this path (the handler isn't
2173 // invoked), so without this bump a noisy
2174 // unauthorized caller is invisible to operators
2175 // watching `nrpc_handler_invocations_total` —
2176 // the dashboard sees "0 requests" while the
2177 // caller sees `CapabilityDenied`.
2178 metrics_for_bridge
2179 .capability_denied_total
2180 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2181 (emit_for_bridge)(meta.origin_hash, meta.seq_or_ts, resp);
2182 continue;
2183 }
2184 let payload = inbound.payload;
2185 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2186 let ev = RedexEvent { entry, payload };
2187 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2188 tracing::warn!(error = %e, "rpc serve_rpc: fold apply error");
2189 }
2190 }
2191 });
2192
2193 // §8a response drainer. Drains `resp_rx` and does the `.await`
2194 // publish that the emit closure used to `tokio::spawn` per response.
2195 // Exits on its own when `resp_tx` (held only by the emit closure,
2196 // which the fold owns) is dropped — i.e. when the bridge task ends
2197 // and the fold drops, the same teardown that stops `_bridge`.
2198 let response_drain_mesh = Arc::clone(self);
2199 let response_drain = tokio::spawn(async move {
2200 while let Some(job) = resp_rx.recv().await {
2201 if let Err(e) = publish_response_to_caller(
2202 &response_drain_mesh,
2203 job.caller_origin,
2204 job.target_hint,
2205 &job.reply_channel,
2206 job.reply_channel_hash,
2207 job.reply_stream_id,
2208 job.payload,
2209 )
2210 .await
2211 {
2212 tracing::warn!(
2213 error = %e,
2214 caller_origin = format!("{:#x}", job.caller_origin),
2215 call_id = job.call_id,
2216 "rpc serve_rpc: response publish failed"
2217 );
2218 }
2219 }
2220 });
2221
2222 // Spawn an async re-announce so peers also learn about
2223 // the new service without the operator having to call
2224 // `announce_capabilities` manually. The local self-index
2225 // already happened above; this is purely for peer
2226 // visibility (the broadcast path also re-runs the
2227 // self-index, which is a cheap version bump).
2228 let mesh_for_announce = Arc::clone(self);
2229 let service_for_log = service.to_string();
2230 tokio::spawn(async move {
2231 let baseline = mesh_for_announce.user_caps_snapshot();
2232 if let Err(e) = mesh_for_announce.announce_capabilities(baseline).await {
2233 tracing::warn!(
2234 error = %e,
2235 service = %service_for_log,
2236 "serve_rpc: auto re-announce failed",
2237 );
2238 }
2239 });
2240
2241 Ok(ServeHandle {
2242 channel_hash,
2243 service: service.to_string(),
2244 _bridge: bridge,
2245 _response_drain: Some(response_drain),
2246 mesh: Arc::clone(self),
2247 })
2248 }
2249
2250 /// Streaming variant of [`Self::serve_rpc`]. The handler
2251 /// receives an [`RpcResponseSink`](super::cortex::RpcResponseSink)
2252 /// it writes chunks to via `sink.send(body)`; returning
2253 /// `Ok(())` closes the stream cleanly, `Err(_)` closes with
2254 /// an error frame.
2255 ///
2256 /// Wire-level identical to the unary path apart from the
2257 /// per-chunk `nrpc-streaming` header markers
2258 /// (`continue` / `end`). Same auto-registration of
2259 /// `<service>.requests` + `<service>.replies.` prefix.
2260 pub fn serve_rpc_streaming<H: RpcStreamingHandler>(
2261 self: &Arc<Self>,
2262 service: &str,
2263 handler: Arc<H>,
2264 ) -> Result<ServeHandle, ServeError> {
2265 let request_channel = ChannelName::new(&format!("{service}.requests"))
2266 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2267 let channel_hash = request_channel.hash();
2268 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2269
2270 // T1.2 cache: bridge populates from inbound.from_node, emit
2271 // closure consults to skip roster fan-out. See the unary
2272 // serve_rpc above for the full rationale.
2273 let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2274
2275 let mesh_for_emit = Arc::clone(self);
2276 let service_for_emit = service.to_string();
2277 let server_origin = self.identity_origin_hash();
2278 let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2279 // Async emit so the streaming fold's pump can `.await` each
2280 // publish — guarantees per-call chunk ordering on the wire.
2281 let emit: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2282 let mesh = Arc::clone(&mesh_for_emit);
2283 let service = service_for_emit.clone();
2284 let target_hint = origin_node_cache_for_emit.get(caller_origin);
2285 Box::pin(async move {
2286 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2287 let reply_channel = match ChannelName::new(&reply_channel_name) {
2288 Ok(c) => c,
2289 Err(e) => {
2290 tracing::warn!(error = %e, channel = %reply_channel_name,
2291 "rpc serve_rpc_streaming: invalid reply channel name");
2292 return;
2293 }
2294 };
2295 let meta = EventMeta::new(
2296 super::cortex::DISPATCH_RPC_RESPONSE,
2297 0,
2298 server_origin,
2299 call_id,
2300 0,
2301 );
2302 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2303 buf.extend_from_slice(&meta.to_bytes());
2304 resp.encode_into(&mut buf);
2305 // PERF_AUDIT §3.10: compute hash + stream_id at the
2306 // call site. These legacy streaming paths don't yet
2307 // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2308 // wiring them up is a follow-up — for now the
2309 // compute happens here per response, same as the
2310 // pre-fix in-function shape.
2311 let reply_channel_id = ChannelId::new(reply_channel.clone());
2312 let reply_channel_hash = reply_channel_id.hash();
2313 let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2314 if let Err(e) = publish_response_to_caller(
2315 &mesh,
2316 caller_origin,
2317 target_hint,
2318 &reply_channel,
2319 reply_channel_hash,
2320 reply_stream_id,
2321 Bytes::from(buf),
2322 )
2323 .await
2324 {
2325 tracing::warn!(error = %e,
2326 caller_origin = format!("{:#x}", caller_origin),
2327 call_id,
2328 "rpc serve_rpc_streaming: chunk publish failed");
2329 }
2330 })
2331 });
2332
2333 // Attach per-service metrics so the spawned handler tasks
2334 // + pump task bump server-side counters (including the
2335 // streaming-only `streaming_chunks_emitted_total`).
2336 let metrics_handle = self.rpc_metrics_arc().for_service(service);
2337 let fold = Arc::new(Mutex::new(
2338 RpcServerStreamingFold::new(handler as Arc<dyn RpcStreamingHandler>, emit)
2339 .with_metrics(metrics_handle),
2340 ));
2341 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2342 let _ = tx.try_send(ev);
2343 });
2344 if self
2345 .register_rpc_inbound(channel_hash, dispatcher)
2346 .is_some()
2347 {
2348 return Err(ServeError::AlreadyServing(service.to_string()));
2349 }
2350 let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2351 let bridge = tokio::spawn(async move {
2352 while let Some(inbound) = rx.recv().await {
2353 if inbound.from_node != 0 {
2354 origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2355 }
2356 let payload = inbound.payload;
2357 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2358 let ev = RedexEvent { entry, payload };
2359 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2360 tracing::warn!(error = %e, "rpc serve_rpc_streaming: fold apply error");
2361 }
2362 }
2363 });
2364 self.rpc_local_services_arc().insert(service.to_string());
2365 Ok(ServeHandle {
2366 channel_hash,
2367 service: service.to_string(),
2368 _bridge: bridge,
2369 // Streaming/duplex variants still spawn per emit (§8a covers the
2370 // unary hot path); no drainer.
2371 _response_drain: None,
2372 mesh: Arc::clone(self),
2373 })
2374 }
2375
2376 /// Register a client-streaming nRPC handler for `service`.
2377 /// Mirror of [`Self::serve_rpc_streaming`] but using the
2378 /// request-side fold ([`RpcStreamingRequestFold`]) — the
2379 /// handler receives one stream of REQUEST_CHUNK bodies and
2380 /// emits one terminal RESPONSE.
2381 ///
2382 /// Wires two emit callbacks:
2383 /// - A sync [`RpcResponseEmitter`] for the terminal RESPONSE
2384 /// (single emit per call, no ordering concern).
2385 /// - An [`RpcRequestGrantEmitter`] for upload-direction
2386 /// credit grants, which publishes [`DISPATCH_RPC_REQUEST_GRANT`]
2387 /// events on the caller's reply channel.
2388 ///
2389 /// Bidi streaming plan (Phase C).
2390 pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>(
2391 self: &Arc<Self>,
2392 service: &str,
2393 handler: Arc<H>,
2394 ) -> Result<ServeHandle, ServeError> {
2395 let request_channel = ChannelName::new(&format!("{service}.requests"))
2396 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2397 let channel_hash = request_channel.hash();
2398 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2399
2400 // T1.2 cache — see serve_rpc above for full rationale.
2401 let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2402
2403 let mesh_for_emit = Arc::clone(self);
2404 let service_for_emit = service.to_string();
2405 let server_origin = self.identity_origin_hash();
2406
2407 // Terminal RESPONSE emitter — sync because there's only
2408 // one RESPONSE per call (no per-call ordering concern that
2409 // would require an async-await between chunks).
2410 let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2411 let emit_resp_service = service_for_emit.clone();
2412 let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2413 let emit_resp: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2414 let mesh = Arc::clone(&emit_resp_mesh);
2415 let service = emit_resp_service.clone();
2416 let target_hint = origin_node_cache_for_emit.get(caller_origin);
2417 tokio::spawn(async move {
2418 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2419 let reply_channel = match ChannelName::new(&reply_channel_name) {
2420 Ok(c) => c,
2421 Err(e) => {
2422 tracing::warn!(error = %e, channel = %reply_channel_name,
2423 "rpc serve_rpc_client_stream: invalid reply channel name");
2424 return;
2425 }
2426 };
2427 let meta = EventMeta::new(
2428 super::cortex::DISPATCH_RPC_RESPONSE,
2429 0,
2430 server_origin,
2431 call_id,
2432 0,
2433 );
2434 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2435 buf.extend_from_slice(&meta.to_bytes());
2436 resp.encode_into(&mut buf);
2437 // PERF_AUDIT §3.10: compute hash + stream_id at the
2438 // call site. These legacy streaming paths don't yet
2439 // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2440 // wiring them up is a follow-up — for now the
2441 // compute happens here per response, same as the
2442 // pre-fix in-function shape.
2443 let reply_channel_id = ChannelId::new(reply_channel.clone());
2444 let reply_channel_hash = reply_channel_id.hash();
2445 let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2446 if let Err(e) = publish_response_to_caller(
2447 &mesh,
2448 caller_origin,
2449 target_hint,
2450 &reply_channel,
2451 reply_channel_hash,
2452 reply_stream_id,
2453 Bytes::from(buf),
2454 )
2455 .await
2456 {
2457 tracing::warn!(error = %e,
2458 caller_origin = format!("{:#x}", caller_origin),
2459 call_id,
2460 "rpc serve_rpc_client_stream: terminal RESPONSE publish failed");
2461 }
2462 });
2463 });
2464
2465 // REQUEST_GRANT emitter — coalesces per-chunk credits into
2466 // a single drainer task that batches by call_id. Avoids the
2467 // tokio::spawn-per-emit storm under bursting.
2468 let emit_grant = build_request_grant_emitter(
2469 Arc::clone(&mesh_for_emit),
2470 service_for_emit.clone(),
2471 server_origin,
2472 "serve_rpc_client_stream",
2473 );
2474
2475 let metrics_handle = self.rpc_metrics_arc().for_service(service);
2476 let fold = Arc::new(Mutex::new(
2477 RpcStreamingRequestFold::new(handler as Arc<dyn RpcClientStreamingHandler>, emit_resp)
2478 .with_grant_emitter(emit_grant)
2479 .with_metrics(metrics_handle),
2480 ));
2481 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2482 let _ = tx.try_send(ev);
2483 });
2484 if self
2485 .register_rpc_inbound(channel_hash, dispatcher)
2486 .is_some()
2487 {
2488 return Err(ServeError::AlreadyServing(service.to_string()));
2489 }
2490 let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2491 let bridge = tokio::spawn(async move {
2492 while let Some(inbound) = rx.recv().await {
2493 if inbound.from_node != 0 {
2494 origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2495 }
2496 let payload = inbound.payload;
2497 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2498 let ev = RedexEvent { entry, payload };
2499 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2500 tracing::warn!(error = %e,
2501 "rpc serve_rpc_client_stream: fold apply error");
2502 }
2503 }
2504 });
2505 self.rpc_local_services_arc().insert(service.to_string());
2506 Ok(ServeHandle {
2507 channel_hash,
2508 service: service.to_string(),
2509 _bridge: bridge,
2510 // Streaming/duplex variants still spawn per emit (§8a covers the
2511 // unary hot path); no drainer.
2512 _response_drain: None,
2513 mesh: Arc::clone(self),
2514 })
2515 }
2516
2517 /// Client-streaming variant of [`Self::call`]. Returns a
2518 /// [`ClientStreamCallRaw`] handle the caller pushes N items
2519 /// into via `send`, then `finish` to await the terminal
2520 /// RESPONSE.
2521 ///
2522 /// **Lazy initial REQUEST.** This method does NOT publish a
2523 /// REQUEST to the wire. It only ensures the caller's reply
2524 /// subscription is set up and registers the pending entry; the
2525 /// initial REQUEST is emitted by the first `send` (or by
2526 /// `finish` for the zero-item degenerate path).
2527 ///
2528 /// Sets `FLAG_RPC_CLIENT_STREAMING_REQUEST` on the initial
2529 /// REQUEST so the server's request-streaming fold knows to
2530 /// open a request-side stream. Optional `request_window_initial`
2531 /// header opts into upload-direction flow control.
2532 ///
2533 /// Bidi streaming plan (Phase C).
2534 pub async fn call_client_stream(
2535 self: &Arc<Self>,
2536 target_node_id: u64,
2537 service: &str,
2538 opts: CallOptions,
2539 ) -> Result<ClientStreamCallRaw, RpcError> {
2540 // `request_window_initial = Some(0)` would deadlock the
2541 // caller: every `send` awaits a credit, but the initial
2542 // REQUEST is lazy (not emitted until the first send), so
2543 // the server never sees the call and never publishes a
2544 // GRANT. Reject up front — `None` means "unbounded credit",
2545 // any positive value opts into flow control.
2546 if matches!(opts.request_window_initial, Some(0)) {
2547 return Err(RpcError::Codec {
2548 direction: CodecDirection::Encode,
2549 message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2550 .to_string(),
2551 });
2552 }
2553 // T1.3: per-service route cache (see PERF_AUDIT
2554 // 2026-05-19). One DashMap::get + Arc::clone instead of
2555 // 2 format! + 2 ChannelName::new + xxhash per call.
2556 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2557 let self_origin = self.identity_origin_hash();
2558 self.ensure_reply_subscription(
2559 target_node_id,
2560 service,
2561 route.reply_channel.clone(),
2562 route.reply_hash,
2563 )
2564 .await?;
2565
2566 let call_id = mint_random_call_id();
2567 let pending = self.rpc_client_pending();
2568 let (terminal_rx, mut grant_rx) =
2569 pending.register_client_streaming(call_id, target_node_id);
2570
2571 // Build the header set + flags we'll queue for the initial
2572 // REQUEST (deferred to the first send / finish).
2573 let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST;
2574 let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2575 if let Some(tc) = opts.trace_context.as_ref() {
2576 initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2577 initial_headers.extend(build_trace_headers(tc));
2578 }
2579 if let Some(window) = opts.request_window_initial {
2580 initial_headers.push((
2581 HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2582 window.to_string().into_bytes(),
2583 ));
2584 }
2585 initial_headers.extend(opts.request_headers.iter().cloned());
2586
2587 // Per-call credit semaphore when flow control is opted in.
2588 // Initial permits = the caller's declared window. Refilled
2589 // by REQUEST_GRANT events arriving on the reply channel,
2590 // pumped through `grant_rx` by the spawned `grant_pump`.
2591 let credit_sem = opts
2592 .request_window_initial
2593 .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2594 let grant_pump = credit_sem.as_ref().map(|sem| {
2595 let sem = Arc::clone(sem);
2596 tokio::spawn(async move {
2597 while let Some(credits) = grant_rx.recv().await {
2598 add_request_grant_credits(&sem, credits);
2599 }
2600 })
2601 });
2602
2603 let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2604 let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2605 let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2606 Ok(ClientStreamCallRaw {
2607 mesh: Arc::clone(self),
2608 target_node_id,
2609 request_channel: route.request_channel.clone(),
2610 request_channel_hash: route.request_channel_hash,
2611 request_stream_id: route.request_stream_id,
2612 self_origin,
2613 call_id,
2614 service: service.to_string(),
2615 initial_headers,
2616 initial_flags,
2617 deadline_ns,
2618 credit_sem,
2619 grant_pump,
2620 terminal_rx: Some(terminal_rx),
2621 state: ClientStreamState::JustOpened,
2622 started: Instant::now(),
2623 observer,
2624 _cancel_keep_alive: cancel_keep_alive,
2625 })
2626 }
2627
2628 /// Register a duplex nRPC handler for `service`. Composes
2629 /// [`Self::serve_rpc_client_stream`] (request-side stream)
2630 /// with [`Self::serve_rpc_streaming`] (response-side multi-
2631 /// fire emit) via [`RpcDuplexFold`].
2632 ///
2633 /// Wires THREE emit callbacks:
2634 /// - Async [`RpcAsyncResponseEmitter`] for response chunks +
2635 /// the terminal frame (per-call ordering required because
2636 /// the response side is multi-fire).
2637 /// - [`RpcRequestGrantEmitter`] for upload-direction credit
2638 /// grants (one per consumed request chunk when flow
2639 /// control is opted into).
2640 ///
2641 /// Bidi streaming plan (Phase D).
2642 pub fn serve_rpc_duplex<H: RpcDuplexHandler>(
2643 self: &Arc<Self>,
2644 service: &str,
2645 handler: Arc<H>,
2646 ) -> Result<ServeHandle, ServeError> {
2647 let request_channel = ChannelName::new(&format!("{service}.requests"))
2648 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2649 let channel_hash = request_channel.hash();
2650 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2651
2652 // T1.2 cache — see serve_rpc above for full rationale.
2653 let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2654
2655 let mesh_for_emit = Arc::clone(self);
2656 let service_for_emit = service.to_string();
2657 let server_origin = self.identity_origin_hash();
2658
2659 // Async response emitter — per-call ordering matters here
2660 // because the response side is multi-fire (same rationale
2661 // as serve_rpc_streaming).
2662 let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2663 let emit_resp_service = service_for_emit.clone();
2664 let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2665 let emit_resp: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2666 let mesh = Arc::clone(&emit_resp_mesh);
2667 let service = emit_resp_service.clone();
2668 let target_hint = origin_node_cache_for_emit.get(caller_origin);
2669 Box::pin(async move {
2670 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2671 let reply_channel = match ChannelName::new(&reply_channel_name) {
2672 Ok(c) => c,
2673 Err(e) => {
2674 tracing::warn!(error = %e, channel = %reply_channel_name,
2675 "rpc serve_rpc_duplex: invalid reply channel name");
2676 return;
2677 }
2678 };
2679 let meta = EventMeta::new(
2680 super::cortex::DISPATCH_RPC_RESPONSE,
2681 0,
2682 server_origin,
2683 call_id,
2684 0,
2685 );
2686 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2687 buf.extend_from_slice(&meta.to_bytes());
2688 resp.encode_into(&mut buf);
2689 // PERF_AUDIT §3.10: compute hash + stream_id at the
2690 // call site. These legacy streaming paths don't yet
2691 // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2692 // wiring them up is a follow-up — for now the
2693 // compute happens here per response, same as the
2694 // pre-fix in-function shape.
2695 let reply_channel_id = ChannelId::new(reply_channel.clone());
2696 let reply_channel_hash = reply_channel_id.hash();
2697 let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2698 if let Err(e) = publish_response_to_caller(
2699 &mesh,
2700 caller_origin,
2701 target_hint,
2702 &reply_channel,
2703 reply_channel_hash,
2704 reply_stream_id,
2705 Bytes::from(buf),
2706 )
2707 .await
2708 {
2709 tracing::warn!(error = %e,
2710 caller_origin = format!("{:#x}", caller_origin),
2711 call_id,
2712 "rpc serve_rpc_duplex: chunk publish failed");
2713 }
2714 })
2715 });
2716
2717 // Request-direction grant emitter — same coalescing
2718 // drainer shape as serve_rpc_client_stream.
2719 let emit_grant = build_request_grant_emitter(
2720 Arc::clone(&mesh_for_emit),
2721 service_for_emit.clone(),
2722 server_origin,
2723 "serve_rpc_duplex",
2724 );
2725
2726 let metrics_handle = self.rpc_metrics_arc().for_service(service);
2727 let fold = Arc::new(Mutex::new(
2728 RpcDuplexFold::new(handler as Arc<dyn RpcDuplexHandler>, emit_resp)
2729 .with_grant_emitter(emit_grant)
2730 .with_metrics(metrics_handle),
2731 ));
2732 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2733 let _ = tx.try_send(ev);
2734 });
2735 if self
2736 .register_rpc_inbound(channel_hash, dispatcher)
2737 .is_some()
2738 {
2739 return Err(ServeError::AlreadyServing(service.to_string()));
2740 }
2741 let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2742 let bridge = tokio::spawn(async move {
2743 while let Some(inbound) = rx.recv().await {
2744 if inbound.from_node != 0 {
2745 origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2746 }
2747 let payload = inbound.payload;
2748 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2749 let ev = RedexEvent { entry, payload };
2750 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2751 tracing::warn!(error = %e,
2752 "rpc serve_rpc_duplex: fold apply error");
2753 }
2754 }
2755 });
2756 self.rpc_local_services_arc().insert(service.to_string());
2757 Ok(ServeHandle {
2758 channel_hash,
2759 service: service.to_string(),
2760 _bridge: bridge,
2761 // Streaming/duplex variants still spawn per emit (§8a covers the
2762 // unary hot path); no drainer.
2763 _response_drain: None,
2764 mesh: Arc::clone(self),
2765 })
2766 }
2767
2768 /// Duplex variant of [`Self::call`]. Returns a
2769 /// [`DuplexCallRaw`] handle with both upload (`send`,
2770 /// `finish_sending`) and download (`next`, or impl
2771 /// `futures::Stream`) surfaces. Use `into_split` to peel off
2772 /// the two halves for the "encoder task + decoder task"
2773 /// shape.
2774 ///
2775 /// Initial REQUEST sets BOTH `FLAG_RPC_CLIENT_STREAMING_REQUEST`
2776 /// AND `FLAG_RPC_STREAMING_RESPONSE`. Lazy publish — the
2777 /// initial REQUEST flies on the first `send` (or on
2778 /// `finish_sending` for the zero-item degenerate path).
2779 ///
2780 /// Bidi streaming plan (Phase D).
2781 pub async fn call_duplex(
2782 self: &Arc<Self>,
2783 target_node_id: u64,
2784 service: &str,
2785 opts: CallOptions,
2786 ) -> Result<DuplexCallRaw, RpcError> {
2787 // Same deadlock guard as `call_client_stream`: Some(0)
2788 // means "send must await a credit that can never arrive"
2789 // because the initial REQUEST is lazy.
2790 if matches!(opts.request_window_initial, Some(0)) {
2791 return Err(RpcError::Codec {
2792 direction: CodecDirection::Encode,
2793 message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2794 .to_string(),
2795 });
2796 }
2797 // T1.3: per-service route cache (see PERF_AUDIT
2798 // 2026-05-19). One DashMap::get + Arc::clone instead of
2799 // 2 format! + 2 ChannelName::new + xxhash per call.
2800 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2801 let self_origin = self.identity_origin_hash();
2802 self.ensure_reply_subscription(
2803 target_node_id,
2804 service,
2805 route.reply_channel.clone(),
2806 route.reply_hash,
2807 )
2808 .await?;
2809
2810 let call_id = mint_random_call_id();
2811 let pending = self.rpc_client_pending();
2812 let (chunks_rx, mut grant_rx) = pending.register_duplex(call_id, target_node_id);
2813
2814 let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
2815 let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2816 if let Some(tc) = opts.trace_context.as_ref() {
2817 initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2818 initial_headers.extend(build_trace_headers(tc));
2819 }
2820 if let Some(window) = opts.request_window_initial {
2821 initial_headers.push((
2822 HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2823 window.to_string().into_bytes(),
2824 ));
2825 }
2826 if let Some(window) = opts.stream_window_initial {
2827 initial_headers.push((
2828 HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2829 window.to_string().into_bytes(),
2830 ));
2831 }
2832 initial_headers.extend(opts.request_headers.iter().cloned());
2833
2834 let credit_sem = opts
2835 .request_window_initial
2836 .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2837 let grant_pump = credit_sem.as_ref().map(|sem| {
2838 let sem = Arc::clone(sem);
2839 tokio::spawn(async move {
2840 while let Some(credits) = grant_rx.recv().await {
2841 add_request_grant_credits(&sem, credits);
2842 }
2843 })
2844 });
2845
2846 let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2847 let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2848 // Cancel keep-alive lives on the shared Arc<DuplexInner>
2849 // so it survives into_split — the watcher exits only when
2850 // BOTH the sink AND stream halves drop, matching the
2851 // existing CANCEL-on-drop semantics.
2852 let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2853 let inner = Arc::new(DuplexInner {
2854 mesh: Arc::clone(self),
2855 target_node_id,
2856 request_channel: route.request_channel.clone(),
2857 request_channel_hash: route.request_channel_hash,
2858 request_stream_id: route.request_stream_id,
2859 self_origin,
2860 call_id,
2861 initial_sent: std::sync::atomic::AtomicBool::new(false),
2862 clean_close: std::sync::atomic::AtomicBool::new(false),
2863 observer,
2864 _cancel_keep_alive: Some(cancel_keep_alive),
2865 });
2866 let sink = DuplexSink {
2867 inner: Arc::clone(&inner),
2868 service: service.to_string(),
2869 initial_headers,
2870 initial_flags,
2871 deadline_ns,
2872 credit_sem,
2873 grant_pump,
2874 state: ClientStreamState::JustOpened,
2875 };
2876 let stream = DuplexStream {
2877 inner,
2878 chunks_rx,
2879 done: false,
2880 };
2881 Ok(DuplexCallRaw { sink, stream })
2882 }
2883
2884 /// Streaming variant of [`Self::call`]. Returns an
2885 /// [`RpcStream`] that yields chunks (as `Result<Bytes, RpcError>`)
2886 /// until the server closes the stream.
2887 ///
2888 /// Sets `FLAG_RPC_STREAMING_RESPONSE` on the request so the
2889 /// server's streaming fold knows to expect multi-fire emits.
2890 /// Same lazy reply-subscription + direct-unicast REQUEST
2891 /// as the unary `call` path.
2892 ///
2893 /// Cancellation: dropping the returned `RpcStream` emits a
2894 /// CANCEL to the server (best-effort) and discards any
2895 /// in-flight chunks.
2896 pub async fn call_streaming(
2897 self: &Arc<Self>,
2898 target_node_id: u64,
2899 service: &str,
2900 payload: Bytes,
2901 opts: CallOptions,
2902 ) -> Result<RpcStream, RpcError> {
2903 // `stream_window_initial = Some(0)` would deadlock the
2904 // RESPONSE direction by default: server's pump awaits one
2905 // credit per chunk, the caller's auto-grant only fires on
2906 // consumed chunks, and the first chunk can never be
2907 // delivered. `None` means "unbounded credit"; any positive
2908 // value opts into flow control. Reject up front — symmetric
2909 // with the request-direction guard in `call_client_stream`.
2910 if matches!(opts.stream_window_initial, Some(0)) {
2911 return Err(RpcError::Codec {
2912 direction: CodecDirection::Encode,
2913 message: "stream_window_initial must be None or >= 1; Some(0) deadlocks the response pump"
2914 .to_string(),
2915 });
2916 }
2917 // T1.3: per-service route cache. One DashMap::get + Arc::clone
2918 // on the hot path instead of 2 format! + 2 ChannelName::new +
2919 // xxhash per call.
2920 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2921 let self_origin = self.identity_origin_hash();
2922 self.ensure_reply_subscription(
2923 target_node_id,
2924 service,
2925 route.reply_channel.clone(),
2926 route.reply_hash,
2927 )
2928 .await?;
2929
2930 let call_id = mint_random_call_id();
2931 let pending = self.rpc_client_pending();
2932 // S-4 part 2: bind the pending entry to the wire-session
2933 // peer the request is dispatched to. The fold's deliver
2934 // gate rejects RESPONSE frames whose from_node doesn't
2935 // match, so a leaked call_id alone can't spoof a reply.
2936 let rx = pending.register_streaming(call_id, target_node_id);
2937
2938 // Build the REQUEST: STREAMING_RESPONSE flag plus optional
2939 // trace-context headers / propagate-trace flag, same as
2940 // unary `call`. Plus the optional flow-control header
2941 // (`nrpc-stream-window-initial`) when the caller opted in
2942 // via `CallOptions::stream_window_initial`.
2943 let mut flags = FLAG_RPC_STREAMING_RESPONSE;
2944 let mut headers = Vec::new();
2945 if let Some(tc) = opts.trace_context.as_ref() {
2946 flags |= FLAG_RPC_PROPAGATE_TRACE;
2947 headers.extend(build_trace_headers(tc));
2948 }
2949 if let Some(window) = opts.stream_window_initial {
2950 headers.push((
2951 HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2952 window.to_string().into_bytes(),
2953 ));
2954 }
2955 // Append caller-supplied request headers (Phase 9b — same
2956 // semantics as the unary `call` path).
2957 headers.extend(opts.request_headers.iter().cloned());
2958 let req = RpcRequestPayload {
2959 service: service.to_string(),
2960 deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2961 flags,
2962 headers,
2963 body: payload.clone(),
2964 };
2965 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2966 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2967 buf.extend_from_slice(&meta.to_bytes());
2968 req.encode_into(&mut buf);
2969
2970 let payload_bytes = Bytes::from(buf);
2971 if let Err(e) = self
2972 .publish_to_peer(
2973 target_node_id,
2974 route.request_channel_hash,
2975 route.request_stream_id,
2976 /* reliable */ true,
2977 std::slice::from_ref(&payload_bytes),
2978 )
2979 .await
2980 {
2981 pending.cancel(call_id);
2982 return Err(RpcError::Transport(e));
2983 }
2984
2985 let request_bytes_len = payload_bytes.len() as u32;
2986 // Cancel keep-alive lives on the returned RpcStream so the
2987 // watcher exits cleanly when the stream drops without cancel.
2988 let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2989 Ok(RpcStream {
2990 mesh: Arc::clone(self),
2991 target_node_id,
2992 request_channel: route.request_channel.clone(),
2993 // PERF_AUDIT §3.10 — cache the channel hash + stream
2994 // id from `route` so per-chunk grants in `poll_next`
2995 // don't re-run `ChannelId::new` + xxh3.
2996 request_channel_hash: route.request_channel_hash,
2997 request_stream_id: route.request_stream_id,
2998 self_origin,
2999 call_id,
3000 inner: rx,
3001 done: false,
3002 stream_window: opts.stream_window_initial,
3003 grant_pending: 0,
3004 _cancel_keep_alive: cancel_keep_alive,
3005 observer: StreamingObserverState::new(
3006 Arc::clone(self),
3007 target_node_id,
3008 service,
3009 request_bytes_len,
3010 ),
3011 })
3012 }
3013
3014 /// Find every node currently advertising `service` via the
3015 /// `nrpc:<service>` capability tag. Returns node IDs in
3016 /// roster order; the caller picks one (or use [`Self::call_service`]
3017 /// for the round-robin shortcut).
3018 ///
3019 /// Pre-Phase 2: requires the target nodes to have called
3020 /// `serve_rpc` AND `announce_capabilities` so the
3021 /// `nrpc:<service>` tag has propagated through capability
3022 /// announcements. The local node's own services are NOT
3023 /// automatically included (callers don't typically invoke
3024 /// themselves via the network — for in-process invocation,
3025 /// the user has the handler directly).
3026 pub fn find_service_nodes(&self, service: &str) -> Vec<u64> {
3027 use crate::adapter::net::behavior::capability::CapabilityFilter;
3028 use crate::adapter::net::behavior::fold::capability_bridge;
3029 let tag = format!("nrpc:{service}");
3030 let filter = CapabilityFilter::default().require_tag(tag);
3031 capability_bridge::find_nodes_matching(self.capability_fold(), &filter)
3032 }
3033
3034 /// Issue an RPC call to `service`, picking one node from
3035 /// those advertising the `nrpc:<service>` tag in the local
3036 /// capability index according to `opts.routing_policy`.
3037 ///
3038 /// Returns `RpcError::NoRoute` if no nodes advertise the
3039 /// service (or if `opts.filter_unhealthy` is set and every
3040 /// candidate is unavailable per the local `ProximityGraph`).
3041 pub async fn call_service(
3042 self: &Arc<Self>,
3043 service: &str,
3044 payload: Bytes,
3045 opts: CallOptions,
3046 ) -> Result<RpcReply, RpcError> {
3047 let mut candidates = self.find_service_nodes(service);
3048 if candidates.is_empty() {
3049 return Err(RpcError::NoRoute {
3050 target: 0,
3051 reason: format!(
3052 "no nodes advertise `nrpc:{service}` (have any servers \
3053 for this service called serve_rpc + announce_capabilities?)"
3054 ),
3055 });
3056 }
3057
3058 // Health filtering. Skip candidates the proximity graph
3059 // marks unhealthy (`!is_available()`). Candidates with no
3060 // proximity entry at all are KEPT — absence of evidence
3061 // is not evidence of unhealth, and a freshly-announced
3062 // service shouldn't be filtered just because pingwaves
3063 // haven't propagated yet.
3064 //
3065 // The bridge: each candidate's session-layer `node_id: u64`
3066 // is mapped to the entity-layer `[u8; 32]` via
3067 // `MeshNode::entity_id_for_node`. The proximity graph is
3068 // keyed on the entity id.
3069 if opts.filter_unhealthy {
3070 let proximity = self.proximity_graph();
3071 candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
3072 Some(entity_id) => match proximity.get_node(&entity_id) {
3073 Some(node) => node.is_available(),
3074 None => true, // no proximity data → keep
3075 },
3076 None => true, // no entity-id mapping → keep
3077 });
3078 if candidates.is_empty() {
3079 return Err(RpcError::NoRoute {
3080 target: 0,
3081 reason: format!(
3082 "every node advertising `nrpc:{service}` is marked \
3083 unhealthy by the local proximity graph",
3084 ),
3085 });
3086 }
3087 }
3088
3089 // Sort once so consistent-hash policies (Sticky) produce
3090 // a stable ordering across calls regardless of how the
3091 // capability index returned the candidates, and so the
3092 // LowestLatency-with-no-proximity-data fallback is
3093 // deterministic. Cheap — the candidate set is typically
3094 // small.
3095 candidates.sort_unstable();
3096
3097 // v0.4 capability-auth caller-side gate. Filter the
3098 // candidate set BEFORE target selection so the routing
3099 // policy never picks a peer the caller can't actually
3100 // reach. Pre-fix `select_target` could pick a denied
3101 // candidate even when authorized peers existed in the
3102 // set, and the resulting `CapabilityDenied` masked the
3103 // fact that the call would have succeeded against B or
3104 // C. Each candidate's own announcement lists
3105 // `nrpc:<service>` (otherwise it wouldn't be a
3106 // `find_service_nodes` candidate), so the gate's
3107 // `has_tag` arm short-circuits in the common case; the
3108 // new work is the allow-list scan. Permissive
3109 // announcements (all three lists empty) admit any
3110 // caller — the byte-identity wire-compat tests pin that
3111 // an unmodified peer's announcement stays unrestricted.
3112 // See `docs/plans/CAPABILITY_AUTH_PLAN.md` §3.
3113 let tag = format!("nrpc:{service}");
3114 use crate::adapter::net::behavior::fold::capability_bridge;
3115 let self_id = self.node_id();
3116 let any_candidate = candidates[0];
3117 let fold = self.capability_fold();
3118 // PERF_AUDIT §4.2 — batch the per-candidate gate so the
3119 // fold read lock is taken once and the caller's subnet +
3120 // groups are parsed once, not N times.
3121 let verdicts = capability_bridge::may_execute_batch(fold, &candidates, &tag, self_id);
3122 let mut iter = verdicts.into_iter();
3123 candidates.retain(|_| iter.next().unwrap_or(false));
3124 if candidates.is_empty() {
3125 return Err(RpcError::CapabilityDenied {
3126 // No authorized target; surface one of the
3127 // originally-advertised candidates so the caller
3128 // can correlate the denial with a real peer. The
3129 // semantic is "no peer advertising `nrpc:<service>`
3130 // authorizes this caller" — `any_candidate` is a
3131 // representative, not necessarily the strictest.
3132 target: any_candidate,
3133 capability: service.to_string(),
3134 });
3135 }
3136
3137 let target = self.select_target(&candidates, &opts.routing_policy);
3138 self.call(target, service, payload, opts).await
3139 }
3140
3141 /// Capability-routed server-streaming call. Same routing as
3142 /// [`call_service`] — capability-fold lookup, health filter,
3143 /// routing-policy sort, capability-auth gate, target selection —
3144 /// but the terminal step is [`call_streaming`] instead of
3145 /// [`call`]. Returns the substrate's `RpcStream` so callers can
3146 /// drive an `async for chunk in stream:` loop.
3147 ///
3148 /// Use cases: an agent invoking a long-running tool that emits
3149 /// progress + a terminal result, a fan-out subscriber that wants
3150 /// streaming chunks from whatever node currently advertises the
3151 /// service, any consumer that today reaches for
3152 /// `find_service_nodes` → manual target selection → `call_streaming`
3153 /// and ends up re-implementing the cap-auth gate `call_service`
3154 /// already enforces.
3155 ///
3156 /// Honors `CallOptions::cancel_token` (v3) and
3157 /// `CallOptions::deadline` exactly like `call_streaming`.
3158 ///
3159 /// [`call_service`]: Self::call_service
3160 /// [`call_streaming`]: Self::call_streaming
3161 /// [`call`]: Self::call
3162 pub async fn call_service_streaming(
3163 self: &Arc<Self>,
3164 service: &str,
3165 payload: Bytes,
3166 opts: CallOptions,
3167 ) -> Result<RpcStream, RpcError> {
3168 let mut candidates = self.find_service_nodes(service);
3169 if candidates.is_empty() {
3170 return Err(RpcError::NoRoute {
3171 target: 0,
3172 reason: format!(
3173 "no nodes advertise `nrpc:{service}` (have any servers \
3174 for this service called serve_rpc + announce_capabilities?)"
3175 ),
3176 });
3177 }
3178
3179 // Health filter — mirrors `call_service`. Candidates with no
3180 // proximity entry are kept (absence of evidence ≠ evidence of
3181 // unhealth); only candidates the proximity graph marks
3182 // explicitly unavailable get dropped.
3183 if opts.filter_unhealthy {
3184 let proximity = self.proximity_graph();
3185 candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
3186 Some(entity_id) => match proximity.get_node(&entity_id) {
3187 Some(node) => node.is_available(),
3188 None => true,
3189 },
3190 None => true,
3191 });
3192 if candidates.is_empty() {
3193 return Err(RpcError::NoRoute {
3194 target: 0,
3195 reason: format!(
3196 "every node advertising `nrpc:{service}` is marked \
3197 unhealthy by the local proximity graph",
3198 ),
3199 });
3200 }
3201 }
3202
3203 // Deterministic ordering so Sticky / LowestLatency-fallback
3204 // pick stably across calls — mirrors `call_service`.
3205 candidates.sort_unstable();
3206
3207 // v0.4 capability-auth caller-side gate. Same as `call_service`:
3208 // filter the candidate set BEFORE target selection so the
3209 // routing policy never picks a peer the caller can't reach.
3210 let tag = format!("nrpc:{service}");
3211 use crate::adapter::net::behavior::fold::capability_bridge;
3212 let self_id = self.node_id();
3213 let any_candidate = candidates[0];
3214 let fold = self.capability_fold();
3215 // PERF_AUDIT §4.2 — batch the per-candidate gate. See the
3216 // mirror site at `:3093`.
3217 let verdicts = capability_bridge::may_execute_batch(fold, &candidates, &tag, self_id);
3218 let mut iter = verdicts.into_iter();
3219 candidates.retain(|_| iter.next().unwrap_or(false));
3220 if candidates.is_empty() {
3221 return Err(RpcError::CapabilityDenied {
3222 target: any_candidate,
3223 capability: service.to_string(),
3224 });
3225 }
3226
3227 let target = self.select_target(&candidates, &opts.routing_policy);
3228 self.call_streaming(target, service, payload, opts).await
3229 }
3230
3231 /// Select a single target from `candidates` according to
3232 /// `policy`. Caller has already ensured `candidates` is
3233 /// non-empty and sorted (so `Sticky` is consistent across
3234 /// calls).
3235 fn select_target(&self, candidates: &[u64], policy: &RoutingPolicy) -> u64 {
3236 match policy {
3237 RoutingPolicy::RoundRobin => {
3238 // `fetch_add(1)` on a dedicated cursor — NOT a
3239 // `load(call_id)` — so two concurrent
3240 // `call_service` invocations always observe
3241 // distinct values and pick distinct targets.
3242 let n = self
3243 .rpc_round_robin_cursor_arc()
3244 .fetch_add(1, Ordering::Relaxed);
3245 candidates[(n as usize) % candidates.len()]
3246 }
3247 RoutingPolicy::Random => {
3248 // Lightweight RNG via a fresh fetch_add (same
3249 // counter, separate per-call value) mixed through
3250 // xxh3. Sufficient for load distribution;
3251 // not cryptographically random.
3252 let n = self
3253 .rpc_round_robin_cursor_arc()
3254 .fetch_add(1, Ordering::Relaxed);
3255 let mixed = xxhash_rust::xxh3::xxh3_64(&n.to_le_bytes());
3256 candidates[(mixed as usize) % candidates.len()]
3257 }
3258 RoutingPolicy::Sticky { key } => {
3259 // Consistent-hash to a position in the (sorted)
3260 // candidate list. Same key + same candidate set =
3261 // same target. A change to the candidate set
3262 // (server failover) reshuffles roughly 1/N of keys.
3263 let h = xxhash_rust::xxh3::xxh3_64(&key.to_le_bytes());
3264 candidates[(h as usize) % candidates.len()]
3265 }
3266 RoutingPolicy::LowestLatency => {
3267 // Walk candidates, look up each via the bridge
3268 // → proximity graph, pick the smallest
3269 // `latency_us`. Candidates without a proximity
3270 // entry (no observed pingwave or no entity-id
3271 // mapping yet) are treated as `u64::MAX` so they
3272 // sort to the bottom — a known-fast node beats an
3273 // unknown one.
3274 //
3275 // Determinism on tie / no-data: `best_node` starts
3276 // at `candidates[0]` (the lexicographically first
3277 // sorted candidate), so all-ties or all-unknown
3278 // collapse to that consistent fallback.
3279 let proximity = self.proximity_graph();
3280 let mut best_node = candidates[0];
3281 let mut best_latency = u64::MAX;
3282 for &node_id in candidates {
3283 let lat = self
3284 .entity_id_for_node(node_id)
3285 .and_then(|eid| proximity.get_node(&eid))
3286 .map(|n| n.latency_us)
3287 .unwrap_or(u64::MAX);
3288 if lat < best_latency {
3289 best_latency = lat;
3290 best_node = node_id;
3291 }
3292 }
3293 best_node
3294 }
3295 }
3296 }
3297
3298 /// Issue an RPC call to `target_node_id` for `service`.
3299 ///
3300 /// Phase 1 — direct entity-to-entity addressing. The caller
3301 /// specifies which target to send to; service discovery (the
3302 /// "find me a healthy instance of X" lookup) is Phase 2.
3303 ///
3304 /// Lazily subscribes the local node's `RpcClientFold` to
3305 /// `<service>.replies.<self_origin>` from `target_node_id` on
3306 /// the first call to that (target, service) pair. The
3307 /// subscription is reused across subsequent calls.
3308 ///
3309 /// On `opts.deadline` expiring OR the future being dropped,
3310 /// emits a CANCEL event so the server can drop the in-flight
3311 /// handler.
3312 pub async fn call(
3313 self: &Arc<Self>,
3314 target_node_id: u64,
3315 service: &str,
3316 payload: Bytes,
3317 mut opts: CallOptions,
3318 ) -> Result<RpcReply, RpcError> {
3319 // `started_total` brackets the entire call for the
3320 // `RpcObserver` latency field; the substrate-internal
3321 // `started` further down (set after the subscription
3322 // setup) drives the existing `RpcReply::latency_ns`
3323 // accounting so observers and Prometheus metrics
3324 // measure slightly different spans but stay consistent
3325 // within their own surface.
3326 let started_total = Instant::now();
3327 let request_bytes_len = payload.len() as u32;
3328 // Per-service route cache: one `DashMap::get(&str)` +
3329 // `Arc::clone` on the hot path instead of 2 `format!` +
3330 // 2 `ChannelName::new` + xxhash per call (T1.3 perf audit
3331 // — `docs/misc/PERF_AUDIT_2026_05_19_NRPC.md`).
3332 let route = self.rpc_route_or_no_route(target_node_id, service)?;
3333 let self_origin = self.identity_origin_hash();
3334
3335 // Caller-side metrics guard. Bumps `in_flight` immediately;
3336 // each early-return path calls `metrics_guard.record(...)`
3337 // with the outcome, and Drop records the latency + bumps
3338 // the matching counter. A future dropped before any
3339 // `record(...)` call (e.g. a hedge loser) leaves the guard
3340 // with `outcome = None` so `in_flight` decrements but no
3341 // outcome is double-counted.
3342 let metrics_registry = self.rpc_metrics_arc();
3343 let mut metrics_guard = CallMetricsGuard::new(metrics_registry.for_service(service));
3344
3345 // Lazy reply-channel subscription. Once per (target, service).
3346 // Reply channel + hash come from the cached `RpcRoute`; we
3347 // only `.clone()` the `ChannelName` (cheap — internally an
3348 // Arc<str>) instead of building it from scratch.
3349 if let Err(e) = self
3350 .ensure_reply_subscription(
3351 target_node_id,
3352 service,
3353 route.reply_channel.clone(),
3354 route.reply_hash,
3355 )
3356 .await
3357 {
3358 metrics_guard.record(CallOutcome::NoRoute);
3359 self.fire_rpc_observer_outbound(
3360 target_node_id,
3361 service,
3362 started_total.elapsed().as_millis() as u32,
3363 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(e.to_string()),
3364 request_bytes_len,
3365 0,
3366 );
3367 return Err(e);
3368 }
3369
3370 // Allocate a fresh call_id. Random u64 from getrandom; a
3371 // sequential counter would let any session peer that
3372 // observed one of their own call_ids predict the next-
3373 // allocated ids and ship spoofed RESPONSE frames on the
3374 // victim's reply channel. Random u64 collides with
3375 // probability 2^-64 per call and is unguessable from
3376 // another peer's perspective.
3377 let call_id = mint_random_call_id();
3378
3379 // Register the oneshot before publishing the REQUEST so a
3380 // very-fast RESPONSE doesn't arrive before we're ready.
3381 // S-4 part 2: bind the pending entry to target_node_id so
3382 // the fold's deliver gate rejects spoofed RESPONSE frames
3383 // arriving from any other session peer.
3384 let pending = self.rpc_client_pending();
3385 let rx = pending.register(call_id, target_node_id);
3386
3387 // Build the REQUEST envelope. If a trace context is set,
3388 // emit `traceparent` / `tracestate` headers and signal
3389 // via `FLAG_RPC_PROPAGATE_TRACE` so the server's fold
3390 // populates `RpcContext::trace_context`.
3391 let (flags, mut headers) = match opts.trace_context.as_ref() {
3392 Some(tc) => (FLAG_RPC_PROPAGATE_TRACE, build_trace_headers(tc)),
3393 None => (0u16, Vec::new()),
3394 };
3395 // Append caller-supplied request headers (e.g. the
3396 // `net-where` predicate header for Phase 9b
3397 // predicate-pushdown). Auto-generated headers come first
3398 // so name collisions resolve to caller-overrides via the
3399 // server-side `predicate_from_rpc_headers` first-match
3400 // semantics.
3401 // PERF_AUDIT §3.11 — `opts` is owned by this function and
3402 // its `request_headers` are unused after this point;
3403 // `Vec::append(&mut other)` drains `other` into `headers`
3404 // with zero allocation, vs the pre-fix
3405 // `.iter().cloned()` which deep-cloned each
3406 // `(String, Vec<u8>)` pair into a fresh entry.
3407 headers.append(&mut opts.request_headers);
3408 let req = RpcRequestPayload {
3409 service: service.to_string(),
3410 deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
3411 flags,
3412 headers,
3413 body: payload.clone(),
3414 };
3415 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
3416 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
3417 buf.extend_from_slice(&meta.to_bytes());
3418 req.encode_into(&mut buf);
3419
3420 // Send the REQUEST directly to `target_node_id` via
3421 // `publish_to_peer`, bypassing the local subscriber roster
3422 // lookup. The roster-based `Mesh::publish` would consult
3423 // `dispatch_recipients(channel)` against the caller's local
3424 // roster, which has no knowledge of who serves this service
3425 // (no Subscribe message ever propagated from the server back
3426 // to the caller — `serve_rpc` is local-only). For Phase 1
3427 // direct addressing we know the target, so direct-send is
3428 // the right primitive.
3429 //
3430 // The receiver routes via the per-channel-hash dispatcher
3431 // hook (channel_hash is stamped on the wire by
3432 // publish_to_peer).
3433 let started = Instant::now();
3434 // Request channel hash + stream_id come from the cached
3435 // route — no `ChannelId::new` clone + xxhash per call.
3436 let payload_bytes = Bytes::from(buf);
3437 if let Err(e) = self
3438 .publish_to_peer(
3439 target_node_id,
3440 route.request_channel_hash,
3441 route.request_stream_id,
3442 /* reliable */ true,
3443 std::slice::from_ref(&payload_bytes),
3444 )
3445 .await
3446 {
3447 pending.cancel(call_id);
3448 // Distinguish "I don't know how to reach this peer"
3449 // from a generic transport blip: when the publish path
3450 // surfaces a no-session error, that's NoRoute (the
3451 // routing layer's job, retry won't help). Other
3452 // transport errors stay as Transport so retry is
3453 // applicable.
3454 let err = if classify_publish_no_session(&e) {
3455 metrics_guard.record(CallOutcome::NoRoute);
3456 RpcError::NoRoute {
3457 target: target_node_id,
3458 reason: e.to_string(),
3459 }
3460 } else {
3461 metrics_guard.record(CallOutcome::Transport);
3462 RpcError::Transport(e)
3463 };
3464 self.fire_rpc_observer_outbound(
3465 target_node_id,
3466 service,
3467 started_total.elapsed().as_millis() as u32,
3468 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(err.to_string()),
3469 request_bytes_len,
3470 0,
3471 );
3472 return Err(err);
3473 }
3474
3475 // From here on, the REQUEST is in flight on the server.
3476 // Wrap the rest of the call in an RAII guard whose Drop
3477 // fires CANCEL if `guard.completed` isn't set — covering:
3478 // - the call future being dropped mid-flight (e.g. hedge
3479 // loser, select!-cancelled future, caller awaiting a
3480 // `JoinHandle` that gets cancelled).
3481 // - the timeout path (we leave `completed=false` so Drop
3482 // handles CANCEL emission; no need for a separate
3483 // `send_rpc_cancel` call).
3484 // - the cancel_token path (same: leave completed=false,
3485 // Drop emits CANCEL).
3486 let mut guard = UnaryCallGuard {
3487 pending: Arc::clone(&pending),
3488 mesh: Arc::clone(self),
3489 target_node_id,
3490 request_channel: route.request_channel.clone(),
3491 self_origin,
3492 call_id,
3493 completed: false,
3494 };
3495
3496 // Substrate cancel-token plumbing (v3 / C-S1). When the
3497 // caller set `opts.cancel_token`, register a Notify against
3498 // the per-mesh cancel_registry. The select! arm below
3499 // observes the cancel signal and short-circuits to
3500 // RpcError::Cancelled, leaving guard.completed = false so
3501 // Drop fires CANCEL on the wire. Release the registry
3502 // entry once the call resolves so the registry doesn't
3503 // grow unboundedly.
3504 let cancel_token = opts.cancel_token.unwrap_or(0);
3505 let cancel_notify = self.cancel_registry().register_notify(cancel_token);
3506
3507 // Race the receiver against the deadline AND the cancel
3508 // signal. Each branch lifts to the same outcome shape
3509 // (Result<Result<RpcResponsePayload, _>, Elapsed>) so the
3510 // existing post-match logic stays unchanged for the ok /
3511 // timeout paths; the cancel arm returns early via
3512 // fire_unary_cancel_outcome — leaves guard.completed=false
3513 // so Drop emits CANCEL on the wire.
3514 let outcome: Result<Result<RpcResponsePayload, _>, tokio::time::error::Elapsed> =
3515 match opts.deadline {
3516 None => {
3517 tokio::select! {
3518 biased;
3519 _ = cancel_notify.notified() => {
3520 return Err(fire_unary_cancel_outcome(
3521 self,
3522 &mut metrics_guard,
3523 cancel_token,
3524 target_node_id,
3525 service,
3526 started_total,
3527 request_bytes_len,
3528 ));
3529 }
3530 r = rx => Ok(r),
3531 }
3532 }
3533 Some(deadline) => {
3534 let timeout_at = deadline.saturating_duration_since(Instant::now());
3535 tokio::select! {
3536 biased;
3537 _ = cancel_notify.notified() => {
3538 return Err(fire_unary_cancel_outcome(
3539 self,
3540 &mut metrics_guard,
3541 cancel_token,
3542 target_node_id,
3543 service,
3544 started_total,
3545 request_bytes_len,
3546 ));
3547 }
3548 r = tokio::time::timeout(timeout_at, rx) => r,
3549 }
3550 }
3551 };
3552
3553 // Whichever non-cancel path won, release the registry
3554 // entry. Idempotent if the cancel arm already released.
3555 self.cancel_registry().release(cancel_token);
3556
3557 let resp = match outcome {
3558 Ok(Ok(resp)) => {
3559 guard.completed = true;
3560 resp
3561 }
3562 Ok(Err(_recv_err)) => {
3563 // Sender dropped externally — pending entry is
3564 // already gone (someone else removed it). Mark
3565 // completed so Drop doesn't fire a useless CANCEL
3566 // for a server that's no longer tracking this id.
3567 guard.completed = true;
3568 metrics_guard.record(CallOutcome::Transport);
3569 let err = RpcError::Transport(AdapterError::Connection(
3570 "rpc client pending sender dropped (no response will arrive)".into(),
3571 ));
3572 self.fire_rpc_observer_outbound(
3573 target_node_id,
3574 service,
3575 started_total.elapsed().as_millis() as u32,
3576 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(
3577 err.to_string(),
3578 ),
3579 request_bytes_len,
3580 0,
3581 );
3582 return Err(err);
3583 }
3584 Err(_elapsed) => {
3585 // Timeout: leave `completed=false` so Drop emits
3586 // CANCEL automatically; surface Timeout to caller.
3587 metrics_guard.record(CallOutcome::Timeout);
3588 self.fire_rpc_observer_outbound(
3589 target_node_id,
3590 service,
3591 started_total.elapsed().as_millis() as u32,
3592 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
3593 request_bytes_len,
3594 0,
3595 );
3596 return Err(RpcError::Timeout {
3597 elapsed_ms: started.elapsed().as_millis() as u64,
3598 });
3599 }
3600 };
3601
3602 // Map the wire status onto the public Result type.
3603 if resp.status.is_ok() {
3604 metrics_guard.record(CallOutcome::Ok);
3605 let response_bytes_len = resp.body.len() as u32;
3606 self.fire_rpc_observer_outbound(
3607 target_node_id,
3608 service,
3609 started_total.elapsed().as_millis() as u32,
3610 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
3611 request_bytes_len,
3612 response_bytes_len,
3613 );
3614 Ok(RpcReply {
3615 body: resp.body,
3616 headers: resp.headers,
3617 latency_ns: started.elapsed().as_nanos() as u64,
3618 })
3619 } else {
3620 metrics_guard.record(CallOutcome::ServerError);
3621 let status = resp.status.to_wire();
3622 let response_bytes_len = resp.body.len() as u32;
3623 let message = String::from_utf8(resp.body.to_vec())
3624 .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
3625 self.fire_rpc_observer_outbound(
3626 target_node_id,
3627 service,
3628 started_total.elapsed().as_millis() as u32,
3629 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(message.clone()),
3630 request_bytes_len,
3631 response_bytes_len,
3632 );
3633 // v0.4 capability-auth: callee-side defense-in-depth
3634 // surfaces as a wire `CapabilityDenied` status. Map it
3635 // back to the typed `RpcError::CapabilityDenied` so
3636 // application code sees the same variant regardless of
3637 // which side of the gate fired.
3638 if matches!(resp.status, RpcStatus::CapabilityDenied) {
3639 return Err(RpcError::CapabilityDenied {
3640 target: target_node_id,
3641 capability: service.to_string(),
3642 });
3643 }
3644 Err(RpcError::ServerError { status, message })
3645 }
3646 }
3647
3648 // ----------------------------------------------------------------
3649 // Internal helpers.
3650 // ----------------------------------------------------------------
3651
3652 /// Lazy-subscribe `reply_channel` from `target_node_id` and
3653 /// register an inbound dispatcher that drives the per-Mesh
3654 /// `RpcClientFold`. Idempotent — subsequent calls for the
3655 /// same (target, service) pair are no-ops.
3656 ///
3657 /// **Bounded** at [`MAX_REPLY_SUBSCRIPTIONS`]: a caller talking
3658 /// to many short-lived (target, service) pairs would otherwise
3659 /// grow the registry indefinitely. Past the cap we refuse the
3660 /// new subscription with `NoRoute` rather than evict an
3661 /// existing one (eviction could rip out a healthy in-flight
3662 /// reply path).
3663 ///
3664 /// **Dispatcher reuse**: the reply-channel name embeds the
3665 /// CALLER's `self_origin`, NOT the target's, so a single
3666 /// caller talking to multiple servers for the same service
3667 /// reuses the same reply channel (same hash). We register the
3668 /// dispatcher only if the slot is unoccupied; subsequent
3669 /// (target, service) pairs that hash to the same slot are
3670 /// allowed to share the existing dispatcher (which routes to
3671 /// the same per-Mesh `pending` map regardless of target). A
3672 /// genuine cross-service hash collision is detected at
3673 /// `serve_rpc` time (the AlreadyServing path) for the server
3674 /// side; on the caller side here, sharing the dispatcher is
3675 /// the correct behavior because all RESPONSE events route
3676 /// through the same `RpcClientPending` keyed by `call_id`.
3677 async fn ensure_reply_subscription(
3678 self: &Arc<Self>,
3679 target_node_id: u64,
3680 service: &str,
3681 reply_channel: ChannelName,
3682 reply_hash: ChannelHash,
3683 ) -> Result<(), RpcError> {
3684 let registry = self.rpc_reply_subscriptions_arc();
3685 // PERF_AUDIT §3.5 — DashMap keyed by
3686 // `(target, xxh3_64(service))`. Pre-fix this was a global
3687 // `Mutex<Vec<(u64, String)>>` that every concurrent RPC
3688 // caller took on every call to scan the Vec with a String
3689 // compare per entry — all callers serialized on it. Now the
3690 // hot path is one shard-local read with a single String
3691 // compare against the slot's stored service name (xxh3 is
3692 // not collision-free; see `reply_subscription_covers`).
3693 let service_hash = xxhash_rust::xxh3::xxh3_64(service.as_bytes());
3694 if reply_subscription_covers(®istry, target_node_id, service_hash, service) {
3695 return Ok(());
3696 }
3697 // Cap the registry. `len()` on DashMap is approximate under
3698 // concurrent churn (it sums shard counts under shard reads,
3699 // not a global lock), which is exactly the semantics we
3700 // want here — the cap is a soft guard against a runaway
3701 // caller, not a precise invariant. Past the cap, new
3702 // entries are refused.
3703 if registry.len() >= MAX_REPLY_SUBSCRIPTIONS {
3704 return Err(RpcError::NoRoute {
3705 target: target_node_id,
3706 reason: format!(
3707 "reply-subscription registry at cap ({} entries); refusing new \
3708 (target={target_node_id:#x}, service={service:?}). Caller should \
3709 reuse an existing target+service pair or shrink the active set.",
3710 MAX_REPLY_SUBSCRIPTIONS,
3711 ),
3712 });
3713 }
3714
3715 // Subscribe to our own reply channel from the target so the
3716 // target's roster has us as a subscriber when the server's
3717 // emit closure publishes the RESPONSE.
3718 self.subscribe_channel(target_node_id, reply_channel.clone())
3719 .await
3720 .map_err(|e| RpcError::NoRoute {
3721 target: target_node_id,
3722 reason: e.to_string(),
3723 })?;
3724
3725 // Register the inbound dispatcher only if the slot is
3726 // unoccupied. The reply-channel name embeds *self_origin*,
3727 // not the target, so multiple targets serving the same
3728 // service share one reply channel + one dispatcher. The
3729 // existing dispatcher routes to the same per-Mesh
3730 // `RpcClientPending` keyed by call_id, so reuse is safe.
3731 if !self.rpc_inbound_dispatcher_registered(reply_hash) {
3732 let pending = self.rpc_client_pending();
3733 let fold = Arc::new(Mutex::new(RpcClientFold::new(pending)));
3734 // S-4 part 2: use `apply_inbound` so the wire-session
3735 // peer's NodeId (resolved in mesh.rs's dispatch site)
3736 // flows into the fold's deliver gate. The legacy
3737 // `RedexFold::apply` shim delivers with from_node=0,
3738 // which would defeat the binding.
3739 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
3740 fold.lock().apply_inbound(&ev);
3741 });
3742 // Race-safe: a concurrent caller might have just
3743 // registered between our check and our insert. In that
3744 // case `register_rpc_inbound` returns the prior
3745 // dispatcher; our new fresh fold is dropped here, and
3746 // the prior dispatcher (which routes to the same
3747 // shared `pending`) keeps doing the job. No collision
3748 // — both folds are functionally equivalent.
3749 if let Some(prev) = self.register_rpc_inbound(reply_hash, dispatcher) {
3750 // Roll back: keep the prior dispatcher (it's
3751 // already wired to the same shared pending map).
3752 let _ = self.register_rpc_inbound(reply_hash, prev);
3753 }
3754 }
3755
3756 let _ = reply_hash; // captured into the dispatcher above; surfaced for debug
3757 // `insert` is idempotent — a concurrent caller that beat us
3758 // to it just overwrote the slot with the identical value.
3759 // On a genuine xxh3 collision between two service names on
3760 // the same target, the slot flips to whichever service
3761 // subscribed last and the other re-subscribes on its next
3762 // call (idempotent, correct, merely un-cached). Cap drift
3763 // past MAX_REPLY_SUBSCRIPTIONS during a concurrent insert
3764 // race is bounded by the number of concurrent callers,
3765 // which operators tune separately.
3766 registry.insert((target_node_id, service_hash), Arc::from(service));
3767 Ok(())
3768 }
3769}
3770
3771/// PERF_AUDIT §3.5 — hot-path membership check for the
3772/// reply-subscription registry. Returns `true` only when the slot
3773/// for `(target, xxh3(service))` exists AND the stored service
3774/// name matches exactly. xxh3_64 is neither collision-free nor
3775/// cryptographic; a hash-only hit that skipped the subscribe for
3776/// a *different* service would silently drop that service's
3777/// replies — the reply channel name embeds the service, so being
3778/// in the target's roster for the colliding service's channel
3779/// does nothing for this one. Verifying the stored name turns a
3780/// collision into a per-call re-subscribe (idempotent, harmless)
3781/// instead of a correctness bug.
3782fn reply_subscription_covers(
3783 registry: &dashmap::DashMap<(u64, u64), Arc<str>>,
3784 target_node_id: u64,
3785 service_hash: u64,
3786 service: &str,
3787) -> bool {
3788 registry
3789 .get(&(target_node_id, service_hash))
3790 .is_some_and(|entry| entry.value().as_ref() == service)
3791}
3792
3793/// Hard cap on the number of distinct (target_node_id, service)
3794/// pairs the caller-side reply-subscription registry will hold.
3795/// Past the cap, the lazy-subscribe path inside [`MeshNode::call`]
3796/// refuses new entries with [`RpcError::NoRoute`]. 1024 is
3797/// generous for any realistic deployment — a caller that needs
3798/// more should reuse existing reply paths.
3799pub const MAX_REPLY_SUBSCRIPTIONS: usize = 1024;
3800
3801/// Mint a random 64-bit call_id. Used as the correlation token
3802/// for REQUEST/RESPONSE pairing. The fold keys pending oneshots on
3803/// this value; any session peer with publish access to the reply
3804/// channel could ship a forged RESPONSE if it could guess the
3805/// value. Sequential u64s are predictable from any peer that
3806/// observes a single allocation; random u64s collide with 2^-64
3807/// probability per call and are unpredictable to observing peers.
3808///
3809/// **PERF_AUDIT §3.8** — pre-fix this called `getrandom::fill` for
3810/// 8 bytes per RPC — one OS entropy syscall per call
3811/// (BCryptGenRandom on Windows, ~200-400 ns; somewhat cheaper on
3812/// Linux). Now each thread refills a small pool of raw OS entropy
3813/// ([`CALL_ID_ENTROPY_POOL_BYTES`]) with a single `getrandom`
3814/// syscall and hands out 8 bytes per call, amortizing the syscall
3815/// across [`CALL_ID_ENTROPY_POOL_BYTES`]/8 mints.
3816///
3817/// Every minted id is still raw OS entropy — NOT the output of a
3818/// userspace PRNG — so the unpredictability-to-peers property is
3819/// byte-for-byte identical to the pre-§3.8 per-call fill. (An
3820/// earlier draft of this fix streamed ids from a thread-local
3821/// SplitMix64; that was unsound for this threat model: call_ids
3822/// are sent to callees by design, and SplitMix64's output
3823/// finalizer is a public bijection, so a single observed id
3824/// reveals the generator state and with it every FUTURE call_id
3825/// minted on that thread — letting one callee forge responses to
3826/// races on calls addressed to other peers. Raw pooled entropy
3827/// has no such state to recover.)
3828///
3829/// If the pool refill fails, falls back to a process-global
3830/// monotonic counter rather than returning `0`: two concurrent
3831/// callers that both minted `0` would `register(0, …)` over each
3832/// other, so the first caller's oneshot closes with
3833/// `RecvError::Closed` (a spurious `Transport` error, not the clean
3834/// timeout the all-distinct path yields). The counter keeps ids
3835/// distinct (predictable on entropy failure, but the S-4
3836/// `from_node` gate still blocks cross-peer forgery, and such calls
3837/// time out anyway). `getrandom::fill` failure is a fatal-
3838/// environment signal (no `/dev/urandom`, broken syscall) and the
3839/// broader stack won't be functional anyway; the pool cursor is
3840/// left exhausted so the next mint retries the refill. `0` is
3841/// reserved as a sentinel and never returned.
3842fn mint_random_call_id() -> u64 {
3843 thread_local! {
3844 // (pool, cursor). Cursor starts exhausted so the first
3845 // mint on each thread performs the initial refill.
3846 static CALL_ID_ENTROPY_POOL: std::cell::RefCell<([u8; CALL_ID_ENTROPY_POOL_BYTES], usize)> = const {
3847 std::cell::RefCell::new(([0u8; CALL_ID_ENTROPY_POOL_BYTES], CALL_ID_ENTROPY_POOL_BYTES))
3848 };
3849 }
3850 CALL_ID_ENTROPY_POOL.with(|cell| {
3851 let mut pool = cell.borrow_mut();
3852 let (buf, cursor) = &mut *pool;
3853 if *cursor >= CALL_ID_ENTROPY_POOL_BYTES {
3854 if getrandom::fill(buf).is_err() {
3855 // Entropy unavailable. Do NOT return 0 — concurrent
3856 // callers would all mint 0 and clobber each other's
3857 // pending entries. A process-global counter keeps ids
3858 // distinct (starts at 1, so it is non-zero until it
3859 // wraps the full u64 range, at which point the 0 is
3860 // mapped to 1 below).
3861 static CALL_ID_FALLBACK: std::sync::atomic::AtomicU64 =
3862 std::sync::atomic::AtomicU64::new(1);
3863 let id = CALL_ID_FALLBACK.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3864 return if id == 0 { 1 } else { id };
3865 }
3866 *cursor = 0;
3867 }
3868 let mut id = [0u8; 8];
3869 id.copy_from_slice(&buf[*cursor..*cursor + 8]);
3870 *cursor += 8;
3871 // Reserve 0 as the "no correlation" sentinel: on the ~1-in-2^64
3872 // chance the pool yields all-zero bytes, remap to a fixed non-zero.
3873 match u64::from_le_bytes(id) {
3874 0 => 1,
3875 id => id,
3876 }
3877 })
3878}
3879
3880/// Per-thread OS-entropy pool size for [`mint_random_call_id`].
3881/// 64 ids (512 bytes) per `getrandom` syscall — the syscall cost
3882/// is dominated by the fixed kernel round-trip, so batching 64
3883/// mints recovers ~98% of the per-call overhead while keeping the
3884/// amount of buffered future-id entropy per thread small.
3885const CALL_ID_ENTROPY_POOL_BYTES: usize = 64 * 8;
3886
3887// ============================================================================
3888// Internal: tiny shims so the `serve_rpc` / `call` impls stay
3889// readable. The underlying state lives on `MeshNode`; these just
3890// rename the accessor methods locally.
3891// ============================================================================
3892
3893impl MeshNode {
3894 fn rpc_client_pending(&self) -> Arc<super::cortex::RpcClientPending> {
3895 self.rpc_client_pending_arc()
3896 }
3897 fn identity_origin_hash(&self) -> u64 {
3898 self.public_key_origin_hash()
3899 }
3900
3901 /// Caller-side helper that pairs `rpc_route_for_service` with
3902 /// the `RpcError::NoRoute { target, reason }` mapping every
3903 /// `Mesh::call*` entry point needs. Returning `Arc<RpcRoute>`
3904 /// keeps the hot-path allocation profile of the cache intact
3905 /// (one refcount bump per caller).
3906 fn rpc_route_or_no_route(
3907 &self,
3908 target_node_id: u64,
3909 service: &str,
3910 ) -> Result<Arc<super::mesh::RpcRoute>, RpcError> {
3911 self.rpc_route_for_service(service)
3912 .map_err(|reason| RpcError::NoRoute {
3913 target: target_node_id,
3914 reason,
3915 })
3916 }
3917}
3918
3919// `proximity_graph()` is already a public accessor on MeshNode
3920// (see the existing `pub fn proximity_graph(&self) -> &Arc<...>`).
3921// `select_target` uses it directly; no shim needed.
3922
3923// ============================================================================
3924// Errors.
3925// ============================================================================
3926
3927/// Errors returned by [`MeshNode::serve_rpc`].
3928#[derive(Debug, thiserror::Error)]
3929pub enum ServeError {
3930 /// The service name fails channel-name validation.
3931 #[error("invalid service name: {0}")]
3932 InvalidServiceName(String),
3933 /// A handler for this service is already registered on this
3934 /// node. Drop the prior `ServeHandle` to free the slot.
3935 #[error("already serving service `{0}` on this node")]
3936 AlreadyServing(String),
3937}
3938
3939// ============================================================================
3940// Typed-call helper.
3941// ============================================================================
3942
3943/// Wire-shape failures from [`typed_call`]. Distinct variants
3944/// for transport (no route, timeout, etc.) vs codec (serde /
3945/// postcard) so service-specific client error enums can wrap
3946/// each independently. Server-level (application) errors are
3947/// decoded into `Resp` itself — the client matches on the
3948/// resulting `Resp::Error(...)` variant.
3949#[derive(Debug, thiserror::Error)]
3950pub enum TypedCallError {
3951 /// Transport-level failure surfaced by [`MeshNode::call`].
3952 #[error("transport: {0}")]
3953 Transport(#[from] RpcError),
3954 /// Request serialization or response deserialization failed.
3955 #[error("codec: {0}")]
3956 Codec(String),
3957}
3958
3959impl From<postcard::Error> for TypedCallError {
3960 fn from(e: postcard::Error) -> Self {
3961 Self::Codec(e.to_string())
3962 }
3963}
3964
3965/// Send a postcard-encoded request to a remote RPC service and
3966/// decode the postcard-encoded reply. The shared shape every
3967/// substrate-internal RPC client wants:
3968///
3969/// 1. `postcard::to_allocvec(request)` → wire body.
3970/// 2. `MeshNode::call(target, service, body, opts{deadline})`.
3971/// 3. `postcard::from_bytes::<Resp>(reply.body)`.
3972///
3973/// Caller wraps the returned `Resp` in its own typed-error
3974/// surface (typically a `Server` variant that holds the
3975/// service-specific error enum decoded from `Resp`). Returning
3976/// `TypedCallError` here keeps the wrapper code to a one-line
3977/// `From<TypedCallError>` impl per client.
3978pub async fn typed_call<Req, Resp>(
3979 mesh: &std::sync::Arc<crate::adapter::net::MeshNode>,
3980 target_node_id: u64,
3981 service: &str,
3982 request: &Req,
3983 deadline: std::time::Duration,
3984) -> Result<Resp, TypedCallError>
3985where
3986 Req: serde::Serialize,
3987 Resp: serde::de::DeserializeOwned,
3988{
3989 let body = postcard::to_allocvec(request)?;
3990 let opts = CallOptions {
3991 deadline: Some(std::time::Instant::now() + deadline),
3992 ..Default::default()
3993 };
3994 let reply = mesh
3995 .call(target_node_id, service, Bytes::from(body), opts)
3996 .await?;
3997 Ok(postcard::from_bytes(&reply.body)?)
3998}
3999
4000// ============================================================================
4001// Helpers.
4002// ============================================================================
4003
4004/// Detect the "no session to the target node id" sub-case of
4005/// [`AdapterError::Connection`]. The publish path can surface
4006/// this through one of two messages depending on which inner
4007/// helper landed it:
4008///
4009/// - `"publish: no session for subscriber {hash}"` — emitted
4010/// by `mesh.rs::publish_to_peer` when the subscriber-roster
4011/// path can't find an active session.
4012/// - `"no session to publisher {hash}"` — emitted by the lower
4013/// mesh.rs send path when there's no active session to the
4014/// target's publisher record at all.
4015///
4016/// Both mean "I can't reach this peer". When we observe either,
4017/// we surface as [`RpcError::NoRoute`] rather than `Transport`
4018/// because retrying the same target without a session is
4019/// pointless and the right behavior for a routing helper is to
4020/// try a different target.
4021fn classify_publish_no_session(err: &AdapterError) -> bool {
4022 match err {
4023 AdapterError::Connection(msg) => {
4024 msg.contains("no session for subscriber") || msg.contains("no session to publisher")
4025 }
4026 _ => false,
4027 }
4028}
4029
4030fn instant_to_unix_nanos(instant: Instant) -> u64 {
4031 // `Instant` is monotonic and not wall-clock — convert via the
4032 // delta from now plus current SystemTime. The result drifts
4033 // marginally with wall-clock skew but is good enough for
4034 // server-side deadline-already-passed short-circuits (which are
4035 // the only consumer of `deadline_ns`).
4036 let now_instant = Instant::now();
4037 let now_wall = std::time::SystemTime::now()
4038 .duration_since(std::time::UNIX_EPOCH)
4039 .map(|d| d.as_nanos() as u64)
4040 .unwrap_or(0);
4041 if instant >= now_instant {
4042 let delta = instant.duration_since(now_instant);
4043 now_wall.saturating_add(delta.as_nanos() as u64)
4044 } else {
4045 let delta = now_instant.duration_since(instant);
4046 now_wall.saturating_sub(delta.as_nanos() as u64)
4047 }
4048}
4049
4050#[allow(dead_code)]
4051fn _ensure_send_sync() {
4052 fn assert_send_sync<T: Send + Sync>() {}
4053 assert_send_sync::<ServeHandle>();
4054 assert_send_sync::<RpcCancellationToken>();
4055 assert_send_sync::<RpcContext>();
4056 assert_send_sync::<RpcHandlerError>();
4057 assert_send_sync::<RpcStatus>();
4058 assert_send_sync::<RpcReply>();
4059 assert_send_sync::<CallOptions>();
4060}
4061
4062#[cfg(test)]
4063mod origin_cache_tests {
4064 use super::*;
4065
4066 /// The crafted-origin memory-amplification guard (cubic P2): the reply-
4067 /// channel / origin-node caches are keyed by the *wire-claimed*
4068 /// `caller_origin`, which a single authed peer can vary freely. Spraying
4069 /// far more distinct origins than the capacity must NOT grow the cache —
4070 /// it stays pinned at `RPC_CALLER_CACHE_CAP`, evicting the coldest.
4071 #[test]
4072 fn origin_keyed_lru_bounds_under_crafted_origin_flood() {
4073 let cache: OriginKeyedLru<u64> = OriginKeyedLru::new();
4074 let flood = (RPC_CALLER_CACHE_CAP as u64) * 4;
4075 for origin in 0..flood {
4076 cache.insert(origin, origin);
4077 }
4078 assert_eq!(
4079 cache.0.lock().len(),
4080 RPC_CALLER_CACHE_CAP,
4081 "cache must stay at its capacity bound under a crafted-origin flood"
4082 );
4083 // The most-recently-seen window survives; the cold prefix is evicted.
4084 assert_eq!(cache.get(flood - 1), Some(flood - 1));
4085 assert_eq!(cache.get(0), None);
4086 }
4087
4088 /// PERF_AUDIT §3.8 — `mint_random_call_id` mints thousands of
4089 /// values from the pooled-entropy path, all of which must be
4090 /// distinct in practice (a duplicate would let two in-flight
4091 /// calls collide on the per-Mesh pending map). 100k samples is
4092 /// far below the 2^32 birthday-paradox boundary, so a
4093 /// regression that recycled pool bytes (cursor mis-advance,
4094 /// missed refill) would fail loudly. The loop also crosses the
4095 /// pool-refill boundary thousands of times (pool holds 64 ids),
4096 /// pinning the refill/cursor arithmetic.
4097 #[test]
4098 fn mint_random_call_id_produces_distinct_values_across_thousands_of_calls() {
4099 let mut seen = std::collections::HashSet::with_capacity(100_000);
4100 for _ in 0..100_000 {
4101 let id = super::mint_random_call_id();
4102 // 0 is the fallback sentinel — should not appear under
4103 // a working `getrandom` refill.
4104 assert_ne!(id, 0, "fallback-zero path triggered unexpectedly");
4105 assert!(seen.insert(id), "duplicate call_id minted: {:#x}", id);
4106 }
4107 }
4108
4109 /// PERF_AUDIT §3.8 — minted ids are raw OS entropy: count
4110 /// set-bits across 10k mints and assert the fraction is near
4111 /// 0.5. A pool-management bug that handed out the zeroed
4112 /// initial buffer (or re-served a stale window) would skew
4113 /// this hard; properly random 64-bit ids have expected ~0.5
4114 /// set bits per sample with O(1/sqrt(N)) tolerance.
4115 #[test]
4116 fn mint_random_call_id_set_bit_density_is_balanced() {
4117 let n = 10_000u64;
4118 let mut total_set: u64 = 0;
4119 for _ in 0..n {
4120 total_set += super::mint_random_call_id().count_ones() as u64;
4121 }
4122 let bits_total = n * 64;
4123 let fraction = total_set as f64 / bits_total as f64;
4124 // Expected 0.5; tolerance generous (~3σ) to keep the test
4125 // reliable while still catching collapsed-pool regressions.
4126 assert!(
4127 (fraction - 0.5).abs() < 0.02,
4128 "set-bit density {} is too far from 0.5 — pool may be mismanaged",
4129 fraction
4130 );
4131 }
4132
4133 /// PERF_AUDIT §3.5 — the reply-subscription registry's hot
4134 /// path must be a `(target, xxh3(service))` lookup, not a
4135 /// `Mutex<Vec<(u64, String)>>` linear scan. Pin the contract
4136 /// via `reply_subscription_covers` (the exact hot-path check):
4137 /// 1. distinct (target, service) pairs are distinct keys —
4138 /// same service against two targets, and two services
4139 /// against one target, never alias;
4140 /// 2. repeat insert of the same pair is idempotent and the
4141 /// fast path keeps answering `true`;
4142 /// 3. an xxh3 COLLISION (same hash, different service name)
4143 /// must NOT count as covered — a false positive here would
4144 /// skip a needed subscribe and silently drop the colliding
4145 /// service's replies. The stored-name verification turns it
4146 /// into a re-subscribe instead;
4147 /// 4. the cap is enforced via `len()`, not a separate counter
4148 /// that could drift.
4149 #[test]
4150 fn reply_subscriptions_keyed_by_target_and_service_hash() {
4151 use dashmap::DashMap;
4152 let registry: DashMap<(u64, u64), Arc<str>> = DashMap::new();
4153 let h_a = xxhash_rust::xxh3::xxh3_64(b"svc-a");
4154 let h_b = xxhash_rust::xxh3::xxh3_64(b"svc-b");
4155 // Same target, different services → distinct entries.
4156 registry.insert((0xAA, h_a), Arc::from("svc-a"));
4157 registry.insert((0xAA, h_b), Arc::from("svc-b"));
4158 assert!(super::reply_subscription_covers(
4159 ®istry, 0xAA, h_a, "svc-a"
4160 ));
4161 assert!(super::reply_subscription_covers(
4162 ®istry, 0xAA, h_b, "svc-b"
4163 ));
4164 // Same service, different targets → distinct entries.
4165 assert!(!super::reply_subscription_covers(
4166 ®istry, 0xBB, h_a, "svc-a"
4167 ));
4168 registry.insert((0xBB, h_a), Arc::from("svc-a"));
4169 assert!(super::reply_subscription_covers(
4170 ®istry, 0xBB, h_a, "svc-a"
4171 ));
4172 // Idempotent — repeat insert overwrites with the identical
4173 // value; the fast path keeps answering true.
4174 registry.insert((0xAA, h_a), Arc::from("svc-a"));
4175 assert!(super::reply_subscription_covers(
4176 ®istry, 0xAA, h_a, "svc-a"
4177 ));
4178 assert_eq!(registry.len(), 3);
4179 // xxh3 collision: "svc-evil" hashing to h_a (forced here —
4180 // xxh3_64 collisions are computable offline since the hash
4181 // isn't cryptographic) must NOT cover "svc-a"'s slot, and
4182 // vice versa. The hash-only DashSet shape this replaced
4183 // answered `true` and silently skipped the subscribe.
4184 assert!(
4185 !super::reply_subscription_covers(®istry, 0xAA, h_a, "svc-evil"),
4186 "hash collision must not satisfy the membership check for a \
4187 different service name"
4188 );
4189 // After the colliding service legitimately subscribes (slot
4190 // overwritten), the original service degrades to
4191 // re-subscribe — covered must flip to false for it, never
4192 // silently true for both.
4193 registry.insert((0xAA, h_a), Arc::from("svc-evil"));
4194 assert!(super::reply_subscription_covers(
4195 ®istry, 0xAA, h_a, "svc-evil"
4196 ));
4197 assert!(!super::reply_subscription_covers(
4198 ®istry, 0xAA, h_a, "svc-a"
4199 ));
4200 }
4201
4202 /// PERF_AUDIT §3.3 — grant-stall backstop check for the
4203 /// window/2 auto-grant coalescing. Simulates the full
4204 /// credit loop for every window 1..=64: the server starts
4205 /// with `window` credits and consumes one per chunk; the
4206 /// client accumulates via `accumulate_auto_grant` and only
4207 /// flushes at the threshold. Asserts:
4208 /// 1. liveness — an actively-polling consumer never observes
4209 /// the server starved (credits exhausted with nothing left
4210 /// to poll), i.e. withholding sub-threshold credits cannot
4211 /// deadlock the stream and no timer/drop backstop is needed;
4212 /// 2. coalescing — grant-packet count stays at
4213 /// ~chunks / (window/2), and is strictly fewer than one
4214 /// grant per chunk once window ≥ 4 (the integration suite
4215 /// only exercises window=2, whose threshold degenerates
4216 /// to per-chunk).
4217 #[test]
4218 fn auto_grant_coalescing_never_starves_the_server_pump() {
4219 for window in 1u32..=64 {
4220 let chunks = 1_000u32;
4221 let mut server_credits = window as u64;
4222 let mut pending = 0u32;
4223 let mut sent = 0u32;
4224 let mut delivered = 0u32;
4225 let mut grants = 0u32;
4226 while delivered < chunks {
4227 // Server pump: send while credits remain.
4228 while server_credits > 0 && sent < chunks {
4229 server_credits -= 1;
4230 sent += 1;
4231 }
4232 assert!(
4233 sent > delivered,
4234 "window {window}: server starved while the consumer is actively \
4235 polling (credits {server_credits}, pending {pending}, \
4236 sent {sent}, delivered {delivered})"
4237 );
4238 // Consumer polls exactly one chunk.
4239 delivered += 1;
4240 if let Some(amount) = super::accumulate_auto_grant(&mut pending, window) {
4241 grants += 1;
4242 server_credits += amount as u64;
4243 }
4244 }
4245 let threshold = (window / 2).max(1);
4246 assert!(
4247 grants <= chunks / threshold + 1,
4248 "window {window}: {grants} grant packets exceeds the \
4249 coalesced cadence bound of {}",
4250 chunks / threshold + 1
4251 );
4252 if window >= 4 {
4253 assert!(
4254 grants < chunks,
4255 "window {window}: coalescing must emit fewer grants than chunks"
4256 );
4257 }
4258 }
4259 }
4260
4261 /// `get` promotes to most-recently-used, so a touched entry outlives an
4262 /// untouched one when the cache overflows by one — confirming the wrapper
4263 /// gives true LRU semantics (a hot caller isn't evicted out from under an
4264 /// in-flight exchange).
4265 #[test]
4266 fn origin_keyed_lru_get_promotes_to_mru() {
4267 let cache: OriginKeyedLru<u64> = OriginKeyedLru::new();
4268 for origin in 0..(RPC_CALLER_CACHE_CAP as u64) {
4269 cache.insert(origin, origin);
4270 }
4271 // Touch origin 0 (otherwise the LRU), then overflow by one entry.
4272 assert_eq!(cache.get(0), Some(0));
4273 cache.insert(u64::MAX, 1);
4274 assert_eq!(cache.get(0), Some(0), "touched entry must survive eviction");
4275 assert_eq!(cache.get(1), None, "the now-LRU entry (1) must be evicted");
4276 }
4277}