Skip to main content

net/adapter/net/
mesh_rpc.rs

1//! `Mesh::serve_rpc` / `Mesh::call` glue — the wire-up between
2//! `MeshNode`'s pub/sub + per-channel-hash dispatch hook and the
3//! `cortex::rpc` server / client folds.
4//!
5//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
6//! In short:
7//!
8//! - `serve_rpc(service, handler)` registers an inbound dispatcher
9//!   for `<service>.requests`'s channel hash. The dispatcher pushes
10//!   inbound REQUEST/CANCEL events through the
11//!   [`crate::adapter::net::cortex::RpcServerFold`], which spawns
12//!   the user handler. The fold's emit closure publishes RESPONSE
13//!   events on `<service>.replies.<caller_origin>` via
14//!   [`MeshNode::publish`].
15//!
16//! - `call(target, service, payload, opts)` allocates a `call_id`,
17//!   registers a oneshot in the per-Mesh `RpcClientPending`,
18//!   subscribes to its own reply channel from `target` (lazy,
19//!   cached), publishes the REQUEST envelope on `<service>.requests`,
20//!   awaits the oneshot. Drop sends a CANCEL.
21//!
22//! Phase 1 surface — direct entity-to-entity addressing
23//! (`call(target_node_id, ...)`), no service discovery layer yet.
24//! Phase 2 will add `call_service(name, ...)` over the existing
25//! capability-announcement registry.
26
27use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31use bytes::Bytes;
32use parking_lot::Mutex;
33use tokio::sync::mpsc;
34use tokio::task::JoinHandle;
35
36use super::channel::{ChannelHash, ChannelId, ChannelName, ChannelPublisher, PublishConfig};
37use super::cortex::{
38    build_trace_headers, encode_request_grant, encode_stream_grant, EventMeta,
39    RpcAsyncResponseEmitter, RpcCancellationToken, RpcClientFold, RpcClientStreamingHandler,
40    RpcContext, RpcDuplexFold, RpcDuplexHandler, RpcHandler, RpcHandlerError, RpcInboundDispatcher,
41    RpcInboundEvent, RpcRequestChunkPayload, RpcRequestGrantEmitter, RpcRequestPayload,
42    RpcResponseEmitter, RpcResponsePayload, RpcServerFold, RpcServerStreamingFold, RpcStatus,
43    RpcStreamingHandler, RpcStreamingRequestFold, StreamItem, TraceContext, DISPATCH_RPC_CANCEL,
44    DISPATCH_RPC_REQUEST, DISPATCH_RPC_REQUEST_CHUNK, DISPATCH_RPC_REQUEST_GRANT,
45    DISPATCH_RPC_STREAM_GRANT, EVENT_META_SIZE, FLAG_RPC_CLIENT_STREAMING_REQUEST,
46    FLAG_RPC_PROPAGATE_TRACE, FLAG_RPC_REQUEST_END, FLAG_RPC_STREAMING_RESPONSE,
47    HEADER_NRPC_REQUEST_WINDOW_INITIAL, HEADER_NRPC_STREAM_WINDOW_INITIAL,
48};
49use super::mesh_rpc_metrics::{CallMetricsGuard, CallOutcome};
50use crate::error::AdapterError;
51
52use super::mesh::MeshNode;
53use super::redex::{RedexEntry, RedexEvent, RedexFold};
54
55// ============================================================================
56// Public types.
57// ============================================================================
58
59/// How `Mesh::call_service` picks a target from the set of nodes
60/// advertising the requested service.
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
62pub enum RoutingPolicy {
63    /// Naive round-robin via the per-Mesh `call_id` counter.
64    /// Distributes calls evenly across candidates regardless of
65    /// load. The default.
66    #[default]
67    RoundRobin,
68    /// Pick a candidate at random per call. Stateless, cheap, and
69    /// gives even distribution under independent calls.
70    Random,
71    /// Consistent-hash to a target by `key`. Same `key` always
72    /// hits the same target as long as the candidate set is
73    /// stable. Useful for session affinity (route a given
74    /// conversation / shard / user to the same backend).
75    Sticky {
76        /// Caller-supplied identifier — hash maps this to the
77        /// target. Use a session id, shard key, or conversation
78        /// id depending on the application.
79        key: u64,
80    },
81    /// Pick the candidate with the smallest measured `latency_us`
82    /// per the local `ProximityGraph`. Candidates the proximity
83    /// graph hasn't observed yet (no entity ↔ node_id mapping or
84    /// no pingwave received) sort to the bottom — better to pick
85    /// a known-fast node than gamble on an unknown one.
86    ///
87    /// Falls back deterministically to the first sorted candidate
88    /// when no candidates have proximity data, so a freshly-
89    /// discovered service still routes consistently.
90    LowestLatency,
91}
92
93/// Options for [`MeshNode::call`] and [`MeshNode::call_service`].
94#[derive(Debug, Clone)]
95pub struct CallOptions {
96    /// Hard deadline for the call. The future returned by `call`
97    /// races a `tokio::time::sleep_until`; whichever fires first
98    /// wins. On timeout the caller emits a CANCEL event for
99    /// `call_id` so the server can drop the in-flight handler.
100    /// `None` means no deadline; the caller waits indefinitely
101    /// (or until the future is dropped).
102    pub deadline: Option<Instant>,
103    /// How `call_service` picks a target. Ignored by `call`
104    /// (which takes an explicit `target_node_id`). Default:
105    /// `RoundRobin`.
106    pub routing_policy: RoutingPolicy,
107    /// Skip candidates whose `ProximityGraph` entry reports
108    /// `!is_available()` (i.e. `Unhealthy` or `Unknown`).
109    /// Default `true`. Candidates with no proximity entry at all
110    /// are KEPT — absence of evidence is not evidence of
111    /// unhealth, and a freshly-announced service shouldn't be
112    /// filtered just because pingwaves haven't propagated yet.
113    pub filter_unhealthy: bool,
114    /// W3C Trace Context to propagate to the server. When `Some`,
115    /// the call sets `FLAG_RPC_PROPAGATE_TRACE` on the request and
116    /// emits `traceparent` / `tracestate` headers; the server's
117    /// `RpcContext::trace_context` will be populated with the same
118    /// values. nRPC is transport-only — application code on both
119    /// sides reads / writes this via whatever tracing backend it
120    /// has wired up (tracing-opentelemetry, Datadog, etc.).
121    pub trace_context: Option<TraceContext>,
122    /// Per-call concurrency cap. Future Phase 2 work; v1 ignores
123    /// this and the per-Mesh `RpcClientPending` doesn't bound
124    /// in-flight count.
125    pub max_in_flight_per_target: u32,
126    /// **Streaming responses only.** Initial credit window for
127    /// per-streaming-response flow control. When `Some(n)`, the
128    /// caller emits `nrpc-stream-window-initial: n` on the
129    /// REQUEST and the server's pump task awaits one credit per
130    /// emitted chunk. The returned [`RpcStream`] auto-grants 1
131    /// credit per consumed chunk so the in-flight credit holds
132    /// near `n` (or use [`RpcStream::grant`] for batched / custom
133    /// cadence). `None` (the default) → unbounded: server pumps
134    /// chunks as fast as the publish path can take them
135    /// (back-compat / pre-flow-control behavior). Ignored by
136    /// non-streaming `call` / `call_service`.
137    pub stream_window_initial: Option<u32>,
138    /// **Client-streaming / duplex only.** Initial credit window
139    /// for per-call request-direction flow control. Mirror of
140    /// [`Self::stream_window_initial`] for the upload direction. When
141    /// `Some(n)`, the caller emits `nrpc-request-window-initial: n`
142    /// on the REQUEST and its `send().await` sink awaits one
143    /// credit per pushed chunk; the server refills via
144    /// [`DISPATCH_RPC_REQUEST_GRANT`] events. `None` → unbounded:
145    /// caller's send sink doesn't block (legacy / fast-path).
146    /// Ignored by unary `call` / `call_streaming`.
147    ///
148    /// Bidi streaming plan (Phase C).
149    pub request_window_initial: Option<u32>,
150    /// Caller-supplied request headers. Appended to the wire
151    /// `RpcRequestPayload::headers` after any auto-generated
152    /// headers (trace context, stream-window). Useful for
153    /// application-level metadata the server needs at
154    /// dispatch-time — e.g., the `net-where` predicate
155    /// header (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`) that
156    /// services consult for predicate-pushdown filtering.
157    ///
158    /// Each entry is `(name, value_bytes)`. Names use the lowercase
159    /// `cyberdeck-*` / `nrpc-*` convention; the substrate doesn't
160    /// validate names beyond the `MAX_RPC_HEADER_NAME_LEN` cap
161    /// enforced at encode time.
162    ///
163    /// Default: empty.
164    pub request_headers: Vec<(String, Vec<u8>)>,
165}
166
167impl Default for CallOptions {
168    fn default() -> Self {
169        Self {
170            deadline: None,
171            routing_policy: RoutingPolicy::default(),
172            filter_unhealthy: true,
173            trace_context: None,
174            max_in_flight_per_target: 64,
175            stream_window_initial: None,
176            request_window_initial: None,
177            request_headers: Vec::new(),
178        }
179    }
180}
181
182/// What [`MeshNode::call`] returns on success.
183#[derive(Debug, Clone)]
184pub struct RpcReply {
185    /// Response payload from the server's handler. Caller decodes
186    /// according to its application protocol.
187    pub body: Bytes,
188    /// Headers attached by the server's response.
189    pub headers: Vec<(String, Vec<u8>)>,
190    /// Wall-clock latency from `call(...)` to RESPONSE arrival.
191    pub latency_ns: u64,
192}
193
194/// What [`MeshNode::call`] returns on failure.
195#[derive(Debug, thiserror::Error)]
196pub enum RpcError {
197    /// No subscription / no route to the target. Either
198    /// `target_node_id` is unknown to the local mesh, or the
199    /// caller's reply-channel subscription couldn't be set up.
200    #[error("no route to target {target:#x}: {reason}")]
201    NoRoute {
202        /// Target node id the call was directed at.
203        target: u64,
204        /// Diagnostic — typically the underlying transport error.
205        reason: String,
206    },
207    /// Caller's deadline elapsed before a RESPONSE arrived. The
208    /// caller emits a CANCEL on timeout so the server can drop
209    /// the in-flight handler; this variant is returned to the
210    /// awaiting caller.
211    #[error("timeout after {elapsed_ms}ms")]
212    Timeout {
213        /// Wall-clock milliseconds elapsed before timeout fired.
214        elapsed_ms: u64,
215    },
216    /// Server returned a non-`Ok` status. Body carries the
217    /// server's diagnostic (UTF-8) when available.
218    #[error("server returned status {status:#06x}: {message}")]
219    ServerError {
220        /// Wire-level `RpcStatus` value the server returned.
221        status: u16,
222        /// UTF-8 diagnostic from the response body, when the body
223        /// decodes as valid UTF-8; otherwise hex-truncated.
224        message: String,
225    },
226    /// Underlying transport error (publish failure, encryption,
227    /// etc.).
228    #[error("transport: {0}")]
229    Transport(#[from] AdapterError),
230    /// Client-local serialization or deserialization failure.
231    /// `direction = Encode` means the typed wrapper failed to
232    /// encode the request before it ever hit the wire;
233    /// `direction = Decode` means the response landed but the
234    /// typed wrapper failed to decode it. Either way this is a
235    /// caller-fixable bug (wrong codec, schema drift, malformed
236    /// `Serialize` impl) — NOT a transient infra failure — so
237    /// retry / circuit-breaker predicates skip it by default.
238    #[error("codec ({direction:?}): {message}")]
239    Codec {
240        /// Which side of the call the codec failure happened on.
241        direction: CodecDirection,
242        /// Decode/encode diagnostic from the underlying serde impl.
243        message: String,
244    },
245    /// v0.4 capability-auth gate denied the call. Either the
246    /// target's latest `CapabilityAnnouncement` does not list
247    /// the requested `nrpc:<service>` tag, or it lists the tag
248    /// with allow-lists the caller does not match. See
249    /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3 for the model.
250    ///
251    /// Raised by the caller-side gate inside
252    /// [`MeshNode::call_service`] BEFORE the request hits the
253    /// wire, and surfaced by the caller on receipt of a
254    /// `RpcStatus::CapabilityDenied` response (the callee-side
255    /// defense-in-depth path).
256    #[error("capability denied: target {target:#x} does not authorize nrpc:{capability}")]
257    CapabilityDenied {
258        /// Target node id the gate denied.
259        target: u64,
260        /// Service / capability tag (without the `nrpc:` prefix)
261        /// the gate denied.
262        capability: String,
263    },
264}
265
266/// Which side of the call surfaced a [`RpcError::Codec`] failure.
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub enum CodecDirection {
269    /// Encoding the outbound request failed before the call was issued.
270    Encode,
271    /// Decoding the inbound response failed after the call returned Ok.
272    Decode,
273}
274
275/// RAII handle returned by [`MeshNode::serve_rpc`]. Dropping it
276/// unregisters the inbound dispatcher and removes the service
277/// from the local-services registry (so subsequent
278/// `announce_capabilities` calls stop emitting the
279/// `nrpc:<service>` tag).
280///
281/// **Bridge task lifecycle.** The bridge task that drains the
282/// inbound mpsc into the fold is NOT aborted on Drop. The
283/// `register_rpc_inbound` dispatcher closure owns the only
284/// `mpsc::Sender` clone, so `unregister_rpc_inbound` (which drops
285/// the dispatcher) closes the channel; the bridge's `rx.recv()`
286/// then yields `None` and the task exits cleanly after draining
287/// any queued events. Aborting would race events that are
288/// mid-`fold.lock().apply()` — those events would be killed
289/// without their RESPONSE being emitted, so the corresponding
290/// callers would just time out.
291///
292/// Outstanding handler executions (already-spawned tokio tasks)
293/// continue to completion regardless.
294pub struct ServeHandle {
295    /// Channel hash to unregister on Drop.
296    channel_hash: ChannelHash,
297    /// Service name to remove from `rpc_local_services` on Drop.
298    service: String,
299    /// The bridge task. Held only so callers can introspect /
300    /// detach it; Drop does NOT abort it (see struct doc-comment).
301    /// Detaches naturally when the handle is dropped — the bridge
302    /// exits on its own once the dispatcher's `mpsc::Sender` is
303    /// dropped via `unregister_rpc_inbound`.
304    _bridge: JoinHandle<()>,
305    /// Hold an Arc back to the mesh so we can unregister on Drop
306    /// without the mesh having to track us.
307    mesh: Arc<MeshNode>,
308}
309
310impl Drop for ServeHandle {
311    fn drop(&mut self) {
312        // Order matters: unregister the dispatcher FIRST so no new
313        // events can land in the bridge's mpsc, THEN drop the
314        // service-tag entry. The bridge task drains any in-flight
315        // events naturally and exits when its `rx.recv()` yields
316        // `None` (which happens as soon as the dispatcher closure
317        // — the sole `tx` owner — is dropped above).
318        self.mesh.unregister_rpc_inbound(self.channel_hash);
319        self.mesh.rpc_local_services_arc().remove(&self.service);
320    }
321}
322
323// ============================================================================
324// Streaming caller-side: RpcStream.
325// ============================================================================
326
327/// An open streaming RPC call. Implements `Stream<Item =
328/// Result<Bytes, RpcError>>` — yields chunks as the server emits
329/// them, terminates on a clean stream-end frame OR a non-`Ok`
330/// status (which is yielded as the last `Err` item before the
331/// stream closes).
332///
333/// Dropping the stream emits a CANCEL to the server (best-effort)
334/// and discards the pending entry — any chunks the server emits
335/// after the drop are silently discarded by the client fold.
336pub struct RpcStream {
337    mesh: Arc<MeshNode>,
338    target_node_id: u64,
339    request_channel: ChannelName,
340    self_origin: u64,
341    call_id: u64,
342    inner: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
343    /// Set true once we've yielded the terminal item (or an
344    /// error). Subsequent polls return `None`.
345    done: bool,
346    /// `Some(_)` if this stream uses flow control (caller set
347    /// `CallOptions::stream_window_initial`). Auto-grant emits 1
348    /// credit per delivered chunk, which keeps the server's
349    /// credit at roughly the initial window. `None` → no flow
350    /// control; `poll_next` does not emit grants.
351    stream_window: Option<u32>,
352    /// Observer-fire bookkeeping. Latched on terminal observation
353    /// in `poll_next`; fired once from `Drop` so the Deck NRPC
354    /// tab + every other `RpcObserver` consumer sees one event
355    /// per streaming-response call.
356    observer: StreamingObserverState,
357}
358
359impl RpcStream {
360    /// Server-assigned `call_id`. Useful for trace correlation /
361    /// custom logging at the call site.
362    pub fn call_id(&self) -> u64 {
363        self.call_id
364    }
365
366    /// Whether this stream is flow-controlled (caller set
367    /// `CallOptions::stream_window_initial`). Useful for tests +
368    /// diagnostics; user code typically doesn't need to inspect
369    /// this.
370    pub fn flow_controlled(&self) -> bool {
371        self.stream_window.is_some()
372    }
373
374    /// Explicitly grant `amount` more credits to the server's
375    /// pump. Spawns a fire-and-forget publish; doesn't await
376    /// acknowledgement. **No-op when flow control was not enabled
377    /// for this stream** — the server would silently drop the
378    /// grant anyway, and emitting wire traffic with no purpose
379    /// would just burn bandwidth.
380    ///
381    /// Auto-grant (1 credit per delivered chunk) covers the
382    /// common case; use this for batched cadence (e.g. grant
383    /// `window/2` after every `window/2` chunks consumed) when
384    /// `auto_grant`-style amortization isn't enough.
385    pub fn grant(&self, amount: u32) {
386        if !self.flow_controlled() || amount == 0 {
387            return;
388        }
389        spawn_grant_publish(
390            Arc::clone(&self.mesh),
391            self.target_node_id,
392            self.request_channel.clone(),
393            self.self_origin,
394            self.call_id,
395            amount,
396        );
397    }
398}
399
400/// Shared fire-and-forget GRANT-publish helper. Used by
401/// [`RpcStream::grant`] (explicit) and the auto-grant in
402/// [`RpcStream::poll_next`]. Same direct-unicast publish path as
403/// [`spawn_cancel_publish`], just with a different dispatch byte
404/// + a 4-byte u32 payload.
405fn spawn_grant_publish(
406    mesh: Arc<MeshNode>,
407    target: u64,
408    request_channel: ChannelName,
409    self_origin: u64,
410    call_id: u64,
411    amount: u32,
412) {
413    tokio::spawn(async move {
414        let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, self_origin, call_id, 0);
415        let request_channel_id = ChannelId::new(request_channel);
416        let request_channel_hash = request_channel_id.hash();
417        let stream_id = MeshNode::publish_stream_id(&request_channel_id);
418        let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
419        buf.extend_from_slice(&meta.to_bytes());
420        buf.extend_from_slice(&encode_stream_grant(amount));
421        let payload = Bytes::from(buf);
422        let _ = mesh
423            .publish_to_peer(
424                target,
425                request_channel_hash,
426                stream_id,
427                /* reliable */ true,
428                std::slice::from_ref(&payload),
429            )
430            .await;
431    });
432}
433
434impl futures::Stream for RpcStream {
435    type Item = Result<Bytes, RpcError>;
436
437    fn poll_next(
438        mut self: std::pin::Pin<&mut Self>,
439        cx: &mut std::task::Context<'_>,
440    ) -> std::task::Poll<Option<Self::Item>> {
441        if self.done {
442            return std::task::Poll::Ready(None);
443        }
444        match self.inner.poll_recv(cx) {
445            std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
446                // Auto-grant 1 credit per delivered chunk so the
447                // server's pump stays at roughly the initial
448                // window. No-op when the stream isn't flow-
449                // controlled. For batched cadence, callers can
450                // skip auto-grant by NOT setting
451                // `stream_window_initial` and using `RpcStream::grant`
452                // directly with their preferred batching.
453                if self.stream_window.is_some() {
454                    spawn_grant_publish(
455                        Arc::clone(&self.mesh),
456                        self.target_node_id,
457                        self.request_channel.clone(),
458                        self.self_origin,
459                        self.call_id,
460                        1,
461                    );
462                }
463                self.observer.add_response_bytes(body.len() as u32);
464                std::task::Poll::Ready(Some(Ok(body)))
465            }
466            std::task::Poll::Ready(Some(StreamItem::End)) => {
467                self.done = true;
468                self.observer.latch_ok();
469                std::task::Poll::Ready(None)
470            }
471            std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
472                self.done = true;
473                let status = resp.status.to_wire();
474                let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
475                    format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
476                });
477                self.observer
478                    .latch_error(format!("server returned status {status:#06x}: {message}"));
479                std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
480            }
481            std::task::Poll::Ready(None) => {
482                self.done = true;
483                std::task::Poll::Ready(None)
484            }
485            std::task::Poll::Pending => std::task::Poll::Pending,
486        }
487    }
488}
489
490impl Drop for RpcStream {
491    fn drop(&mut self) {
492        // Best-effort CANCEL to the server. Spawn a task because
493        // Drop can't be async; the publish happens off-thread.
494        // Also clear our pending entry so any in-flight chunks
495        // are dropped on arrival.
496        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
497        spawn_cancel_publish(
498            Arc::clone(&self.mesh),
499            self.target_node_id,
500            self.request_channel.clone(),
501            self.self_origin,
502            self.call_id,
503        );
504        // Fire the observer with the latched status (Ok / Error /
505        // Canceled). Idempotent — only the first fire emits.
506        self.observer.fire();
507    }
508}
509
510// ============================================================================
511// Phase C — caller-side client-streaming / duplex primitive.
512// ============================================================================
513
514/// Shared REQUEST_CHUNK-publish helper. Builds the wire frame and
515/// fires through `publish_to_peer` direct-unicast (same routing
516/// pattern as the initial REQUEST — caller knows the target).
517async fn publish_request_chunk(
518    mesh: &Arc<MeshNode>,
519    target: u64,
520    request_channel: &ChannelName,
521    self_origin: u64,
522    chunk: &RpcRequestChunkPayload,
523) -> Result<(), RpcError> {
524    let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, self_origin, chunk.call_id, 0);
525    let mut buf = Vec::with_capacity(EVENT_META_SIZE + chunk.encoded_len());
526    buf.extend_from_slice(&meta.to_bytes());
527    buf.extend_from_slice(&chunk.encode());
528    let request_channel_id = ChannelId::new(request_channel.clone());
529    let request_channel_hash = request_channel_id.hash();
530    let stream_id = MeshNode::publish_stream_id(&request_channel_id);
531    let payload = Bytes::from(buf);
532    mesh.publish_to_peer(
533        target,
534        request_channel_hash,
535        stream_id,
536        /* reliable */ true,
537        std::slice::from_ref(&payload),
538    )
539    .await
540    .map_err(RpcError::Transport)
541}
542
543/// Internal state of a [`ClientStreamCallRaw`]. The state machine
544/// is small: open the call (initial REQUEST not yet sent), then
545/// send N items (the first becomes the initial REQUEST, subsequent
546/// become REQUEST_CHUNKs), then finish (terminal REQUEST_END
547/// frame). After finish, no further sends are accepted.
548#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549enum ClientStreamState {
550    /// Pending entry registered, reply subscription ensured, but
551    /// the initial REQUEST has NOT been published to the wire yet.
552    /// First `send` flips this to `Sending`.
553    JustOpened,
554    /// Initial REQUEST has been published; subsequent sends ride
555    /// as REQUEST_CHUNKs.
556    Sending,
557    /// `finish` has been called; the terminal REQUEST_END frame
558    /// (or the initial REQUEST with FLAG_END for the degenerate
559    /// zero-send path) has been published. The terminal RESPONSE
560    /// has not necessarily arrived yet — that's awaited on the
561    /// caller's terminal_rx.
562    Finishing,
563    /// Terminal RESPONSE has been delivered. Drop is a no-op.
564    Done,
565}
566
567/// Caller-side handle for a client-streaming (or duplex Phase D)
568/// RPC. Push N items via [`ClientStreamCallRaw::send`], then
569/// [`ClientStreamCallRaw::finish`] to await the terminal RESPONSE.
570///
571/// **Lazy initial REQUEST.** The initial REQUEST is published on
572/// the FIRST `send()` (or on `finish()` if the caller sends nothing
573/// — that's the "zero-item upload" degenerate path that opens and
574/// closes the call in one frame). Constructing the handle does
575/// NOT yet emit any wire traffic beyond the reply-channel
576/// subscription setup.
577///
578/// **Flow control.** When the caller set
579/// [`CallOptions::request_window_initial`] to `Some(n)`, the
580/// handle holds an `n`-permit `Semaphore` that gates `send`. The
581/// server's [`DISPATCH_RPC_REQUEST_GRANT`] events refill the
582/// semaphore. When `None`, `send` doesn't block (caller is on the
583/// unbounded-credit fast path).
584///
585/// **Cancellation.** Dropping the handle BEFORE `finish` returns
586/// `Ok` fires a best-effort CANCEL to the server and clears the
587/// pending entry. Dropping after a successful `finish` is a no-op
588/// (terminal RESPONSE already delivered + entry removed).
589///
590/// Bidi streaming plan (Phase C).
591pub struct ClientStreamCallRaw {
592    mesh: Arc<MeshNode>,
593    target_node_id: u64,
594    request_channel: ChannelName,
595    self_origin: u64,
596    call_id: u64,
597    service: String,
598    /// Header set queued for the initial REQUEST. Drained on the
599    /// first publish (either `send` or `finish`).
600    initial_headers: Vec<(String, Vec<u8>)>,
601    /// Flag bits queued for the initial REQUEST. Always carries
602    /// `FLAG_RPC_CLIENT_STREAMING_REQUEST`; may also carry
603    /// `FLAG_RPC_PROPAGATE_TRACE` when the caller supplied a
604    /// trace context.
605    initial_flags: u16,
606    /// `deadline_ns` from `CallOptions::deadline`. Embedded in the
607    /// initial REQUEST.
608    deadline_ns: u64,
609    /// Per-call semaphore for upload credits. `None` when the
610    /// caller didn't opt into flow control (`request_window_initial`
611    /// was `None` on the `CallOptions`).
612    credit_sem: Option<Arc<tokio::sync::Semaphore>>,
613    /// Background task that drains REQUEST_GRANT credits from the
614    /// pending entry's grant mpsc into `credit_sem`. Aborted on
615    /// Drop. `None` when flow control is off.
616    grant_pump: Option<JoinHandle<()>>,
617    /// Single-shot terminal-RESPONSE receiver. Taken by `finish`;
618    /// after that `Drop` doesn't attempt to await again.
619    terminal_rx: Option<tokio::sync::oneshot::Receiver<RpcResponsePayload>>,
620    /// State machine. See [`ClientStreamState`].
621    state: ClientStreamState,
622    /// Wall-clock start (for `RpcReply::latency_ns` reporting).
623    started: Instant,
624    /// Observer-fire bookkeeping. Latched on terminal observation
625    /// in `finish`; fired once from `Drop` so the Deck NRPC tab +
626    /// every `RpcObserver` consumer sees one event per
627    /// client-streaming call.
628    observer: StreamingObserverState,
629}
630
631impl ClientStreamCallRaw {
632    /// Server-assigned `call_id`. Useful for trace correlation /
633    /// custom logging.
634    pub fn call_id(&self) -> u64 {
635        self.call_id
636    }
637
638    /// Whether this call is flow-controlled (caller set
639    /// `CallOptions::request_window_initial`).
640    pub fn flow_controlled(&self) -> bool {
641        self.credit_sem.is_some()
642    }
643
644    /// Push one body chunk to the server. Encodes as the initial
645    /// REQUEST (first call) or as a REQUEST_CHUNK (subsequent
646    /// calls). When flow control is opted into, awaits one credit
647    /// before publishing.
648    ///
649    /// Returns `Err(RpcError::Codec)` if called after [`Self::finish`].
650    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
651        match self.state {
652            ClientStreamState::Finishing | ClientStreamState::Done => {
653                return Err(RpcError::Codec {
654                    direction: CodecDirection::Encode,
655                    message: "send() called after finish()".to_string(),
656                });
657            }
658            _ => {}
659        }
660        // Gate on credit when flow control is opted into.
661        if let Some(sem) = self.credit_sem.as_ref() {
662            let permit = sem.clone().acquire_owned().await.map_err(|_| {
663                RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
664            })?;
665            permit.forget();
666        }
667        self.observer.add_request_bytes(body.len() as u32);
668        match self.state {
669            ClientStreamState::JustOpened => {
670                // First send → initial REQUEST.
671                let req = RpcRequestPayload {
672                    service: self.service.clone(),
673                    deadline_ns: self.deadline_ns,
674                    flags: self.initial_flags,
675                    headers: std::mem::take(&mut self.initial_headers),
676                    body: body.clone(),
677                };
678                self.publish_initial_request(&req).await?;
679                self.state = ClientStreamState::Sending;
680            }
681            ClientStreamState::Sending => {
682                let chunk = RpcRequestChunkPayload {
683                    call_id: self.call_id,
684                    flags: 0,
685                    headers: vec![],
686                    body: body.clone(),
687                };
688                publish_request_chunk(
689                    &self.mesh,
690                    self.target_node_id,
691                    &self.request_channel,
692                    self.self_origin,
693                    &chunk,
694                )
695                .await?;
696            }
697            ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
698        }
699        Ok(())
700    }
701
702    /// Close the upload direction and await the server's terminal
703    /// RESPONSE. Emits a REQUEST_CHUNK with `FLAG_RPC_REQUEST_END`
704    /// (empty body) if the call has already published its initial
705    /// REQUEST, or an initial REQUEST with both
706    /// `FLAG_RPC_CLIENT_STREAMING_REQUEST` and
707    /// `FLAG_RPC_REQUEST_END` set (the degenerate "zero-item
708    /// upload" path) if nothing was sent.
709    ///
710    /// Consumes the handle — Drop after `finish` is a no-op.
711    pub async fn finish(mut self) -> Result<RpcReply, RpcError> {
712        match self.state {
713            ClientStreamState::JustOpened => {
714                let req = RpcRequestPayload {
715                    service: self.service.clone(),
716                    deadline_ns: self.deadline_ns,
717                    flags: self.initial_flags | FLAG_RPC_REQUEST_END,
718                    headers: std::mem::take(&mut self.initial_headers),
719                    body: Bytes::new(),
720                };
721                self.publish_initial_request(&req).await?;
722            }
723            ClientStreamState::Sending => {
724                let chunk = RpcRequestChunkPayload {
725                    call_id: self.call_id,
726                    flags: FLAG_RPC_REQUEST_END,
727                    headers: vec![],
728                    body: Bytes::new(),
729                };
730                publish_request_chunk(
731                    &self.mesh,
732                    self.target_node_id,
733                    &self.request_channel,
734                    self.self_origin,
735                    &chunk,
736                )
737                .await?;
738            }
739            ClientStreamState::Finishing | ClientStreamState::Done => {
740                return Err(RpcError::Codec {
741                    direction: CodecDirection::Encode,
742                    message: "finish() called twice".to_string(),
743                });
744            }
745        }
746        self.state = ClientStreamState::Finishing;
747        let terminal_rx = self.terminal_rx.take().ok_or_else(|| {
748            RpcError::Transport(AdapterError::Connection(
749                "terminal receiver already consumed".into(),
750            ))
751        })?;
752        // Honor the deadline if the caller set one.
753        let resp = if self.deadline_ns > 0 {
754            let now = std::time::SystemTime::now()
755                .duration_since(std::time::UNIX_EPOCH)
756                .map(|d| d.as_nanos() as u64)
757                .unwrap_or(0);
758            let remaining = self.deadline_ns.saturating_sub(now);
759            match tokio::time::timeout(std::time::Duration::from_nanos(remaining), terminal_rx)
760                .await
761            {
762                Ok(Ok(r)) => r,
763                Ok(Err(_)) => {
764                    let msg = "terminal sender dropped before response arrived";
765                    self.observer.latch_error(msg);
766                    return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
767                }
768                Err(_elapsed) => {
769                    let elapsed_ms = self.started.elapsed().as_millis() as u64;
770                    self.observer.latch_timeout();
771                    return Err(RpcError::Timeout { elapsed_ms });
772                }
773            }
774        } else {
775            match terminal_rx.await {
776                Ok(r) => r,
777                Err(_) => {
778                    let msg = "terminal sender dropped before response arrived";
779                    self.observer.latch_error(msg);
780                    return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
781                }
782            }
783        };
784        self.state = ClientStreamState::Done;
785        self.observer.add_response_bytes(resp.body.len() as u32);
786        if !resp.status.is_ok() {
787            // String::from_utf8 takes `Vec<u8>`. `Bytes::to_vec()`
788            // matches the prior `resp.body.clone()` semantics (full
789            // copy of the body for the error-formatting path);
790            // bulk-throughput improvement lives on the decode side,
791            // not here.
792            let message = String::from_utf8(resp.body.to_vec())
793                .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
794            self.observer.latch_error(format!(
795                "server returned status {:#06x}: {message}",
796                resp.status.to_wire()
797            ));
798            return Err(RpcError::ServerError {
799                status: resp.status.to_wire(),
800                message,
801            });
802        }
803        self.observer.latch_ok();
804        let latency_ns = self.started.elapsed().as_nanos() as u64;
805        Ok(RpcReply {
806            body: resp.body,
807            headers: resp.headers,
808            latency_ns,
809        })
810    }
811
812    async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
813        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self.self_origin, self.call_id, 0);
814        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
815        buf.extend_from_slice(&meta.to_bytes());
816        buf.extend_from_slice(&req.encode());
817        let request_channel_id = ChannelId::new(self.request_channel.clone());
818        let request_channel_hash = request_channel_id.hash();
819        let stream_id = MeshNode::publish_stream_id(&request_channel_id);
820        let payload = Bytes::from(buf);
821        self.mesh
822            .publish_to_peer(
823                self.target_node_id,
824                request_channel_hash,
825                stream_id,
826                /* reliable */ true,
827                std::slice::from_ref(&payload),
828            )
829            .await
830            .map_err(RpcError::Transport)
831    }
832}
833
834impl Drop for ClientStreamCallRaw {
835    fn drop(&mut self) {
836        if let Some(task) = self.grant_pump.take() {
837            task.abort();
838        }
839        // Fire the observer with whatever status was latched
840        // (Ok / Error / Timeout / Canceled). Idempotent — only
841        // the first call emits.
842        self.observer.fire();
843        if matches!(self.state, ClientStreamState::Done) {
844            // Successful completion — pending entry already gone,
845            // no CANCEL needed.
846            return;
847        }
848        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
849        // Only fire CANCEL on the wire if the server has actually
850        // seen the initial REQUEST. A `JustOpened` Drop means we
851        // never published anything; no need to CANCEL a call the
852        // server doesn't know about.
853        if !matches!(self.state, ClientStreamState::JustOpened) {
854            spawn_cancel_publish(
855                Arc::clone(&self.mesh),
856                self.target_node_id,
857                self.request_channel.clone(),
858                self.self_origin,
859                self.call_id,
860            );
861        }
862    }
863}
864
865// ============================================================================
866// Phase D — caller-side duplex primitive.
867// ============================================================================
868
869/// Shared state between a `DuplexSink` and its sibling
870/// `DuplexStream`. Both halves hold an `Arc<DuplexInner>`; when
871/// the refcount hits zero (i.e. both halves dropped) the Drop
872/// fires CANCEL to the server unless the call was cleanly closed
873/// (`clean_close = true`).
874struct DuplexInner {
875    mesh: Arc<MeshNode>,
876    target_node_id: u64,
877    request_channel: ChannelName,
878    self_origin: u64,
879    call_id: u64,
880    /// Whether the initial REQUEST was successfully published.
881    /// `false` means we never reached the wire — no CANCEL needed
882    /// (server doesn't know about the call).
883    initial_sent: std::sync::atomic::AtomicBool,
884    /// Set true when the call closes cleanly — terminal RESPONSE
885    /// (or terminal Error) was observed on the response stream.
886    /// Suppresses CANCEL-on-drop.
887    clean_close: std::sync::atomic::AtomicBool,
888    /// Observer-fire bookkeeping. Latched from the various
889    /// terminal-observation sites (DuplexCall::next /
890    /// DuplexStream::poll_next yielding End or Error); fired
891    /// once on Drop. The DuplexCall / DuplexSink / DuplexStream
892    /// each share access via the surrounding Arc<DuplexInner>.
893    observer: StreamingObserverState,
894}
895
896impl Drop for DuplexInner {
897    fn drop(&mut self) {
898        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
899        // Fire the observer with the latched status (Ok / Error /
900        // Canceled). Idempotent — only the first call emits.
901        self.observer.fire();
902        if self.clean_close.load(Ordering::SeqCst) {
903            return;
904        }
905        if !self.initial_sent.load(Ordering::SeqCst) {
906            return;
907        }
908        spawn_cancel_publish(
909            Arc::clone(&self.mesh),
910            self.target_node_id,
911            self.request_channel.clone(),
912            self.self_origin,
913            self.call_id,
914        );
915    }
916}
917
918/// Send half of a duplex call. Push items via `send`; emit the
919/// terminal REQUEST_END frame via `finish_sending`. After
920/// `finish_sending` the upload side is closed but the sibling
921/// `DuplexStream` continues yielding response chunks until the
922/// server's terminal frame arrives.
923///
924/// Bidi streaming plan (Phase D).
925pub struct DuplexSink {
926    inner: Arc<DuplexInner>,
927    service: String,
928    initial_headers: Vec<(String, Vec<u8>)>,
929    initial_flags: u16,
930    deadline_ns: u64,
931    credit_sem: Option<Arc<tokio::sync::Semaphore>>,
932    grant_pump: Option<JoinHandle<()>>,
933    state: ClientStreamState,
934}
935
936impl DuplexSink {
937    /// Push one body chunk to the server. Same semantics as
938    /// [`ClientStreamCallRaw::send`].
939    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
940        match self.state {
941            ClientStreamState::Finishing | ClientStreamState::Done => {
942                return Err(RpcError::Codec {
943                    direction: CodecDirection::Encode,
944                    message: "send() called after finish_sending()".to_string(),
945                });
946            }
947            _ => {}
948        }
949        if let Some(sem) = self.credit_sem.as_ref() {
950            let permit = sem.clone().acquire_owned().await.map_err(|_| {
951                RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
952            })?;
953            permit.forget();
954        }
955        self.inner.observer.add_request_bytes(body.len() as u32);
956        match self.state {
957            ClientStreamState::JustOpened => {
958                let req = RpcRequestPayload {
959                    service: self.service.clone(),
960                    deadline_ns: self.deadline_ns,
961                    flags: self.initial_flags,
962                    headers: std::mem::take(&mut self.initial_headers),
963                    body: body.clone(),
964                };
965                self.publish_initial_request(&req).await?;
966                self.inner.initial_sent.store(true, Ordering::SeqCst);
967                self.state = ClientStreamState::Sending;
968            }
969            ClientStreamState::Sending => {
970                let chunk = RpcRequestChunkPayload {
971                    call_id: self.inner.call_id,
972                    flags: 0,
973                    headers: vec![],
974                    body: body.clone(),
975                };
976                publish_request_chunk(
977                    &self.inner.mesh,
978                    self.inner.target_node_id,
979                    &self.inner.request_channel,
980                    self.inner.self_origin,
981                    &chunk,
982                )
983                .await?;
984            }
985            ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
986        }
987        Ok(())
988    }
989
990    /// Close the upload direction. Emits the terminal REQUEST_END
991    /// frame. The response stream continues until the server's
992    /// terminal RESPONSE arrives (use the sibling `DuplexStream`).
993    pub async fn finish_sending(mut self) -> Result<(), RpcError> {
994        match self.state {
995            ClientStreamState::JustOpened => {
996                let req = RpcRequestPayload {
997                    service: self.service.clone(),
998                    deadline_ns: self.deadline_ns,
999                    flags: self.initial_flags | FLAG_RPC_REQUEST_END,
1000                    headers: std::mem::take(&mut self.initial_headers),
1001                    body: Bytes::new(),
1002                };
1003                self.publish_initial_request(&req).await?;
1004                self.inner.initial_sent.store(true, Ordering::SeqCst);
1005            }
1006            ClientStreamState::Sending => {
1007                let chunk = RpcRequestChunkPayload {
1008                    call_id: self.inner.call_id,
1009                    flags: FLAG_RPC_REQUEST_END,
1010                    headers: vec![],
1011                    body: Bytes::new(),
1012                };
1013                publish_request_chunk(
1014                    &self.inner.mesh,
1015                    self.inner.target_node_id,
1016                    &self.inner.request_channel,
1017                    self.inner.self_origin,
1018                    &chunk,
1019                )
1020                .await?;
1021            }
1022            ClientStreamState::Finishing | ClientStreamState::Done => {
1023                return Err(RpcError::Codec {
1024                    direction: CodecDirection::Encode,
1025                    message: "finish_sending() called twice".to_string(),
1026                });
1027            }
1028        }
1029        self.state = ClientStreamState::Finishing;
1030        Ok(())
1031    }
1032
1033    /// Server-assigned `call_id`. Same value on the sibling
1034    /// `DuplexStream`.
1035    pub fn call_id(&self) -> u64 {
1036        self.inner.call_id
1037    }
1038
1039    /// Whether this call is flow-controlled on the upload side.
1040    pub fn flow_controlled(&self) -> bool {
1041        self.credit_sem.is_some()
1042    }
1043
1044    async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
1045        let meta = EventMeta::new(
1046            DISPATCH_RPC_REQUEST,
1047            0,
1048            self.inner.self_origin,
1049            self.inner.call_id,
1050            0,
1051        );
1052        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
1053        buf.extend_from_slice(&meta.to_bytes());
1054        buf.extend_from_slice(&req.encode());
1055        let request_channel_id = ChannelId::new(self.inner.request_channel.clone());
1056        let request_channel_hash = request_channel_id.hash();
1057        let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1058        let payload = Bytes::from(buf);
1059        self.inner
1060            .mesh
1061            .publish_to_peer(
1062                self.inner.target_node_id,
1063                request_channel_hash,
1064                stream_id,
1065                /* reliable */ true,
1066                std::slice::from_ref(&payload),
1067            )
1068            .await
1069            .map_err(RpcError::Transport)
1070    }
1071}
1072
1073impl Drop for DuplexSink {
1074    fn drop(&mut self) {
1075        if let Some(task) = self.grant_pump.take() {
1076            task.abort();
1077        }
1078        // The shared DuplexInner's Drop (when refcount hits 0)
1079        // does the CANCEL — nothing to do here beyond aborting
1080        // the grant pump.
1081    }
1082}
1083
1084/// Receive half of a duplex call. Implements `futures::Stream`
1085/// yielding `Result<Bytes, RpcError>` per inbound RESPONSE chunk.
1086/// EOF on terminal Ok; one final `Err(RpcError::ServerError)` on
1087/// terminal non-Ok.
1088///
1089/// Bidi streaming plan (Phase D).
1090pub struct DuplexStream {
1091    inner: Arc<DuplexInner>,
1092    chunks_rx: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
1093    done: bool,
1094}
1095
1096impl DuplexStream {
1097    /// Server-assigned `call_id`. Same value on the sibling
1098    /// `DuplexSink`.
1099    pub fn call_id(&self) -> u64 {
1100        self.inner.call_id
1101    }
1102}
1103
1104impl futures::Stream for DuplexStream {
1105    type Item = Result<Bytes, RpcError>;
1106
1107    fn poll_next(
1108        mut self: std::pin::Pin<&mut Self>,
1109        cx: &mut std::task::Context<'_>,
1110    ) -> std::task::Poll<Option<Self::Item>> {
1111        if self.done {
1112            return std::task::Poll::Ready(None);
1113        }
1114        match self.chunks_rx.poll_recv(cx) {
1115            std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
1116                self.inner.observer.add_response_bytes(body.len() as u32);
1117                std::task::Poll::Ready(Some(Ok(body)))
1118            }
1119            std::task::Poll::Ready(Some(StreamItem::End)) => {
1120                self.done = true;
1121                self.inner.clean_close.store(true, Ordering::SeqCst);
1122                self.inner.observer.latch_ok();
1123                std::task::Poll::Ready(None)
1124            }
1125            std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
1126                self.done = true;
1127                self.inner.clean_close.store(true, Ordering::SeqCst);
1128                let status = resp.status.to_wire();
1129                let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
1130                    format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
1131                });
1132                self.inner
1133                    .observer
1134                    .latch_error(format!("server returned status {status:#06x}: {message}"));
1135                std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
1136            }
1137            std::task::Poll::Ready(None) => {
1138                self.done = true;
1139                std::task::Poll::Ready(None)
1140            }
1141            std::task::Poll::Pending => std::task::Poll::Pending,
1142        }
1143    }
1144}
1145
1146/// Caller-side handle for a duplex RPC. Combines a `DuplexSink`
1147/// (upload) and `DuplexStream` (download). For application code
1148/// that wants to encode requests in one task and decode responses
1149/// in another, use [`Self::into_split`] to peel off the two halves.
1150///
1151/// Bidi streaming plan (Phase D).
1152pub struct DuplexCallRaw {
1153    sink: DuplexSink,
1154    stream: DuplexStream,
1155}
1156
1157impl DuplexCallRaw {
1158    /// Server-assigned `call_id`.
1159    pub fn call_id(&self) -> u64 {
1160        self.sink.call_id()
1161    }
1162
1163    /// Whether the upload side is flow-controlled.
1164    pub fn flow_controlled(&self) -> bool {
1165        self.sink.flow_controlled()
1166    }
1167
1168    /// Push one body chunk to the server. Delegates to the inner
1169    /// `DuplexSink::send`.
1170    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1171        self.sink.send(body).await
1172    }
1173
1174    /// Close the upload direction. Delegates to the inner
1175    /// `DuplexSink::finish_sending` but keeps the receive side
1176    /// alive so the caller can keep polling response chunks.
1177    ///
1178    /// NOTE: consumes the sink half but not the stream half.
1179    /// Internally, we replace `self.sink` with a no-op
1180    /// placeholder so subsequent send() / finish_sending()
1181    /// surface a clear error (`send() after finish_sending()`).
1182    pub async fn finish_sending(&mut self) -> Result<(), RpcError> {
1183        // Take the sink out by swapping in a placeholder whose
1184        // state is `Done` so subsequent sends error cleanly.
1185        let placeholder = DuplexSink {
1186            inner: Arc::clone(&self.sink.inner),
1187            service: String::new(),
1188            initial_headers: Vec::new(),
1189            initial_flags: 0,
1190            deadline_ns: 0,
1191            credit_sem: None,
1192            grant_pump: None,
1193            state: ClientStreamState::Done,
1194        };
1195        let sink = std::mem::replace(&mut self.sink, placeholder);
1196        sink.finish_sending().await
1197    }
1198
1199    /// Pull the next response chunk. `None` on terminal Ok;
1200    /// `Some(Err)` then `None` on terminal non-Ok. Same shape as
1201    /// `futures::StreamExt::next`.
1202    pub async fn next(&mut self) -> Option<Result<Bytes, RpcError>> {
1203        use futures::StreamExt;
1204        self.stream.next().await
1205    }
1206
1207    /// Split into independent send / receive halves. Both halves
1208    /// hold an `Arc<DuplexInner>`; CANCEL fires only when BOTH
1209    /// halves drop without a clean close.
1210    pub fn into_split(self) -> (DuplexSink, DuplexStream) {
1211        (self.sink, self.stream)
1212    }
1213}
1214
1215impl futures::Stream for DuplexCallRaw {
1216    type Item = Result<Bytes, RpcError>;
1217
1218    fn poll_next(
1219        mut self: std::pin::Pin<&mut Self>,
1220        cx: &mut std::task::Context<'_>,
1221    ) -> std::task::Poll<Option<Self::Item>> {
1222        std::pin::Pin::new(&mut self.stream).poll_next(cx)
1223    }
1224}
1225
1226// ============================================================================
1227// Unary call: CANCEL-on-drop guard.
1228// ============================================================================
1229
1230/// RAII guard that fires CANCEL to the server if the unary call
1231/// future is dropped before a response arrives. Without this, a
1232/// `select!`-loser future (e.g. hedge runner-up) would leave the
1233/// server-side handler running to completion — wasting CPU on a
1234/// reply nobody will read.
1235///
1236/// The guard is built *after* the REQUEST has been successfully
1237/// published — if the publish fails, no guard is constructed and
1238/// no CANCEL is sent. On the success path the call function flips
1239/// `completed = true` so Drop becomes a no-op (the server already
1240/// finished and removed its in-flight entry).
1241struct UnaryCallGuard {
1242    pending: Arc<super::cortex::RpcClientPending>,
1243    mesh: Arc<MeshNode>,
1244    target_node_id: u64,
1245    request_channel: ChannelName,
1246    self_origin: u64,
1247    call_id: u64,
1248    /// True after the call resolved Ok or got a definitive
1249    /// non-cancellable Err. Drop checks this — `false` fires
1250    /// CANCEL, `true` is a no-op (still removes the pending
1251    /// entry).
1252    completed: bool,
1253}
1254
1255impl Drop for UnaryCallGuard {
1256    fn drop(&mut self) {
1257        self.pending.cancel(self.call_id);
1258        if !self.completed {
1259            spawn_cancel_publish(
1260                Arc::clone(&self.mesh),
1261                self.target_node_id,
1262                self.request_channel.clone(),
1263                self.self_origin,
1264                self.call_id,
1265            );
1266        }
1267    }
1268}
1269
1270// ============================================================================
1271// Streaming/duplex observer-fire bookkeeping.
1272//
1273// The unary `MeshNode::call` fires `RpcObserver::on_call` at each
1274// terminal return path (see line ~2306). The streaming /
1275// client-streaming / duplex paths have multiple terminal points
1276// (poll_next sees End / Error; finish() returns; Drop without
1277// terminal observation). To avoid sprinkling `fire_rpc_observer_outbound`
1278// at every terminal site, each handle holds a
1279// `StreamingObserverState` that latches the terminal status on
1280// observation and fires exactly once on Drop. The Deck NRPC tab
1281// + every consumer of `RpcObserver` get one event per streaming
1282// / duplex call, same as for unary today.
1283// ============================================================================
1284
1285/// Per-call observer-fire bookkeeping shared between the
1286/// streaming + client-streaming + duplex caller-side handles.
1287/// Latches terminal status on observation; `fire()` (called from
1288/// the handle's Drop) emits one `RpcCallEvent` with the latched
1289/// status (or `Canceled` if nothing latched — i.e. the handle
1290/// was dropped before observing its terminator).
1291///
1292/// Status discriminator:
1293///   0 = none latched (Drop → Canceled)
1294///   1 = Ok
1295///   2 = Error (message in `observer_msg`)
1296///   3 = Timeout
1297pub(crate) struct StreamingObserverState {
1298    mesh: Arc<MeshNode>,
1299    target_node_id: u64,
1300    service: String,
1301    started: Instant,
1302    request_bytes: AtomicU32,
1303    response_bytes: AtomicU32,
1304    observer_status: AtomicU8,
1305    observer_msg: parking_lot::Mutex<Option<String>>,
1306    fired: AtomicBool,
1307}
1308
1309impl StreamingObserverState {
1310    pub(crate) fn new(
1311        mesh: Arc<MeshNode>,
1312        target_node_id: u64,
1313        service: impl Into<String>,
1314        request_bytes: u32,
1315    ) -> Self {
1316        Self {
1317            mesh,
1318            target_node_id,
1319            service: service.into(),
1320            started: Instant::now(),
1321            request_bytes: AtomicU32::new(request_bytes),
1322            response_bytes: AtomicU32::new(0),
1323            observer_status: AtomicU8::new(0),
1324            observer_msg: parking_lot::Mutex::new(None),
1325            fired: AtomicBool::new(false),
1326        }
1327    }
1328
1329    pub(crate) fn add_request_bytes(&self, n: u32) {
1330        self.request_bytes.fetch_add(n, Ordering::Relaxed);
1331    }
1332
1333    pub(crate) fn add_response_bytes(&self, n: u32) {
1334        self.response_bytes.fetch_add(n, Ordering::Relaxed);
1335    }
1336
1337    pub(crate) fn latch_ok(&self) {
1338        self.observer_status.store(1, Ordering::Relaxed);
1339    }
1340
1341    pub(crate) fn latch_error(&self, msg: impl Into<String>) {
1342        *self.observer_msg.lock() = Some(msg.into());
1343        self.observer_status.store(2, Ordering::Relaxed);
1344    }
1345
1346    pub(crate) fn latch_timeout(&self) {
1347        self.observer_status.store(3, Ordering::Relaxed);
1348    }
1349
1350    /// Fire the observer event. Idempotent — only the first call
1351    /// actually emits; subsequent are no-ops. Called from each
1352    /// streaming handle's Drop.
1353    pub(crate) fn fire(&self) {
1354        if self.fired.swap(true, Ordering::SeqCst) {
1355            return;
1356        }
1357        let status_code = self.observer_status.load(Ordering::Relaxed);
1358        let status = match status_code {
1359            1 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
1360            2 => {
1361                let msg = self.observer_msg.lock().clone().unwrap_or_default();
1362                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(msg)
1363            }
1364            3 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
1365            _ => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1366        };
1367        self.mesh.fire_rpc_observer_outbound(
1368            self.target_node_id,
1369            &self.service,
1370            self.started.elapsed().as_millis() as u32,
1371            status,
1372            self.request_bytes.load(Ordering::Relaxed),
1373            self.response_bytes.load(Ordering::Relaxed),
1374        );
1375    }
1376}
1377
1378/// Per-call cap on in-flight request-direction credits. Tokio's
1379/// `Semaphore::MAX_PERMITS` is `usize::MAX >> 3`; we cap the
1380/// caller-side accumulator at this value so a misbehaving server
1381/// can't make the caller hold an unbounded outstanding window.
1382/// 1M credits is already orders of magnitude beyond any sane
1383/// request burst — a caller sitting on 1M unconsumed credits is
1384/// either misconfigured or under attack.
1385const REQUEST_GRANT_PER_CALL_CAP: usize = 1_000_000;
1386
1387/// Add `credits` to a caller-side request-direction credit
1388/// semaphore, capped so the accumulator never exceeds
1389/// [`REQUEST_GRANT_PER_CALL_CAP`]. Per-frame cap of `usize::MAX >> 4`
1390/// remains as a second line of defense against pathological frame
1391/// values.
1392fn add_request_grant_credits(sem: &tokio::sync::Semaphore, credits: u32) {
1393    if credits == 0 {
1394        return;
1395    }
1396    let current = sem.available_permits();
1397    let remaining = REQUEST_GRANT_PER_CALL_CAP.saturating_sub(current);
1398    let safe = (credits as usize).min(usize::MAX >> 4).min(remaining);
1399    if safe > 0 {
1400        sem.add_permits(safe);
1401    }
1402}
1403
1404/// Build a coalescing REQUEST_GRANT emitter.
1405///
1406/// Naive emitters `tokio::spawn` one publish task per consumed
1407/// chunk, which becomes a spawn-storm + AEAD-storm under bursting.
1408/// This helper hands back an emitter that pushes `(caller_origin,
1409/// call_id, credits)` into an unbounded mpsc; a single dedicated
1410/// drainer task `try_recv`s the queue to drain whatever is
1411/// immediately available, coalesces credits per call_id, and
1412/// publishes ONE batched REQUEST_GRANT per call per drain cycle.
1413///
1414/// Lifecycle: the drainer task lives as long as any clone of the
1415/// returned emitter (mpsc sender count > 0). When the fold and all
1416/// in-flight handlers release the emitter, `rx.recv` returns `None`
1417/// and the drainer exits naturally.
1418fn build_request_grant_emitter(
1419    mesh: Arc<MeshNode>,
1420    service: String,
1421    server_origin: u64,
1422    diag_tag: &'static str,
1423) -> RpcRequestGrantEmitter {
1424    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(u64, u64, u32)>();
1425    tokio::spawn(async move {
1426        while let Some(first) = rx.recv().await {
1427            let mut summed: std::collections::HashMap<(u64, u64), u32> =
1428                std::collections::HashMap::new();
1429            let (caller, call_id, credits) = first;
1430            summed.insert((caller, call_id), credits);
1431            // Coalesce anything immediately queued behind the first
1432            // wake. Bounded by what the substrate has produced so
1433            // far; doesn't add latency since `try_recv` returns
1434            // immediately when the queue is empty.
1435            while let Ok((caller, call_id, credits)) = rx.try_recv() {
1436                let entry = summed.entry((caller, call_id)).or_insert(0);
1437                *entry = entry.saturating_add(credits);
1438            }
1439            for ((caller, call_id), credits) in summed {
1440                let reply_channel_name = format!("{service}.replies.{caller:016x}");
1441                let reply_channel = match ChannelName::new(&reply_channel_name) {
1442                    Ok(c) => c,
1443                    Err(e) => {
1444                        tracing::warn!(
1445                            error = %e,
1446                            channel = %reply_channel_name,
1447                            tag = diag_tag,
1448                            "rpc grant drainer: invalid reply channel name");
1449                        continue;
1450                    }
1451                };
1452                let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, server_origin, call_id, 0);
1453                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
1454                buf.extend_from_slice(&meta.to_bytes());
1455                buf.extend_from_slice(&encode_request_grant(call_id, credits));
1456                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1457                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1458                    tracing::warn!(
1459                        error = %e,
1460                        caller_origin = format!("{:#x}", caller),
1461                        call_id,
1462                        tag = diag_tag,
1463                        "rpc grant drainer: REQUEST_GRANT publish failed");
1464                }
1465            }
1466        }
1467    });
1468    Arc::new(move |caller_origin, call_id, credits| {
1469        // Send failure means the drainer has exited (all sender
1470        // clones dropped, then we somehow cloned a stale one).
1471        // Treat as a no-op — the call is tearing down anyway.
1472        let _ = tx.send((caller_origin, call_id, credits));
1473    })
1474}
1475
1476/// Shared CANCEL-publish helper: spawn a task that fires a
1477/// CANCEL event for `call_id` to `target` on the request channel.
1478/// Both [`RpcStream::Drop`] and [`UnaryCallGuard::Drop`] use it.
1479fn spawn_cancel_publish(
1480    mesh: Arc<MeshNode>,
1481    target: u64,
1482    request_channel: ChannelName,
1483    self_origin: u64,
1484    call_id: u64,
1485) {
1486    tokio::spawn(async move {
1487        let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, self_origin, call_id, 0);
1488        let request_channel_id = ChannelId::new(request_channel);
1489        let request_channel_hash = request_channel_id.hash();
1490        let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1491        let payload = Bytes::from(meta.to_bytes().to_vec());
1492        let _ = mesh
1493            .publish_to_peer(
1494                target,
1495                request_channel_hash,
1496                stream_id,
1497                /* reliable */ true,
1498                std::slice::from_ref(&payload),
1499            )
1500            .await;
1501    });
1502}
1503
1504// ============================================================================
1505// MeshNode extensions.
1506// ============================================================================
1507
1508impl MeshNode {
1509    /// Register an nRPC handler for `service` on this node.
1510    ///
1511    /// Subscribes this node to `<service>.requests` (so the local
1512    /// `register_rpc_inbound` dispatcher feeds inbound REQUEST
1513    /// events into the [`RpcServerFold`]) and wires the fold's
1514    /// RESPONSE-emit callback to publish on
1515    /// `<service>.replies.<caller_origin>` via the existing
1516    /// pub/sub path.
1517    ///
1518    /// **Local-only registration** (Phase 1). Multi-instance
1519    /// services that load-balance via `SubscriptionMode::QueueGroup`
1520    /// require each replica to call `serve_rpc` on its own node;
1521    /// the mesh-level subscriber roster + `dispatch_recipients`
1522    /// then routes one-of-N as designed. Each replica's local
1523    /// `serve_rpc` must use the same service name (which becomes
1524    /// the queue-group identifier).
1525    ///
1526    /// Returns a [`ServeHandle`] whose Drop tears down the
1527    /// registration. Concurrent registrations for the same service
1528    /// on one node return `Err(ServeError::AlreadyServing)`.
1529    pub fn serve_rpc<H: RpcHandler>(
1530        self: &Arc<Self>,
1531        service: &str,
1532        handler: Arc<H>,
1533    ) -> Result<ServeHandle, ServeError> {
1534        let request_channel = ChannelName::new(&format!("{service}.requests"))
1535            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1536        let channel_hash = request_channel.hash();
1537
1538        // Bridge: a tokio mpsc the inbound dispatcher pushes into.
1539        // The bridge task drains it and runs each event through
1540        // the fold. Bounded so a runaway publisher can't OOM the
1541        // server; over-cap pushes drop the inbound event (which
1542        // surfaces to the caller as a timeout).
1543        let (tx, mut rx) = mpsc::channel::<RpcInboundEvent>(1024);
1544
1545        // Build the emit closure. When the handler completes, the
1546        // fold calls this with `(caller_origin, call_id, response)`.
1547        // The closure publishes a RESPONSE event on
1548        // `<service>.replies.<caller_origin>` via the existing
1549        // pub/sub path. `tokio::spawn` keeps the closure
1550        // synchronous (the fold doesn't await).
1551        let mesh_for_emit = Arc::clone(self);
1552        let service_for_emit = service.to_string();
1553        let server_origin = self.identity_origin_hash();
1554        let emit: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1555            let mesh = Arc::clone(&mesh_for_emit);
1556            let service = service_for_emit.clone();
1557            tokio::spawn(async move {
1558                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1559                let reply_channel = match ChannelName::new(&reply_channel_name) {
1560                    Ok(c) => c,
1561                    Err(e) => {
1562                        tracing::warn!(error = %e, channel = %reply_channel_name,
1563                            "rpc serve_rpc: invalid reply channel name");
1564                        return;
1565                    }
1566                };
1567                // Build the RESPONSE event envelope: 24-byte meta
1568                // + encoded RpcResponsePayload.
1569                let meta = EventMeta::new(
1570                    super::cortex::DISPATCH_RPC_RESPONSE,
1571                    0,
1572                    server_origin,
1573                    call_id,
1574                    0,
1575                );
1576                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1577                buf.extend_from_slice(&meta.to_bytes());
1578                buf.extend_from_slice(&resp.encode());
1579
1580                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1581                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1582                    tracing::warn!(error = %e, caller_origin = format!("{:#x}", caller_origin),
1583                        call_id, "rpc serve_rpc: response publish failed");
1584                }
1585            });
1586        });
1587
1588        // Build the server fold and wrap it in an Arc<Mutex<...>>
1589        // so the bridge task can drive it (the trait takes
1590        // `&mut self`). Attach the per-service metrics handle so
1591        // the spawned handler tasks bump server-side counters.
1592        let metrics_handle = self.rpc_metrics_arc().for_service(service);
1593        // Keep a clone of the emit closure for the callee-side
1594        // capability-auth defense-in-depth path in the bridge
1595        // below — the fold owns its own clone, this one only
1596        // emits the `CapabilityDenied` rejection before the fold
1597        // sees the event.
1598        let emit_for_bridge = Arc::clone(&emit);
1599        // Clone the per-service metrics handle so the bridge can
1600        // bump `capability_denied_total` on gate rejection. The
1601        // fold's own clone (passed via `with_metrics`) handles the
1602        // handler-side counters; this one covers the path BEFORE
1603        // the handler runs, which the fold-side metrics never see.
1604        let metrics_for_bridge = Arc::clone(&metrics_handle);
1605        let fold = Arc::new(Mutex::new(
1606            RpcServerFold::new(handler as Arc<dyn RpcHandler>, emit).with_metrics(metrics_handle),
1607        ));
1608
1609        // Register the inbound dispatcher. Push into the mpsc;
1610        // the bridge task does the actual fold work.
1611        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1612            // Best-effort send — over-cap means the bridge can't
1613            // keep up; drop and let the caller time out. Logging
1614            // here would spam.
1615            let _ = tx.try_send(ev);
1616        });
1617        // Register the service in `rpc_local_services` and refresh
1618        // the self-indexed announcement BEFORE installing the
1619        // dispatcher so the callee-side gate (in the bridge below)
1620        // sees a self-announcement carrying `nrpc:<service>` the
1621        // moment the first inbound event lands. Without this, the
1622        // gate was either silently permissive (no self-ann) or
1623        // silently denying (self-ann from a prior
1624        // `announce_capabilities` that pre-dated this service's
1625        // registration). See `docs/misc/CODE_REVIEW_2026_05_19_CAPABILITY_AUTH.md`
1626        // H1 + H2.
1627        self.rpc_local_services_arc().insert(service.to_string());
1628        self.index_self_with_local_services();
1629
1630        if self
1631            .register_rpc_inbound(channel_hash, dispatcher)
1632            .is_some()
1633        {
1634            return Err(ServeError::AlreadyServing(service.to_string()));
1635        }
1636
1637        // Spawn the bridge task. It reads inbound events, runs
1638        // the v0.4 capability-auth callee-side gate (defense in
1639        // depth — the caller-side gate inside `call_service`
1640        // covers the well-behaved client path), and on accept
1641        // feeds them to the fold.
1642        let mesh_for_bridge = Arc::clone(self);
1643        let service_for_bridge = service.to_string();
1644        let bridge = tokio::spawn(async move {
1645            let tag = format!("nrpc:{}", service_for_bridge);
1646            while let Some(inbound) = rx.recv().await {
1647                // Defense-in-depth check. Skip only when the wire
1648                // session resolved no NodeId (`from_node == 0` is
1649                // the loopback / test sentinel per
1650                // `RpcInboundEvent::from_node` — production wire
1651                // delivery drops events that fail NodeId
1652                // resolution rather than passing 0). The cold-
1653                // start "no self-ann" skip the original
1654                // implementation carried was a permissive hole;
1655                // `index_self_with_local_services` above
1656                // guarantees a self-ann exists before the
1657                // dispatcher is wired, so denying when the gate
1658                // says no is now the safe failure mode.
1659                let self_node = mesh_for_bridge.node_id();
1660                let index = mesh_for_bridge.capability_index_arc();
1661                let from_node = inbound.from_node;
1662                if from_node != 0 && !index.may_execute(self_node, &tag, from_node) {
1663                    // Decode the EventMeta so we can address the
1664                    // caller's reply channel (keyed on
1665                    // `caller_origin`) and tag the response with
1666                    // the correct `call_id`. A garbled meta means
1667                    // the request would have been rejected by the
1668                    // fold's own decode path too; drop silently
1669                    // to match the existing skip-on-malformed
1670                    // behavior there.
1671                    let Some(meta) = (if inbound.payload.len() >= EVENT_META_SIZE {
1672                        EventMeta::from_bytes(&inbound.payload[..EVENT_META_SIZE])
1673                    } else {
1674                        None
1675                    }) else {
1676                        continue;
1677                    };
1678                    let resp = super::cortex::RpcResponsePayload {
1679                        status: RpcStatus::CapabilityDenied,
1680                        headers: vec![],
1681                        body: Bytes::from(format!(
1682                            "callee-side capability-auth gate denied nrpc:{}",
1683                            service_for_bridge
1684                        )),
1685                    };
1686                    // Server-side metrics: bump `capability_denied_total`
1687                    // on the per-service counter. The fold-side
1688                    // metrics never see this path (the handler isn't
1689                    // invoked), so without this bump a noisy
1690                    // unauthorized caller is invisible to operators
1691                    // watching `nrpc_handler_invocations_total` —
1692                    // the dashboard sees "0 requests" while the
1693                    // caller sees `CapabilityDenied`.
1694                    metrics_for_bridge
1695                        .capability_denied_total
1696                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1697                    (emit_for_bridge)(meta.origin_hash, meta.seq_or_ts, resp);
1698                    continue;
1699                }
1700                let payload = inbound.payload;
1701                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1702                let ev = RedexEvent { entry, payload };
1703                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1704                    tracing::warn!(error = %e, "rpc serve_rpc: fold apply error");
1705                }
1706            }
1707        });
1708
1709        // Spawn an async re-announce so peers also learn about
1710        // the new service without the operator having to call
1711        // `announce_capabilities` manually. The local self-index
1712        // already happened above; this is purely for peer
1713        // visibility (the broadcast path also re-runs the
1714        // self-index, which is a cheap version bump).
1715        let mesh_for_announce = Arc::clone(self);
1716        let service_for_log = service.to_string();
1717        tokio::spawn(async move {
1718            let baseline = mesh_for_announce.user_caps_snapshot();
1719            if let Err(e) = mesh_for_announce.announce_capabilities(baseline).await {
1720                tracing::warn!(
1721                    error = %e,
1722                    service = %service_for_log,
1723                    "serve_rpc: auto re-announce failed",
1724                );
1725            }
1726        });
1727
1728        Ok(ServeHandle {
1729            channel_hash,
1730            service: service.to_string(),
1731            _bridge: bridge,
1732            mesh: Arc::clone(self),
1733        })
1734    }
1735
1736    /// Streaming variant of [`Self::serve_rpc`]. The handler
1737    /// receives an [`RpcResponseSink`](super::cortex::RpcResponseSink)
1738    /// it writes chunks to via `sink.send(body)`; returning
1739    /// `Ok(())` closes the stream cleanly, `Err(_)` closes with
1740    /// an error frame.
1741    ///
1742    /// Wire-level identical to the unary path apart from the
1743    /// per-chunk `nrpc-streaming` header markers
1744    /// (`continue` / `end`). Same auto-registration of
1745    /// `<service>.requests` + `<service>.replies.` prefix.
1746    pub fn serve_rpc_streaming<H: RpcStreamingHandler>(
1747        self: &Arc<Self>,
1748        service: &str,
1749        handler: Arc<H>,
1750    ) -> Result<ServeHandle, ServeError> {
1751        let request_channel = ChannelName::new(&format!("{service}.requests"))
1752            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1753        let channel_hash = request_channel.hash();
1754        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
1755
1756        let mesh_for_emit = Arc::clone(self);
1757        let service_for_emit = service.to_string();
1758        let server_origin = self.identity_origin_hash();
1759        // Async emit so the streaming fold's pump can `.await` each
1760        // publish — guarantees per-call chunk ordering on the wire.
1761        let emit: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1762            let mesh = Arc::clone(&mesh_for_emit);
1763            let service = service_for_emit.clone();
1764            Box::pin(async move {
1765                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1766                let reply_channel = match ChannelName::new(&reply_channel_name) {
1767                    Ok(c) => c,
1768                    Err(e) => {
1769                        tracing::warn!(error = %e, channel = %reply_channel_name,
1770                                "rpc serve_rpc_streaming: invalid reply channel name");
1771                        return;
1772                    }
1773                };
1774                let meta = EventMeta::new(
1775                    super::cortex::DISPATCH_RPC_RESPONSE,
1776                    0,
1777                    server_origin,
1778                    call_id,
1779                    0,
1780                );
1781                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1782                buf.extend_from_slice(&meta.to_bytes());
1783                buf.extend_from_slice(&resp.encode());
1784                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1785                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1786                    tracing::warn!(error = %e,
1787                            caller_origin = format!("{:#x}", caller_origin),
1788                            call_id,
1789                            "rpc serve_rpc_streaming: chunk publish failed");
1790                }
1791            })
1792        });
1793
1794        // Attach per-service metrics so the spawned handler tasks
1795        // + pump task bump server-side counters (including the
1796        // streaming-only `streaming_chunks_emitted_total`).
1797        let metrics_handle = self.rpc_metrics_arc().for_service(service);
1798        let fold = Arc::new(Mutex::new(
1799            RpcServerStreamingFold::new(handler as Arc<dyn RpcStreamingHandler>, emit)
1800                .with_metrics(metrics_handle),
1801        ));
1802        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1803            let _ = tx.try_send(ev);
1804        });
1805        if self
1806            .register_rpc_inbound(channel_hash, dispatcher)
1807            .is_some()
1808        {
1809            return Err(ServeError::AlreadyServing(service.to_string()));
1810        }
1811        let bridge = tokio::spawn(async move {
1812            while let Some(inbound) = rx.recv().await {
1813                let payload = inbound.payload;
1814                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1815                let ev = RedexEvent { entry, payload };
1816                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1817                    tracing::warn!(error = %e, "rpc serve_rpc_streaming: fold apply error");
1818                }
1819            }
1820        });
1821        self.rpc_local_services_arc().insert(service.to_string());
1822        Ok(ServeHandle {
1823            channel_hash,
1824            service: service.to_string(),
1825            _bridge: bridge,
1826            mesh: Arc::clone(self),
1827        })
1828    }
1829
1830    /// Register a client-streaming nRPC handler for `service`.
1831    /// Mirror of [`Self::serve_rpc_streaming`] but using the
1832    /// request-side fold ([`RpcStreamingRequestFold`]) — the
1833    /// handler receives one stream of REQUEST_CHUNK bodies and
1834    /// emits one terminal RESPONSE.
1835    ///
1836    /// Wires two emit callbacks:
1837    /// - A sync [`RpcResponseEmitter`] for the terminal RESPONSE
1838    ///   (single emit per call, no ordering concern).
1839    /// - An [`RpcRequestGrantEmitter`] for upload-direction
1840    ///   credit grants, which publishes [`DISPATCH_RPC_REQUEST_GRANT`]
1841    ///   events on the caller's reply channel.
1842    ///
1843    /// Bidi streaming plan (Phase C).
1844    pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>(
1845        self: &Arc<Self>,
1846        service: &str,
1847        handler: Arc<H>,
1848    ) -> Result<ServeHandle, ServeError> {
1849        let request_channel = ChannelName::new(&format!("{service}.requests"))
1850            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1851        let channel_hash = request_channel.hash();
1852        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
1853
1854        let mesh_for_emit = Arc::clone(self);
1855        let service_for_emit = service.to_string();
1856        let server_origin = self.identity_origin_hash();
1857
1858        // Terminal RESPONSE emitter — sync because there's only
1859        // one RESPONSE per call (no per-call ordering concern that
1860        // would require an async-await between chunks).
1861        let emit_resp_mesh = Arc::clone(&mesh_for_emit);
1862        let emit_resp_service = service_for_emit.clone();
1863        let emit_resp: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1864            let mesh = Arc::clone(&emit_resp_mesh);
1865            let service = emit_resp_service.clone();
1866            tokio::spawn(async move {
1867                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
1868                let reply_channel = match ChannelName::new(&reply_channel_name) {
1869                    Ok(c) => c,
1870                    Err(e) => {
1871                        tracing::warn!(error = %e, channel = %reply_channel_name,
1872                                "rpc serve_rpc_client_stream: invalid reply channel name");
1873                        return;
1874                    }
1875                };
1876                let meta = EventMeta::new(
1877                    super::cortex::DISPATCH_RPC_RESPONSE,
1878                    0,
1879                    server_origin,
1880                    call_id,
1881                    0,
1882                );
1883                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
1884                buf.extend_from_slice(&meta.to_bytes());
1885                buf.extend_from_slice(&resp.encode());
1886                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1887                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1888                    tracing::warn!(error = %e,
1889                            caller_origin = format!("{:#x}", caller_origin),
1890                            call_id,
1891                            "rpc serve_rpc_client_stream: terminal RESPONSE publish failed");
1892                }
1893            });
1894        });
1895
1896        // REQUEST_GRANT emitter — coalesces per-chunk credits into
1897        // a single drainer task that batches by call_id. Avoids the
1898        // tokio::spawn-per-emit storm under bursting.
1899        let emit_grant = build_request_grant_emitter(
1900            Arc::clone(&mesh_for_emit),
1901            service_for_emit.clone(),
1902            server_origin,
1903            "serve_rpc_client_stream",
1904        );
1905
1906        let metrics_handle = self.rpc_metrics_arc().for_service(service);
1907        let fold = Arc::new(Mutex::new(
1908            RpcStreamingRequestFold::new(handler as Arc<dyn RpcClientStreamingHandler>, emit_resp)
1909                .with_grant_emitter(emit_grant)
1910                .with_metrics(metrics_handle),
1911        ));
1912        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
1913            let _ = tx.try_send(ev);
1914        });
1915        if self
1916            .register_rpc_inbound(channel_hash, dispatcher)
1917            .is_some()
1918        {
1919            return Err(ServeError::AlreadyServing(service.to_string()));
1920        }
1921        let bridge = tokio::spawn(async move {
1922            while let Some(inbound) = rx.recv().await {
1923                let payload = inbound.payload;
1924                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
1925                let ev = RedexEvent { entry, payload };
1926                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
1927                    tracing::warn!(error = %e,
1928                        "rpc serve_rpc_client_stream: fold apply error");
1929                }
1930            }
1931        });
1932        self.rpc_local_services_arc().insert(service.to_string());
1933        Ok(ServeHandle {
1934            channel_hash,
1935            service: service.to_string(),
1936            _bridge: bridge,
1937            mesh: Arc::clone(self),
1938        })
1939    }
1940
1941    /// Client-streaming variant of [`Self::call`]. Returns a
1942    /// [`ClientStreamCallRaw`] handle the caller pushes N items
1943    /// into via `send`, then `finish` to await the terminal
1944    /// RESPONSE.
1945    ///
1946    /// **Lazy initial REQUEST.** This method does NOT publish a
1947    /// REQUEST to the wire. It only ensures the caller's reply
1948    /// subscription is set up and registers the pending entry; the
1949    /// initial REQUEST is emitted by the first `send` (or by
1950    /// `finish` for the zero-item degenerate path).
1951    ///
1952    /// Sets `FLAG_RPC_CLIENT_STREAMING_REQUEST` on the initial
1953    /// REQUEST so the server's request-streaming fold knows to
1954    /// open a request-side stream. Optional `request_window_initial`
1955    /// header opts into upload-direction flow control.
1956    ///
1957    /// Bidi streaming plan (Phase C).
1958    pub async fn call_client_stream(
1959        self: &Arc<Self>,
1960        target_node_id: u64,
1961        service: &str,
1962        opts: CallOptions,
1963    ) -> Result<ClientStreamCallRaw, RpcError> {
1964        // `request_window_initial = Some(0)` would deadlock the
1965        // caller: every `send` awaits a credit, but the initial
1966        // REQUEST is lazy (not emitted until the first send), so
1967        // the server never sees the call and never publishes a
1968        // GRANT. Reject up front — `None` means "unbounded credit",
1969        // any positive value opts into flow control.
1970        if matches!(opts.request_window_initial, Some(0)) {
1971            return Err(RpcError::Codec {
1972                direction: CodecDirection::Encode,
1973                message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
1974                    .to_string(),
1975            });
1976        }
1977        // T1.3: per-service route cache (see PERF_AUDIT
1978        // 2026-05-19). One DashMap::get + Arc::clone instead of
1979        // 2 format! + 2 ChannelName::new + xxhash per call.
1980        let route = self.rpc_route_or_no_route(target_node_id, service)?;
1981        let self_origin = self.identity_origin_hash();
1982        self.ensure_reply_subscription(
1983            target_node_id,
1984            service,
1985            route.reply_channel.clone(),
1986            route.reply_hash,
1987        )
1988        .await?;
1989
1990        let call_id = mint_random_call_id();
1991        let pending = self.rpc_client_pending();
1992        let (terminal_rx, mut grant_rx) =
1993            pending.register_client_streaming(call_id, target_node_id);
1994
1995        // Build the header set + flags we'll queue for the initial
1996        // REQUEST (deferred to the first send / finish).
1997        let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST;
1998        let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
1999        if let Some(tc) = opts.trace_context.as_ref() {
2000            initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2001            initial_headers.extend(build_trace_headers(tc));
2002        }
2003        if let Some(window) = opts.request_window_initial {
2004            initial_headers.push((
2005                HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2006                window.to_string().into_bytes(),
2007            ));
2008        }
2009        initial_headers.extend(opts.request_headers.iter().cloned());
2010
2011        // Per-call credit semaphore when flow control is opted in.
2012        // Initial permits = the caller's declared window. Refilled
2013        // by REQUEST_GRANT events arriving on the reply channel,
2014        // pumped through `grant_rx` by the spawned `grant_pump`.
2015        let credit_sem = opts
2016            .request_window_initial
2017            .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2018        let grant_pump = credit_sem.as_ref().map(|sem| {
2019            let sem = Arc::clone(sem);
2020            tokio::spawn(async move {
2021                while let Some(credits) = grant_rx.recv().await {
2022                    add_request_grant_credits(&sem, credits);
2023                }
2024            })
2025        });
2026
2027        let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2028        let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2029        Ok(ClientStreamCallRaw {
2030            mesh: Arc::clone(self),
2031            target_node_id,
2032            request_channel: route.request_channel.clone(),
2033            self_origin,
2034            call_id,
2035            service: service.to_string(),
2036            initial_headers,
2037            initial_flags,
2038            deadline_ns,
2039            credit_sem,
2040            grant_pump,
2041            terminal_rx: Some(terminal_rx),
2042            state: ClientStreamState::JustOpened,
2043            started: Instant::now(),
2044            observer,
2045        })
2046    }
2047
2048    /// Register a duplex nRPC handler for `service`. Composes
2049    /// [`Self::serve_rpc_client_stream`] (request-side stream)
2050    /// with [`Self::serve_rpc_streaming`] (response-side multi-
2051    /// fire emit) via [`RpcDuplexFold`].
2052    ///
2053    /// Wires THREE emit callbacks:
2054    /// - Async [`RpcAsyncResponseEmitter`] for response chunks +
2055    ///   the terminal frame (per-call ordering required because
2056    ///   the response side is multi-fire).
2057    /// - [`RpcRequestGrantEmitter`] for upload-direction credit
2058    ///   grants (one per consumed request chunk when flow
2059    ///   control is opted into).
2060    ///
2061    /// Bidi streaming plan (Phase D).
2062    pub fn serve_rpc_duplex<H: RpcDuplexHandler>(
2063        self: &Arc<Self>,
2064        service: &str,
2065        handler: Arc<H>,
2066    ) -> Result<ServeHandle, ServeError> {
2067        let request_channel = ChannelName::new(&format!("{service}.requests"))
2068            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2069        let channel_hash = request_channel.hash();
2070        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2071
2072        let mesh_for_emit = Arc::clone(self);
2073        let service_for_emit = service.to_string();
2074        let server_origin = self.identity_origin_hash();
2075
2076        // Async response emitter — per-call ordering matters here
2077        // because the response side is multi-fire (same rationale
2078        // as serve_rpc_streaming).
2079        let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2080        let emit_resp_service = service_for_emit.clone();
2081        let emit_resp: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2082            let mesh = Arc::clone(&emit_resp_mesh);
2083            let service = emit_resp_service.clone();
2084            Box::pin(async move {
2085                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2086                let reply_channel = match ChannelName::new(&reply_channel_name) {
2087                    Ok(c) => c,
2088                    Err(e) => {
2089                        tracing::warn!(error = %e, channel = %reply_channel_name,
2090                                "rpc serve_rpc_duplex: invalid reply channel name");
2091                        return;
2092                    }
2093                };
2094                let meta = EventMeta::new(
2095                    super::cortex::DISPATCH_RPC_RESPONSE,
2096                    0,
2097                    server_origin,
2098                    call_id,
2099                    0,
2100                );
2101                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2102                buf.extend_from_slice(&meta.to_bytes());
2103                buf.extend_from_slice(&resp.encode());
2104                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
2105                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
2106                    tracing::warn!(error = %e,
2107                            caller_origin = format!("{:#x}", caller_origin),
2108                            call_id,
2109                            "rpc serve_rpc_duplex: chunk publish failed");
2110                }
2111            })
2112        });
2113
2114        // Request-direction grant emitter — same coalescing
2115        // drainer shape as serve_rpc_client_stream.
2116        let emit_grant = build_request_grant_emitter(
2117            Arc::clone(&mesh_for_emit),
2118            service_for_emit.clone(),
2119            server_origin,
2120            "serve_rpc_duplex",
2121        );
2122
2123        let metrics_handle = self.rpc_metrics_arc().for_service(service);
2124        let fold = Arc::new(Mutex::new(
2125            RpcDuplexFold::new(handler as Arc<dyn RpcDuplexHandler>, emit_resp)
2126                .with_grant_emitter(emit_grant)
2127                .with_metrics(metrics_handle),
2128        ));
2129        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2130            let _ = tx.try_send(ev);
2131        });
2132        if self
2133            .register_rpc_inbound(channel_hash, dispatcher)
2134            .is_some()
2135        {
2136            return Err(ServeError::AlreadyServing(service.to_string()));
2137        }
2138        let bridge = tokio::spawn(async move {
2139            while let Some(inbound) = rx.recv().await {
2140                let payload = inbound.payload;
2141                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2142                let ev = RedexEvent { entry, payload };
2143                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2144                    tracing::warn!(error = %e,
2145                        "rpc serve_rpc_duplex: fold apply error");
2146                }
2147            }
2148        });
2149        self.rpc_local_services_arc().insert(service.to_string());
2150        Ok(ServeHandle {
2151            channel_hash,
2152            service: service.to_string(),
2153            _bridge: bridge,
2154            mesh: Arc::clone(self),
2155        })
2156    }
2157
2158    /// Duplex variant of [`Self::call`]. Returns a
2159    /// [`DuplexCallRaw`] handle with both upload (`send`,
2160    /// `finish_sending`) and download (`next`, or impl
2161    /// `futures::Stream`) surfaces. Use `into_split` to peel off
2162    /// the two halves for the "encoder task + decoder task"
2163    /// shape.
2164    ///
2165    /// Initial REQUEST sets BOTH `FLAG_RPC_CLIENT_STREAMING_REQUEST`
2166    /// AND `FLAG_RPC_STREAMING_RESPONSE`. Lazy publish — the
2167    /// initial REQUEST flies on the first `send` (or on
2168    /// `finish_sending` for the zero-item degenerate path).
2169    ///
2170    /// Bidi streaming plan (Phase D).
2171    pub async fn call_duplex(
2172        self: &Arc<Self>,
2173        target_node_id: u64,
2174        service: &str,
2175        opts: CallOptions,
2176    ) -> Result<DuplexCallRaw, RpcError> {
2177        // Same deadlock guard as `call_client_stream`: Some(0)
2178        // means "send must await a credit that can never arrive"
2179        // because the initial REQUEST is lazy.
2180        if matches!(opts.request_window_initial, Some(0)) {
2181            return Err(RpcError::Codec {
2182                direction: CodecDirection::Encode,
2183                message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2184                    .to_string(),
2185            });
2186        }
2187        // T1.3: per-service route cache (see PERF_AUDIT
2188        // 2026-05-19). One DashMap::get + Arc::clone instead of
2189        // 2 format! + 2 ChannelName::new + xxhash per call.
2190        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2191        let self_origin = self.identity_origin_hash();
2192        self.ensure_reply_subscription(
2193            target_node_id,
2194            service,
2195            route.reply_channel.clone(),
2196            route.reply_hash,
2197        )
2198        .await?;
2199
2200        let call_id = mint_random_call_id();
2201        let pending = self.rpc_client_pending();
2202        let (chunks_rx, mut grant_rx) = pending.register_duplex(call_id, target_node_id);
2203
2204        let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
2205        let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2206        if let Some(tc) = opts.trace_context.as_ref() {
2207            initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2208            initial_headers.extend(build_trace_headers(tc));
2209        }
2210        if let Some(window) = opts.request_window_initial {
2211            initial_headers.push((
2212                HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2213                window.to_string().into_bytes(),
2214            ));
2215        }
2216        if let Some(window) = opts.stream_window_initial {
2217            initial_headers.push((
2218                HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2219                window.to_string().into_bytes(),
2220            ));
2221        }
2222        initial_headers.extend(opts.request_headers.iter().cloned());
2223
2224        let credit_sem = opts
2225            .request_window_initial
2226            .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2227        let grant_pump = credit_sem.as_ref().map(|sem| {
2228            let sem = Arc::clone(sem);
2229            tokio::spawn(async move {
2230                while let Some(credits) = grant_rx.recv().await {
2231                    add_request_grant_credits(&sem, credits);
2232                }
2233            })
2234        });
2235
2236        let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2237        let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2238        let inner = Arc::new(DuplexInner {
2239            mesh: Arc::clone(self),
2240            target_node_id,
2241            request_channel: route.request_channel.clone(),
2242            self_origin,
2243            call_id,
2244            initial_sent: std::sync::atomic::AtomicBool::new(false),
2245            clean_close: std::sync::atomic::AtomicBool::new(false),
2246            observer,
2247        });
2248        let sink = DuplexSink {
2249            inner: Arc::clone(&inner),
2250            service: service.to_string(),
2251            initial_headers,
2252            initial_flags,
2253            deadline_ns,
2254            credit_sem,
2255            grant_pump,
2256            state: ClientStreamState::JustOpened,
2257        };
2258        let stream = DuplexStream {
2259            inner,
2260            chunks_rx,
2261            done: false,
2262        };
2263        Ok(DuplexCallRaw { sink, stream })
2264    }
2265
2266    /// Streaming variant of [`Self::call`]. Returns an
2267    /// [`RpcStream`] that yields chunks (as `Result<Bytes, RpcError>`)
2268    /// until the server closes the stream.
2269    ///
2270    /// Sets `FLAG_RPC_STREAMING_RESPONSE` on the request so the
2271    /// server's streaming fold knows to expect multi-fire emits.
2272    /// Same lazy reply-subscription + direct-unicast REQUEST
2273    /// as the unary `call` path.
2274    ///
2275    /// Cancellation: dropping the returned `RpcStream` emits a
2276    /// CANCEL to the server (best-effort) and discards any
2277    /// in-flight chunks.
2278    pub async fn call_streaming(
2279        self: &Arc<Self>,
2280        target_node_id: u64,
2281        service: &str,
2282        payload: Bytes,
2283        opts: CallOptions,
2284    ) -> Result<RpcStream, RpcError> {
2285        // `stream_window_initial = Some(0)` would deadlock the
2286        // RESPONSE direction by default: server's pump awaits one
2287        // credit per chunk, the caller's auto-grant only fires on
2288        // consumed chunks, and the first chunk can never be
2289        // delivered. `None` means "unbounded credit"; any positive
2290        // value opts into flow control. Reject up front — symmetric
2291        // with the request-direction guard in `call_client_stream`.
2292        if matches!(opts.stream_window_initial, Some(0)) {
2293            return Err(RpcError::Codec {
2294                direction: CodecDirection::Encode,
2295                message: "stream_window_initial must be None or >= 1; Some(0) deadlocks the response pump"
2296                    .to_string(),
2297            });
2298        }
2299        // T1.3: per-service route cache. One DashMap::get + Arc::clone
2300        // on the hot path instead of 2 format! + 2 ChannelName::new +
2301        // xxhash per call.
2302        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2303        let self_origin = self.identity_origin_hash();
2304        self.ensure_reply_subscription(
2305            target_node_id,
2306            service,
2307            route.reply_channel.clone(),
2308            route.reply_hash,
2309        )
2310        .await?;
2311
2312        let call_id = mint_random_call_id();
2313        let pending = self.rpc_client_pending();
2314        // S-4 part 2: bind the pending entry to the wire-session
2315        // peer the request is dispatched to. The fold's deliver
2316        // gate rejects RESPONSE frames whose from_node doesn't
2317        // match, so a leaked call_id alone can't spoof a reply.
2318        let rx = pending.register_streaming(call_id, target_node_id);
2319
2320        // Build the REQUEST: STREAMING_RESPONSE flag plus optional
2321        // trace-context headers / propagate-trace flag, same as
2322        // unary `call`. Plus the optional flow-control header
2323        // (`nrpc-stream-window-initial`) when the caller opted in
2324        // via `CallOptions::stream_window_initial`.
2325        let mut flags = FLAG_RPC_STREAMING_RESPONSE;
2326        let mut headers = Vec::new();
2327        if let Some(tc) = opts.trace_context.as_ref() {
2328            flags |= FLAG_RPC_PROPAGATE_TRACE;
2329            headers.extend(build_trace_headers(tc));
2330        }
2331        if let Some(window) = opts.stream_window_initial {
2332            headers.push((
2333                HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2334                window.to_string().into_bytes(),
2335            ));
2336        }
2337        // Append caller-supplied request headers (Phase 9b — same
2338        // semantics as the unary `call` path).
2339        headers.extend(opts.request_headers.iter().cloned());
2340        let req = RpcRequestPayload {
2341            service: service.to_string(),
2342            deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2343            flags,
2344            headers,
2345            body: payload.clone(),
2346        };
2347        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2348        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2349        buf.extend_from_slice(&meta.to_bytes());
2350        buf.extend_from_slice(&req.encode());
2351
2352        let payload_bytes = Bytes::from(buf);
2353        if let Err(e) = self
2354            .publish_to_peer(
2355                target_node_id,
2356                route.request_channel_hash,
2357                route.request_stream_id,
2358                /* reliable */ true,
2359                std::slice::from_ref(&payload_bytes),
2360            )
2361            .await
2362        {
2363            pending.cancel(call_id);
2364            return Err(RpcError::Transport(e));
2365        }
2366
2367        let request_bytes_len = payload_bytes.len() as u32;
2368        Ok(RpcStream {
2369            mesh: Arc::clone(self),
2370            target_node_id,
2371            request_channel: route.request_channel.clone(),
2372            self_origin,
2373            call_id,
2374            inner: rx,
2375            done: false,
2376            stream_window: opts.stream_window_initial,
2377            observer: StreamingObserverState::new(
2378                Arc::clone(self),
2379                target_node_id,
2380                service,
2381                request_bytes_len,
2382            ),
2383        })
2384    }
2385
2386    /// Find every node currently advertising `service` via the
2387    /// `nrpc:<service>` capability tag. Returns node IDs in
2388    /// roster order; the caller picks one (or use [`Self::call_service`]
2389    /// for the round-robin shortcut).
2390    ///
2391    /// Pre-Phase 2: requires the target nodes to have called
2392    /// `serve_rpc` AND `announce_capabilities` so the
2393    /// `nrpc:<service>` tag has propagated through capability
2394    /// announcements. The local node's own services are NOT
2395    /// automatically included (callers don't typically invoke
2396    /// themselves via the network — for in-process invocation,
2397    /// the user has the handler directly).
2398    pub fn find_service_nodes(&self, service: &str) -> Vec<u64> {
2399        use crate::adapter::net::behavior::capability::CapabilityFilter;
2400        let tag = format!("nrpc:{service}");
2401        let filter = CapabilityFilter::default().require_tag(tag);
2402        self.capability_index_arc().query(&filter)
2403    }
2404
2405    /// Issue an RPC call to `service`, picking one node from
2406    /// those advertising the `nrpc:<service>` tag in the local
2407    /// capability index according to `opts.routing_policy`.
2408    ///
2409    /// Returns `RpcError::NoRoute` if no nodes advertise the
2410    /// service (or if `opts.filter_unhealthy` is set and every
2411    /// candidate is unavailable per the local `ProximityGraph`).
2412    pub async fn call_service(
2413        self: &Arc<Self>,
2414        service: &str,
2415        payload: Bytes,
2416        opts: CallOptions,
2417    ) -> Result<RpcReply, RpcError> {
2418        let mut candidates = self.find_service_nodes(service);
2419        if candidates.is_empty() {
2420            return Err(RpcError::NoRoute {
2421                target: 0,
2422                reason: format!(
2423                    "no nodes advertise `nrpc:{service}` (have any servers \
2424                     for this service called serve_rpc + announce_capabilities?)"
2425                ),
2426            });
2427        }
2428
2429        // Health filtering. Skip candidates the proximity graph
2430        // marks unhealthy (`!is_available()`). Candidates with no
2431        // proximity entry at all are KEPT — absence of evidence
2432        // is not evidence of unhealth, and a freshly-announced
2433        // service shouldn't be filtered just because pingwaves
2434        // haven't propagated yet.
2435        //
2436        // The bridge: each candidate's session-layer `node_id: u64`
2437        // is mapped to the entity-layer `[u8; 32]` via
2438        // `MeshNode::entity_id_for_node`. The proximity graph is
2439        // keyed on the entity id.
2440        if opts.filter_unhealthy {
2441            let proximity = self.proximity_graph();
2442            candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
2443                Some(entity_id) => match proximity.get_node(&entity_id) {
2444                    Some(node) => node.is_available(),
2445                    None => true, // no proximity data → keep
2446                },
2447                None => true, // no entity-id mapping → keep
2448            });
2449            if candidates.is_empty() {
2450                return Err(RpcError::NoRoute {
2451                    target: 0,
2452                    reason: format!(
2453                        "every node advertising `nrpc:{service}` is marked \
2454                         unhealthy by the local proximity graph",
2455                    ),
2456                });
2457            }
2458        }
2459
2460        // Sort once so consistent-hash policies (Sticky) produce
2461        // a stable ordering across calls regardless of how the
2462        // capability index returned the candidates, and so the
2463        // LowestLatency-with-no-proximity-data fallback is
2464        // deterministic. Cheap — the candidate set is typically
2465        // small.
2466        candidates.sort_unstable();
2467
2468        // v0.4 capability-auth caller-side gate. Filter the
2469        // candidate set BEFORE target selection so the routing
2470        // policy never picks a peer the caller can't actually
2471        // reach. Pre-fix `select_target` could pick a denied
2472        // candidate even when authorized peers existed in the
2473        // set, and the resulting `CapabilityDenied` masked the
2474        // fact that the call would have succeeded against B or
2475        // C. Each candidate's own announcement lists
2476        // `nrpc:<service>` (otherwise it wouldn't be a
2477        // `find_service_nodes` candidate), so the gate's
2478        // `has_tag` arm short-circuits in the common case; the
2479        // new work is the allow-list scan. Permissive
2480        // announcements (all three lists empty) admit any
2481        // caller — the byte-identity wire-compat tests pin that
2482        // an unmodified peer's announcement stays unrestricted.
2483        // See `docs/plans/CAPABILITY_AUTH_PLAN.md` §3.
2484        let tag = format!("nrpc:{service}");
2485        let index = self.capability_index_arc();
2486        let self_id = self.node_id();
2487        let any_candidate = candidates[0];
2488        candidates.retain(|c| index.may_execute(*c, &tag, self_id));
2489        if candidates.is_empty() {
2490            return Err(RpcError::CapabilityDenied {
2491                // No authorized target; surface one of the
2492                // originally-advertised candidates so the caller
2493                // can correlate the denial with a real peer. The
2494                // semantic is "no peer advertising `nrpc:<service>`
2495                // authorizes this caller" — `any_candidate` is a
2496                // representative, not necessarily the strictest.
2497                target: any_candidate,
2498                capability: service.to_string(),
2499            });
2500        }
2501
2502        let target = self.select_target(&candidates, &opts.routing_policy);
2503        self.call(target, service, payload, opts).await
2504    }
2505
2506    /// Select a single target from `candidates` according to
2507    /// `policy`. Caller has already ensured `candidates` is
2508    /// non-empty and sorted (so `Sticky` is consistent across
2509    /// calls).
2510    fn select_target(&self, candidates: &[u64], policy: &RoutingPolicy) -> u64 {
2511        match policy {
2512            RoutingPolicy::RoundRobin => {
2513                // `fetch_add(1)` on a dedicated cursor — NOT a
2514                // `load(call_id)` — so two concurrent
2515                // `call_service` invocations always observe
2516                // distinct values and pick distinct targets.
2517                let n = self
2518                    .rpc_round_robin_cursor_arc()
2519                    .fetch_add(1, Ordering::Relaxed);
2520                candidates[(n as usize) % candidates.len()]
2521            }
2522            RoutingPolicy::Random => {
2523                // Lightweight RNG via a fresh fetch_add (same
2524                // counter, separate per-call value) mixed through
2525                // xxh3. Sufficient for load distribution;
2526                // not cryptographically random.
2527                let n = self
2528                    .rpc_round_robin_cursor_arc()
2529                    .fetch_add(1, Ordering::Relaxed);
2530                let mixed = xxhash_rust::xxh3::xxh3_64(&n.to_le_bytes());
2531                candidates[(mixed as usize) % candidates.len()]
2532            }
2533            RoutingPolicy::Sticky { key } => {
2534                // Consistent-hash to a position in the (sorted)
2535                // candidate list. Same key + same candidate set =
2536                // same target. A change to the candidate set
2537                // (server failover) reshuffles roughly 1/N of keys.
2538                let h = xxhash_rust::xxh3::xxh3_64(&key.to_le_bytes());
2539                candidates[(h as usize) % candidates.len()]
2540            }
2541            RoutingPolicy::LowestLatency => {
2542                // Walk candidates, look up each via the bridge
2543                // → proximity graph, pick the smallest
2544                // `latency_us`. Candidates without a proximity
2545                // entry (no observed pingwave or no entity-id
2546                // mapping yet) are treated as `u64::MAX` so they
2547                // sort to the bottom — a known-fast node beats an
2548                // unknown one.
2549                //
2550                // Determinism on tie / no-data: `best_node` starts
2551                // at `candidates[0]` (the lexicographically first
2552                // sorted candidate), so all-ties or all-unknown
2553                // collapse to that consistent fallback.
2554                let proximity = self.proximity_graph();
2555                let mut best_node = candidates[0];
2556                let mut best_latency = u64::MAX;
2557                for &node_id in candidates {
2558                    let lat = self
2559                        .entity_id_for_node(node_id)
2560                        .and_then(|eid| proximity.get_node(&eid))
2561                        .map(|n| n.latency_us)
2562                        .unwrap_or(u64::MAX);
2563                    if lat < best_latency {
2564                        best_latency = lat;
2565                        best_node = node_id;
2566                    }
2567                }
2568                best_node
2569            }
2570        }
2571    }
2572
2573    /// Issue an RPC call to `target_node_id` for `service`.
2574    ///
2575    /// Phase 1 — direct entity-to-entity addressing. The caller
2576    /// specifies which target to send to; service discovery (the
2577    /// "find me a healthy instance of X" lookup) is Phase 2.
2578    ///
2579    /// Lazily subscribes the local node's `RpcClientFold` to
2580    /// `<service>.replies.<self_origin>` from `target_node_id` on
2581    /// the first call to that (target, service) pair. The
2582    /// subscription is reused across subsequent calls.
2583    ///
2584    /// On `opts.deadline` expiring OR the future being dropped,
2585    /// emits a CANCEL event so the server can drop the in-flight
2586    /// handler.
2587    pub async fn call(
2588        self: &Arc<Self>,
2589        target_node_id: u64,
2590        service: &str,
2591        payload: Bytes,
2592        opts: CallOptions,
2593    ) -> Result<RpcReply, RpcError> {
2594        // `started_total` brackets the entire call for the
2595        // `RpcObserver` latency field; the substrate-internal
2596        // `started` further down (set after the subscription
2597        // setup) drives the existing `RpcReply::latency_ns`
2598        // accounting so observers and Prometheus metrics
2599        // measure slightly different spans but stay consistent
2600        // within their own surface.
2601        let started_total = Instant::now();
2602        let request_bytes_len = payload.len() as u32;
2603        // Per-service route cache: one `DashMap::get(&str)` +
2604        // `Arc::clone` on the hot path instead of 2 `format!` +
2605        // 2 `ChannelName::new` + xxhash per call (T1.3 perf audit
2606        // — `docs/misc/PERF_AUDIT_2026_05_19_NRPC.md`).
2607        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2608        let self_origin = self.identity_origin_hash();
2609
2610        // Caller-side metrics guard. Bumps `in_flight` immediately;
2611        // each early-return path calls `metrics_guard.record(...)`
2612        // with the outcome, and Drop records the latency + bumps
2613        // the matching counter. A future dropped before any
2614        // `record(...)` call (e.g. a hedge loser) leaves the guard
2615        // with `outcome = None` so `in_flight` decrements but no
2616        // outcome is double-counted.
2617        let metrics_registry = self.rpc_metrics_arc();
2618        let mut metrics_guard = CallMetricsGuard::new(metrics_registry.for_service(service));
2619
2620        // Lazy reply-channel subscription. Once per (target, service).
2621        // Reply channel + hash come from the cached `RpcRoute`; we
2622        // only `.clone()` the `ChannelName` (cheap — internally an
2623        // Arc<str>) instead of building it from scratch.
2624        if let Err(e) = self
2625            .ensure_reply_subscription(
2626                target_node_id,
2627                service,
2628                route.reply_channel.clone(),
2629                route.reply_hash,
2630            )
2631            .await
2632        {
2633            metrics_guard.record(CallOutcome::NoRoute);
2634            self.fire_rpc_observer_outbound(
2635                target_node_id,
2636                service,
2637                started_total.elapsed().as_millis() as u32,
2638                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(e.to_string()),
2639                request_bytes_len,
2640                0,
2641            );
2642            return Err(e);
2643        }
2644
2645        // Allocate a fresh call_id. Random u64 from getrandom; a
2646        // sequential counter would let any session peer that
2647        // observed one of their own call_ids predict the next-
2648        // allocated ids and ship spoofed RESPONSE frames on the
2649        // victim's reply channel. Random u64 collides with
2650        // probability 2^-64 per call and is unguessable from
2651        // another peer's perspective.
2652        let call_id = mint_random_call_id();
2653
2654        // Register the oneshot before publishing the REQUEST so a
2655        // very-fast RESPONSE doesn't arrive before we're ready.
2656        // S-4 part 2: bind the pending entry to target_node_id so
2657        // the fold's deliver gate rejects spoofed RESPONSE frames
2658        // arriving from any other session peer.
2659        let pending = self.rpc_client_pending();
2660        let rx = pending.register(call_id, target_node_id);
2661
2662        // Build the REQUEST envelope. If a trace context is set,
2663        // emit `traceparent` / `tracestate` headers and signal
2664        // via `FLAG_RPC_PROPAGATE_TRACE` so the server's fold
2665        // populates `RpcContext::trace_context`.
2666        let (flags, mut headers) = match opts.trace_context.as_ref() {
2667            Some(tc) => (FLAG_RPC_PROPAGATE_TRACE, build_trace_headers(tc)),
2668            None => (0u16, Vec::new()),
2669        };
2670        // Append caller-supplied request headers (e.g. the
2671        // `net-where` predicate header for Phase 9b
2672        // predicate-pushdown). Auto-generated headers come first
2673        // so name collisions resolve to caller-overrides via the
2674        // server-side `predicate_from_rpc_headers` first-match
2675        // semantics.
2676        headers.extend(opts.request_headers.iter().cloned());
2677        let req = RpcRequestPayload {
2678            service: service.to_string(),
2679            deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2680            flags,
2681            headers,
2682            body: payload.clone(),
2683        };
2684        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2685        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2686        buf.extend_from_slice(&meta.to_bytes());
2687        buf.extend_from_slice(&req.encode());
2688
2689        // Send the REQUEST directly to `target_node_id` via
2690        // `publish_to_peer`, bypassing the local subscriber roster
2691        // lookup. The roster-based `Mesh::publish` would consult
2692        // `dispatch_recipients(channel)` against the caller's local
2693        // roster, which has no knowledge of who serves this service
2694        // (no Subscribe message ever propagated from the server back
2695        // to the caller — `serve_rpc` is local-only). For Phase 1
2696        // direct addressing we know the target, so direct-send is
2697        // the right primitive.
2698        //
2699        // The receiver routes via the per-channel-hash dispatcher
2700        // hook (channel_hash is stamped on the wire by
2701        // publish_to_peer).
2702        let started = Instant::now();
2703        // Request channel hash + stream_id come from the cached
2704        // route — no `ChannelId::new` clone + xxhash per call.
2705        let payload_bytes = Bytes::from(buf);
2706        if let Err(e) = self
2707            .publish_to_peer(
2708                target_node_id,
2709                route.request_channel_hash,
2710                route.request_stream_id,
2711                /* reliable */ true,
2712                std::slice::from_ref(&payload_bytes),
2713            )
2714            .await
2715        {
2716            pending.cancel(call_id);
2717            // Distinguish "I don't know how to reach this peer"
2718            // from a generic transport blip: when the publish path
2719            // surfaces a no-session error, that's NoRoute (the
2720            // routing layer's job, retry won't help). Other
2721            // transport errors stay as Transport so retry is
2722            // applicable.
2723            let err = if classify_publish_no_session(&e) {
2724                metrics_guard.record(CallOutcome::NoRoute);
2725                RpcError::NoRoute {
2726                    target: target_node_id,
2727                    reason: e.to_string(),
2728                }
2729            } else {
2730                metrics_guard.record(CallOutcome::Transport);
2731                RpcError::Transport(e)
2732            };
2733            self.fire_rpc_observer_outbound(
2734                target_node_id,
2735                service,
2736                started_total.elapsed().as_millis() as u32,
2737                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(err.to_string()),
2738                request_bytes_len,
2739                0,
2740            );
2741            return Err(err);
2742        }
2743
2744        // From here on, the REQUEST is in flight on the server.
2745        // Wrap the rest of the call in an RAII guard whose Drop
2746        // fires CANCEL if `guard.completed` isn't set — covering:
2747        //  - the call future being dropped mid-flight (e.g. hedge
2748        //    loser, select!-cancelled future, caller awaiting a
2749        //    `JoinHandle` that gets cancelled).
2750        //  - the timeout path (we leave `completed=false` so Drop
2751        //    handles CANCEL emission; no need for a separate
2752        //    `send_rpc_cancel` call).
2753        let mut guard = UnaryCallGuard {
2754            pending: Arc::clone(&pending),
2755            mesh: Arc::clone(self),
2756            target_node_id,
2757            request_channel: route.request_channel.clone(),
2758            self_origin,
2759            call_id,
2760            completed: false,
2761        };
2762
2763        // Race the receiver against the deadline.
2764        let outcome: Result<Result<RpcResponsePayload, _>, tokio::time::error::Elapsed> =
2765            match opts.deadline {
2766                None => Ok(rx.await),
2767                Some(deadline) => {
2768                    let timeout_at = deadline.saturating_duration_since(Instant::now());
2769                    tokio::time::timeout(timeout_at, rx).await
2770                }
2771            };
2772
2773        let resp = match outcome {
2774            Ok(Ok(resp)) => {
2775                guard.completed = true;
2776                resp
2777            }
2778            Ok(Err(_recv_err)) => {
2779                // Sender dropped externally — pending entry is
2780                // already gone (someone else removed it). Mark
2781                // completed so Drop doesn't fire a useless CANCEL
2782                // for a server that's no longer tracking this id.
2783                guard.completed = true;
2784                metrics_guard.record(CallOutcome::Transport);
2785                let err = RpcError::Transport(AdapterError::Connection(
2786                    "rpc client pending sender dropped (no response will arrive)".into(),
2787                ));
2788                self.fire_rpc_observer_outbound(
2789                    target_node_id,
2790                    service,
2791                    started_total.elapsed().as_millis() as u32,
2792                    crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(
2793                        err.to_string(),
2794                    ),
2795                    request_bytes_len,
2796                    0,
2797                );
2798                return Err(err);
2799            }
2800            Err(_elapsed) => {
2801                // Timeout: leave `completed=false` so Drop emits
2802                // CANCEL automatically; surface Timeout to caller.
2803                metrics_guard.record(CallOutcome::Timeout);
2804                self.fire_rpc_observer_outbound(
2805                    target_node_id,
2806                    service,
2807                    started_total.elapsed().as_millis() as u32,
2808                    crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
2809                    request_bytes_len,
2810                    0,
2811                );
2812                return Err(RpcError::Timeout {
2813                    elapsed_ms: started.elapsed().as_millis() as u64,
2814                });
2815            }
2816        };
2817
2818        // Map the wire status onto the public Result type.
2819        if resp.status.is_ok() {
2820            metrics_guard.record(CallOutcome::Ok);
2821            let response_bytes_len = resp.body.len() as u32;
2822            self.fire_rpc_observer_outbound(
2823                target_node_id,
2824                service,
2825                started_total.elapsed().as_millis() as u32,
2826                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
2827                request_bytes_len,
2828                response_bytes_len,
2829            );
2830            Ok(RpcReply {
2831                body: resp.body,
2832                headers: resp.headers,
2833                latency_ns: started.elapsed().as_nanos() as u64,
2834            })
2835        } else {
2836            metrics_guard.record(CallOutcome::ServerError);
2837            let status = resp.status.to_wire();
2838            let response_bytes_len = resp.body.len() as u32;
2839            let message = String::from_utf8(resp.body.to_vec())
2840                .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
2841            self.fire_rpc_observer_outbound(
2842                target_node_id,
2843                service,
2844                started_total.elapsed().as_millis() as u32,
2845                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(message.clone()),
2846                request_bytes_len,
2847                response_bytes_len,
2848            );
2849            // v0.4 capability-auth: callee-side defense-in-depth
2850            // surfaces as a wire `CapabilityDenied` status. Map it
2851            // back to the typed `RpcError::CapabilityDenied` so
2852            // application code sees the same variant regardless of
2853            // which side of the gate fired.
2854            if matches!(resp.status, RpcStatus::CapabilityDenied) {
2855                return Err(RpcError::CapabilityDenied {
2856                    target: target_node_id,
2857                    capability: service.to_string(),
2858                });
2859            }
2860            Err(RpcError::ServerError { status, message })
2861        }
2862    }
2863
2864    // ----------------------------------------------------------------
2865    // Internal helpers.
2866    // ----------------------------------------------------------------
2867
2868    /// Lazy-subscribe `reply_channel` from `target_node_id` and
2869    /// register an inbound dispatcher that drives the per-Mesh
2870    /// `RpcClientFold`. Idempotent — subsequent calls for the
2871    /// same (target, service) pair are no-ops.
2872    ///
2873    /// **Bounded** at [`MAX_REPLY_SUBSCRIPTIONS`]: a caller talking
2874    /// to many short-lived (target, service) pairs would otherwise
2875    /// grow the registry indefinitely. Past the cap we refuse the
2876    /// new subscription with `NoRoute` rather than evict an
2877    /// existing one (eviction could rip out a healthy in-flight
2878    /// reply path).
2879    ///
2880    /// **Dispatcher reuse**: the reply-channel name embeds the
2881    /// CALLER's `self_origin`, NOT the target's, so a single
2882    /// caller talking to multiple servers for the same service
2883    /// reuses the same reply channel (same hash). We register the
2884    /// dispatcher only if the slot is unoccupied; subsequent
2885    /// (target, service) pairs that hash to the same slot are
2886    /// allowed to share the existing dispatcher (which routes to
2887    /// the same per-Mesh `pending` map regardless of target). A
2888    /// genuine cross-service hash collision is detected at
2889    /// `serve_rpc` time (the AlreadyServing path) for the server
2890    /// side; on the caller side here, sharing the dispatcher is
2891    /// the correct behavior because all RESPONSE events route
2892    /// through the same `RpcClientPending` keyed by `call_id`.
2893    async fn ensure_reply_subscription(
2894        self: &Arc<Self>,
2895        target_node_id: u64,
2896        service: &str,
2897        reply_channel: ChannelName,
2898        reply_hash: ChannelHash,
2899    ) -> Result<(), RpcError> {
2900        let registry = self.rpc_reply_subscriptions_arc();
2901        {
2902            let entries = registry.lock();
2903            if entries
2904                .iter()
2905                .any(|(t, s)| *t == target_node_id && s == service)
2906            {
2907                return Ok(());
2908            }
2909            // Cap the registry. New entries past the cap are
2910            // refused — caller should reuse an existing
2911            // (target, service) pair or operate on fewer.
2912            if entries.len() >= MAX_REPLY_SUBSCRIPTIONS {
2913                return Err(RpcError::NoRoute {
2914                    target: target_node_id,
2915                    reason: format!(
2916                        "reply-subscription registry at cap ({} entries); refusing new \
2917                         (target={target_node_id:#x}, service={service:?}). Caller should \
2918                         reuse an existing target+service pair or shrink the active set.",
2919                        MAX_REPLY_SUBSCRIPTIONS,
2920                    ),
2921                });
2922            }
2923        }
2924
2925        // Subscribe to our own reply channel from the target so the
2926        // target's roster has us as a subscriber when the server's
2927        // emit closure publishes the RESPONSE.
2928        self.subscribe_channel(target_node_id, reply_channel.clone())
2929            .await
2930            .map_err(|e| RpcError::NoRoute {
2931                target: target_node_id,
2932                reason: e.to_string(),
2933            })?;
2934
2935        // Register the inbound dispatcher only if the slot is
2936        // unoccupied. The reply-channel name embeds *self_origin*,
2937        // not the target, so multiple targets serving the same
2938        // service share one reply channel + one dispatcher. The
2939        // existing dispatcher routes to the same per-Mesh
2940        // `RpcClientPending` keyed by call_id, so reuse is safe.
2941        if !self.rpc_inbound_dispatcher_registered(reply_hash) {
2942            let pending = self.rpc_client_pending();
2943            let fold = Arc::new(Mutex::new(RpcClientFold::new(pending)));
2944            // S-4 part 2: use `apply_inbound` so the wire-session
2945            // peer's NodeId (resolved in mesh.rs's dispatch site)
2946            // flows into the fold's deliver gate. The legacy
2947            // `RedexFold::apply` shim delivers with from_node=0,
2948            // which would defeat the binding.
2949            let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2950                fold.lock().apply_inbound(&ev);
2951            });
2952            // Race-safe: a concurrent caller might have just
2953            // registered between our check and our insert. In that
2954            // case `register_rpc_inbound` returns the prior
2955            // dispatcher; our new fresh fold is dropped here, and
2956            // the prior dispatcher (which routes to the same
2957            // shared `pending`) keeps doing the job. No collision
2958            // — both folds are functionally equivalent.
2959            if let Some(prev) = self.register_rpc_inbound(reply_hash, dispatcher) {
2960                // Roll back: keep the prior dispatcher (it's
2961                // already wired to the same shared pending map).
2962                let _ = self.register_rpc_inbound(reply_hash, prev);
2963            }
2964        }
2965
2966        let _ = reply_hash; // captured into the dispatcher above; surfaced for debug
2967        registry.lock().push((target_node_id, service.to_string()));
2968        Ok(())
2969    }
2970}
2971
2972/// Hard cap on the number of distinct (target_node_id, service)
2973/// pairs the caller-side reply-subscription registry will hold.
2974/// Past the cap, the lazy-subscribe path inside [`MeshNode::call`]
2975/// refuses new entries with [`RpcError::NoRoute`]. 1024 is
2976/// generous for any realistic deployment — a caller that needs
2977/// more should reuse existing reply paths.
2978pub const MAX_REPLY_SUBSCRIPTIONS: usize = 1024;
2979
2980/// Mint a random 64-bit call_id from `getrandom` entropy. Used as
2981/// the correlation token for REQUEST/RESPONSE pairing. The fold
2982/// keys pending oneshots on this value; any session peer with
2983/// publish access to the reply channel could ship a forged
2984/// RESPONSE if it could guess the value. Sequential u64s are
2985/// predictable from any peer that observes a single allocation;
2986/// random u64s collide with 2^-64 probability per call and are
2987/// independent across peers.
2988///
2989/// Falls back to a zero call_id on entropy failure — that
2990/// effectively disables correlation for this call (the oneshot
2991/// will time out) rather than panic, but in practice
2992/// `getrandom::fill` failure is a fatal-environment signal
2993/// (no `/dev/urandom`, broken syscall) and the broader stack
2994/// won't be functional anyway.
2995fn mint_random_call_id() -> u64 {
2996    let mut buf = [0u8; 8];
2997    if getrandom::fill(&mut buf).is_err() {
2998        return 0;
2999    }
3000    u64::from_le_bytes(buf)
3001}
3002
3003// ============================================================================
3004// Internal: tiny shims so the `serve_rpc` / `call` impls stay
3005// readable. The underlying state lives on `MeshNode`; these just
3006// rename the accessor methods locally.
3007// ============================================================================
3008
3009impl MeshNode {
3010    fn rpc_client_pending(&self) -> Arc<super::cortex::RpcClientPending> {
3011        self.rpc_client_pending_arc()
3012    }
3013    fn identity_origin_hash(&self) -> u64 {
3014        self.public_key_origin_hash()
3015    }
3016
3017    /// Caller-side helper that pairs `rpc_route_for_service` with
3018    /// the `RpcError::NoRoute { target, reason }` mapping every
3019    /// `Mesh::call*` entry point needs. Returning `Arc<RpcRoute>`
3020    /// keeps the hot-path allocation profile of the cache intact
3021    /// (one refcount bump per caller).
3022    fn rpc_route_or_no_route(
3023        &self,
3024        target_node_id: u64,
3025        service: &str,
3026    ) -> Result<Arc<super::mesh::RpcRoute>, RpcError> {
3027        self.rpc_route_for_service(service)
3028            .map_err(|reason| RpcError::NoRoute {
3029                target: target_node_id,
3030                reason,
3031            })
3032    }
3033}
3034
3035// `proximity_graph()` is already a public accessor on MeshNode
3036// (see the existing `pub fn proximity_graph(&self) -> &Arc<...>`).
3037// `select_target` uses it directly; no shim needed.
3038
3039// ============================================================================
3040// Errors.
3041// ============================================================================
3042
3043/// Errors returned by [`MeshNode::serve_rpc`].
3044#[derive(Debug, thiserror::Error)]
3045pub enum ServeError {
3046    /// The service name fails channel-name validation.
3047    #[error("invalid service name: {0}")]
3048    InvalidServiceName(String),
3049    /// A handler for this service is already registered on this
3050    /// node. Drop the prior `ServeHandle` to free the slot.
3051    #[error("already serving service `{0}` on this node")]
3052    AlreadyServing(String),
3053}
3054
3055// ============================================================================
3056// Helpers.
3057// ============================================================================
3058
3059/// Detect the "no session to the target node id" sub-case of
3060/// [`AdapterError::Connection`]. The publish path can surface
3061/// this through one of two messages depending on which inner
3062/// helper landed it:
3063///
3064///   - `"publish: no session for subscriber {hash}"` — emitted
3065///     by `mesh.rs::publish_to_peer` when the subscriber-roster
3066///     path can't find an active session.
3067///   - `"no session to publisher {hash}"` — emitted by the lower
3068///     mesh.rs send path when there's no active session to the
3069///     target's publisher record at all.
3070///
3071/// Both mean "I can't reach this peer". When we observe either,
3072/// we surface as [`RpcError::NoRoute`] rather than `Transport`
3073/// because retrying the same target without a session is
3074/// pointless and the right behavior for a routing helper is to
3075/// try a different target.
3076fn classify_publish_no_session(err: &AdapterError) -> bool {
3077    match err {
3078        AdapterError::Connection(msg) => {
3079            msg.contains("no session for subscriber") || msg.contains("no session to publisher")
3080        }
3081        _ => false,
3082    }
3083}
3084
3085fn instant_to_unix_nanos(instant: Instant) -> u64 {
3086    // `Instant` is monotonic and not wall-clock — convert via the
3087    // delta from now plus current SystemTime. The result drifts
3088    // marginally with wall-clock skew but is good enough for
3089    // server-side deadline-already-passed short-circuits (which are
3090    // the only consumer of `deadline_ns`).
3091    let now_instant = Instant::now();
3092    let now_wall = std::time::SystemTime::now()
3093        .duration_since(std::time::UNIX_EPOCH)
3094        .map(|d| d.as_nanos() as u64)
3095        .unwrap_or(0);
3096    if instant >= now_instant {
3097        let delta = instant.duration_since(now_instant);
3098        now_wall.saturating_add(delta.as_nanos() as u64)
3099    } else {
3100        let delta = now_instant.duration_since(instant);
3101        now_wall.saturating_sub(delta.as_nanos() as u64)
3102    }
3103}
3104
3105#[allow(dead_code)]
3106fn _ensure_send_sync() {
3107    fn assert_send_sync<T: Send + Sync>() {}
3108    assert_send_sync::<ServeHandle>();
3109    assert_send_sync::<RpcCancellationToken>();
3110    assert_send_sync::<RpcContext>();
3111    assert_send_sync::<RpcHandlerError>();
3112    assert_send_sync::<RpcStatus>();
3113    assert_send_sync::<RpcReply>();
3114    assert_send_sync::<CallOptions>();
3115}