net/adapter/net/mesh_rpc.rs
1//! `Mesh::serve_rpc` / `Mesh::call` glue — the wire-up between
2//! `MeshNode`'s pub/sub + per-channel-hash dispatch hook and the
3//! `cortex::rpc` server / client folds.
4//!
5//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
6//! In short:
7//!
8//! - `serve_rpc(service, handler)` registers an inbound dispatcher
9//! for `<service>.requests`'s channel hash. The dispatcher pushes
10//! inbound REQUEST/CANCEL events through the
11//! [`crate::adapter::net::cortex::RpcServerFold`], which spawns
12//! the user handler. The fold's emit closure publishes RESPONSE
13//! events on `<service>.replies.<caller_origin>` via
14//! [`MeshNode::publish`].
15//!
16//! - `call(target, service, payload, opts)` allocates a `call_id`,
17//! registers a oneshot in the per-Mesh `RpcClientPending`,
18//! subscribes to its own reply channel from `target` (lazy,
19//! cached), publishes the REQUEST envelope on `<service>.requests`,
20//! awaits the oneshot. Drop sends a CANCEL.
21//!
22//! Phase 1 surface — direct entity-to-entity addressing
23//! (`call(target_node_id, ...)`), no service discovery layer yet.
24//! Phase 2 will add `call_service(name, ...)` over the existing
25//! capability-announcement registry.
26
27use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31use bytes::Bytes;
32use parking_lot::Mutex;
33use tokio::sync::mpsc;
34use tokio::task::JoinHandle;
35
36use super::channel::{ChannelHash, ChannelId, ChannelName, ChannelPublisher, PublishConfig};
37use super::cortex::{
38 build_trace_headers, encode_request_grant, encode_stream_grant, EventMeta,
39 RpcAsyncResponseEmitter, RpcCancellationToken, RpcClientFold, RpcClientStreamingHandler,
40 RpcContext, RpcDuplexFold, RpcDuplexHandler, RpcHandler, RpcHandlerError, RpcInboundDispatcher,
41 RpcInboundEvent, RpcRequestChunkPayload, RpcRequestGrantEmitter, RpcRequestPayload,
42 RpcResponseEmitter, RpcResponsePayload, RpcServerFold, RpcServerStreamingFold, RpcStatus,
43 RpcStreamingHandler, RpcStreamingRequestFold, StreamItem, TraceContext, DISPATCH_RPC_CANCEL,
44 DISPATCH_RPC_REQUEST, DISPATCH_RPC_REQUEST_CHUNK, DISPATCH_RPC_REQUEST_GRANT,
45 DISPATCH_RPC_STREAM_GRANT, EVENT_META_SIZE, FLAG_RPC_CLIENT_STREAMING_REQUEST,
46 FLAG_RPC_PROPAGATE_TRACE, FLAG_RPC_REQUEST_END, FLAG_RPC_STREAMING_RESPONSE,
47 HEADER_NRPC_REQUEST_WINDOW_INITIAL, HEADER_NRPC_STREAM_WINDOW_INITIAL,
48};
49use super::mesh_rpc_metrics::{CallMetricsGuard, CallOutcome};
50use crate::error::AdapterError;
51
52use super::mesh::MeshNode;
53use super::redex::{RedexEntry, RedexEvent, RedexFold};
54
55// ============================================================================
56// Public types.
57// ============================================================================
58
59/// How `Mesh::call_service` picks a target from the set of nodes
60/// advertising the requested service.
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
62pub enum RoutingPolicy {
63 /// Naive round-robin via the per-Mesh `call_id` counter.
64 /// Distributes calls evenly across candidates regardless of
65 /// load. The default.
66 #[default]
67 RoundRobin,
68 /// Pick a candidate at random per call. Stateless, cheap, and
69 /// gives even distribution under independent calls.
70 Random,
71 /// Consistent-hash to a target by `key`. Same `key` always
72 /// hits the same target as long as the candidate set is
73 /// stable. Useful for session affinity (route a given
74 /// conversation / shard / user to the same backend).
75 Sticky {
76 /// Caller-supplied identifier — hash maps this to the
77 /// target. Use a session id, shard key, or conversation
78 /// id depending on the application.
79 key: u64,
80 },
81 /// Pick the candidate with the smallest measured `latency_us`
82 /// per the local `ProximityGraph`. Candidates the proximity
83 /// graph hasn't observed yet (no entity ↔ node_id mapping or
84 /// no pingwave received) sort to the bottom — better to pick
85 /// a known-fast node than gamble on an unknown one.
86 ///
87 /// Falls back deterministically to the first sorted candidate
88 /// when no candidates have proximity data, so a freshly-
89 /// discovered service still routes consistently.
90 LowestLatency,
91}
92
93/// Options for [`MeshNode::call`] and [`MeshNode::call_service`].
94#[derive(Debug, Clone)]
95pub struct CallOptions {
96 /// Hard deadline for the call. The future returned by `call`
97 /// races a `tokio::time::sleep_until`; whichever fires first
98 /// wins. On timeout the caller emits a CANCEL event for
99 /// `call_id` so the server can drop the in-flight handler.
100 /// `None` means no deadline; the caller waits indefinitely
101 /// (or until the future is dropped).
102 pub deadline: Option<Instant>,
103 /// How `call_service` picks a target. Ignored by `call`
104 /// (which takes an explicit `target_node_id`). Default:
105 /// `RoundRobin`.
106 pub routing_policy: RoutingPolicy,
107 /// Skip candidates whose `ProximityGraph` entry reports
108 /// `!is_available()` (i.e. `Unhealthy` or `Unknown`).
109 /// Default `true`. Candidates with no proximity entry at all
110 /// are KEPT — absence of evidence is not evidence of
111 /// unhealth, and a freshly-announced service shouldn't be
112 /// filtered just because pingwaves haven't propagated yet.
113 pub filter_unhealthy: bool,
114 /// W3C Trace Context to propagate to the server. When `Some`,
115 /// the call sets `FLAG_RPC_PROPAGATE_TRACE` on the request and
116 /// emits `traceparent` / `tracestate` headers; the server's
117 /// `RpcContext::trace_context` will be populated with the same
118 /// values. nRPC is transport-only — application code on both
119 /// sides reads / writes this via whatever tracing backend it
120 /// has wired up (tracing-opentelemetry, Datadog, etc.).
121 pub trace_context: Option<TraceContext>,
122 /// Per-call concurrency cap. Future Phase 2 work; v1 ignores
123 /// this and the per-Mesh `RpcClientPending` doesn't bound
124 /// in-flight count.
125 pub max_in_flight_per_target: u32,
126 /// **Streaming responses only.** Initial credit window for
127 /// per-streaming-response flow control. When `Some(n)`, the
128 /// caller emits `nrpc-stream-window-initial: n` on the
129 /// REQUEST and the server's pump task awaits one credit per
130 /// emitted chunk. The returned [`RpcStream`] auto-grants 1
131 /// credit per consumed chunk so the in-flight credit holds
132 /// near `n` (or use [`RpcStream::grant`] for batched / custom
133 /// cadence). `None` (the default) → unbounded: server pumps
134 /// chunks as fast as the publish path can take them
135 /// (back-compat / pre-flow-control behavior). Ignored by
136 /// non-streaming `call` / `call_service`.
137 pub stream_window_initial: Option<u32>,
138 /// **Client-streaming / duplex only.** Initial credit window
139 /// for per-call request-direction flow control. Mirror of
140 /// [`Self::stream_window_initial`] for the upload direction. When
141 /// `Some(n)`, the caller emits `nrpc-request-window-initial: n`
142 /// on the REQUEST and its `send().await` sink awaits one
143 /// credit per pushed chunk; the server refills via
144 /// [`DISPATCH_RPC_REQUEST_GRANT`] events. `None` → unbounded:
145 /// caller's send sink doesn't block (legacy / fast-path).
146 /// Ignored by unary `call` / `call_streaming`.
147 ///
148 /// Bidi streaming plan (Phase C).
149 pub request_window_initial: Option<u32>,
150 /// Caller-supplied request headers. Appended to the wire
151 /// `RpcRequestPayload::headers` after any auto-generated
152 /// headers (trace context, stream-window). Useful for
153 /// application-level metadata the server needs at
154 /// dispatch-time — e.g., the `net-where` predicate
155 /// header (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`) that
156 /// services consult for predicate-pushdown filtering.
157 ///
158 /// Each entry is `(name, value_bytes)`. Names use the lowercase
159 /// `cyberdeck-*` / `nrpc-*` convention; the substrate doesn't
160 /// validate names beyond the `MAX_RPC_HEADER_NAME_LEN` cap
161 /// enforced at encode time.
162 ///
163 /// Default: empty.
164 pub request_headers: Vec<(String, Vec<u8>)>,
165}
166
167impl Default for CallOptions {
168 fn default() -> Self {
169 Self {
170 deadline: None,
171 routing_policy: RoutingPolicy::default(),
172 filter_unhealthy: true,
173 trace_context: None,
174 max_in_flight_per_target: 64,
175 stream_window_initial: None,
176 request_window_initial: None,
177 request_headers: Vec::new(),
178 }
179 }
180}
181
182/// What [`MeshNode::call`] returns on success.
183#[derive(Debug, Clone)]
184pub struct RpcReply {
185 /// Response payload from the server's handler. Caller decodes
186 /// according to its application protocol.
187 pub body: Bytes,
188 /// Headers attached by the server's response.
189 pub headers: Vec<(String, Vec<u8>)>,
190 /// Wall-clock latency from `call(...)` to RESPONSE arrival.
191 pub latency_ns: u64,
192}
193
194/// What [`MeshNode::call`] returns on failure.
195#[derive(Debug, thiserror::Error)]
196pub enum RpcError {
197 /// No subscription / no route to the target. Either
198 /// `target_node_id` is unknown to the local mesh, or the
199 /// caller's reply-channel subscription couldn't be set up.
200 #[error("no route to target {target:#x}: {reason}")]
201 NoRoute {
202 /// Target node id the call was directed at.
203 target: u64,
204 /// Diagnostic — typically the underlying transport error.
205 reason: String,
206 },
207 /// Caller's deadline elapsed before a RESPONSE arrived. The
208 /// caller emits a CANCEL on timeout so the server can drop
209 /// the in-flight handler; this variant is returned to the
210 /// awaiting caller.
211 #[error("timeout after {elapsed_ms}ms")]
212 Timeout {
213 /// Wall-clock milliseconds elapsed before timeout fired.
214 elapsed_ms: u64,
215 },
216 /// Server returned a non-`Ok` status. Body carries the
217 /// server's diagnostic (UTF-8) when available.
218 #[error("server returned status {status:#06x}: {message}")]
219 ServerError {
220 /// Wire-level `RpcStatus` value the server returned.
221 status: u16,
222 /// UTF-8 diagnostic from the response body, when the body
223 /// decodes as valid UTF-8; otherwise hex-truncated.
224 message: String,
225 },
226 /// Underlying transport error (publish failure, encryption,
227 /// etc.).
228 #[error("transport: {0}")]
229 Transport(#[from] AdapterError),
230 /// Client-local serialization or deserialization failure.
231 /// `direction = Encode` means the typed wrapper failed to
232 /// encode the request before it ever hit the wire;
233 /// `direction = Decode` means the response landed but the
234 /// typed wrapper failed to decode it. Either way this is a
235 /// caller-fixable bug (wrong codec, schema drift, malformed
236 /// `Serialize` impl) — NOT a transient infra failure — so
237 /// retry / circuit-breaker predicates skip it by default.
238 #[error("codec ({direction:?}): {message}")]
239 Codec {
240 /// Which side of the call the codec failure happened on.
241 direction: CodecDirection,
242 /// Decode/encode diagnostic from the underlying serde impl.
243 message: String,
244 },
245 /// v0.4 capability-auth gate denied the call. Either the
246 /// target's latest `CapabilityAnnouncement` does not list
247 /// the requested `nrpc:<service>` tag, or it lists the tag
248 /// with allow-lists the caller does not match. See
249 /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3 for the model.
250 ///
251 /// Raised by the caller-side gate inside
252 /// [`MeshNode::call_service`] BEFORE the request hits the
253 /// wire, and surfaced by the caller on receipt of a
254 /// `RpcStatus::CapabilityDenied` response (the callee-side
255 /// defense-in-depth path).
256 #[error("capability denied: target {target:#x} does not authorize nrpc:{capability}")]
257 CapabilityDenied {
258 /// Target node id the gate denied.
259 target: u64,
260 /// Service / capability tag (without the `nrpc:` prefix)
261 /// the gate denied.
262 capability: String,
263 },
264}
265
266/// Which side of the call surfaced a [`RpcError::Codec`] failure.
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub enum CodecDirection {
269 /// Encoding the outbound request failed before the call was issued.
270 Encode,
271 /// Decoding the inbound response failed after the call returned Ok.
272 Decode,
273}
274
275/// RAII handle returned by [`MeshNode::serve_rpc`]. Dropping it
276/// unregisters the inbound dispatcher and removes the service
277/// from the local-services registry (so subsequent
278/// `announce_capabilities` calls stop emitting the
279/// `nrpc:<service>` tag).
280///
281/// **Bridge task lifecycle.** The bridge task that drains the
282/// inbound mpsc into the fold is NOT aborted on Drop. The
283/// `register_rpc_inbound` dispatcher closure owns the only
284/// `mpsc::Sender` clone, so `unregister_rpc_inbound` (which drops
285/// the dispatcher) closes the channel; the bridge's `rx.recv()`
286/// then yields `None` and the task exits cleanly after draining
287/// any queued events. Aborting would race events that are
288/// mid-`fold.lock().apply()` — those events would be killed
289/// without their RESPONSE being emitted, so the corresponding
290/// callers would just time out.
291///
292/// Outstanding handler executions (already-spawned tokio tasks)
293/// continue to completion regardless.
294pub struct ServeHandle {
295 /// Channel hash to unregister on Drop.
296 channel_hash: ChannelHash,
297 /// Service name to remove from `rpc_local_services` on Drop.
298 service: String,
299 /// The bridge task. Held only so callers can introspect /
300 /// detach it; Drop does NOT abort it (see struct doc-comment).
301 /// Detaches naturally when the handle is dropped — the bridge
302 /// exits on its own once the dispatcher's `mpsc::Sender` is
303 /// dropped via `unregister_rpc_inbound`.
304 _bridge: JoinHandle<()>,
305 /// Hold an Arc back to the mesh so we can unregister on Drop
306 /// without the mesh having to track us.
307 mesh: Arc<MeshNode>,
308}
309
310impl Drop for ServeHandle {
311 fn drop(&mut self) {
312 // Order matters: unregister the dispatcher FIRST so no new
313 // events can land in the bridge's mpsc, THEN drop the
314 // service-tag entry. The bridge task drains any in-flight
315 // events naturally and exits when its `rx.recv()` yields
316 // `None` (which happens as soon as the dispatcher closure
317 // — the sole `tx` owner — is dropped above).
318 self.mesh.unregister_rpc_inbound(self.channel_hash);
319 self.mesh.rpc_local_services_arc().remove(&self.service);
320 }
321}
322
323// ============================================================================
324// Streaming caller-side: RpcStream.
325// ============================================================================
326
327/// An open streaming RPC call. Implements `Stream<Item =
328/// Result<Bytes, RpcError>>` — yields chunks as the server emits
329/// them, terminates on a clean stream-end frame OR a non-`Ok`
330/// status (which is yielded as the last `Err` item before the
331/// stream closes).
332///
333/// Dropping the stream emits a CANCEL to the server (best-effort)
334/// and discards the pending entry — any chunks the server emits
335/// after the drop are silently discarded by the client fold.
336pub struct RpcStream {
337 mesh: Arc<MeshNode>,
338 target_node_id: u64,
339 request_channel: ChannelName,
340 self_origin: u64,
341 call_id: u64,
342 inner: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
343 /// Set true once we've yielded the terminal item (or an
344 /// error). Subsequent polls return `None`.
345 done: bool,
346 /// `Some(_)` if this stream uses flow control (caller set
347 /// `CallOptions::stream_window_initial`). Auto-grant emits 1
348 /// credit per delivered chunk, which keeps the server's
349 /// credit at roughly the initial window. `None` → no flow
350 /// control; `poll_next` does not emit grants.
351 stream_window: Option<u32>,
352 /// Observer-fire bookkeeping. Latched on terminal observation
353 /// in `poll_next`; fired once from `Drop` so the Deck NRPC
354 /// tab + every other `RpcObserver` consumer sees one event
355 /// per streaming-response call.
356 observer: StreamingObserverState,
357}
358
359impl RpcStream {
360 /// Server-assigned `call_id`. Useful for trace correlation /
361 /// custom logging at the call site.
362 pub fn call_id(&self) -> u64 {
363 self.call_id
364 }
365
366 /// Whether this stream is flow-controlled (caller set
367 /// `CallOptions::stream_window_initial`). Useful for tests +
368 /// diagnostics; user code typically doesn't need to inspect
369 /// this.
370 pub fn flow_controlled(&self) -> bool {
371 self.stream_window.is_some()
372 }
373
374 /// Explicitly grant `amount` more credits to the server's
375 /// pump. Spawns a fire-and-forget publish; doesn't await
376 /// acknowledgement. **No-op when flow control was not enabled
377 /// for this stream** — the server would silently drop the
378 /// grant anyway, and emitting wire traffic with no purpose
379 /// would just burn bandwidth.
380 ///
381 /// Auto-grant (1 credit per delivered chunk) covers the
382 /// common case; use this for batched cadence (e.g. grant
383 /// `window/2` after every `window/2` chunks consumed) when
384 /// `auto_grant`-style amortization isn't enough.
385 pub fn grant(&self, amount: u32) {
386 if !self.flow_controlled() || amount == 0 {
387 return;
388 }
389 spawn_grant_publish(
390 Arc::clone(&self.mesh),
391 self.target_node_id,
392 self.request_channel.clone(),
393 self.self_origin,
394 self.call_id,
395 amount,
396 );
397 }
398}
399
400/// Shared fire-and-forget GRANT-publish helper. Used by
401/// [`RpcStream::grant`] (explicit) and the auto-grant in
402/// [`RpcStream::poll_next`]. Same direct-unicast publish path as
403/// [`spawn_cancel_publish`], just with a different dispatch byte
404/// + a 4-byte u32 payload.
405fn spawn_grant_publish(
406 mesh: Arc<MeshNode>,
407 target: u64,
408 request_channel: ChannelName,
409 self_origin: u64,
410 call_id: u64,
411 amount: u32,
412) {
413 tokio::spawn(async move {
414 let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, self_origin, call_id, 0);
415 let request_channel_id = ChannelId::new(request_channel);
416 let request_channel_hash = request_channel_id.hash();
417 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
418 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
419 buf.extend_from_slice(&meta.to_bytes());
420 buf.extend_from_slice(&encode_stream_grant(amount));
421 let payload = Bytes::from(buf);
422 let _ = mesh
423 .publish_to_peer(
424 target,
425 request_channel_hash,
426 stream_id,
427 /* reliable */ true,
428 std::slice::from_ref(&payload),
429 )
430 .await;
431 });
432}
433
434impl futures::Stream for RpcStream {
435 type Item = Result<Bytes, RpcError>;
436
437 fn poll_next(
438 mut self: std::pin::Pin<&mut Self>,
439 cx: &mut std::task::Context<'_>,
440 ) -> std::task::Poll<Option<Self::Item>> {
441 if self.done {
442 return std::task::Poll::Ready(None);
443 }
444 match self.inner.poll_recv(cx) {
445 std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
446 // Auto-grant 1 credit per delivered chunk so the
447 // server's pump stays at roughly the initial
448 // window. No-op when the stream isn't flow-
449 // controlled. For batched cadence, callers can
450 // skip auto-grant by NOT setting
451 // `stream_window_initial` and using `RpcStream::grant`
452 // directly with their preferred batching.
453 if self.stream_window.is_some() {
454 spawn_grant_publish(
455 Arc::clone(&self.mesh),
456 self.target_node_id,
457 self.request_channel.clone(),
458 self.self_origin,
459 self.call_id,
460 1,
461 );
462 }
463 self.observer.add_response_bytes(body.len() as u32);
464 std::task::Poll::Ready(Some(Ok(body)))
465 }
466 std::task::Poll::Ready(Some(StreamItem::End)) => {
467 self.done = true;
468 self.observer.latch_ok();
469 std::task::Poll::Ready(None)
470 }
471 std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
472 self.done = true;
473 let status = resp.status.to_wire();
474 let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
475 format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
476 });
477 self.observer
478 .latch_error(format!("server returned status {status:#06x}: {message}"));
479 std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
480 }
481 std::task::Poll::Ready(None) => {
482 self.done = true;
483 std::task::Poll::Ready(None)
484 }
485 std::task::Poll::Pending => std::task::Poll::Pending,
486 }
487 }
488}
489
490impl Drop for RpcStream {
491 fn drop(&mut self) {
492 // Best-effort CANCEL to the server. Spawn a task because
493 // Drop can't be async; the publish happens off-thread.
494 // Also clear our pending entry so any in-flight chunks
495 // are dropped on arrival.
496 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
497 spawn_cancel_publish(
498 Arc::clone(&self.mesh),
499 self.target_node_id,
500 self.request_channel.clone(),
501 self.self_origin,
502 self.call_id,
503 );
504 // Fire the observer with the latched status (Ok / Error /
505 // Canceled). Idempotent — only the first fire emits.
506 self.observer.fire();
507 }
508}
509
510// ============================================================================
511// Phase C — caller-side client-streaming / duplex primitive.
512// ============================================================================
513
514/// Shared REQUEST_CHUNK-publish helper. Builds the wire frame and
515/// fires through `publish_to_peer` direct-unicast (same routing
516/// pattern as the initial REQUEST — caller knows the target).
517async fn publish_request_chunk(
518 mesh: &Arc<MeshNode>,
519 target: u64,
520 request_channel: &ChannelName,
521 self_origin: u64,
522 chunk: &RpcRequestChunkPayload,
523) -> Result<(), RpcError> {
524 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, self_origin, chunk.call_id, 0);
525 let mut buf = Vec::with_capacity(EVENT_META_SIZE + chunk.encoded_len());
526 buf.extend_from_slice(&meta.to_bytes());
527 buf.extend_from_slice(&chunk.encode());
528 let request_channel_id = ChannelId::new(request_channel.clone());
529 let request_channel_hash = request_channel_id.hash();
530 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
531 let payload = Bytes::from(buf);
532 mesh.publish_to_peer(
533 target,
534 request_channel_hash,
535 stream_id,
536 /* reliable */ true,
537 std::slice::from_ref(&payload),
538 )
539 .await
540 .map_err(RpcError::Transport)
541}
542
543/// Internal state of a [`ClientStreamCallRaw`]. The state machine
544/// is small: open the call (initial REQUEST not yet sent), then
545/// send N items (the first becomes the initial REQUEST, subsequent
546/// become REQUEST_CHUNKs), then finish (terminal REQUEST_END
547/// frame). After finish, no further sends are accepted.
548#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549enum ClientStreamState {
550 /// Pending entry registered, reply subscription ensured, but
551 /// the initial REQUEST has NOT been published to the wire yet.
552 /// First `send` flips this to `Sending`.
553 JustOpened,
554 /// Initial REQUEST has been published; subsequent sends ride
555 /// as REQUEST_CHUNKs.
556 Sending,
557 /// `finish` has been called; the terminal REQUEST_END frame
558 /// (or the initial REQUEST with FLAG_END for the degenerate
559 /// zero-send path) has been published. The terminal RESPONSE
560 /// has not necessarily arrived yet — that's awaited on the
561 /// caller's terminal_rx.
562 Finishing,
563 /// Terminal RESPONSE has been delivered. Drop is a no-op.
564 Done,
565}
566
567/// Caller-side handle for a client-streaming (or duplex Phase D)
568/// RPC. Push N items via [`ClientStreamCallRaw::send`], then
569/// [`ClientStreamCallRaw::finish`] to await the terminal RESPONSE.
570///
571/// **Lazy initial REQUEST.** The initial REQUEST is published on
572/// the FIRST `send()` (or on `finish()` if the caller sends nothing
573/// — that's the "zero-item upload" degenerate path that opens and
574/// closes the call in one frame). Constructing the handle does
575/// NOT yet emit any wire traffic beyond the reply-channel
576/// subscription setup.
577///
578/// **Flow control.** When the caller set
579/// [`CallOptions::request_window_initial`] to `Some(n)`, the
580/// handle holds an `n`-permit `Semaphore` that gates `send`. The
581/// server's [`DISPATCH_RPC_REQUEST_GRANT`] events refill the
582/// semaphore. When `None`, `send` doesn't block (caller is on the
583/// unbounded-credit fast path).
584///
585/// **Cancellation.** Dropping the handle BEFORE `finish` returns
586/// `Ok` fires a best-effort CANCEL to the server and clears the
587/// pending entry. Dropping after a successful `finish` is a no-op
588/// (terminal RESPONSE already delivered + entry removed).
589///
590/// Bidi streaming plan (Phase C).
591pub struct ClientStreamCallRaw {
592 mesh: Arc<MeshNode>,
593 target_node_id: u64,
594 request_channel: ChannelName,
595 self_origin: u64,
596 call_id: u64,
597 service: String,
598 /// Header set queued for the initial REQUEST. Drained on the
599 /// first publish (either `send` or `finish`).
600 initial_headers: Vec<(String, Vec<u8>)>,
601 /// Flag bits queued for the initial REQUEST. Always carries
602 /// `FLAG_RPC_CLIENT_STREAMING_REQUEST`; may also carry
603 /// `FLAG_RPC_PROPAGATE_TRACE` when the caller supplied a
604 /// trace context.
605 initial_flags: u16,
606 /// `deadline_ns` from `CallOptions::deadline`. Embedded in the
607 /// initial REQUEST.
608 deadline_ns: u64,
609 /// Per-call semaphore for upload credits. `None` when the
610 /// caller didn't opt into flow control (`request_window_initial`
611 /// was `None` on the `CallOptions`).
612 credit_sem: Option<Arc<tokio::sync::Semaphore>>,
613 /// Background task that drains REQUEST_GRANT credits from the
614 /// pending entry's grant mpsc into `credit_sem`. Aborted on
615 /// Drop. `None` when flow control is off.
616 grant_pump: Option<JoinHandle<()>>,
617 /// Single-shot terminal-RESPONSE receiver. Taken by `finish`;
618 /// after that `Drop` doesn't attempt to await again.
619 terminal_rx: Option<tokio::sync::oneshot::Receiver<RpcResponsePayload>>,
620 /// State machine. See [`ClientStreamState`].
621 state: ClientStreamState,
622 /// Wall-clock start (for `RpcReply::latency_ns` reporting).
623 started: Instant,
624 /// Observer-fire bookkeeping. Latched on terminal observation
625 /// in `finish`; fired once from `Drop` so the Deck NRPC tab +
626 /// every `RpcObserver` consumer sees one event per
627 /// client-streaming call.
628 observer: StreamingObserverState,
629}
630
631impl ClientStreamCallRaw {
632 /// Server-assigned `call_id`. Useful for trace correlation /
633 /// custom logging.
634 pub fn call_id(&self) -> u64 {
635 self.call_id
636 }
637
638 /// Whether this call is flow-controlled (caller set
639 /// `CallOptions::request_window_initial`).
640 pub fn flow_controlled(&self) -> bool {
641 self.credit_sem.is_some()
642 }
643
644 /// Push one body chunk to the server. Encodes as the initial
645 /// REQUEST (first call) or as a REQUEST_CHUNK (subsequent
646 /// calls). When flow control is opted into, awaits one credit
647 /// before publishing.
648 ///
649 /// Returns `Err(RpcError::Codec)` if called after [`Self::finish`].
650 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
651 match self.state {
652 ClientStreamState::Finishing | ClientStreamState::Done => {
653 return Err(RpcError::Codec {
654 direction: CodecDirection::Encode,
655 message: "send() called after finish()".to_string(),
656 });
657 }
658 _ => {}
659 }
660 // Gate on credit when flow control is opted into.
661 if let Some(sem) = self.credit_sem.as_ref() {
662 let permit = sem.clone().acquire_owned().await.map_err(|_| {
663 RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
664 })?;
665 permit.forget();
666 }
667 self.observer.add_request_bytes(body.len() as u32);
668 match self.state {
669 ClientStreamState::JustOpened => {
670 // First send → initial REQUEST.
671 let req = RpcRequestPayload {
672 service: self.service.clone(),
673 deadline_ns: self.deadline_ns,
674 flags: self.initial_flags,
675 headers: std::mem::take(&mut self.initial_headers),
676 body: body.clone(),
677 };
678 self.publish_initial_request(&req).await?;
679 self.state = ClientStreamState::Sending;
680 }
681 ClientStreamState::Sending => {
682 let chunk = RpcRequestChunkPayload {
683 call_id: self.call_id,
684 flags: 0,
685 headers: vec![],
686 body: body.clone(),
687 };
688 publish_request_chunk(
689 &self.mesh,
690 self.target_node_id,
691 &self.request_channel,
692 self.self_origin,
693 &chunk,
694 )
695 .await?;
696 }
697 ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
698 }
699 Ok(())
700 }
701
702 /// Close the upload direction and await the server's terminal
703 /// RESPONSE. Emits a REQUEST_CHUNK with `FLAG_RPC_REQUEST_END`
704 /// (empty body) if the call has already published its initial
705 /// REQUEST, or an initial REQUEST with both
706 /// `FLAG_RPC_CLIENT_STREAMING_REQUEST` and
707 /// `FLAG_RPC_REQUEST_END` set (the degenerate "zero-item
708 /// upload" path) if nothing was sent.
709 ///
710 /// Consumes the handle — Drop after `finish` is a no-op.
711 pub async fn finish(mut self) -> Result<RpcReply, RpcError> {
712 match self.state {
713 ClientStreamState::JustOpened => {
714 let req = RpcRequestPayload {
715 service: self.service.clone(),
716 deadline_ns: self.deadline_ns,
717 flags: self.initial_flags | FLAG_RPC_REQUEST_END,
718 headers: std::mem::take(&mut self.initial_headers),
719 body: Bytes::new(),
720 };
721 self.publish_initial_request(&req).await?;
722 }
723 ClientStreamState::Sending => {
724 let chunk = RpcRequestChunkPayload {
725 call_id: self.call_id,
726 flags: FLAG_RPC_REQUEST_END,
727 headers: vec![],
728 body: Bytes::new(),
729 };
730 publish_request_chunk(
731 &self.mesh,
732 self.target_node_id,
733 &self.request_channel,
734 self.self_origin,
735 &chunk,
736 )
737 .await?;
738 }
739 ClientStreamState::Finishing | ClientStreamState::Done => {
740 return Err(RpcError::Codec {
741 direction: CodecDirection::Encode,
742 message: "finish() called twice".to_string(),
743 });
744 }
745 }
746 self.state = ClientStreamState::Finishing;
747 let terminal_rx = self.terminal_rx.take().ok_or_else(|| {
748 RpcError::Transport(AdapterError::Connection(
749 "terminal receiver already consumed".into(),
750 ))
751 })?;
752 // Honor the deadline if the caller set one.
753 let resp = if self.deadline_ns > 0 {
754 let now = std::time::SystemTime::now()
755 .duration_since(std::time::UNIX_EPOCH)
756 .map(|d| d.as_nanos() as u64)
757 .unwrap_or(0);
758 let remaining = self.deadline_ns.saturating_sub(now);
759 match tokio::time::timeout(std::time::Duration::from_nanos(remaining), terminal_rx)
760 .await
761 {
762 Ok(Ok(r)) => r,
763 Ok(Err(_)) => {
764 let msg = "terminal sender dropped before response arrived";
765 self.observer.latch_error(msg);
766 return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
767 }
768 Err(_elapsed) => {
769 let elapsed_ms = self.started.elapsed().as_millis() as u64;
770 self.observer.latch_timeout();
771 return Err(RpcError::Timeout { elapsed_ms });
772 }
773 }
774 } else {
775 match terminal_rx.await {
776 Ok(r) => r,
777 Err(_) => {
778 let msg = "terminal sender dropped before response arrived";
779 self.observer.latch_error(msg);
780 return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
781 }
782 }
783 };
784 self.state = ClientStreamState::Done;
785 self.observer.add_response_bytes(resp.body.len() as u32);
786 if !resp.status.is_ok() {
787 // String::from_utf8 takes `Vec<u8>`. `Bytes::to_vec()`
788 // matches the prior `resp.body.clone()` semantics (full
789 // copy of the body for the error-formatting path);
790 // bulk-throughput improvement lives on the decode side,
791 // not here.
792 let message = String::from_utf8(resp.body.to_vec())
793 .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
794 self.observer.latch_error(format!(
795 "server returned status {:#06x}: {message}",
796 resp.status.to_wire()
797 ));
798 return Err(RpcError::ServerError {
799 status: resp.status.to_wire(),
800 message,
801 });
802 }
803 self.observer.latch_ok();
804 let latency_ns = self.started.elapsed().as_nanos() as u64;
805 Ok(RpcReply {
806 body: resp.body,
807 headers: resp.headers,
808 latency_ns,
809 })
810 }
811
812 async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
813 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self.self_origin, self.call_id, 0);
814 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
815 buf.extend_from_slice(&meta.to_bytes());
816 buf.extend_from_slice(&req.encode());
817 let request_channel_id = ChannelId::new(self.request_channel.clone());
818 let request_channel_hash = request_channel_id.hash();
819 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
820 let payload = Bytes::from(buf);
821 self.mesh
822 .publish_to_peer(
823 self.target_node_id,
824 request_channel_hash,
825 stream_id,
826 /* reliable */ true,
827 std::slice::from_ref(&payload),
828 )
829 .await
830 .map_err(RpcError::Transport)
831 }
832}
833
834impl Drop for ClientStreamCallRaw {
835 fn drop(&mut self) {
836 if let Some(task) = self.grant_pump.take() {
837 task.abort();
838 }
839 // Fire the observer with whatever status was latched
840 // (Ok / Error / Timeout / Canceled). Idempotent — only
841 // the first call emits.
842 self.observer.fire();
843 if matches!(self.state, ClientStreamState::Done) {
844 // Successful completion — pending entry already gone,
845 // no CANCEL needed.
846 return;
847 }
848 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
849 // Only fire CANCEL on the wire if the server has actually
850 // seen the initial REQUEST. A `JustOpened` Drop means we
851 // never published anything; no need to CANCEL a call the
852 // server doesn't know about.
853 if !matches!(self.state, ClientStreamState::JustOpened) {
854 spawn_cancel_publish(
855 Arc::clone(&self.mesh),
856 self.target_node_id,
857 self.request_channel.clone(),
858 self.self_origin,
859 self.call_id,
860 );
861 }
862 }
863}
864
865// ============================================================================
866// Phase D — caller-side duplex primitive.
867// ============================================================================
868
869/// Shared state between a `DuplexSink` and its sibling
870/// `DuplexStream`. Both halves hold an `Arc<DuplexInner>`; when
871/// the refcount hits zero (i.e. both halves dropped) the Drop
872/// fires CANCEL to the server unless the call was cleanly closed
873/// (`clean_close = true`).
874struct DuplexInner {
875 mesh: Arc<MeshNode>,
876 target_node_id: u64,
877 request_channel: ChannelName,
878 self_origin: u64,
879 call_id: u64,
880 /// Whether the initial REQUEST was successfully published.
881 /// `false` means we never reached the wire — no CANCEL needed
882 /// (server doesn't know about the call).
883 initial_sent: std::sync::atomic::AtomicBool,
884 /// Set true when the call closes cleanly — terminal RESPONSE
885 /// (or terminal Error) was observed on the response stream.
886 /// Suppresses CANCEL-on-drop.
887 clean_close: std::sync::atomic::AtomicBool,
888 /// Observer-fire bookkeeping. Latched from the various
889 /// terminal-observation sites (DuplexCall::next /
890 /// DuplexStream::poll_next yielding End or Error); fired
891 /// once on Drop. The DuplexCall / DuplexSink / DuplexStream
892 /// each share access via the surrounding Arc<DuplexInner>.
893 observer: StreamingObserverState,
894}
895
896impl Drop for DuplexInner {
897 fn drop(&mut self) {
898 self.mesh.rpc_client_pending_arc().cancel(self.call_id);
899 // Fire the observer with the latched status (Ok / Error /
900 // Canceled). Idempotent — only the first call emits.
901 self.observer.fire();
902 if self.clean_close.load(Ordering::SeqCst) {
903 return;
904 }
905 if !self.initial_sent.load(Ordering::SeqCst) {
906 return;
907 }
908 spawn_cancel_publish(
909 Arc::clone(&self.mesh),
910 self.target_node_id,
911 self.request_channel.clone(),
912 self.self_origin,
913 self.call_id,
914 );
915 }
916}
917
918/// Send half of a duplex call. Push items via `send`; emit the
919/// terminal REQUEST_END frame via `finish_sending`. After
920/// `finish_sending` the upload side is closed but the sibling
921/// `DuplexStream` continues yielding response chunks until the
922/// server's terminal frame arrives.
923///
924/// Bidi streaming plan (Phase D).
925pub struct DuplexSink {
926 inner: Arc<DuplexInner>,
927 service: String,
928 initial_headers: Vec<(String, Vec<u8>)>,
929 initial_flags: u16,
930 deadline_ns: u64,
931 credit_sem: Option<Arc<tokio::sync::Semaphore>>,
932 grant_pump: Option<JoinHandle<()>>,
933 state: ClientStreamState,
934}
935
936impl DuplexSink {
937 /// Push one body chunk to the server. Same semantics as
938 /// [`ClientStreamCallRaw::send`].
939 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
940 match self.state {
941 ClientStreamState::Finishing | ClientStreamState::Done => {
942 return Err(RpcError::Codec {
943 direction: CodecDirection::Encode,
944 message: "send() called after finish_sending()".to_string(),
945 });
946 }
947 _ => {}
948 }
949 if let Some(sem) = self.credit_sem.as_ref() {
950 let permit = sem.clone().acquire_owned().await.map_err(|_| {
951 RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
952 })?;
953 permit.forget();
954 }
955 self.inner.observer.add_request_bytes(body.len() as u32);
956 match self.state {
957 ClientStreamState::JustOpened => {
958 let req = RpcRequestPayload {
959 service: self.service.clone(),
960 deadline_ns: self.deadline_ns,
961 flags: self.initial_flags,
962 headers: std::mem::take(&mut self.initial_headers),
963 body: body.clone(),
964 };
965 self.publish_initial_request(&req).await?;
966 self.inner.initial_sent.store(true, Ordering::SeqCst);
967 self.state = ClientStreamState::Sending;
968 }
969 ClientStreamState::Sending => {
970 let chunk = RpcRequestChunkPayload {
971 call_id: self.inner.call_id,
972 flags: 0,
973 headers: vec![],
974 body: body.clone(),
975 };
976 publish_request_chunk(
977 &self.inner.mesh,
978 self.inner.target_node_id,
979 &self.inner.request_channel,
980 self.inner.self_origin,
981 &chunk,
982 )
983 .await?;
984 }
985 ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
986 }
987 Ok(())
988 }
989
990 /// Close the upload direction. Emits the terminal REQUEST_END
991 /// frame. The response stream continues until the server's
992 /// terminal RESPONSE arrives (use the sibling `DuplexStream`).
993 pub async fn finish_sending(mut self) -> Result<(), RpcError> {
994 match self.state {
995 ClientStreamState::JustOpened => {
996 let req = RpcRequestPayload {
997 service: self.service.clone(),
998 deadline_ns: self.deadline_ns,
999 flags: self.initial_flags | FLAG_RPC_REQUEST_END,
1000 headers: std::mem::take(&mut self.initial_headers),
1001 body: Bytes::new(),
1002 };
1003 self.publish_initial_request(&req).await?;
1004 self.inner.initial_sent.store(true, Ordering::SeqCst);
1005 }
1006 ClientStreamState::Sending => {
1007 let chunk = RpcRequestChunkPayload {
1008 call_id: self.inner.call_id,
1009 flags: FLAG_RPC_REQUEST_END,
1010 headers: vec![],
1011 body: Bytes::new(),
1012 };
1013 publish_request_chunk(
1014 &self.inner.mesh,
1015 self.inner.target_node_id,
1016 &self.inner.request_channel,
1017 self.inner.self_origin,
1018 &chunk,
1019 )
1020 .await?;
1021 }
1022 ClientStreamState::Finishing | ClientStreamState::Done => {
1023 return Err(RpcError::Codec {
1024 direction: CodecDirection::Encode,
1025 message: "finish_sending() called twice".to_string(),
1026 });
1027 }
1028 }
1029 self.state = ClientStreamState::Finishing;
1030 Ok(())
1031 }
1032
1033 /// Server-assigned `call_id`. Same value on the sibling
1034 /// `DuplexStream`.
1035 pub fn call_id(&self) -> u64 {
1036 self.inner.call_id
1037 }
1038
1039 /// Whether this call is flow-controlled on the upload side.
1040 pub fn flow_controlled(&self) -> bool {
1041 self.credit_sem.is_some()
1042 }
1043
1044 async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
1045 let meta = EventMeta::new(
1046 DISPATCH_RPC_REQUEST,
1047 0,
1048 self.inner.self_origin,
1049 self.inner.call_id,
1050 0,
1051 );
1052 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
1053 buf.extend_from_slice(&meta.to_bytes());
1054 buf.extend_from_slice(&req.encode());
1055 let request_channel_id = ChannelId::new(self.inner.request_channel.clone());
1056 let request_channel_hash = request_channel_id.hash();
1057 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1058 let payload = Bytes::from(buf);
1059 self.inner
1060 .mesh
1061 .publish_to_peer(
1062 self.inner.target_node_id,
1063 request_channel_hash,
1064 stream_id,
1065 /* reliable */ true,
1066 std::slice::from_ref(&payload),
1067 )
1068 .await
1069 .map_err(RpcError::Transport)
1070 }
1071}
1072
1073impl Drop for DuplexSink {
1074 fn drop(&mut self) {
1075 if let Some(task) = self.grant_pump.take() {
1076 task.abort();
1077 }
1078 // The shared DuplexInner's Drop (when refcount hits 0)
1079 // does the CANCEL — nothing to do here beyond aborting
1080 // the grant pump.
1081 }
1082}
1083
1084/// Receive half of a duplex call. Implements `futures::Stream`
1085/// yielding `Result<Bytes, RpcError>` per inbound RESPONSE chunk.
1086/// EOF on terminal Ok; one final `Err(RpcError::ServerError)` on
1087/// terminal non-Ok.
1088///
1089/// Bidi streaming plan (Phase D).
1090pub struct DuplexStream {
1091 inner: Arc<DuplexInner>,
1092 chunks_rx: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
1093 done: bool,
1094}
1095
1096impl DuplexStream {
1097 /// Server-assigned `call_id`. Same value on the sibling
1098 /// `DuplexSink`.
1099 pub fn call_id(&self) -> u64 {
1100 self.inner.call_id
1101 }
1102}
1103
1104impl futures::Stream for DuplexStream {
1105 type Item = Result<Bytes, RpcError>;
1106
1107 fn poll_next(
1108 mut self: std::pin::Pin<&mut Self>,
1109 cx: &mut std::task::Context<'_>,
1110 ) -> std::task::Poll<Option<Self::Item>> {
1111 if self.done {
1112 return std::task::Poll::Ready(None);
1113 }
1114 match self.chunks_rx.poll_recv(cx) {
1115 std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
1116 self.inner.observer.add_response_bytes(body.len() as u32);
1117 std::task::Poll::Ready(Some(Ok(body)))
1118 }
1119 std::task::Poll::Ready(Some(StreamItem::End)) => {
1120 self.done = true;
1121 self.inner.clean_close.store(true, Ordering::SeqCst);
1122 self.inner.observer.latch_ok();
1123 std::task::Poll::Ready(None)
1124 }
1125 std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
1126 self.done = true;
1127 self.inner.clean_close.store(true, Ordering::SeqCst);
1128 let status = resp.status.to_wire();
1129 let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
1130 format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
1131 });
1132 self.inner
1133 .observer
1134 .latch_error(format!("server returned status {status:#06x}: {message}"));
1135 std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
1136 }
1137 std::task::Poll::Ready(None) => {
1138 self.done = true;
1139 std::task::Poll::Ready(None)
1140 }
1141 std::task::Poll::Pending => std::task::Poll::Pending,
1142 }
1143 }
1144}
1145
1146/// Caller-side handle for a duplex RPC. Combines a `DuplexSink`
1147/// (upload) and `DuplexStream` (download). For application code
1148/// that wants to encode requests in one task and decode responses
1149/// in another, use [`Self::into_split`] to peel off the two halves.
1150///
1151/// Bidi streaming plan (Phase D).
1152pub struct DuplexCallRaw {
1153 sink: DuplexSink,
1154 stream: DuplexStream,
1155}
1156
1157impl DuplexCallRaw {
1158 /// Server-assigned `call_id`.
1159 pub fn call_id(&self) -> u64 {
1160 self.sink.call_id()
1161 }
1162
1163 /// Whether the upload side is flow-controlled.
1164 pub fn flow_controlled(&self) -> bool {
1165 self.sink.flow_controlled()
1166 }
1167
1168 /// Push one body chunk to the server. Delegates to the inner
1169 /// `DuplexSink::send`.
1170 pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1171 self.sink.send(body).await
1172 }
1173
1174 /// Close the upload direction. Delegates to the inner
1175 /// `DuplexSink::finish_sending` but keeps the receive side
1176 /// alive so the caller can keep polling response chunks.
1177 ///
1178 /// NOTE: consumes the sink half but not the stream half.
1179 /// Internally, we replace `self.sink` with a no-op
1180 /// placeholder so subsequent send() / finish_sending()
1181 /// surface a clear error (`send() after finish_sending()`).
1182 pub async fn finish_sending(&mut self) -> Result<(), RpcError> {
1183 // Take the sink out by swapping in a placeholder whose
1184 // state is `Done` so subsequent sends error cleanly.
1185 let placeholder = DuplexSink {
1186 inner: Arc::clone(&self.sink.inner),
1187 service: String::new(),
1188 initial_headers: Vec::new(),
1189 initial_flags: 0,
1190 deadline_ns: 0,
1191 credit_sem: None,
1192 grant_pump: None,
1193 state: ClientStreamState::Done,
1194 };
1195 let sink = std::mem::replace(&mut self.sink, placeholder);
1196 sink.finish_sending().await
1197 }
1198
1199 /// Pull the next response chunk. `None` on terminal Ok;
1200 /// `Some(Err)` then `None` on terminal non-Ok. Same shape as
1201 /// `futures::StreamExt::next`.
1202 pub async fn next(&mut self) -> Option<Result<Bytes, RpcError>> {
1203 use futures::StreamExt;
1204 self.stream.next().await
1205 }
1206
1207 /// Split into independent send / receive halves. Both halves
1208 /// hold an `Arc<DuplexInner>`; CANCEL fires only when BOTH
1209 /// halves drop without a clean close.
1210 pub fn into_split(self) -> (DuplexSink, DuplexStream) {
1211 (self.sink, self.stream)
1212 }
1213}
1214
1215impl futures::Stream for DuplexCallRaw {
1216 type Item = Result<Bytes, RpcError>;
1217
1218 fn poll_next(
1219 mut self: std::pin::Pin<&mut Self>,
1220 cx: &mut std::task::Context<'_>,
1221 ) -> std::task::Poll<Option<Self::Item>> {
1222 std::pin::Pin::new(&mut self.stream).poll_next(cx)
1223 }
1224}
1225
1226// ============================================================================
1227// Unary call: CANCEL-on-drop guard.
1228// ============================================================================
1229
1230/// RAII guard that fires CANCEL to the server if the unary call
1231/// future is dropped before a response arrives. Without this, a
1232/// `select!`-loser future (e.g. hedge runner-up) would leave the
1233/// server-side handler running to completion — wasting CPU on a
1234/// reply nobody will read.
1235///
1236/// The guard is built *after* the REQUEST has been successfully
1237/// published — if the publish fails, no guard is constructed and
1238/// no CANCEL is sent. On the success path the call function flips
1239/// `completed = true` so Drop becomes a no-op (the server already
1240/// finished and removed its in-flight entry).
1241struct UnaryCallGuard {
1242 pending: Arc<super::cortex::RpcClientPending>,
1243 mesh: Arc<MeshNode>,
1244 target_node_id: u64,
1245 request_channel: ChannelName,
1246 self_origin: u64,
1247 call_id: u64,
1248 /// True after the call resolved Ok or got a definitive
1249 /// non-cancellable Err. Drop checks this — `false` fires
1250 /// CANCEL, `true` is a no-op (still removes the pending
1251 /// entry).
1252 completed: bool,
1253}
1254
1255impl Drop for UnaryCallGuard {
1256 fn drop(&mut self) {
1257 self.pending.cancel(self.call_id);
1258 if !self.completed {
1259 spawn_cancel_publish(
1260 Arc::clone(&self.mesh),
1261 self.target_node_id,
1262 self.request_channel.clone(),
1263 self.self_origin,
1264 self.call_id,
1265 );
1266 }
1267 }
1268}
1269
1270// ============================================================================
1271// Streaming/duplex observer-fire bookkeeping.
1272//
1273// The unary `MeshNode::call` fires `RpcObserver::on_call` at each
1274// terminal return path (see line ~2306). The streaming /
1275// client-streaming / duplex paths have multiple terminal points
1276// (poll_next sees End / Error; finish() returns; Drop without
1277// terminal observation). To avoid sprinkling `fire_rpc_observer_outbound`
1278// at every terminal site, each handle holds a
1279// `StreamingObserverState` that latches the terminal status on
1280// observation and fires exactly once on Drop. The Deck NRPC tab
1281// + every consumer of `RpcObserver` get one event per streaming
1282// / duplex call, same as for unary today.
1283// ============================================================================
1284
1285/// Per-call observer-fire bookkeeping shared between the
1286/// streaming + client-streaming + duplex caller-side handles.
1287/// Latches terminal status on observation; `fire()` (called from
1288/// the handle's Drop) emits one `RpcCallEvent` with the latched
1289/// status (or `Canceled` if nothing latched — i.e. the handle
1290/// was dropped before observing its terminator).
1291///
1292/// Status discriminator:
1293/// 0 = none latched (Drop → Canceled)
1294/// 1 = Ok
1295/// 2 = Error (message in `observer_msg`)
1296/// 3 = Timeout
1297pub(crate) struct StreamingObserverState {
1298 mesh: Arc<MeshNode>,
1299 target_node_id: u64,
1300 service: String,
1301 started: Instant,
1302 request_bytes: AtomicU32,
1303 response_bytes: AtomicU32,
1304 observer_status: AtomicU8,
1305 observer_msg: parking_lot::Mutex<Option<String>>,
1306 fired: AtomicBool,
1307}
1308
1309impl StreamingObserverState {
1310 pub(crate) fn new(
1311 mesh: Arc<MeshNode>,
1312 target_node_id: u64,
1313 service: impl Into<String>,
1314 request_bytes: u32,
1315 ) -> Self {
1316 Self {
1317 mesh,
1318 target_node_id,
1319 service: service.into(),
1320 started: Instant::now(),
1321 request_bytes: AtomicU32::new(request_bytes),
1322 response_bytes: AtomicU32::new(0),
1323 observer_status: AtomicU8::new(0),
1324 observer_msg: parking_lot::Mutex::new(None),
1325 fired: AtomicBool::new(false),
1326 }
1327 }
1328
1329 pub(crate) fn add_request_bytes(&self, n: u32) {
1330 self.request_bytes.fetch_add(n, Ordering::Relaxed);
1331 }
1332
1333 pub(crate) fn add_response_bytes(&self, n: u32) {
1334 self.response_bytes.fetch_add(n, Ordering::Relaxed);
1335 }
1336
1337 pub(crate) fn latch_ok(&self) {
1338 self.observer_status.store(1, Ordering::Relaxed);
1339 }
1340
1341 pub(crate) fn latch_error(&self, msg: impl Into<String>) {
1342 *self.observer_msg.lock() = Some(msg.into());
1343 self.observer_status.store(2, Ordering::Relaxed);
1344 }
1345
1346 pub(crate) fn latch_timeout(&self) {
1347 self.observer_status.store(3, Ordering::Relaxed);
1348 }
1349
1350 /// Fire the observer event. Idempotent — only the first call
1351 /// actually emits; subsequent are no-ops. Called from each
1352 /// streaming handle's Drop.
1353 pub(crate) fn fire(&self) {
1354 if self.fired.swap(true, Ordering::SeqCst) {
1355 return;
1356 }
1357 let status_code = self.observer_status.load(Ordering::Relaxed);
1358 let status = match status_code {
1359 1 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
1360 2 => {
1361 let msg = self.observer_msg.lock().clone().unwrap_or_default();
1362 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(msg)
1363 }
1364 3 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
1365 _ => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1366 };
1367 self.mesh.fire_rpc_observer_outbound(
1368 self.target_node_id,
1369 &self.service,
1370 self.started.elapsed().as_millis() as u32,
1371 status,
1372 self.request_bytes.load(Ordering::Relaxed),
1373 self.response_bytes.load(Ordering::Relaxed),
1374 );
1375 }
1376}
1377
1378/// Per-call cap on in-flight request-direction credits. Tokio's
1379/// `Semaphore::MAX_PERMITS` is `usize::MAX >> 3`; we cap the
1380/// caller-side accumulator at this value so a misbehaving server
1381/// can't make the caller hold an unbounded outstanding window.
1382/// 1M credits is already orders of magnitude beyond any sane
1383/// request burst — a caller sitting on 1M unconsumed credits is
1384/// either misconfigured or under attack.
1385const REQUEST_GRANT_PER_CALL_CAP: usize = 1_000_000;
1386
1387/// Add `credits` to a caller-side request-direction credit
1388/// semaphore, capped so the accumulator never exceeds
1389/// [`REQUEST_GRANT_PER_CALL_CAP`]. Per-frame cap of `usize::MAX >> 4`
1390/// remains as a second line of defense against pathological frame
1391/// values.
1392fn add_request_grant_credits(sem: &tokio::sync::Semaphore, credits: u32) {
1393 if credits == 0 {
1394 return;
1395 }
1396 let current = sem.available_permits();
1397 let remaining = REQUEST_GRANT_PER_CALL_CAP.saturating_sub(current);
1398 let safe = (credits as usize).min(usize::MAX >> 4).min(remaining);
1399 if safe > 0 {
1400 sem.add_permits(safe);
1401 }
1402}
1403
1404/// Build a coalescing REQUEST_GRANT emitter.
1405///
1406/// Naive emitters `tokio::spawn` one publish task per consumed
1407/// chunk, which becomes a spawn-storm + AEAD-storm under bursting.
1408/// This helper hands back an emitter that pushes `(caller_origin,
1409/// call_id, credits)` into an unbounded mpsc; a single dedicated
1410/// drainer task `try_recv`s the queue to drain whatever is
1411/// immediately available, coalesces credits per call_id, and
1412/// publishes ONE batched REQUEST_GRANT per call per drain cycle.
1413///
1414/// Lifecycle: the drainer task lives as long as any clone of the
1415/// returned emitter (mpsc sender count > 0). When the fold and all
1416/// in-flight handlers release the emitter, `rx.recv` returns `None`
1417/// and the drainer exits naturally.
1418fn build_request_grant_emitter(
1419 mesh: Arc<MeshNode>,
1420 service: String,
1421 server_origin: u64,
1422 diag_tag: &'static str,
1423) -> RpcRequestGrantEmitter {
1424 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(u64, u64, u32)>();
1425 tokio::spawn(async move {
1426 while let Some(first) = rx.recv().await {
1427 let mut summed: std::collections::HashMap<(u64, u64), u32> =
1428 std::collections::HashMap::new();
1429 let (caller, call_id, credits) = first;
1430 summed.insert((caller, call_id), credits);
1431 // Coalesce anything immediately queued behind the first
1432 // wake. Bounded by what the substrate has produced so
1433 // far; doesn't add latency since `try_recv` returns
1434 // immediately when the queue is empty.
1435 while let Ok((caller, call_id, credits)) = rx.try_recv() {
1436 let entry = summed.entry((caller, call_id)).or_insert(0);
1437 *entry = entry.saturating_add(credits);
1438 }
1439 for ((caller, call_id), credits) in summed {
1440 let reply_channel_name = format!("{service}.replies.{caller:016x}");
1441 let reply_channel = match ChannelName::new(&reply_channel_name) {
1442 Ok(c) => c,
1443 Err(e) => {
1444 tracing::warn!(
1445 error = %e,
1446 channel = %reply_channel_name,
1447 tag = diag_tag,
1448 "rpc grant drainer: invalid reply channel name");
1449 continue;
1450 }
1451 };
1452 let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, server_origin, call_id, 0);
1453 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
1454 buf.extend_from_slice(&meta.to_bytes());
1455 buf.extend_from_slice(&encode_request_grant(call_id, credits));
1456 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1457 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1458 tracing::warn!(
1459 error = %e,
1460 caller_origin = format!("{:#x}", caller),
1461 call_id,
1462 tag = diag_tag,
1463 "rpc grant drainer: REQUEST_GRANT publish failed");
1464 }
1465 }
1466 }
1467 });
1468 Arc::new(move |caller_origin, call_id, credits| {
1469 // Send failure means the drainer has exited (all sender
1470 // clones dropped, then we somehow cloned a stale one).
1471 // Treat as a no-op — the call is tearing down anyway.
1472 let _ = tx.send((caller_origin, call_id, credits));
1473 })
1474}
1475
1476/// Shared CANCEL-publish helper: spawn a task that fires a
1477/// CANCEL event for `call_id` to `target` on the request channel.
1478/// Both [`RpcStream::Drop`] and [`UnaryCallGuard::Drop`] use it.
1479fn spawn_cancel_publish(
1480 mesh: Arc<MeshNode>,
1481 target: u64,
1482 request_channel: ChannelName,
1483 self_origin: u64,
1484 call_id: u64,
1485) {
1486 tokio::spawn(async move {
1487 let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, self_origin, call_id, 0);
1488 let request_channel_id = ChannelId::new(request_channel);
1489 let request_channel_hash = request_channel_id.hash();
1490 let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1491 let payload = Bytes::from(meta.to_bytes().to_vec());
1492 let _ = mesh
1493 .publish_to_peer(
1494 target,
1495 request_channel_hash,
1496 stream_id,
1497 /* reliable */ true,
1498 std::slice::from_ref(&payload),
1499 )
1500 .await;
1501 });
1502}
1503
1504// ============================================================================
1505// MeshNode extensions.
1506// ============================================================================
1507
1508impl MeshNode {
1509 /// Register an nRPC handler for `service` on this node.
1510 ///
1511 /// Subscribes this node to `<service>.requests` (so the local
1512 /// `register_rpc_inbound` dispatcher feeds inbound REQUEST
1513 /// events into the [`RpcServerFold`]) and wires the fold's
1514 /// RESPONSE-emit callback to publish on
1515 /// `<service>.replies.<caller_origin>` via the existing
1516 /// pub/sub path.
1517 ///
1518 /// **Local-only registration** (Phase 1). Multi-instance
1519 /// services that load-balance via `SubscriptionMode::QueueGroup`
1520 /// require each replica to call `serve_rpc` on its own node;
1521 /// the mesh-level subscriber roster + `dispatch_recipients`
1522 /// then routes one-of-N as designed. Each replica's local
1523 /// `serve_rpc` must use the same service name (which becomes
1524 /// the queue-group identifier).
1525 ///
1526 /// Returns a [`ServeHandle`] whose Drop tears down the
1527 /// registration. Concurrent registrations for the same service
1528 /// on one node return `Err(ServeError::AlreadyServing)`.
1529 pub fn serve_rpc<H: RpcHandler>(
1530 self: &Arc<Self>,
1531 service: &str,
1532 handler: Arc<H>,
1533 ) -> Result<ServeHandle, ServeError> {
1534 let request_channel = ChannelName::new(&format!("{service}.requests"))
1535 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1536 let channel_hash = request_channel.hash();
1537
1538 // Bridge: a tokio mpsc the inbound dispatcher pushes into.
1539 // The bridge task drains it and runs each event through
1540 // the fold. Bounded so a runaway publisher can't OOM the
1541 // server; over-cap pushes drop the inbound event (which
1542 // surfaces to the caller as a timeout).
1543 let (tx, mut rx) = mpsc::channel::<RpcInboundEvent>(1024);
1544
1545 // Build the emit closure. When the handler completes, the
1546 // fold calls this with `(caller_origin, call_id, response)`.
1547 // The closure publishes a RESPONSE event on
1548 // `<service>.replies.<caller_origin>` via the existing
1549 // pub/sub path. `tokio::spawn` keeps the closure
1550 // synchronous (the fold doesn't await).
1551 let mesh_for_emit = Arc::clone(self);
1552 let service_for_emit = service.to_string();
1553 let server_origin = self.identity_origin_hash();
1554 let emit: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1555 let mesh = Arc::clone(&mesh_for_emit);
1556 let service = service_for_emit.clone();
1557 tokio::spawn(async move {
1558 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1559 let reply_channel = match ChannelName::new(&reply_channel_name) {
1560 Ok(c) => c,
1561 Err(e) => {
1562 tracing::warn!(error = %e, channel = %reply_channel_name,
1563 "rpc serve_rpc: invalid reply channel name");
1564 return;
1565 }
1566 };
1567 // Build the RESPONSE event envelope: 24-byte meta
1568 // + encoded RpcResponsePayload.
1569 let meta = EventMeta::new(
1570 super::cortex::DISPATCH_RPC_RESPONSE,
1571 0,
1572 server_origin,
1573 call_id,
1574 0,
1575 );
1576 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1577 buf.extend_from_slice(&meta.to_bytes());
1578 buf.extend_from_slice(&resp.encode());
1579
1580 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1581 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1582 tracing::warn!(error = %e, caller_origin = format!("{:#x}", caller_origin),
1583 call_id, "rpc serve_rpc: response publish failed");
1584 }
1585 });
1586 });
1587
1588 // Build the server fold and wrap it in an Arc<Mutex<...>>
1589 // so the bridge task can drive it (the trait takes
1590 // `&mut self`). Attach the per-service metrics handle so
1591 // the spawned handler tasks bump server-side counters.
1592 let metrics_handle = self.rpc_metrics_arc().for_service(service);
1593 // Keep a clone of the emit closure for the callee-side
1594 // capability-auth defense-in-depth path in the bridge
1595 // below — the fold owns its own clone, this one only
1596 // emits the `CapabilityDenied` rejection before the fold
1597 // sees the event.
1598 let emit_for_bridge = Arc::clone(&emit);
1599 // Clone the per-service metrics handle so the bridge can
1600 // bump `capability_denied_total` on gate rejection. The
1601 // fold's own clone (passed via `with_metrics`) handles the
1602 // handler-side counters; this one covers the path BEFORE
1603 // the handler runs, which the fold-side metrics never see.
1604 let metrics_for_bridge = Arc::clone(&metrics_handle);
1605 let fold = Arc::new(Mutex::new(
1606 RpcServerFold::new(handler as Arc<dyn RpcHandler>, emit).with_metrics(metrics_handle),
1607 ));
1608
1609 // Register the inbound dispatcher. Push into the mpsc;
1610 // the bridge task does the actual fold work.
1611 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1612 // Best-effort send — over-cap means the bridge can't
1613 // keep up; drop and let the caller time out. Logging
1614 // here would spam.
1615 let _ = tx.try_send(ev);
1616 });
1617 // Register the service in `rpc_local_services` and refresh
1618 // the self-indexed announcement BEFORE installing the
1619 // dispatcher so the callee-side gate (in the bridge below)
1620 // sees a self-announcement carrying `nrpc:<service>` the
1621 // moment the first inbound event lands. Without this, the
1622 // gate was either silently permissive (no self-ann) or
1623 // silently denying (self-ann from a prior
1624 // `announce_capabilities` that pre-dated this service's
1625 // registration). See `docs/misc/CODE_REVIEW_2026_05_19_CAPABILITY_AUTH.md`
1626 // H1 + H2.
1627 self.rpc_local_services_arc().insert(service.to_string());
1628 self.index_self_with_local_services();
1629
1630 if self
1631 .register_rpc_inbound(channel_hash, dispatcher)
1632 .is_some()
1633 {
1634 return Err(ServeError::AlreadyServing(service.to_string()));
1635 }
1636
1637 // Spawn the bridge task. It reads inbound events, runs
1638 // the v0.4 capability-auth callee-side gate (defense in
1639 // depth — the caller-side gate inside `call_service`
1640 // covers the well-behaved client path), and on accept
1641 // feeds them to the fold.
1642 let mesh_for_bridge = Arc::clone(self);
1643 let service_for_bridge = service.to_string();
1644 let bridge = tokio::spawn(async move {
1645 let tag = format!("nrpc:{}", service_for_bridge);
1646 while let Some(inbound) = rx.recv().await {
1647 // Defense-in-depth check. Skip only when the wire
1648 // session resolved no NodeId (`from_node == 0` is
1649 // the loopback / test sentinel per
1650 // `RpcInboundEvent::from_node` — production wire
1651 // delivery drops events that fail NodeId
1652 // resolution rather than passing 0). The cold-
1653 // start "no self-ann" skip the original
1654 // implementation carried was a permissive hole;
1655 // `index_self_with_local_services` above
1656 // guarantees a self-ann exists before the
1657 // dispatcher is wired, so denying when the gate
1658 // says no is now the safe failure mode.
1659 let self_node = mesh_for_bridge.node_id();
1660 let index = mesh_for_bridge.capability_index_arc();
1661 let from_node = inbound.from_node;
1662 if from_node != 0 && !index.may_execute(self_node, &tag, from_node) {
1663 // Decode the EventMeta so we can address the
1664 // caller's reply channel (keyed on
1665 // `caller_origin`) and tag the response with
1666 // the correct `call_id`. A garbled meta means
1667 // the request would have been rejected by the
1668 // fold's own decode path too; drop silently
1669 // to match the existing skip-on-malformed
1670 // behavior there.
1671 let Some(meta) = (if inbound.payload.len() >= EVENT_META_SIZE {
1672 EventMeta::from_bytes(&inbound.payload[..EVENT_META_SIZE])
1673 } else {
1674 None
1675 }) else {
1676 continue;
1677 };
1678 let resp = super::cortex::RpcResponsePayload {
1679 status: RpcStatus::CapabilityDenied,
1680 headers: vec![],
1681 body: Bytes::from(format!(
1682 "callee-side capability-auth gate denied nrpc:{}",
1683 service_for_bridge
1684 )),
1685 };
1686 // Server-side metrics: bump `capability_denied_total`
1687 // on the per-service counter. The fold-side
1688 // metrics never see this path (the handler isn't
1689 // invoked), so without this bump a noisy
1690 // unauthorized caller is invisible to operators
1691 // watching `nrpc_handler_invocations_total` —
1692 // the dashboard sees "0 requests" while the
1693 // caller sees `CapabilityDenied`.
1694 metrics_for_bridge
1695 .capability_denied_total
1696 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1697 (emit_for_bridge)(meta.origin_hash, meta.seq_or_ts, resp);
1698 continue;
1699 }
1700 let payload = inbound.payload;
1701 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1702 let ev = RedexEvent { entry, payload };
1703 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1704 tracing::warn!(error = %e, "rpc serve_rpc: fold apply error");
1705 }
1706 }
1707 });
1708
1709 // Spawn an async re-announce so peers also learn about
1710 // the new service without the operator having to call
1711 // `announce_capabilities` manually. The local self-index
1712 // already happened above; this is purely for peer
1713 // visibility (the broadcast path also re-runs the
1714 // self-index, which is a cheap version bump).
1715 let mesh_for_announce = Arc::clone(self);
1716 let service_for_log = service.to_string();
1717 tokio::spawn(async move {
1718 let baseline = mesh_for_announce.user_caps_snapshot();
1719 if let Err(e) = mesh_for_announce.announce_capabilities(baseline).await {
1720 tracing::warn!(
1721 error = %e,
1722 service = %service_for_log,
1723 "serve_rpc: auto re-announce failed",
1724 );
1725 }
1726 });
1727
1728 Ok(ServeHandle {
1729 channel_hash,
1730 service: service.to_string(),
1731 _bridge: bridge,
1732 mesh: Arc::clone(self),
1733 })
1734 }
1735
1736 /// Streaming variant of [`Self::serve_rpc`]. The handler
1737 /// receives an [`RpcResponseSink`](super::cortex::RpcResponseSink)
1738 /// it writes chunks to via `sink.send(body)`; returning
1739 /// `Ok(())` closes the stream cleanly, `Err(_)` closes with
1740 /// an error frame.
1741 ///
1742 /// Wire-level identical to the unary path apart from the
1743 /// per-chunk `nrpc-streaming` header markers
1744 /// (`continue` / `end`). Same auto-registration of
1745 /// `<service>.requests` + `<service>.replies.` prefix.
1746 pub fn serve_rpc_streaming<H: RpcStreamingHandler>(
1747 self: &Arc<Self>,
1748 service: &str,
1749 handler: Arc<H>,
1750 ) -> Result<ServeHandle, ServeError> {
1751 let request_channel = ChannelName::new(&format!("{service}.requests"))
1752 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1753 let channel_hash = request_channel.hash();
1754 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
1755
1756 let mesh_for_emit = Arc::clone(self);
1757 let service_for_emit = service.to_string();
1758 let server_origin = self.identity_origin_hash();
1759 // Async emit so the streaming fold's pump can `.await` each
1760 // publish — guarantees per-call chunk ordering on the wire.
1761 let emit: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1762 let mesh = Arc::clone(&mesh_for_emit);
1763 let service = service_for_emit.clone();
1764 Box::pin(async move {
1765 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1766 let reply_channel = match ChannelName::new(&reply_channel_name) {
1767 Ok(c) => c,
1768 Err(e) => {
1769 tracing::warn!(error = %e, channel = %reply_channel_name,
1770 "rpc serve_rpc_streaming: invalid reply channel name");
1771 return;
1772 }
1773 };
1774 let meta = EventMeta::new(
1775 super::cortex::DISPATCH_RPC_RESPONSE,
1776 0,
1777 server_origin,
1778 call_id,
1779 0,
1780 );
1781 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1782 buf.extend_from_slice(&meta.to_bytes());
1783 buf.extend_from_slice(&resp.encode());
1784 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1785 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1786 tracing::warn!(error = %e,
1787 caller_origin = format!("{:#x}", caller_origin),
1788 call_id,
1789 "rpc serve_rpc_streaming: chunk publish failed");
1790 }
1791 })
1792 });
1793
1794 // Attach per-service metrics so the spawned handler tasks
1795 // + pump task bump server-side counters (including the
1796 // streaming-only `streaming_chunks_emitted_total`).
1797 let metrics_handle = self.rpc_metrics_arc().for_service(service);
1798 let fold = Arc::new(Mutex::new(
1799 RpcServerStreamingFold::new(handler as Arc<dyn RpcStreamingHandler>, emit)
1800 .with_metrics(metrics_handle),
1801 ));
1802 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1803 let _ = tx.try_send(ev);
1804 });
1805 if self
1806 .register_rpc_inbound(channel_hash, dispatcher)
1807 .is_some()
1808 {
1809 return Err(ServeError::AlreadyServing(service.to_string()));
1810 }
1811 let bridge = tokio::spawn(async move {
1812 while let Some(inbound) = rx.recv().await {
1813 let payload = inbound.payload;
1814 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1815 let ev = RedexEvent { entry, payload };
1816 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1817 tracing::warn!(error = %e, "rpc serve_rpc_streaming: fold apply error");
1818 }
1819 }
1820 });
1821 self.rpc_local_services_arc().insert(service.to_string());
1822 Ok(ServeHandle {
1823 channel_hash,
1824 service: service.to_string(),
1825 _bridge: bridge,
1826 mesh: Arc::clone(self),
1827 })
1828 }
1829
1830 /// Register a client-streaming nRPC handler for `service`.
1831 /// Mirror of [`Self::serve_rpc_streaming`] but using the
1832 /// request-side fold ([`RpcStreamingRequestFold`]) — the
1833 /// handler receives one stream of REQUEST_CHUNK bodies and
1834 /// emits one terminal RESPONSE.
1835 ///
1836 /// Wires two emit callbacks:
1837 /// - A sync [`RpcResponseEmitter`] for the terminal RESPONSE
1838 /// (single emit per call, no ordering concern).
1839 /// - An [`RpcRequestGrantEmitter`] for upload-direction
1840 /// credit grants, which publishes [`DISPATCH_RPC_REQUEST_GRANT`]
1841 /// events on the caller's reply channel.
1842 ///
1843 /// Bidi streaming plan (Phase C).
1844 pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>(
1845 self: &Arc<Self>,
1846 service: &str,
1847 handler: Arc<H>,
1848 ) -> Result<ServeHandle, ServeError> {
1849 let request_channel = ChannelName::new(&format!("{service}.requests"))
1850 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1851 let channel_hash = request_channel.hash();
1852 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
1853
1854 let mesh_for_emit = Arc::clone(self);
1855 let service_for_emit = service.to_string();
1856 let server_origin = self.identity_origin_hash();
1857
1858 // Terminal RESPONSE emitter — sync because there's only
1859 // one RESPONSE per call (no per-call ordering concern that
1860 // would require an async-await between chunks).
1861 let emit_resp_mesh = Arc::clone(&mesh_for_emit);
1862 let emit_resp_service = service_for_emit.clone();
1863 let emit_resp: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1864 let mesh = Arc::clone(&emit_resp_mesh);
1865 let service = emit_resp_service.clone();
1866 tokio::spawn(async move {
1867 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1868 let reply_channel = match ChannelName::new(&reply_channel_name) {
1869 Ok(c) => c,
1870 Err(e) => {
1871 tracing::warn!(error = %e, channel = %reply_channel_name,
1872 "rpc serve_rpc_client_stream: invalid reply channel name");
1873 return;
1874 }
1875 };
1876 let meta = EventMeta::new(
1877 super::cortex::DISPATCH_RPC_RESPONSE,
1878 0,
1879 server_origin,
1880 call_id,
1881 0,
1882 );
1883 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1884 buf.extend_from_slice(&meta.to_bytes());
1885 buf.extend_from_slice(&resp.encode());
1886 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1887 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1888 tracing::warn!(error = %e,
1889 caller_origin = format!("{:#x}", caller_origin),
1890 call_id,
1891 "rpc serve_rpc_client_stream: terminal RESPONSE publish failed");
1892 }
1893 });
1894 });
1895
1896 // REQUEST_GRANT emitter — coalesces per-chunk credits into
1897 // a single drainer task that batches by call_id. Avoids the
1898 // tokio::spawn-per-emit storm under bursting.
1899 let emit_grant = build_request_grant_emitter(
1900 Arc::clone(&mesh_for_emit),
1901 service_for_emit.clone(),
1902 server_origin,
1903 "serve_rpc_client_stream",
1904 );
1905
1906 let metrics_handle = self.rpc_metrics_arc().for_service(service);
1907 let fold = Arc::new(Mutex::new(
1908 RpcStreamingRequestFold::new(handler as Arc<dyn RpcClientStreamingHandler>, emit_resp)
1909 .with_grant_emitter(emit_grant)
1910 .with_metrics(metrics_handle),
1911 ));
1912 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1913 let _ = tx.try_send(ev);
1914 });
1915 if self
1916 .register_rpc_inbound(channel_hash, dispatcher)
1917 .is_some()
1918 {
1919 return Err(ServeError::AlreadyServing(service.to_string()));
1920 }
1921 let bridge = tokio::spawn(async move {
1922 while let Some(inbound) = rx.recv().await {
1923 let payload = inbound.payload;
1924 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1925 let ev = RedexEvent { entry, payload };
1926 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1927 tracing::warn!(error = %e,
1928 "rpc serve_rpc_client_stream: fold apply error");
1929 }
1930 }
1931 });
1932 self.rpc_local_services_arc().insert(service.to_string());
1933 Ok(ServeHandle {
1934 channel_hash,
1935 service: service.to_string(),
1936 _bridge: bridge,
1937 mesh: Arc::clone(self),
1938 })
1939 }
1940
1941 /// Client-streaming variant of [`Self::call`]. Returns a
1942 /// [`ClientStreamCallRaw`] handle the caller pushes N items
1943 /// into via `send`, then `finish` to await the terminal
1944 /// RESPONSE.
1945 ///
1946 /// **Lazy initial REQUEST.** This method does NOT publish a
1947 /// REQUEST to the wire. It only ensures the caller's reply
1948 /// subscription is set up and registers the pending entry; the
1949 /// initial REQUEST is emitted by the first `send` (or by
1950 /// `finish` for the zero-item degenerate path).
1951 ///
1952 /// Sets `FLAG_RPC_CLIENT_STREAMING_REQUEST` on the initial
1953 /// REQUEST so the server's request-streaming fold knows to
1954 /// open a request-side stream. Optional `request_window_initial`
1955 /// header opts into upload-direction flow control.
1956 ///
1957 /// Bidi streaming plan (Phase C).
1958 pub async fn call_client_stream(
1959 self: &Arc<Self>,
1960 target_node_id: u64,
1961 service: &str,
1962 opts: CallOptions,
1963 ) -> Result<ClientStreamCallRaw, RpcError> {
1964 // `request_window_initial = Some(0)` would deadlock the
1965 // caller: every `send` awaits a credit, but the initial
1966 // REQUEST is lazy (not emitted until the first send), so
1967 // the server never sees the call and never publishes a
1968 // GRANT. Reject up front — `None` means "unbounded credit",
1969 // any positive value opts into flow control.
1970 if matches!(opts.request_window_initial, Some(0)) {
1971 return Err(RpcError::Codec {
1972 direction: CodecDirection::Encode,
1973 message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
1974 .to_string(),
1975 });
1976 }
1977 // T1.3: per-service route cache (see PERF_AUDIT
1978 // 2026-05-19). One DashMap::get + Arc::clone instead of
1979 // 2 format! + 2 ChannelName::new + xxhash per call.
1980 let route = self.rpc_route_or_no_route(target_node_id, service)?;
1981 let self_origin = self.identity_origin_hash();
1982 self.ensure_reply_subscription(
1983 target_node_id,
1984 service,
1985 route.reply_channel.clone(),
1986 route.reply_hash,
1987 )
1988 .await?;
1989
1990 let call_id = mint_random_call_id();
1991 let pending = self.rpc_client_pending();
1992 let (terminal_rx, mut grant_rx) =
1993 pending.register_client_streaming(call_id, target_node_id);
1994
1995 // Build the header set + flags we'll queue for the initial
1996 // REQUEST (deferred to the first send / finish).
1997 let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST;
1998 let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
1999 if let Some(tc) = opts.trace_context.as_ref() {
2000 initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2001 initial_headers.extend(build_trace_headers(tc));
2002 }
2003 if let Some(window) = opts.request_window_initial {
2004 initial_headers.push((
2005 HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2006 window.to_string().into_bytes(),
2007 ));
2008 }
2009 initial_headers.extend(opts.request_headers.iter().cloned());
2010
2011 // Per-call credit semaphore when flow control is opted in.
2012 // Initial permits = the caller's declared window. Refilled
2013 // by REQUEST_GRANT events arriving on the reply channel,
2014 // pumped through `grant_rx` by the spawned `grant_pump`.
2015 let credit_sem = opts
2016 .request_window_initial
2017 .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2018 let grant_pump = credit_sem.as_ref().map(|sem| {
2019 let sem = Arc::clone(sem);
2020 tokio::spawn(async move {
2021 while let Some(credits) = grant_rx.recv().await {
2022 add_request_grant_credits(&sem, credits);
2023 }
2024 })
2025 });
2026
2027 let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2028 let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2029 Ok(ClientStreamCallRaw {
2030 mesh: Arc::clone(self),
2031 target_node_id,
2032 request_channel: route.request_channel.clone(),
2033 self_origin,
2034 call_id,
2035 service: service.to_string(),
2036 initial_headers,
2037 initial_flags,
2038 deadline_ns,
2039 credit_sem,
2040 grant_pump,
2041 terminal_rx: Some(terminal_rx),
2042 state: ClientStreamState::JustOpened,
2043 started: Instant::now(),
2044 observer,
2045 })
2046 }
2047
2048 /// Register a duplex nRPC handler for `service`. Composes
2049 /// [`Self::serve_rpc_client_stream`] (request-side stream)
2050 /// with [`Self::serve_rpc_streaming`] (response-side multi-
2051 /// fire emit) via [`RpcDuplexFold`].
2052 ///
2053 /// Wires THREE emit callbacks:
2054 /// - Async [`RpcAsyncResponseEmitter`] for response chunks +
2055 /// the terminal frame (per-call ordering required because
2056 /// the response side is multi-fire).
2057 /// - [`RpcRequestGrantEmitter`] for upload-direction credit
2058 /// grants (one per consumed request chunk when flow
2059 /// control is opted into).
2060 ///
2061 /// Bidi streaming plan (Phase D).
2062 pub fn serve_rpc_duplex<H: RpcDuplexHandler>(
2063 self: &Arc<Self>,
2064 service: &str,
2065 handler: Arc<H>,
2066 ) -> Result<ServeHandle, ServeError> {
2067 let request_channel = ChannelName::new(&format!("{service}.requests"))
2068 .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2069 let channel_hash = request_channel.hash();
2070 let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2071
2072 let mesh_for_emit = Arc::clone(self);
2073 let service_for_emit = service.to_string();
2074 let server_origin = self.identity_origin_hash();
2075
2076 // Async response emitter — per-call ordering matters here
2077 // because the response side is multi-fire (same rationale
2078 // as serve_rpc_streaming).
2079 let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2080 let emit_resp_service = service_for_emit.clone();
2081 let emit_resp: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2082 let mesh = Arc::clone(&emit_resp_mesh);
2083 let service = emit_resp_service.clone();
2084 Box::pin(async move {
2085 let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2086 let reply_channel = match ChannelName::new(&reply_channel_name) {
2087 Ok(c) => c,
2088 Err(e) => {
2089 tracing::warn!(error = %e, channel = %reply_channel_name,
2090 "rpc serve_rpc_duplex: invalid reply channel name");
2091 return;
2092 }
2093 };
2094 let meta = EventMeta::new(
2095 super::cortex::DISPATCH_RPC_RESPONSE,
2096 0,
2097 server_origin,
2098 call_id,
2099 0,
2100 );
2101 let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2102 buf.extend_from_slice(&meta.to_bytes());
2103 buf.extend_from_slice(&resp.encode());
2104 let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
2105 if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
2106 tracing::warn!(error = %e,
2107 caller_origin = format!("{:#x}", caller_origin),
2108 call_id,
2109 "rpc serve_rpc_duplex: chunk publish failed");
2110 }
2111 })
2112 });
2113
2114 // Request-direction grant emitter — same coalescing
2115 // drainer shape as serve_rpc_client_stream.
2116 let emit_grant = build_request_grant_emitter(
2117 Arc::clone(&mesh_for_emit),
2118 service_for_emit.clone(),
2119 server_origin,
2120 "serve_rpc_duplex",
2121 );
2122
2123 let metrics_handle = self.rpc_metrics_arc().for_service(service);
2124 let fold = Arc::new(Mutex::new(
2125 RpcDuplexFold::new(handler as Arc<dyn RpcDuplexHandler>, emit_resp)
2126 .with_grant_emitter(emit_grant)
2127 .with_metrics(metrics_handle),
2128 ));
2129 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2130 let _ = tx.try_send(ev);
2131 });
2132 if self
2133 .register_rpc_inbound(channel_hash, dispatcher)
2134 .is_some()
2135 {
2136 return Err(ServeError::AlreadyServing(service.to_string()));
2137 }
2138 let bridge = tokio::spawn(async move {
2139 while let Some(inbound) = rx.recv().await {
2140 let payload = inbound.payload;
2141 let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2142 let ev = RedexEvent { entry, payload };
2143 if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2144 tracing::warn!(error = %e,
2145 "rpc serve_rpc_duplex: fold apply error");
2146 }
2147 }
2148 });
2149 self.rpc_local_services_arc().insert(service.to_string());
2150 Ok(ServeHandle {
2151 channel_hash,
2152 service: service.to_string(),
2153 _bridge: bridge,
2154 mesh: Arc::clone(self),
2155 })
2156 }
2157
2158 /// Duplex variant of [`Self::call`]. Returns a
2159 /// [`DuplexCallRaw`] handle with both upload (`send`,
2160 /// `finish_sending`) and download (`next`, or impl
2161 /// `futures::Stream`) surfaces. Use `into_split` to peel off
2162 /// the two halves for the "encoder task + decoder task"
2163 /// shape.
2164 ///
2165 /// Initial REQUEST sets BOTH `FLAG_RPC_CLIENT_STREAMING_REQUEST`
2166 /// AND `FLAG_RPC_STREAMING_RESPONSE`. Lazy publish — the
2167 /// initial REQUEST flies on the first `send` (or on
2168 /// `finish_sending` for the zero-item degenerate path).
2169 ///
2170 /// Bidi streaming plan (Phase D).
2171 pub async fn call_duplex(
2172 self: &Arc<Self>,
2173 target_node_id: u64,
2174 service: &str,
2175 opts: CallOptions,
2176 ) -> Result<DuplexCallRaw, RpcError> {
2177 // Same deadlock guard as `call_client_stream`: Some(0)
2178 // means "send must await a credit that can never arrive"
2179 // because the initial REQUEST is lazy.
2180 if matches!(opts.request_window_initial, Some(0)) {
2181 return Err(RpcError::Codec {
2182 direction: CodecDirection::Encode,
2183 message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2184 .to_string(),
2185 });
2186 }
2187 // T1.3: per-service route cache (see PERF_AUDIT
2188 // 2026-05-19). One DashMap::get + Arc::clone instead of
2189 // 2 format! + 2 ChannelName::new + xxhash per call.
2190 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2191 let self_origin = self.identity_origin_hash();
2192 self.ensure_reply_subscription(
2193 target_node_id,
2194 service,
2195 route.reply_channel.clone(),
2196 route.reply_hash,
2197 )
2198 .await?;
2199
2200 let call_id = mint_random_call_id();
2201 let pending = self.rpc_client_pending();
2202 let (chunks_rx, mut grant_rx) = pending.register_duplex(call_id, target_node_id);
2203
2204 let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
2205 let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2206 if let Some(tc) = opts.trace_context.as_ref() {
2207 initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2208 initial_headers.extend(build_trace_headers(tc));
2209 }
2210 if let Some(window) = opts.request_window_initial {
2211 initial_headers.push((
2212 HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2213 window.to_string().into_bytes(),
2214 ));
2215 }
2216 if let Some(window) = opts.stream_window_initial {
2217 initial_headers.push((
2218 HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2219 window.to_string().into_bytes(),
2220 ));
2221 }
2222 initial_headers.extend(opts.request_headers.iter().cloned());
2223
2224 let credit_sem = opts
2225 .request_window_initial
2226 .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2227 let grant_pump = credit_sem.as_ref().map(|sem| {
2228 let sem = Arc::clone(sem);
2229 tokio::spawn(async move {
2230 while let Some(credits) = grant_rx.recv().await {
2231 add_request_grant_credits(&sem, credits);
2232 }
2233 })
2234 });
2235
2236 let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2237 let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2238 let inner = Arc::new(DuplexInner {
2239 mesh: Arc::clone(self),
2240 target_node_id,
2241 request_channel: route.request_channel.clone(),
2242 self_origin,
2243 call_id,
2244 initial_sent: std::sync::atomic::AtomicBool::new(false),
2245 clean_close: std::sync::atomic::AtomicBool::new(false),
2246 observer,
2247 });
2248 let sink = DuplexSink {
2249 inner: Arc::clone(&inner),
2250 service: service.to_string(),
2251 initial_headers,
2252 initial_flags,
2253 deadline_ns,
2254 credit_sem,
2255 grant_pump,
2256 state: ClientStreamState::JustOpened,
2257 };
2258 let stream = DuplexStream {
2259 inner,
2260 chunks_rx,
2261 done: false,
2262 };
2263 Ok(DuplexCallRaw { sink, stream })
2264 }
2265
2266 /// Streaming variant of [`Self::call`]. Returns an
2267 /// [`RpcStream`] that yields chunks (as `Result<Bytes, RpcError>`)
2268 /// until the server closes the stream.
2269 ///
2270 /// Sets `FLAG_RPC_STREAMING_RESPONSE` on the request so the
2271 /// server's streaming fold knows to expect multi-fire emits.
2272 /// Same lazy reply-subscription + direct-unicast REQUEST
2273 /// as the unary `call` path.
2274 ///
2275 /// Cancellation: dropping the returned `RpcStream` emits a
2276 /// CANCEL to the server (best-effort) and discards any
2277 /// in-flight chunks.
2278 pub async fn call_streaming(
2279 self: &Arc<Self>,
2280 target_node_id: u64,
2281 service: &str,
2282 payload: Bytes,
2283 opts: CallOptions,
2284 ) -> Result<RpcStream, RpcError> {
2285 // `stream_window_initial = Some(0)` would deadlock the
2286 // RESPONSE direction by default: server's pump awaits one
2287 // credit per chunk, the caller's auto-grant only fires on
2288 // consumed chunks, and the first chunk can never be
2289 // delivered. `None` means "unbounded credit"; any positive
2290 // value opts into flow control. Reject up front — symmetric
2291 // with the request-direction guard in `call_client_stream`.
2292 if matches!(opts.stream_window_initial, Some(0)) {
2293 return Err(RpcError::Codec {
2294 direction: CodecDirection::Encode,
2295 message: "stream_window_initial must be None or >= 1; Some(0) deadlocks the response pump"
2296 .to_string(),
2297 });
2298 }
2299 // T1.3: per-service route cache. One DashMap::get + Arc::clone
2300 // on the hot path instead of 2 format! + 2 ChannelName::new +
2301 // xxhash per call.
2302 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2303 let self_origin = self.identity_origin_hash();
2304 self.ensure_reply_subscription(
2305 target_node_id,
2306 service,
2307 route.reply_channel.clone(),
2308 route.reply_hash,
2309 )
2310 .await?;
2311
2312 let call_id = mint_random_call_id();
2313 let pending = self.rpc_client_pending();
2314 // S-4 part 2: bind the pending entry to the wire-session
2315 // peer the request is dispatched to. The fold's deliver
2316 // gate rejects RESPONSE frames whose from_node doesn't
2317 // match, so a leaked call_id alone can't spoof a reply.
2318 let rx = pending.register_streaming(call_id, target_node_id);
2319
2320 // Build the REQUEST: STREAMING_RESPONSE flag plus optional
2321 // trace-context headers / propagate-trace flag, same as
2322 // unary `call`. Plus the optional flow-control header
2323 // (`nrpc-stream-window-initial`) when the caller opted in
2324 // via `CallOptions::stream_window_initial`.
2325 let mut flags = FLAG_RPC_STREAMING_RESPONSE;
2326 let mut headers = Vec::new();
2327 if let Some(tc) = opts.trace_context.as_ref() {
2328 flags |= FLAG_RPC_PROPAGATE_TRACE;
2329 headers.extend(build_trace_headers(tc));
2330 }
2331 if let Some(window) = opts.stream_window_initial {
2332 headers.push((
2333 HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2334 window.to_string().into_bytes(),
2335 ));
2336 }
2337 // Append caller-supplied request headers (Phase 9b — same
2338 // semantics as the unary `call` path).
2339 headers.extend(opts.request_headers.iter().cloned());
2340 let req = RpcRequestPayload {
2341 service: service.to_string(),
2342 deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2343 flags,
2344 headers,
2345 body: payload.clone(),
2346 };
2347 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2348 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2349 buf.extend_from_slice(&meta.to_bytes());
2350 buf.extend_from_slice(&req.encode());
2351
2352 let payload_bytes = Bytes::from(buf);
2353 if let Err(e) = self
2354 .publish_to_peer(
2355 target_node_id,
2356 route.request_channel_hash,
2357 route.request_stream_id,
2358 /* reliable */ true,
2359 std::slice::from_ref(&payload_bytes),
2360 )
2361 .await
2362 {
2363 pending.cancel(call_id);
2364 return Err(RpcError::Transport(e));
2365 }
2366
2367 let request_bytes_len = payload_bytes.len() as u32;
2368 Ok(RpcStream {
2369 mesh: Arc::clone(self),
2370 target_node_id,
2371 request_channel: route.request_channel.clone(),
2372 self_origin,
2373 call_id,
2374 inner: rx,
2375 done: false,
2376 stream_window: opts.stream_window_initial,
2377 observer: StreamingObserverState::new(
2378 Arc::clone(self),
2379 target_node_id,
2380 service,
2381 request_bytes_len,
2382 ),
2383 })
2384 }
2385
2386 /// Find every node currently advertising `service` via the
2387 /// `nrpc:<service>` capability tag. Returns node IDs in
2388 /// roster order; the caller picks one (or use [`Self::call_service`]
2389 /// for the round-robin shortcut).
2390 ///
2391 /// Pre-Phase 2: requires the target nodes to have called
2392 /// `serve_rpc` AND `announce_capabilities` so the
2393 /// `nrpc:<service>` tag has propagated through capability
2394 /// announcements. The local node's own services are NOT
2395 /// automatically included (callers don't typically invoke
2396 /// themselves via the network — for in-process invocation,
2397 /// the user has the handler directly).
2398 pub fn find_service_nodes(&self, service: &str) -> Vec<u64> {
2399 use crate::adapter::net::behavior::capability::CapabilityFilter;
2400 let tag = format!("nrpc:{service}");
2401 let filter = CapabilityFilter::default().require_tag(tag);
2402 self.capability_index_arc().query(&filter)
2403 }
2404
2405 /// Issue an RPC call to `service`, picking one node from
2406 /// those advertising the `nrpc:<service>` tag in the local
2407 /// capability index according to `opts.routing_policy`.
2408 ///
2409 /// Returns `RpcError::NoRoute` if no nodes advertise the
2410 /// service (or if `opts.filter_unhealthy` is set and every
2411 /// candidate is unavailable per the local `ProximityGraph`).
2412 pub async fn call_service(
2413 self: &Arc<Self>,
2414 service: &str,
2415 payload: Bytes,
2416 opts: CallOptions,
2417 ) -> Result<RpcReply, RpcError> {
2418 let mut candidates = self.find_service_nodes(service);
2419 if candidates.is_empty() {
2420 return Err(RpcError::NoRoute {
2421 target: 0,
2422 reason: format!(
2423 "no nodes advertise `nrpc:{service}` (have any servers \
2424 for this service called serve_rpc + announce_capabilities?)"
2425 ),
2426 });
2427 }
2428
2429 // Health filtering. Skip candidates the proximity graph
2430 // marks unhealthy (`!is_available()`). Candidates with no
2431 // proximity entry at all are KEPT — absence of evidence
2432 // is not evidence of unhealth, and a freshly-announced
2433 // service shouldn't be filtered just because pingwaves
2434 // haven't propagated yet.
2435 //
2436 // The bridge: each candidate's session-layer `node_id: u64`
2437 // is mapped to the entity-layer `[u8; 32]` via
2438 // `MeshNode::entity_id_for_node`. The proximity graph is
2439 // keyed on the entity id.
2440 if opts.filter_unhealthy {
2441 let proximity = self.proximity_graph();
2442 candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
2443 Some(entity_id) => match proximity.get_node(&entity_id) {
2444 Some(node) => node.is_available(),
2445 None => true, // no proximity data → keep
2446 },
2447 None => true, // no entity-id mapping → keep
2448 });
2449 if candidates.is_empty() {
2450 return Err(RpcError::NoRoute {
2451 target: 0,
2452 reason: format!(
2453 "every node advertising `nrpc:{service}` is marked \
2454 unhealthy by the local proximity graph",
2455 ),
2456 });
2457 }
2458 }
2459
2460 // Sort once so consistent-hash policies (Sticky) produce
2461 // a stable ordering across calls regardless of how the
2462 // capability index returned the candidates, and so the
2463 // LowestLatency-with-no-proximity-data fallback is
2464 // deterministic. Cheap — the candidate set is typically
2465 // small.
2466 candidates.sort_unstable();
2467
2468 // v0.4 capability-auth caller-side gate. Filter the
2469 // candidate set BEFORE target selection so the routing
2470 // policy never picks a peer the caller can't actually
2471 // reach. Pre-fix `select_target` could pick a denied
2472 // candidate even when authorized peers existed in the
2473 // set, and the resulting `CapabilityDenied` masked the
2474 // fact that the call would have succeeded against B or
2475 // C. Each candidate's own announcement lists
2476 // `nrpc:<service>` (otherwise it wouldn't be a
2477 // `find_service_nodes` candidate), so the gate's
2478 // `has_tag` arm short-circuits in the common case; the
2479 // new work is the allow-list scan. Permissive
2480 // announcements (all three lists empty) admit any
2481 // caller — the byte-identity wire-compat tests pin that
2482 // an unmodified peer's announcement stays unrestricted.
2483 // See `docs/plans/CAPABILITY_AUTH_PLAN.md` §3.
2484 let tag = format!("nrpc:{service}");
2485 let index = self.capability_index_arc();
2486 let self_id = self.node_id();
2487 let any_candidate = candidates[0];
2488 candidates.retain(|c| index.may_execute(*c, &tag, self_id));
2489 if candidates.is_empty() {
2490 return Err(RpcError::CapabilityDenied {
2491 // No authorized target; surface one of the
2492 // originally-advertised candidates so the caller
2493 // can correlate the denial with a real peer. The
2494 // semantic is "no peer advertising `nrpc:<service>`
2495 // authorizes this caller" — `any_candidate` is a
2496 // representative, not necessarily the strictest.
2497 target: any_candidate,
2498 capability: service.to_string(),
2499 });
2500 }
2501
2502 let target = self.select_target(&candidates, &opts.routing_policy);
2503 self.call(target, service, payload, opts).await
2504 }
2505
2506 /// Select a single target from `candidates` according to
2507 /// `policy`. Caller has already ensured `candidates` is
2508 /// non-empty and sorted (so `Sticky` is consistent across
2509 /// calls).
2510 fn select_target(&self, candidates: &[u64], policy: &RoutingPolicy) -> u64 {
2511 match policy {
2512 RoutingPolicy::RoundRobin => {
2513 // `fetch_add(1)` on a dedicated cursor — NOT a
2514 // `load(call_id)` — so two concurrent
2515 // `call_service` invocations always observe
2516 // distinct values and pick distinct targets.
2517 let n = self
2518 .rpc_round_robin_cursor_arc()
2519 .fetch_add(1, Ordering::Relaxed);
2520 candidates[(n as usize) % candidates.len()]
2521 }
2522 RoutingPolicy::Random => {
2523 // Lightweight RNG via a fresh fetch_add (same
2524 // counter, separate per-call value) mixed through
2525 // xxh3. Sufficient for load distribution;
2526 // not cryptographically random.
2527 let n = self
2528 .rpc_round_robin_cursor_arc()
2529 .fetch_add(1, Ordering::Relaxed);
2530 let mixed = xxhash_rust::xxh3::xxh3_64(&n.to_le_bytes());
2531 candidates[(mixed as usize) % candidates.len()]
2532 }
2533 RoutingPolicy::Sticky { key } => {
2534 // Consistent-hash to a position in the (sorted)
2535 // candidate list. Same key + same candidate set =
2536 // same target. A change to the candidate set
2537 // (server failover) reshuffles roughly 1/N of keys.
2538 let h = xxhash_rust::xxh3::xxh3_64(&key.to_le_bytes());
2539 candidates[(h as usize) % candidates.len()]
2540 }
2541 RoutingPolicy::LowestLatency => {
2542 // Walk candidates, look up each via the bridge
2543 // → proximity graph, pick the smallest
2544 // `latency_us`. Candidates without a proximity
2545 // entry (no observed pingwave or no entity-id
2546 // mapping yet) are treated as `u64::MAX` so they
2547 // sort to the bottom — a known-fast node beats an
2548 // unknown one.
2549 //
2550 // Determinism on tie / no-data: `best_node` starts
2551 // at `candidates[0]` (the lexicographically first
2552 // sorted candidate), so all-ties or all-unknown
2553 // collapse to that consistent fallback.
2554 let proximity = self.proximity_graph();
2555 let mut best_node = candidates[0];
2556 let mut best_latency = u64::MAX;
2557 for &node_id in candidates {
2558 let lat = self
2559 .entity_id_for_node(node_id)
2560 .and_then(|eid| proximity.get_node(&eid))
2561 .map(|n| n.latency_us)
2562 .unwrap_or(u64::MAX);
2563 if lat < best_latency {
2564 best_latency = lat;
2565 best_node = node_id;
2566 }
2567 }
2568 best_node
2569 }
2570 }
2571 }
2572
2573 /// Issue an RPC call to `target_node_id` for `service`.
2574 ///
2575 /// Phase 1 — direct entity-to-entity addressing. The caller
2576 /// specifies which target to send to; service discovery (the
2577 /// "find me a healthy instance of X" lookup) is Phase 2.
2578 ///
2579 /// Lazily subscribes the local node's `RpcClientFold` to
2580 /// `<service>.replies.<self_origin>` from `target_node_id` on
2581 /// the first call to that (target, service) pair. The
2582 /// subscription is reused across subsequent calls.
2583 ///
2584 /// On `opts.deadline` expiring OR the future being dropped,
2585 /// emits a CANCEL event so the server can drop the in-flight
2586 /// handler.
2587 pub async fn call(
2588 self: &Arc<Self>,
2589 target_node_id: u64,
2590 service: &str,
2591 payload: Bytes,
2592 opts: CallOptions,
2593 ) -> Result<RpcReply, RpcError> {
2594 // `started_total` brackets the entire call for the
2595 // `RpcObserver` latency field; the substrate-internal
2596 // `started` further down (set after the subscription
2597 // setup) drives the existing `RpcReply::latency_ns`
2598 // accounting so observers and Prometheus metrics
2599 // measure slightly different spans but stay consistent
2600 // within their own surface.
2601 let started_total = Instant::now();
2602 let request_bytes_len = payload.len() as u32;
2603 // Per-service route cache: one `DashMap::get(&str)` +
2604 // `Arc::clone` on the hot path instead of 2 `format!` +
2605 // 2 `ChannelName::new` + xxhash per call (T1.3 perf audit
2606 // — `docs/misc/PERF_AUDIT_2026_05_19_NRPC.md`).
2607 let route = self.rpc_route_or_no_route(target_node_id, service)?;
2608 let self_origin = self.identity_origin_hash();
2609
2610 // Caller-side metrics guard. Bumps `in_flight` immediately;
2611 // each early-return path calls `metrics_guard.record(...)`
2612 // with the outcome, and Drop records the latency + bumps
2613 // the matching counter. A future dropped before any
2614 // `record(...)` call (e.g. a hedge loser) leaves the guard
2615 // with `outcome = None` so `in_flight` decrements but no
2616 // outcome is double-counted.
2617 let metrics_registry = self.rpc_metrics_arc();
2618 let mut metrics_guard = CallMetricsGuard::new(metrics_registry.for_service(service));
2619
2620 // Lazy reply-channel subscription. Once per (target, service).
2621 // Reply channel + hash come from the cached `RpcRoute`; we
2622 // only `.clone()` the `ChannelName` (cheap — internally an
2623 // Arc<str>) instead of building it from scratch.
2624 if let Err(e) = self
2625 .ensure_reply_subscription(
2626 target_node_id,
2627 service,
2628 route.reply_channel.clone(),
2629 route.reply_hash,
2630 )
2631 .await
2632 {
2633 metrics_guard.record(CallOutcome::NoRoute);
2634 self.fire_rpc_observer_outbound(
2635 target_node_id,
2636 service,
2637 started_total.elapsed().as_millis() as u32,
2638 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(e.to_string()),
2639 request_bytes_len,
2640 0,
2641 );
2642 return Err(e);
2643 }
2644
2645 // Allocate a fresh call_id. Random u64 from getrandom; a
2646 // sequential counter would let any session peer that
2647 // observed one of their own call_ids predict the next-
2648 // allocated ids and ship spoofed RESPONSE frames on the
2649 // victim's reply channel. Random u64 collides with
2650 // probability 2^-64 per call and is unguessable from
2651 // another peer's perspective.
2652 let call_id = mint_random_call_id();
2653
2654 // Register the oneshot before publishing the REQUEST so a
2655 // very-fast RESPONSE doesn't arrive before we're ready.
2656 // S-4 part 2: bind the pending entry to target_node_id so
2657 // the fold's deliver gate rejects spoofed RESPONSE frames
2658 // arriving from any other session peer.
2659 let pending = self.rpc_client_pending();
2660 let rx = pending.register(call_id, target_node_id);
2661
2662 // Build the REQUEST envelope. If a trace context is set,
2663 // emit `traceparent` / `tracestate` headers and signal
2664 // via `FLAG_RPC_PROPAGATE_TRACE` so the server's fold
2665 // populates `RpcContext::trace_context`.
2666 let (flags, mut headers) = match opts.trace_context.as_ref() {
2667 Some(tc) => (FLAG_RPC_PROPAGATE_TRACE, build_trace_headers(tc)),
2668 None => (0u16, Vec::new()),
2669 };
2670 // Append caller-supplied request headers (e.g. the
2671 // `net-where` predicate header for Phase 9b
2672 // predicate-pushdown). Auto-generated headers come first
2673 // so name collisions resolve to caller-overrides via the
2674 // server-side `predicate_from_rpc_headers` first-match
2675 // semantics.
2676 headers.extend(opts.request_headers.iter().cloned());
2677 let req = RpcRequestPayload {
2678 service: service.to_string(),
2679 deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2680 flags,
2681 headers,
2682 body: payload.clone(),
2683 };
2684 let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2685 let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2686 buf.extend_from_slice(&meta.to_bytes());
2687 buf.extend_from_slice(&req.encode());
2688
2689 // Send the REQUEST directly to `target_node_id` via
2690 // `publish_to_peer`, bypassing the local subscriber roster
2691 // lookup. The roster-based `Mesh::publish` would consult
2692 // `dispatch_recipients(channel)` against the caller's local
2693 // roster, which has no knowledge of who serves this service
2694 // (no Subscribe message ever propagated from the server back
2695 // to the caller — `serve_rpc` is local-only). For Phase 1
2696 // direct addressing we know the target, so direct-send is
2697 // the right primitive.
2698 //
2699 // The receiver routes via the per-channel-hash dispatcher
2700 // hook (channel_hash is stamped on the wire by
2701 // publish_to_peer).
2702 let started = Instant::now();
2703 // Request channel hash + stream_id come from the cached
2704 // route — no `ChannelId::new` clone + xxhash per call.
2705 let payload_bytes = Bytes::from(buf);
2706 if let Err(e) = self
2707 .publish_to_peer(
2708 target_node_id,
2709 route.request_channel_hash,
2710 route.request_stream_id,
2711 /* reliable */ true,
2712 std::slice::from_ref(&payload_bytes),
2713 )
2714 .await
2715 {
2716 pending.cancel(call_id);
2717 // Distinguish "I don't know how to reach this peer"
2718 // from a generic transport blip: when the publish path
2719 // surfaces a no-session error, that's NoRoute (the
2720 // routing layer's job, retry won't help). Other
2721 // transport errors stay as Transport so retry is
2722 // applicable.
2723 let err = if classify_publish_no_session(&e) {
2724 metrics_guard.record(CallOutcome::NoRoute);
2725 RpcError::NoRoute {
2726 target: target_node_id,
2727 reason: e.to_string(),
2728 }
2729 } else {
2730 metrics_guard.record(CallOutcome::Transport);
2731 RpcError::Transport(e)
2732 };
2733 self.fire_rpc_observer_outbound(
2734 target_node_id,
2735 service,
2736 started_total.elapsed().as_millis() as u32,
2737 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(err.to_string()),
2738 request_bytes_len,
2739 0,
2740 );
2741 return Err(err);
2742 }
2743
2744 // From here on, the REQUEST is in flight on the server.
2745 // Wrap the rest of the call in an RAII guard whose Drop
2746 // fires CANCEL if `guard.completed` isn't set — covering:
2747 // - the call future being dropped mid-flight (e.g. hedge
2748 // loser, select!-cancelled future, caller awaiting a
2749 // `JoinHandle` that gets cancelled).
2750 // - the timeout path (we leave `completed=false` so Drop
2751 // handles CANCEL emission; no need for a separate
2752 // `send_rpc_cancel` call).
2753 let mut guard = UnaryCallGuard {
2754 pending: Arc::clone(&pending),
2755 mesh: Arc::clone(self),
2756 target_node_id,
2757 request_channel: route.request_channel.clone(),
2758 self_origin,
2759 call_id,
2760 completed: false,
2761 };
2762
2763 // Race the receiver against the deadline.
2764 let outcome: Result<Result<RpcResponsePayload, _>, tokio::time::error::Elapsed> =
2765 match opts.deadline {
2766 None => Ok(rx.await),
2767 Some(deadline) => {
2768 let timeout_at = deadline.saturating_duration_since(Instant::now());
2769 tokio::time::timeout(timeout_at, rx).await
2770 }
2771 };
2772
2773 let resp = match outcome {
2774 Ok(Ok(resp)) => {
2775 guard.completed = true;
2776 resp
2777 }
2778 Ok(Err(_recv_err)) => {
2779 // Sender dropped externally — pending entry is
2780 // already gone (someone else removed it). Mark
2781 // completed so Drop doesn't fire a useless CANCEL
2782 // for a server that's no longer tracking this id.
2783 guard.completed = true;
2784 metrics_guard.record(CallOutcome::Transport);
2785 let err = RpcError::Transport(AdapterError::Connection(
2786 "rpc client pending sender dropped (no response will arrive)".into(),
2787 ));
2788 self.fire_rpc_observer_outbound(
2789 target_node_id,
2790 service,
2791 started_total.elapsed().as_millis() as u32,
2792 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(
2793 err.to_string(),
2794 ),
2795 request_bytes_len,
2796 0,
2797 );
2798 return Err(err);
2799 }
2800 Err(_elapsed) => {
2801 // Timeout: leave `completed=false` so Drop emits
2802 // CANCEL automatically; surface Timeout to caller.
2803 metrics_guard.record(CallOutcome::Timeout);
2804 self.fire_rpc_observer_outbound(
2805 target_node_id,
2806 service,
2807 started_total.elapsed().as_millis() as u32,
2808 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
2809 request_bytes_len,
2810 0,
2811 );
2812 return Err(RpcError::Timeout {
2813 elapsed_ms: started.elapsed().as_millis() as u64,
2814 });
2815 }
2816 };
2817
2818 // Map the wire status onto the public Result type.
2819 if resp.status.is_ok() {
2820 metrics_guard.record(CallOutcome::Ok);
2821 let response_bytes_len = resp.body.len() as u32;
2822 self.fire_rpc_observer_outbound(
2823 target_node_id,
2824 service,
2825 started_total.elapsed().as_millis() as u32,
2826 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
2827 request_bytes_len,
2828 response_bytes_len,
2829 );
2830 Ok(RpcReply {
2831 body: resp.body,
2832 headers: resp.headers,
2833 latency_ns: started.elapsed().as_nanos() as u64,
2834 })
2835 } else {
2836 metrics_guard.record(CallOutcome::ServerError);
2837 let status = resp.status.to_wire();
2838 let response_bytes_len = resp.body.len() as u32;
2839 let message = String::from_utf8(resp.body.to_vec())
2840 .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
2841 self.fire_rpc_observer_outbound(
2842 target_node_id,
2843 service,
2844 started_total.elapsed().as_millis() as u32,
2845 crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(message.clone()),
2846 request_bytes_len,
2847 response_bytes_len,
2848 );
2849 // v0.4 capability-auth: callee-side defense-in-depth
2850 // surfaces as a wire `CapabilityDenied` status. Map it
2851 // back to the typed `RpcError::CapabilityDenied` so
2852 // application code sees the same variant regardless of
2853 // which side of the gate fired.
2854 if matches!(resp.status, RpcStatus::CapabilityDenied) {
2855 return Err(RpcError::CapabilityDenied {
2856 target: target_node_id,
2857 capability: service.to_string(),
2858 });
2859 }
2860 Err(RpcError::ServerError { status, message })
2861 }
2862 }
2863
2864 // ----------------------------------------------------------------
2865 // Internal helpers.
2866 // ----------------------------------------------------------------
2867
2868 /// Lazy-subscribe `reply_channel` from `target_node_id` and
2869 /// register an inbound dispatcher that drives the per-Mesh
2870 /// `RpcClientFold`. Idempotent — subsequent calls for the
2871 /// same (target, service) pair are no-ops.
2872 ///
2873 /// **Bounded** at [`MAX_REPLY_SUBSCRIPTIONS`]: a caller talking
2874 /// to many short-lived (target, service) pairs would otherwise
2875 /// grow the registry indefinitely. Past the cap we refuse the
2876 /// new subscription with `NoRoute` rather than evict an
2877 /// existing one (eviction could rip out a healthy in-flight
2878 /// reply path).
2879 ///
2880 /// **Dispatcher reuse**: the reply-channel name embeds the
2881 /// CALLER's `self_origin`, NOT the target's, so a single
2882 /// caller talking to multiple servers for the same service
2883 /// reuses the same reply channel (same hash). We register the
2884 /// dispatcher only if the slot is unoccupied; subsequent
2885 /// (target, service) pairs that hash to the same slot are
2886 /// allowed to share the existing dispatcher (which routes to
2887 /// the same per-Mesh `pending` map regardless of target). A
2888 /// genuine cross-service hash collision is detected at
2889 /// `serve_rpc` time (the AlreadyServing path) for the server
2890 /// side; on the caller side here, sharing the dispatcher is
2891 /// the correct behavior because all RESPONSE events route
2892 /// through the same `RpcClientPending` keyed by `call_id`.
2893 async fn ensure_reply_subscription(
2894 self: &Arc<Self>,
2895 target_node_id: u64,
2896 service: &str,
2897 reply_channel: ChannelName,
2898 reply_hash: ChannelHash,
2899 ) -> Result<(), RpcError> {
2900 let registry = self.rpc_reply_subscriptions_arc();
2901 {
2902 let entries = registry.lock();
2903 if entries
2904 .iter()
2905 .any(|(t, s)| *t == target_node_id && s == service)
2906 {
2907 return Ok(());
2908 }
2909 // Cap the registry. New entries past the cap are
2910 // refused — caller should reuse an existing
2911 // (target, service) pair or operate on fewer.
2912 if entries.len() >= MAX_REPLY_SUBSCRIPTIONS {
2913 return Err(RpcError::NoRoute {
2914 target: target_node_id,
2915 reason: format!(
2916 "reply-subscription registry at cap ({} entries); refusing new \
2917 (target={target_node_id:#x}, service={service:?}). Caller should \
2918 reuse an existing target+service pair or shrink the active set.",
2919 MAX_REPLY_SUBSCRIPTIONS,
2920 ),
2921 });
2922 }
2923 }
2924
2925 // Subscribe to our own reply channel from the target so the
2926 // target's roster has us as a subscriber when the server's
2927 // emit closure publishes the RESPONSE.
2928 self.subscribe_channel(target_node_id, reply_channel.clone())
2929 .await
2930 .map_err(|e| RpcError::NoRoute {
2931 target: target_node_id,
2932 reason: e.to_string(),
2933 })?;
2934
2935 // Register the inbound dispatcher only if the slot is
2936 // unoccupied. The reply-channel name embeds *self_origin*,
2937 // not the target, so multiple targets serving the same
2938 // service share one reply channel + one dispatcher. The
2939 // existing dispatcher routes to the same per-Mesh
2940 // `RpcClientPending` keyed by call_id, so reuse is safe.
2941 if !self.rpc_inbound_dispatcher_registered(reply_hash) {
2942 let pending = self.rpc_client_pending();
2943 let fold = Arc::new(Mutex::new(RpcClientFold::new(pending)));
2944 // S-4 part 2: use `apply_inbound` so the wire-session
2945 // peer's NodeId (resolved in mesh.rs's dispatch site)
2946 // flows into the fold's deliver gate. The legacy
2947 // `RedexFold::apply` shim delivers with from_node=0,
2948 // which would defeat the binding.
2949 let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2950 fold.lock().apply_inbound(&ev);
2951 });
2952 // Race-safe: a concurrent caller might have just
2953 // registered between our check and our insert. In that
2954 // case `register_rpc_inbound` returns the prior
2955 // dispatcher; our new fresh fold is dropped here, and
2956 // the prior dispatcher (which routes to the same
2957 // shared `pending`) keeps doing the job. No collision
2958 // — both folds are functionally equivalent.
2959 if let Some(prev) = self.register_rpc_inbound(reply_hash, dispatcher) {
2960 // Roll back: keep the prior dispatcher (it's
2961 // already wired to the same shared pending map).
2962 let _ = self.register_rpc_inbound(reply_hash, prev);
2963 }
2964 }
2965
2966 let _ = reply_hash; // captured into the dispatcher above; surfaced for debug
2967 registry.lock().push((target_node_id, service.to_string()));
2968 Ok(())
2969 }
2970}
2971
2972/// Hard cap on the number of distinct (target_node_id, service)
2973/// pairs the caller-side reply-subscription registry will hold.
2974/// Past the cap, the lazy-subscribe path inside [`MeshNode::call`]
2975/// refuses new entries with [`RpcError::NoRoute`]. 1024 is
2976/// generous for any realistic deployment — a caller that needs
2977/// more should reuse existing reply paths.
2978pub const MAX_REPLY_SUBSCRIPTIONS: usize = 1024;
2979
2980/// Mint a random 64-bit call_id from `getrandom` entropy. Used as
2981/// the correlation token for REQUEST/RESPONSE pairing. The fold
2982/// keys pending oneshots on this value; any session peer with
2983/// publish access to the reply channel could ship a forged
2984/// RESPONSE if it could guess the value. Sequential u64s are
2985/// predictable from any peer that observes a single allocation;
2986/// random u64s collide with 2^-64 probability per call and are
2987/// independent across peers.
2988///
2989/// Falls back to a zero call_id on entropy failure — that
2990/// effectively disables correlation for this call (the oneshot
2991/// will time out) rather than panic, but in practice
2992/// `getrandom::fill` failure is a fatal-environment signal
2993/// (no `/dev/urandom`, broken syscall) and the broader stack
2994/// won't be functional anyway.
2995fn mint_random_call_id() -> u64 {
2996 let mut buf = [0u8; 8];
2997 if getrandom::fill(&mut buf).is_err() {
2998 return 0;
2999 }
3000 u64::from_le_bytes(buf)
3001}
3002
3003// ============================================================================
3004// Internal: tiny shims so the `serve_rpc` / `call` impls stay
3005// readable. The underlying state lives on `MeshNode`; these just
3006// rename the accessor methods locally.
3007// ============================================================================
3008
3009impl MeshNode {
3010 fn rpc_client_pending(&self) -> Arc<super::cortex::RpcClientPending> {
3011 self.rpc_client_pending_arc()
3012 }
3013 fn identity_origin_hash(&self) -> u64 {
3014 self.public_key_origin_hash()
3015 }
3016
3017 /// Caller-side helper that pairs `rpc_route_for_service` with
3018 /// the `RpcError::NoRoute { target, reason }` mapping every
3019 /// `Mesh::call*` entry point needs. Returning `Arc<RpcRoute>`
3020 /// keeps the hot-path allocation profile of the cache intact
3021 /// (one refcount bump per caller).
3022 fn rpc_route_or_no_route(
3023 &self,
3024 target_node_id: u64,
3025 service: &str,
3026 ) -> Result<Arc<super::mesh::RpcRoute>, RpcError> {
3027 self.rpc_route_for_service(service)
3028 .map_err(|reason| RpcError::NoRoute {
3029 target: target_node_id,
3030 reason,
3031 })
3032 }
3033}
3034
3035// `proximity_graph()` is already a public accessor on MeshNode
3036// (see the existing `pub fn proximity_graph(&self) -> &Arc<...>`).
3037// `select_target` uses it directly; no shim needed.
3038
3039// ============================================================================
3040// Errors.
3041// ============================================================================
3042
3043/// Errors returned by [`MeshNode::serve_rpc`].
3044#[derive(Debug, thiserror::Error)]
3045pub enum ServeError {
3046 /// The service name fails channel-name validation.
3047 #[error("invalid service name: {0}")]
3048 InvalidServiceName(String),
3049 /// A handler for this service is already registered on this
3050 /// node. Drop the prior `ServeHandle` to free the slot.
3051 #[error("already serving service `{0}` on this node")]
3052 AlreadyServing(String),
3053}
3054
3055// ============================================================================
3056// Helpers.
3057// ============================================================================
3058
3059/// Detect the "no session to the target node id" sub-case of
3060/// [`AdapterError::Connection`]. The publish path can surface
3061/// this through one of two messages depending on which inner
3062/// helper landed it:
3063///
3064/// - `"publish: no session for subscriber {hash}"` — emitted
3065/// by `mesh.rs::publish_to_peer` when the subscriber-roster
3066/// path can't find an active session.
3067/// - `"no session to publisher {hash}"` — emitted by the lower
3068/// mesh.rs send path when there's no active session to the
3069/// target's publisher record at all.
3070///
3071/// Both mean "I can't reach this peer". When we observe either,
3072/// we surface as [`RpcError::NoRoute`] rather than `Transport`
3073/// because retrying the same target without a session is
3074/// pointless and the right behavior for a routing helper is to
3075/// try a different target.
3076fn classify_publish_no_session(err: &AdapterError) -> bool {
3077 match err {
3078 AdapterError::Connection(msg) => {
3079 msg.contains("no session for subscriber") || msg.contains("no session to publisher")
3080 }
3081 _ => false,
3082 }
3083}
3084
3085fn instant_to_unix_nanos(instant: Instant) -> u64 {
3086 // `Instant` is monotonic and not wall-clock — convert via the
3087 // delta from now plus current SystemTime. The result drifts
3088 // marginally with wall-clock skew but is good enough for
3089 // server-side deadline-already-passed short-circuits (which are
3090 // the only consumer of `deadline_ns`).
3091 let now_instant = Instant::now();
3092 let now_wall = std::time::SystemTime::now()
3093 .duration_since(std::time::UNIX_EPOCH)
3094 .map(|d| d.as_nanos() as u64)
3095 .unwrap_or(0);
3096 if instant >= now_instant {
3097 let delta = instant.duration_since(now_instant);
3098 now_wall.saturating_add(delta.as_nanos() as u64)
3099 } else {
3100 let delta = now_instant.duration_since(instant);
3101 now_wall.saturating_sub(delta.as_nanos() as u64)
3102 }
3103}
3104
3105#[allow(dead_code)]
3106fn _ensure_send_sync() {
3107 fn assert_send_sync<T: Send + Sync>() {}
3108 assert_send_sync::<ServeHandle>();
3109 assert_send_sync::<RpcCancellationToken>();
3110 assert_send_sync::<RpcContext>();
3111 assert_send_sync::<RpcHandlerError>();
3112 assert_send_sync::<RpcStatus>();
3113 assert_send_sync::<RpcReply>();
3114 assert_send_sync::<CallOptions>();
3115}