Skip to main content

net/adapter/net/
mesh_rpc.rs

1//! `Mesh::serve_rpc` / `Mesh::call` glue — the wire-up between
2//! `MeshNode`'s pub/sub + per-channel-hash dispatch hook and the
3//! `cortex::rpc` server / client folds.
4//!
5//! See `docs/misc/NRPC_DESIGN.md` for the full architectural framing.
6//! In short:
7//!
8//! - `serve_rpc(service, handler)` registers an inbound dispatcher
9//!   for `<service>.requests`'s channel hash. The dispatcher pushes
10//!   inbound REQUEST/CANCEL events through the
11//!   [`crate::adapter::net::cortex::RpcServerFold`], which spawns
12//!   the user handler. The fold's emit closure publishes RESPONSE
13//!   events on `<service>.replies.<caller_origin>` via
14//!   [`MeshNode::publish`].
15//!
16//! - `call(target, service, payload, opts)` allocates a `call_id`,
17//!   registers a oneshot in the per-Mesh `RpcClientPending`,
18//!   subscribes to its own reply channel from `target` (lazy,
19//!   cached), publishes the REQUEST envelope on `<service>.requests`,
20//!   awaits the oneshot. Drop sends a CANCEL.
21//!
22//! Phase 1 surface — direct entity-to-entity addressing
23//! (`call(target_node_id, ...)`), no service discovery layer yet.
24//! Phase 2 will add `call_service(name, ...)` over the existing
25//! capability-announcement registry.
26
27use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31use bytes::Bytes;
32use parking_lot::Mutex;
33use tokio::sync::mpsc;
34use tokio::task::JoinHandle;
35
36use super::channel::{ChannelHash, ChannelId, ChannelName, ChannelPublisher, PublishConfig};
37use super::cortex::{
38    build_trace_headers, encode_request_grant, encode_stream_grant, EventMeta,
39    RpcAsyncResponseEmitter, RpcCancellationToken, RpcClientFold, RpcClientStreamingHandler,
40    RpcContext, RpcDuplexFold, RpcDuplexHandler, RpcHandler, RpcHandlerError, RpcInboundDispatcher,
41    RpcInboundEvent, RpcRequestChunkPayload, RpcRequestGrantEmitter, RpcRequestPayload,
42    RpcResponseEmitter, RpcResponsePayload, RpcServerFold, RpcServerStreamingFold, RpcStatus,
43    RpcStreamingHandler, RpcStreamingRequestFold, StreamItem, TraceContext, DISPATCH_RPC_CANCEL,
44    DISPATCH_RPC_REQUEST, DISPATCH_RPC_REQUEST_CHUNK, DISPATCH_RPC_REQUEST_GRANT,
45    DISPATCH_RPC_STREAM_GRANT, EVENT_META_SIZE, FLAG_RPC_CLIENT_STREAMING_REQUEST,
46    FLAG_RPC_PROPAGATE_TRACE, FLAG_RPC_REQUEST_END, FLAG_RPC_STREAMING_RESPONSE,
47    HEADER_NRPC_REQUEST_WINDOW_INITIAL, HEADER_NRPC_STREAM_WINDOW_INITIAL,
48};
49use super::mesh_rpc_metrics::{CallMetricsGuard, CallOutcome};
50use crate::error::AdapterError;
51
52use super::mesh::MeshNode;
53use super::redex::{RedexEntry, RedexEvent, RedexFold};
54
55// ============================================================================
56// Public types.
57// ============================================================================
58
59/// How `Mesh::call_service` picks a target from the set of nodes
60/// advertising the requested service.
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
62pub enum RoutingPolicy {
63    /// Naive round-robin via the per-Mesh `call_id` counter.
64    /// Distributes calls evenly across candidates regardless of
65    /// load. The default.
66    #[default]
67    RoundRobin,
68    /// Pick a candidate at random per call. Stateless, cheap, and
69    /// gives even distribution under independent calls.
70    Random,
71    /// Consistent-hash to a target by `key`. Same `key` always
72    /// hits the same target as long as the candidate set is
73    /// stable. Useful for session affinity (route a given
74    /// conversation / shard / user to the same backend).
75    Sticky {
76        /// Caller-supplied identifier — hash maps this to the
77        /// target. Use a session id, shard key, or conversation
78        /// id depending on the application.
79        key: u64,
80    },
81    /// Pick the candidate with the smallest measured `latency_us`
82    /// per the local `ProximityGraph`. Candidates the proximity
83    /// graph hasn't observed yet (no entity ↔ node_id mapping or
84    /// no pingwave received) sort to the bottom — better to pick
85    /// a known-fast node than gamble on an unknown one.
86    ///
87    /// Falls back deterministically to the first sorted candidate
88    /// when no candidates have proximity data, so a freshly-
89    /// discovered service still routes consistently.
90    LowestLatency,
91}
92
93/// Options for [`MeshNode::call`] and [`MeshNode::call_service`].
94#[derive(Debug, Clone)]
95pub struct CallOptions {
96    /// Hard deadline for the call. The future returned by `call`
97    /// races a `tokio::time::sleep_until`; whichever fires first
98    /// wins. On timeout the caller emits a CANCEL event for
99    /// `call_id` so the server can drop the in-flight handler.
100    /// `None` means no deadline; the caller waits indefinitely
101    /// (or until the future is dropped).
102    pub deadline: Option<Instant>,
103    /// How `call_service` picks a target. Ignored by `call`
104    /// (which takes an explicit `target_node_id`). Default:
105    /// `RoundRobin`.
106    pub routing_policy: RoutingPolicy,
107    /// Skip candidates whose `ProximityGraph` entry reports
108    /// `!is_available()` (i.e. `Unhealthy` or `Unknown`).
109    /// Default `true`. Candidates with no proximity entry at all
110    /// are KEPT — absence of evidence is not evidence of
111    /// unhealth, and a freshly-announced service shouldn't be
112    /// filtered just because pingwaves haven't propagated yet.
113    pub filter_unhealthy: bool,
114    /// W3C Trace Context to propagate to the server. When `Some`,
115    /// the call sets `FLAG_RPC_PROPAGATE_TRACE` on the request and
116    /// emits `traceparent` / `tracestate` headers; the server's
117    /// `RpcContext::trace_context` will be populated with the same
118    /// values. nRPC is transport-only — application code on both
119    /// sides reads / writes this via whatever tracing backend it
120    /// has wired up (tracing-opentelemetry, Datadog, etc.).
121    pub trace_context: Option<TraceContext>,
122    /// Per-call concurrency cap. Future Phase 2 work; v1 ignores
123    /// this and the per-Mesh `RpcClientPending` doesn't bound
124    /// in-flight count.
125    pub max_in_flight_per_target: u32,
126    /// **Streaming responses only.** Initial credit window for
127    /// per-streaming-response flow control. When `Some(n)`, the
128    /// caller emits `nrpc-stream-window-initial: n` on the
129    /// REQUEST and the server's pump task awaits one credit per
130    /// emitted chunk. The returned [`RpcStream`] auto-grants 1
131    /// credit per consumed chunk so the in-flight credit holds
132    /// near `n` (or use [`RpcStream::grant`] for batched / custom
133    /// cadence). `None` (the default) → unbounded: server pumps
134    /// chunks as fast as the publish path can take them
135    /// (back-compat / pre-flow-control behavior). Ignored by
136    /// non-streaming `call` / `call_service`.
137    pub stream_window_initial: Option<u32>,
138    /// **Client-streaming / duplex only.** Initial credit window
139    /// for per-call request-direction flow control. Mirror of
140    /// [`Self::stream_window_initial`] for the upload direction. When
141    /// `Some(n)`, the caller emits `nrpc-request-window-initial: n`
142    /// on the REQUEST and its `send().await` sink awaits one
143    /// credit per pushed chunk; the server refills via
144    /// [`DISPATCH_RPC_REQUEST_GRANT`] events. `None` → unbounded:
145    /// caller's send sink doesn't block (legacy / fast-path).
146    /// Ignored by unary `call` / `call_streaming`.
147    ///
148    /// Bidi streaming plan (Phase C).
149    pub request_window_initial: Option<u32>,
150    /// Caller-supplied request headers. Appended to the wire
151    /// `RpcRequestPayload::headers` after any auto-generated
152    /// headers (trace context, stream-window). Useful for
153    /// application-level metadata the server needs at
154    /// dispatch-time — e.g., the `net-where` predicate
155    /// header (Phase 9b of `CAPABILITY_SYSTEM_SDK_PLAN.md`) that
156    /// services consult for predicate-pushdown filtering.
157    ///
158    /// Each entry is `(name, value_bytes)`. Names use the lowercase
159    /// `cyberdeck-*` / `nrpc-*` convention; the substrate doesn't
160    /// validate names beyond the `MAX_RPC_HEADER_NAME_LEN` cap
161    /// enforced at encode time.
162    ///
163    /// Default: empty.
164    pub request_headers: Vec<(String, Vec<u8>)>,
165    /// Caller-side cancel token. Mint via
166    /// [`MeshNode::reserve_cancel_token`]; pair with
167    /// [`MeshNode::cancel`] from any thread to abort the in-flight
168    /// call. `None` (or `Some(0)` — the "no token" sentinel) → no
169    /// cancel slot is reserved and the call has no external abort
170    /// path beyond Drop-on-future-cancellation.
171    ///
172    /// Honored uniformly by every call shape: `call`, `call_service`,
173    /// `call_streaming`, `call_client_stream`, `call_duplex`. The
174    /// substrate registers the token in a per-mesh cancel registry
175    /// at call construction and removes it on resolution (success,
176    /// error, or Drop). A cancel that fires mid-flight surfaces to
177    /// the caller as [`RpcError::Cancelled`] and emits CANCEL on
178    /// the wire via the existing per-call-shape guards (UnaryCallGuard,
179    /// ClientStreamCallRaw::Drop, DuplexCallRaw::Drop).
180    ///
181    /// Cancel-before-register is race-safe: a cancel that arrives
182    /// in the gap between `reserve_cancel_token` and the call's
183    /// internal register step latches a pre-cancel flag on the
184    /// registry's orphan entry; the subsequent register observes
185    /// it and the call short-circuits to [`RpcError::Cancelled`]
186    /// without ever publishing the REQUEST.
187    pub cancel_token: Option<u64>,
188}
189
190impl Default for CallOptions {
191    fn default() -> Self {
192        Self {
193            deadline: None,
194            routing_policy: RoutingPolicy::default(),
195            filter_unhealthy: true,
196            trace_context: None,
197            max_in_flight_per_target: 64,
198            stream_window_initial: None,
199            request_window_initial: None,
200            request_headers: Vec::new(),
201            cancel_token: None,
202        }
203    }
204}
205
206/// What [`MeshNode::call`] returns on success.
207#[derive(Debug, Clone)]
208pub struct RpcReply {
209    /// Response payload from the server's handler. Caller decodes
210    /// according to its application protocol.
211    pub body: Bytes,
212    /// Headers attached by the server's response.
213    pub headers: Vec<(String, Vec<u8>)>,
214    /// Wall-clock latency from `call(...)` to RESPONSE arrival.
215    pub latency_ns: u64,
216}
217
218/// What [`MeshNode::call`] returns on failure.
219#[derive(Debug, thiserror::Error)]
220pub enum RpcError {
221    /// No subscription / no route to the target. Either
222    /// `target_node_id` is unknown to the local mesh, or the
223    /// caller's reply-channel subscription couldn't be set up.
224    #[error("no route to target {target:#x}: {reason}")]
225    NoRoute {
226        /// Target node id the call was directed at.
227        target: u64,
228        /// Diagnostic — typically the underlying transport error.
229        reason: String,
230    },
231    /// Caller's deadline elapsed before a RESPONSE arrived. The
232    /// caller emits a CANCEL on timeout so the server can drop
233    /// the in-flight handler; this variant is returned to the
234    /// awaiting caller.
235    #[error("timeout after {elapsed_ms}ms")]
236    Timeout {
237        /// Wall-clock milliseconds elapsed before timeout fired.
238        elapsed_ms: u64,
239    },
240    /// Server returned a non-`Ok` status. Body carries the
241    /// server's diagnostic (UTF-8) when available.
242    #[error("server returned status {status:#06x}: {message}")]
243    ServerError {
244        /// Wire-level `RpcStatus` value the server returned.
245        status: u16,
246        /// UTF-8 diagnostic from the response body, when the body
247        /// decodes as valid UTF-8; otherwise hex-truncated.
248        message: String,
249    },
250    /// Underlying transport error (publish failure, encryption,
251    /// etc.).
252    #[error("transport: {0}")]
253    Transport(#[from] AdapterError),
254    /// Client-local serialization or deserialization failure.
255    /// `direction = Encode` means the typed wrapper failed to
256    /// encode the request before it ever hit the wire;
257    /// `direction = Decode` means the response landed but the
258    /// typed wrapper failed to decode it. Either way this is a
259    /// caller-fixable bug (wrong codec, schema drift, malformed
260    /// `Serialize` impl) — NOT a transient infra failure — so
261    /// retry / circuit-breaker predicates skip it by default.
262    #[error("codec ({direction:?}): {message}")]
263    Codec {
264        /// Which side of the call the codec failure happened on.
265        direction: CodecDirection,
266        /// Decode/encode diagnostic from the underlying serde impl.
267        message: String,
268    },
269    /// v0.4 capability-auth gate denied the call. Either the
270    /// target's latest `CapabilityAnnouncement` does not list
271    /// the requested `nrpc:<service>` tag, or it lists the tag
272    /// with allow-lists the caller does not match. See
273    /// `docs/plans/CAPABILITY_AUTH_PLAN.md` §3 for the model.
274    ///
275    /// Raised by the caller-side gate inside
276    /// [`MeshNode::call_service`] BEFORE the request hits the
277    /// wire, and surfaced by the caller on receipt of a
278    /// `RpcStatus::CapabilityDenied` response (the callee-side
279    /// defense-in-depth path).
280    #[error("capability denied: target {target:#x} does not authorize nrpc:{capability}")]
281    CapabilityDenied {
282        /// Target node id the gate denied.
283        target: u64,
284        /// Service / capability tag (without the `nrpc:` prefix)
285        /// the gate denied.
286        capability: String,
287    },
288    /// Caller-side cancellation fired via
289    /// [`MeshNode::cancel`] with the call's `cancel_token`.
290    /// Triggers a Drop-on-cancel CANCEL frame on the wire so the
291    /// server's in-flight handler observes the cancel; the
292    /// awaiting caller returns this variant. NOT retried by the
293    /// default retry policy — cancellation is caller-driven and
294    /// re-issuing the call defeats the point.
295    #[error("call cancelled by caller")]
296    Cancelled,
297}
298
299/// Which side of the call surfaced a [`RpcError::Codec`] failure.
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub enum CodecDirection {
302    /// Encoding the outbound request failed before the call was issued.
303    Encode,
304    /// Decoding the inbound response failed after the call returned Ok.
305    Decode,
306}
307
308/// RAII handle returned by [`MeshNode::serve_rpc`]. Dropping it
309/// unregisters the inbound dispatcher and removes the service
310/// from the local-services registry (so subsequent
311/// `announce_capabilities` calls stop emitting the
312/// `nrpc:<service>` tag).
313///
314/// **Bridge task lifecycle.** The bridge task that drains the
315/// inbound mpsc into the fold is NOT aborted on Drop. The
316/// `register_rpc_inbound` dispatcher closure owns the only
317/// `mpsc::Sender` clone, so `unregister_rpc_inbound` (which drops
318/// the dispatcher) closes the channel; the bridge's `rx.recv()`
319/// then yields `None` and the task exits cleanly after draining
320/// any queued events. Aborting would race events that are
321/// mid-`fold.lock().apply()` — those events would be killed
322/// without their RESPONSE being emitted, so the corresponding
323/// callers would just time out.
324///
325/// Outstanding handler executions (already-spawned tokio tasks)
326/// continue to completion regardless.
327pub struct ServeHandle {
328    /// Channel hash to unregister on Drop.
329    channel_hash: ChannelHash,
330    /// Service name to remove from `rpc_local_services` on Drop.
331    service: String,
332    /// The bridge task. Held only so callers can introspect /
333    /// detach it; Drop does NOT abort it (see struct doc-comment).
334    /// Detaches naturally when the handle is dropped — the bridge
335    /// exits on its own once the dispatcher's `mpsc::Sender` is
336    /// dropped via `unregister_rpc_inbound`.
337    _bridge: JoinHandle<()>,
338    /// The per-service response drainer task (unary `serve_rpc` only;
339    /// `None` for the streaming/duplex variants, which still spawn per
340    /// emit). Like `_bridge`, held only to detach — it exits on its own
341    /// once the emit closure (the sole `Sender` owner, dropped when the
342    /// bridge task ends and the fold drops) is gone. See §8a.
343    _response_drain: Option<JoinHandle<()>>,
344    /// Hold an Arc back to the mesh so we can unregister on Drop
345    /// without the mesh having to track us.
346    mesh: Arc<MeshNode>,
347}
348
349impl Drop for ServeHandle {
350    fn drop(&mut self) {
351        // Order matters: unregister the dispatcher FIRST so no new
352        // events can land in the bridge's mpsc, THEN drop the
353        // service-tag entry. The bridge task drains any in-flight
354        // events naturally and exits when its `rx.recv()` yields
355        // `None` (which happens as soon as the dispatcher closure
356        // — the sole `tx` owner — is dropped above).
357        self.mesh.unregister_rpc_inbound(self.channel_hash);
358        self.mesh.rpc_local_services_arc().remove(&self.service);
359    }
360}
361
362/// A response ready to publish, handed from a (synchronous) `serve_rpc`
363/// emit closure to the per-service response drainer task. Replaces the
364/// pre-§8a `tokio::spawn`-per-response: the emit closure builds the wire
365/// payload (cheap, sync) and `try_send`s this job; one drain task does the
366/// `.await` publish. The reply `ChannelName` is `Arc<str>` and `payload` is
367/// `Bytes`, so the hand-off is a couple of moves — no copy, no per-response
368/// task allocation/scheduling.
369struct RpcResponseJob {
370    caller_origin: u64,
371    call_id: u64,
372    target_hint: Option<u64>,
373    reply_channel: ChannelName,
374    /// PERF_AUDIT §3.10 — cached
375    /// `ChannelId::new(reply_channel).hash()`, populated by the
376    /// emit closure's `reply_channel_cache` lookup. Pre-fix the
377    /// drainer re-ran xxh3 over the channel name per response;
378    /// the same `OriginKeyedLru` now caches the triple so a
379    /// cache hit is one Arc bump + two `u64` copies.
380    reply_channel_hash: ChannelHash,
381    /// PERF_AUDIT §3.10 — cached
382    /// `MeshNode::publish_stream_id(&reply_channel_id)`.
383    reply_stream_id: u64,
384    payload: Bytes,
385}
386
387/// Cached triple `(ChannelName, ChannelHash, stream_id)` for the
388/// per-caller reply channel. Stored in the per-`serve_rpc`
389/// `OriginKeyedLru` so each subsequent response to the same
390/// caller is one Arc bump on the name + two `u64` copies — no
391/// xxh3, no `publish_stream_id`.
392///
393/// Per PERF_AUDIT §3.10.
394#[derive(Clone)]
395struct CachedReplyChannel {
396    name: ChannelName,
397    hash: ChannelHash,
398    stream_id: u64,
399}
400
401// ============================================================================
402// Streaming caller-side: RpcStream.
403// ============================================================================
404
405/// An open streaming RPC call. Implements `Stream<Item =
406/// Result<Bytes, RpcError>>` — yields chunks as the server emits
407/// them, terminates on a clean stream-end frame OR a non-`Ok`
408/// status (which is yielded as the last `Err` item before the
409/// stream closes).
410///
411/// Dropping the stream emits a CANCEL to the server (best-effort)
412/// and discards the pending entry — any chunks the server emits
413/// after the drop are silently discarded by the client fold.
414pub struct RpcStream {
415    mesh: Arc<MeshNode>,
416    target_node_id: u64,
417    request_channel: ChannelName,
418    /// Cached `ChannelId::new(request_channel).hash()`. Pre-fix
419    /// `spawn_grant_publish` re-ran xxh3 over the channel name on
420    /// every auto/explicit grant — per PERF_AUDIT §3.10 the value
421    /// is invariant for the stream's lifetime, so we cache it once
422    /// at construction.
423    request_channel_hash: ChannelHash,
424    /// Cached `MeshNode::publish_stream_id(&request_channel_id)`.
425    /// Same reasoning as `request_channel_hash`.
426    request_stream_id: u64,
427    self_origin: u64,
428    call_id: u64,
429    inner: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
430    /// Set true once we've yielded the terminal item (or an
431    /// error). Subsequent polls return `None`.
432    done: bool,
433    /// `Some(_)` if this stream uses flow control (caller set
434    /// `CallOptions::stream_window_initial`). Auto-grant
435    /// accumulates 1 credit per delivered chunk and fires one
436    /// batched `spawn_grant_publish` once the accumulator reaches
437    /// `window / 2` (or 1 for tiny windows). Keeps the server's
438    /// pump fed at roughly the configured rate without the per-
439    /// chunk spawn-storm + AEAD-storm the pre-fix path produced.
440    /// `None` → no flow control; `poll_next` does not emit grants.
441    /// Per PERF_AUDIT_2026_06_10_FULL_CRATE.md §3.3.
442    stream_window: Option<u32>,
443    /// Auto-grant accumulator: chunks delivered since the last
444    /// emitted grant. Flushed at the `window / 2` threshold (see
445    /// the doc on [`Self::stream_window`]).
446    grant_pending: u32,
447    /// Observer-fire bookkeeping. Latched on terminal observation
448    /// in `poll_next`; fired once from `Drop` so the Deck NRPC
449    /// tab + every other `RpcObserver` consumer sees one event
450    /// per streaming-response call.
451    observer: StreamingObserverState,
452    /// v3 cancel-watcher keep-alive (C-S1). Dropping this field
453    /// (on stream Drop) resolves the matching watcher task's
454    /// oneshot receiver with `Err`, telling the watcher to exit
455    /// cleanly + release the registry entry. When the call was
456    /// opened without `cancel_token`, this is a placeholder sender
457    /// with no watcher behind it — drop has no observable effect.
458    _cancel_keep_alive: StreamCancelKeepAlive,
459}
460
461impl RpcStream {
462    /// Server-assigned `call_id`. Useful for trace correlation /
463    /// custom logging at the call site.
464    pub fn call_id(&self) -> u64 {
465        self.call_id
466    }
467
468    /// Whether this stream is flow-controlled (caller set
469    /// `CallOptions::stream_window_initial`). Useful for tests +
470    /// diagnostics; user code typically doesn't need to inspect
471    /// this.
472    pub fn flow_controlled(&self) -> bool {
473        self.stream_window.is_some()
474    }
475
476    /// Explicitly grant `amount` more credits to the server's
477    /// pump. Spawns a fire-and-forget publish; doesn't await
478    /// acknowledgement. **No-op when flow control was not enabled
479    /// for this stream** — the server would silently drop the
480    /// grant anyway, and emitting wire traffic with no purpose
481    /// would just burn bandwidth.
482    ///
483    /// Auto-grant (1 credit per delivered chunk) covers the
484    /// common case; use this for batched cadence (e.g. grant
485    /// `window/2` after every `window/2` chunks consumed) when
486    /// `auto_grant`-style amortization isn't enough.
487    pub fn grant(&self, amount: u32) {
488        if !self.flow_controlled() || amount == 0 {
489            return;
490        }
491        spawn_grant_publish(
492            Arc::clone(&self.mesh),
493            self.target_node_id,
494            self.request_channel_hash,
495            self.request_stream_id,
496            self.self_origin,
497            self.call_id,
498            amount,
499        );
500    }
501}
502
503/// PERF_AUDIT §3.3 — auto-grant coalescing decision for
504/// [`RpcStream::poll_next`]. Accumulates one credit (the chunk
505/// that was just delivered to the consumer) into `pending` and
506/// returns `Some(amount)` when the accumulator reaches the flush
507/// threshold of `window / 2` (clamped to ≥ 1 so a window of 1
508/// degenerates to the pre-fix per-chunk cadence).
509///
510/// Liveness invariant (why no flush-on-drop / timer backstop is
511/// needed): the credits left pending never exceed
512/// `threshold - 1 < window`. The server starts with `window`
513/// credits and `credits = window - (sent - delivered) - pending`,
514/// so whenever the consumer has polled everything that was sent
515/// (the only state in which it could block waiting on the server),
516/// `credits = window - pending >= window - threshold + 1 >= 1` —
517/// the server can always make progress. A consumer that stops
518/// polling stalls the pump by design (that's flow control), and
519/// the chunks already buffered in the stream's mpsc are enough to
520/// carry `pending` across the threshold as soon as it resumes.
521fn accumulate_auto_grant(pending: &mut u32, window: u32) -> Option<u32> {
522    *pending = pending.saturating_add(1);
523    let threshold = (window / 2).max(1);
524    if *pending >= threshold {
525        let amount = *pending;
526        *pending = 0;
527        Some(amount)
528    } else {
529        None
530    }
531}
532
533/// Shared fire-and-forget GRANT-publish helper. Used by
534/// [`RpcStream::grant`] (explicit) and the auto-grant in
535/// [`RpcStream::poll_next`]. Same direct-unicast publish path as
536/// [`spawn_cancel_publish`], just with a different dispatch byte
537/// + a 4-byte u32 payload.
538///
539/// PERF_AUDIT §3.10 — takes `request_channel_hash` and
540/// `request_stream_id` as pre-computed inputs (cached on
541/// `RpcStream`) so the per-chunk grant path doesn't re-run
542/// `ChannelId::new` + xxh3 on every call.
543fn spawn_grant_publish(
544    mesh: Arc<MeshNode>,
545    target: u64,
546    request_channel_hash: ChannelHash,
547    request_stream_id: u64,
548    self_origin: u64,
549    call_id: u64,
550    amount: u32,
551) {
552    tokio::spawn(async move {
553        let meta = EventMeta::new(DISPATCH_RPC_STREAM_GRANT, 0, self_origin, call_id, 0);
554        let mut buf = Vec::with_capacity(EVENT_META_SIZE + 4);
555        buf.extend_from_slice(&meta.to_bytes());
556        buf.extend_from_slice(&encode_stream_grant(amount));
557        let payload = Bytes::from(buf);
558        let _ = mesh
559            .publish_to_peer(
560                target,
561                request_channel_hash,
562                request_stream_id,
563                /* reliable */ true,
564                std::slice::from_ref(&payload),
565            )
566            .await;
567    });
568}
569
570impl futures::Stream for RpcStream {
571    type Item = Result<Bytes, RpcError>;
572
573    fn poll_next(
574        mut self: std::pin::Pin<&mut Self>,
575        cx: &mut std::task::Context<'_>,
576    ) -> std::task::Poll<Option<Self::Item>> {
577        if self.done {
578            return std::task::Poll::Ready(None);
579        }
580        match self.inner.poll_recv(cx) {
581            std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
582                // Auto-grant: accumulate 1 credit per delivered
583                // chunk and fire a batched `spawn_grant_publish`
584                // only when the accumulator reaches `window / 2`
585                // (or 1 for tiny windows). Per PERF_AUDIT §3.3 —
586                // pre-fix this spawned one task + one reliable
587                // AEAD packet per chunk, a spawn-storm + AEAD-
588                // storm under bursting; the server side already
589                // fixed the identical shape via
590                // `build_request_grant_emitter` (§3.3 audit text).
591                // Callers needing finer cadence still have
592                // `RpcStream::grant` for explicit batches.
593                if let Some(window) = self.stream_window {
594                    let mut pending = self.grant_pending;
595                    if let Some(amount) = accumulate_auto_grant(&mut pending, window) {
596                        spawn_grant_publish(
597                            Arc::clone(&self.mesh),
598                            self.target_node_id,
599                            self.request_channel_hash,
600                            self.request_stream_id,
601                            self.self_origin,
602                            self.call_id,
603                            amount,
604                        );
605                    }
606                    self.grant_pending = pending;
607                }
608                self.observer.add_response_bytes(body.len() as u32);
609                std::task::Poll::Ready(Some(Ok(body)))
610            }
611            std::task::Poll::Ready(Some(StreamItem::End)) => {
612                self.done = true;
613                self.observer.latch_ok();
614                std::task::Poll::Ready(None)
615            }
616            std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
617                self.done = true;
618                let status = resp.status.to_wire();
619                let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
620                    format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
621                });
622                self.observer
623                    .latch_error(format!("server returned status {status:#06x}: {message}"));
624                std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
625            }
626            std::task::Poll::Ready(None) => {
627                self.done = true;
628                std::task::Poll::Ready(None)
629            }
630            std::task::Poll::Pending => std::task::Poll::Pending,
631        }
632    }
633}
634
635impl Drop for RpcStream {
636    fn drop(&mut self) {
637        // Best-effort CANCEL to the server. Spawn a task because
638        // Drop can't be async; the publish happens off-thread.
639        // Also clear our pending entry so any in-flight chunks
640        // are dropped on arrival.
641        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
642        spawn_cancel_publish(
643            Arc::clone(&self.mesh),
644            self.target_node_id,
645            self.request_channel.clone(),
646            self.self_origin,
647            self.call_id,
648        );
649        // Fire the observer with the latched status (Ok / Error /
650        // Canceled). Idempotent — only the first fire emits.
651        self.observer.fire();
652    }
653}
654
655// ============================================================================
656// Phase C — caller-side client-streaming / duplex primitive.
657// ============================================================================
658
659/// Shared REQUEST_CHUNK-publish helper. Builds the wire frame and
660/// fires through `publish_to_peer` direct-unicast (same routing
661/// pattern as the initial REQUEST — caller knows the target).
662/// PERF_AUDIT §3.10 — accepts pre-computed
663/// `request_channel_hash` and `request_stream_id` (cached on
664/// `ClientStreamCallRaw`) so the per-chunk client-stream send path
665/// doesn't re-run `ChannelId::new` + xxh3 on every chunk.
666async fn publish_request_chunk(
667    mesh: &Arc<MeshNode>,
668    target: u64,
669    request_channel_hash: ChannelHash,
670    request_stream_id: u64,
671    self_origin: u64,
672    chunk: &RpcRequestChunkPayload,
673) -> Result<(), RpcError> {
674    let meta = EventMeta::new(DISPATCH_RPC_REQUEST_CHUNK, 0, self_origin, chunk.call_id, 0);
675    let mut buf = Vec::with_capacity(EVENT_META_SIZE + chunk.encoded_len());
676    buf.extend_from_slice(&meta.to_bytes());
677    chunk.encode_into(&mut buf);
678    let payload = Bytes::from(buf);
679    mesh.publish_to_peer(
680        target,
681        request_channel_hash,
682        request_stream_id,
683        /* reliable */ true,
684        std::slice::from_ref(&payload),
685    )
686    .await
687    .map_err(RpcError::Transport)
688}
689
690/// Internal state of a [`ClientStreamCallRaw`]. The state machine
691/// is small: open the call (initial REQUEST not yet sent), then
692/// send N items (the first becomes the initial REQUEST, subsequent
693/// become REQUEST_CHUNKs), then finish (terminal REQUEST_END
694/// frame). After finish, no further sends are accepted.
695#[derive(Debug, Clone, Copy, PartialEq, Eq)]
696enum ClientStreamState {
697    /// Pending entry registered, reply subscription ensured, but
698    /// the initial REQUEST has NOT been published to the wire yet.
699    /// First `send` flips this to `Sending`.
700    JustOpened,
701    /// Initial REQUEST has been published; subsequent sends ride
702    /// as REQUEST_CHUNKs.
703    Sending,
704    /// `finish` has been called; the terminal REQUEST_END frame
705    /// (or the initial REQUEST with FLAG_END for the degenerate
706    /// zero-send path) has been published. The terminal RESPONSE
707    /// has not necessarily arrived yet — that's awaited on the
708    /// caller's terminal_rx.
709    Finishing,
710    /// Terminal RESPONSE has been delivered. Drop is a no-op.
711    Done,
712}
713
714/// Caller-side handle for a client-streaming (or duplex Phase D)
715/// RPC. Push N items via [`ClientStreamCallRaw::send`], then
716/// [`ClientStreamCallRaw::finish`] to await the terminal RESPONSE.
717///
718/// **Lazy initial REQUEST.** The initial REQUEST is published on
719/// the FIRST `send()` (or on `finish()` if the caller sends nothing
720/// — that's the "zero-item upload" degenerate path that opens and
721/// closes the call in one frame). Constructing the handle does
722/// NOT yet emit any wire traffic beyond the reply-channel
723/// subscription setup.
724///
725/// **Flow control.** When the caller set
726/// [`CallOptions::request_window_initial`] to `Some(n)`, the
727/// handle holds an `n`-permit `Semaphore` that gates `send`. The
728/// server's [`DISPATCH_RPC_REQUEST_GRANT`] events refill the
729/// semaphore. When `None`, `send` doesn't block (caller is on the
730/// unbounded-credit fast path).
731///
732/// **Cancellation.** Dropping the handle BEFORE `finish` returns
733/// `Ok` fires a best-effort CANCEL to the server and clears the
734/// pending entry. Dropping after a successful `finish` is a no-op
735/// (terminal RESPONSE already delivered + entry removed).
736///
737/// Bidi streaming plan (Phase C).
738pub struct ClientStreamCallRaw {
739    mesh: Arc<MeshNode>,
740    target_node_id: u64,
741    request_channel: ChannelName,
742    /// PERF_AUDIT §3.10 — cached `ChannelId::new(request_channel).hash()`
743    /// so per-chunk REQUEST_CHUNK publishes don't re-run xxh3.
744    request_channel_hash: ChannelHash,
745    /// PERF_AUDIT §3.10 — cached
746    /// `MeshNode::publish_stream_id(&request_channel_id)`.
747    request_stream_id: u64,
748    self_origin: u64,
749    call_id: u64,
750    service: String,
751    /// Header set queued for the initial REQUEST. Drained on the
752    /// first publish (either `send` or `finish`).
753    initial_headers: Vec<(String, Vec<u8>)>,
754    /// Flag bits queued for the initial REQUEST. Always carries
755    /// `FLAG_RPC_CLIENT_STREAMING_REQUEST`; may also carry
756    /// `FLAG_RPC_PROPAGATE_TRACE` when the caller supplied a
757    /// trace context.
758    initial_flags: u16,
759    /// `deadline_ns` from `CallOptions::deadline`. Embedded in the
760    /// initial REQUEST.
761    deadline_ns: u64,
762    /// Per-call semaphore for upload credits. `None` when the
763    /// caller didn't opt into flow control (`request_window_initial`
764    /// was `None` on the `CallOptions`).
765    credit_sem: Option<Arc<tokio::sync::Semaphore>>,
766    /// Background task that drains REQUEST_GRANT credits from the
767    /// pending entry's grant mpsc into `credit_sem`. Aborted on
768    /// Drop. `None` when flow control is off.
769    grant_pump: Option<JoinHandle<()>>,
770    /// Single-shot terminal-RESPONSE receiver. Taken by `finish`;
771    /// after that `Drop` doesn't attempt to await again.
772    terminal_rx: Option<tokio::sync::oneshot::Receiver<RpcResponsePayload>>,
773    /// State machine. See [`ClientStreamState`].
774    state: ClientStreamState,
775    /// Wall-clock start (for `RpcReply::latency_ns` reporting).
776    started: Instant,
777    /// Observer-fire bookkeeping. Latched on terminal observation
778    /// in `finish`; fired once from `Drop` so the Deck NRPC tab +
779    /// every `RpcObserver` consumer sees one event per
780    /// client-streaming call.
781    observer: StreamingObserverState,
782    /// v3 cancel-watcher keep-alive (C-S1). Dropping this field
783    /// (on call Drop) tells the watcher task to exit cleanly and
784    /// release the registry entry. See
785    /// [`spawn_stream_cancel_watcher`] for the lifecycle.
786    _cancel_keep_alive: StreamCancelKeepAlive,
787}
788
789impl ClientStreamCallRaw {
790    /// Server-assigned `call_id`. Useful for trace correlation /
791    /// custom logging.
792    pub fn call_id(&self) -> u64 {
793        self.call_id
794    }
795
796    /// Whether this call is flow-controlled (caller set
797    /// `CallOptions::request_window_initial`).
798    pub fn flow_controlled(&self) -> bool {
799        self.credit_sem.is_some()
800    }
801
802    /// Push one body chunk to the server. Encodes as the initial
803    /// REQUEST (first call) or as a REQUEST_CHUNK (subsequent
804    /// calls). When flow control is opted into, awaits one credit
805    /// before publishing.
806    ///
807    /// Returns `Err(RpcError::Codec)` if called after [`Self::finish`].
808    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
809        match self.state {
810            ClientStreamState::Finishing | ClientStreamState::Done => {
811                return Err(RpcError::Codec {
812                    direction: CodecDirection::Encode,
813                    message: "send() called after finish()".to_string(),
814                });
815            }
816            _ => {}
817        }
818        // Gate on credit when flow control is opted into.
819        if let Some(sem) = self.credit_sem.as_ref() {
820            let permit = sem.clone().acquire_owned().await.map_err(|_| {
821                RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
822            })?;
823            permit.forget();
824        }
825        self.observer.add_request_bytes(body.len() as u32);
826        match self.state {
827            ClientStreamState::JustOpened => {
828                // First send → initial REQUEST.
829                let req = RpcRequestPayload {
830                    service: self.service.clone(),
831                    deadline_ns: self.deadline_ns,
832                    flags: self.initial_flags,
833                    headers: std::mem::take(&mut self.initial_headers),
834                    body: body.clone(),
835                };
836                self.publish_initial_request(&req).await?;
837                self.state = ClientStreamState::Sending;
838            }
839            ClientStreamState::Sending => {
840                let chunk = RpcRequestChunkPayload {
841                    call_id: self.call_id,
842                    flags: 0,
843                    headers: vec![],
844                    body: body.clone(),
845                };
846                publish_request_chunk(
847                    &self.mesh,
848                    self.target_node_id,
849                    self.request_channel_hash,
850                    self.request_stream_id,
851                    self.self_origin,
852                    &chunk,
853                )
854                .await?;
855            }
856            ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
857        }
858        Ok(())
859    }
860
861    /// Close the upload direction and await the server's terminal
862    /// RESPONSE. Emits a REQUEST_CHUNK with `FLAG_RPC_REQUEST_END`
863    /// (empty body) if the call has already published its initial
864    /// REQUEST, or an initial REQUEST with both
865    /// `FLAG_RPC_CLIENT_STREAMING_REQUEST` and
866    /// `FLAG_RPC_REQUEST_END` set (the degenerate "zero-item
867    /// upload" path) if nothing was sent.
868    ///
869    /// Consumes the handle — Drop after `finish` is a no-op.
870    pub async fn finish(mut self) -> Result<RpcReply, RpcError> {
871        match self.state {
872            ClientStreamState::JustOpened => {
873                let req = RpcRequestPayload {
874                    service: self.service.clone(),
875                    deadline_ns: self.deadline_ns,
876                    flags: self.initial_flags | FLAG_RPC_REQUEST_END,
877                    headers: std::mem::take(&mut self.initial_headers),
878                    body: Bytes::new(),
879                };
880                self.publish_initial_request(&req).await?;
881            }
882            ClientStreamState::Sending => {
883                let chunk = RpcRequestChunkPayload {
884                    call_id: self.call_id,
885                    flags: FLAG_RPC_REQUEST_END,
886                    headers: vec![],
887                    body: Bytes::new(),
888                };
889                publish_request_chunk(
890                    &self.mesh,
891                    self.target_node_id,
892                    self.request_channel_hash,
893                    self.request_stream_id,
894                    self.self_origin,
895                    &chunk,
896                )
897                .await?;
898            }
899            ClientStreamState::Finishing | ClientStreamState::Done => {
900                return Err(RpcError::Codec {
901                    direction: CodecDirection::Encode,
902                    message: "finish() called twice".to_string(),
903                });
904            }
905        }
906        self.state = ClientStreamState::Finishing;
907        let terminal_rx = self.terminal_rx.take().ok_or_else(|| {
908            RpcError::Transport(AdapterError::Connection(
909                "terminal receiver already consumed".into(),
910            ))
911        })?;
912        // Honor the deadline if the caller set one.
913        let resp = if self.deadline_ns > 0 {
914            let now = std::time::SystemTime::now()
915                .duration_since(std::time::UNIX_EPOCH)
916                .map(|d| d.as_nanos() as u64)
917                .unwrap_or(0);
918            let remaining = self.deadline_ns.saturating_sub(now);
919            match tokio::time::timeout(std::time::Duration::from_nanos(remaining), terminal_rx)
920                .await
921            {
922                Ok(Ok(r)) => r,
923                Ok(Err(_)) => {
924                    let msg = "terminal sender dropped before response arrived";
925                    self.observer.latch_error(msg);
926                    return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
927                }
928                Err(_elapsed) => {
929                    let elapsed_ms = self.started.elapsed().as_millis() as u64;
930                    self.observer.latch_timeout();
931                    return Err(RpcError::Timeout { elapsed_ms });
932                }
933            }
934        } else {
935            match terminal_rx.await {
936                Ok(r) => r,
937                Err(_) => {
938                    let msg = "terminal sender dropped before response arrived";
939                    self.observer.latch_error(msg);
940                    return Err(RpcError::Transport(AdapterError::Connection(msg.into())));
941                }
942            }
943        };
944        self.state = ClientStreamState::Done;
945        self.observer.add_response_bytes(resp.body.len() as u32);
946        if !resp.status.is_ok() {
947            // String::from_utf8 takes `Vec<u8>`. `Bytes::to_vec()`
948            // matches the prior `resp.body.clone()` semantics (full
949            // copy of the body for the error-formatting path);
950            // bulk-throughput improvement lives on the decode side,
951            // not here.
952            let message = String::from_utf8(resp.body.to_vec())
953                .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
954            self.observer.latch_error(format!(
955                "server returned status {:#06x}: {message}",
956                resp.status.to_wire()
957            ));
958            return Err(RpcError::ServerError {
959                status: resp.status.to_wire(),
960                message,
961            });
962        }
963        self.observer.latch_ok();
964        let latency_ns = self.started.elapsed().as_nanos() as u64;
965        Ok(RpcReply {
966            body: resp.body,
967            headers: resp.headers,
968            latency_ns,
969        })
970    }
971
972    async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
973        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self.self_origin, self.call_id, 0);
974        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
975        buf.extend_from_slice(&meta.to_bytes());
976        req.encode_into(&mut buf);
977        let payload = Bytes::from(buf);
978        // PERF_AUDIT §3.10 — use the cached hash + stream_id from
979        // construction; no per-publish `ChannelId::new` + xxh3.
980        self.mesh
981            .publish_to_peer(
982                self.target_node_id,
983                self.request_channel_hash,
984                self.request_stream_id,
985                /* reliable */ true,
986                std::slice::from_ref(&payload),
987            )
988            .await
989            .map_err(RpcError::Transport)
990    }
991}
992
993impl Drop for ClientStreamCallRaw {
994    fn drop(&mut self) {
995        if let Some(task) = self.grant_pump.take() {
996            task.abort();
997        }
998        // Fire the observer with whatever status was latched
999        // (Ok / Error / Timeout / Canceled). Idempotent — only
1000        // the first call emits.
1001        self.observer.fire();
1002        if matches!(self.state, ClientStreamState::Done) {
1003            // Successful completion — pending entry already gone,
1004            // no CANCEL needed.
1005            return;
1006        }
1007        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
1008        // Only fire CANCEL on the wire if the server has actually
1009        // seen the initial REQUEST. A `JustOpened` Drop means we
1010        // never published anything; no need to CANCEL a call the
1011        // server doesn't know about.
1012        if !matches!(self.state, ClientStreamState::JustOpened) {
1013            spawn_cancel_publish(
1014                Arc::clone(&self.mesh),
1015                self.target_node_id,
1016                self.request_channel.clone(),
1017                self.self_origin,
1018                self.call_id,
1019            );
1020        }
1021    }
1022}
1023
1024// ============================================================================
1025// Phase D — caller-side duplex primitive.
1026// ============================================================================
1027
1028/// Shared state between a `DuplexSink` and its sibling
1029/// `DuplexStream`. Both halves hold an `Arc<DuplexInner>`; when
1030/// the refcount hits zero (i.e. both halves dropped) the Drop
1031/// fires CANCEL to the server unless the call was cleanly closed
1032/// (`clean_close = true`).
1033struct DuplexInner {
1034    mesh: Arc<MeshNode>,
1035    target_node_id: u64,
1036    request_channel: ChannelName,
1037    /// PERF_AUDIT §3.10 — cached channel-id hash + stream id so
1038    /// per-chunk publishes from the upload side don't re-run
1039    /// `ChannelId::new` + xxh3.
1040    request_channel_hash: ChannelHash,
1041    request_stream_id: u64,
1042    self_origin: u64,
1043    call_id: u64,
1044    /// Whether the initial REQUEST was successfully published.
1045    /// `false` means we never reached the wire — no CANCEL needed
1046    /// (server doesn't know about the call).
1047    initial_sent: std::sync::atomic::AtomicBool,
1048    /// Set true when the call closes cleanly — terminal RESPONSE
1049    /// (or terminal Error) was observed on the response stream.
1050    /// Suppresses CANCEL-on-drop.
1051    clean_close: std::sync::atomic::AtomicBool,
1052    /// Observer-fire bookkeeping. Latched from the various
1053    /// terminal-observation sites (DuplexCall::next /
1054    /// DuplexStream::poll_next yielding End or Error); fired
1055    /// once on Drop. The DuplexCall / DuplexSink / DuplexStream
1056    /// each share access via the surrounding Arc<DuplexInner>.
1057    observer: StreamingObserverState,
1058    /// v3 cancel-watcher keep-alive (C-S1). Lives on
1059    /// `Arc<DuplexInner>` so it survives `into_split` — both
1060    /// halves of the duplex hold the same Arc, so the watcher
1061    /// task exits only when BOTH halves drop (matching the
1062    /// Drop-fires-CANCEL semantics above). Wrapped in `Option`
1063    /// for `mem::take`-style construction patterns; populated
1064    /// once at `call_duplex` time and never cleared.
1065    _cancel_keep_alive: Option<StreamCancelKeepAlive>,
1066}
1067
1068impl Drop for DuplexInner {
1069    fn drop(&mut self) {
1070        self.mesh.rpc_client_pending_arc().cancel(self.call_id);
1071        // Fire the observer with the latched status (Ok / Error /
1072        // Canceled). Idempotent — only the first call emits.
1073        self.observer.fire();
1074        if self.clean_close.load(Ordering::SeqCst) {
1075            return;
1076        }
1077        if !self.initial_sent.load(Ordering::SeqCst) {
1078            return;
1079        }
1080        spawn_cancel_publish(
1081            Arc::clone(&self.mesh),
1082            self.target_node_id,
1083            self.request_channel.clone(),
1084            self.self_origin,
1085            self.call_id,
1086        );
1087    }
1088}
1089
1090/// Send half of a duplex call. Push items via `send`; emit the
1091/// terminal REQUEST_END frame via `finish_sending`. After
1092/// `finish_sending` the upload side is closed but the sibling
1093/// `DuplexStream` continues yielding response chunks until the
1094/// server's terminal frame arrives.
1095///
1096/// Bidi streaming plan (Phase D).
1097pub struct DuplexSink {
1098    inner: Arc<DuplexInner>,
1099    service: String,
1100    initial_headers: Vec<(String, Vec<u8>)>,
1101    initial_flags: u16,
1102    deadline_ns: u64,
1103    credit_sem: Option<Arc<tokio::sync::Semaphore>>,
1104    grant_pump: Option<JoinHandle<()>>,
1105    state: ClientStreamState,
1106}
1107
1108impl DuplexSink {
1109    /// Push one body chunk to the server. Same semantics as
1110    /// [`ClientStreamCallRaw::send`].
1111    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1112        match self.state {
1113            ClientStreamState::Finishing | ClientStreamState::Done => {
1114                return Err(RpcError::Codec {
1115                    direction: CodecDirection::Encode,
1116                    message: "send() called after finish_sending()".to_string(),
1117                });
1118            }
1119            _ => {}
1120        }
1121        if let Some(sem) = self.credit_sem.as_ref() {
1122            let permit = sem.clone().acquire_owned().await.map_err(|_| {
1123                RpcError::Transport(AdapterError::Connection("credit semaphore closed".into()))
1124            })?;
1125            permit.forget();
1126        }
1127        self.inner.observer.add_request_bytes(body.len() as u32);
1128        match self.state {
1129            ClientStreamState::JustOpened => {
1130                let req = RpcRequestPayload {
1131                    service: self.service.clone(),
1132                    deadline_ns: self.deadline_ns,
1133                    flags: self.initial_flags,
1134                    headers: std::mem::take(&mut self.initial_headers),
1135                    body: body.clone(),
1136                };
1137                self.publish_initial_request(&req).await?;
1138                self.inner.initial_sent.store(true, Ordering::SeqCst);
1139                self.state = ClientStreamState::Sending;
1140            }
1141            ClientStreamState::Sending => {
1142                let chunk = RpcRequestChunkPayload {
1143                    call_id: self.inner.call_id,
1144                    flags: 0,
1145                    headers: vec![],
1146                    body: body.clone(),
1147                };
1148                publish_request_chunk(
1149                    &self.inner.mesh,
1150                    self.inner.target_node_id,
1151                    self.inner.request_channel_hash,
1152                    self.inner.request_stream_id,
1153                    self.inner.self_origin,
1154                    &chunk,
1155                )
1156                .await?;
1157            }
1158            ClientStreamState::Finishing | ClientStreamState::Done => unreachable!(),
1159        }
1160        Ok(())
1161    }
1162
1163    /// Close the upload direction. Emits the terminal REQUEST_END
1164    /// frame. The response stream continues until the server's
1165    /// terminal RESPONSE arrives (use the sibling `DuplexStream`).
1166    pub async fn finish_sending(mut self) -> Result<(), RpcError> {
1167        match self.state {
1168            ClientStreamState::JustOpened => {
1169                let req = RpcRequestPayload {
1170                    service: self.service.clone(),
1171                    deadline_ns: self.deadline_ns,
1172                    flags: self.initial_flags | FLAG_RPC_REQUEST_END,
1173                    headers: std::mem::take(&mut self.initial_headers),
1174                    body: Bytes::new(),
1175                };
1176                self.publish_initial_request(&req).await?;
1177                self.inner.initial_sent.store(true, Ordering::SeqCst);
1178            }
1179            ClientStreamState::Sending => {
1180                let chunk = RpcRequestChunkPayload {
1181                    call_id: self.inner.call_id,
1182                    flags: FLAG_RPC_REQUEST_END,
1183                    headers: vec![],
1184                    body: Bytes::new(),
1185                };
1186                publish_request_chunk(
1187                    &self.inner.mesh,
1188                    self.inner.target_node_id,
1189                    self.inner.request_channel_hash,
1190                    self.inner.request_stream_id,
1191                    self.inner.self_origin,
1192                    &chunk,
1193                )
1194                .await?;
1195            }
1196            ClientStreamState::Finishing | ClientStreamState::Done => {
1197                return Err(RpcError::Codec {
1198                    direction: CodecDirection::Encode,
1199                    message: "finish_sending() called twice".to_string(),
1200                });
1201            }
1202        }
1203        self.state = ClientStreamState::Finishing;
1204        Ok(())
1205    }
1206
1207    /// Server-assigned `call_id`. Same value on the sibling
1208    /// `DuplexStream`.
1209    pub fn call_id(&self) -> u64 {
1210        self.inner.call_id
1211    }
1212
1213    /// Whether this call is flow-controlled on the upload side.
1214    pub fn flow_controlled(&self) -> bool {
1215        self.credit_sem.is_some()
1216    }
1217
1218    async fn publish_initial_request(&self, req: &RpcRequestPayload) -> Result<(), RpcError> {
1219        let meta = EventMeta::new(
1220            DISPATCH_RPC_REQUEST,
1221            0,
1222            self.inner.self_origin,
1223            self.inner.call_id,
1224            0,
1225        );
1226        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.encoded_len());
1227        buf.extend_from_slice(&meta.to_bytes());
1228        req.encode_into(&mut buf);
1229        let payload = Bytes::from(buf);
1230        // PERF_AUDIT §3.10 — cached hash + stream_id from the
1231        // inner `ClientStreamCallRaw`.
1232        self.inner
1233            .mesh
1234            .publish_to_peer(
1235                self.inner.target_node_id,
1236                self.inner.request_channel_hash,
1237                self.inner.request_stream_id,
1238                /* reliable */ true,
1239                std::slice::from_ref(&payload),
1240            )
1241            .await
1242            .map_err(RpcError::Transport)
1243    }
1244}
1245
1246impl Drop for DuplexSink {
1247    fn drop(&mut self) {
1248        if let Some(task) = self.grant_pump.take() {
1249            task.abort();
1250        }
1251        // The shared DuplexInner's Drop (when refcount hits 0)
1252        // does the CANCEL — nothing to do here beyond aborting
1253        // the grant pump.
1254    }
1255}
1256
1257/// Receive half of a duplex call. Implements `futures::Stream`
1258/// yielding `Result<Bytes, RpcError>` per inbound RESPONSE chunk.
1259/// EOF on terminal Ok; one final `Err(RpcError::ServerError)` on
1260/// terminal non-Ok.
1261///
1262/// Bidi streaming plan (Phase D).
1263pub struct DuplexStream {
1264    inner: Arc<DuplexInner>,
1265    chunks_rx: tokio::sync::mpsc::UnboundedReceiver<StreamItem>,
1266    done: bool,
1267}
1268
1269impl DuplexStream {
1270    /// Server-assigned `call_id`. Same value on the sibling
1271    /// `DuplexSink`.
1272    pub fn call_id(&self) -> u64 {
1273        self.inner.call_id
1274    }
1275}
1276
1277impl futures::Stream for DuplexStream {
1278    type Item = Result<Bytes, RpcError>;
1279
1280    fn poll_next(
1281        mut self: std::pin::Pin<&mut Self>,
1282        cx: &mut std::task::Context<'_>,
1283    ) -> std::task::Poll<Option<Self::Item>> {
1284        if self.done {
1285            return std::task::Poll::Ready(None);
1286        }
1287        match self.chunks_rx.poll_recv(cx) {
1288            std::task::Poll::Ready(Some(StreamItem::Chunk(body))) => {
1289                self.inner.observer.add_response_bytes(body.len() as u32);
1290                std::task::Poll::Ready(Some(Ok(body)))
1291            }
1292            std::task::Poll::Ready(Some(StreamItem::End)) => {
1293                self.done = true;
1294                self.inner.clean_close.store(true, Ordering::SeqCst);
1295                self.inner.observer.latch_ok();
1296                std::task::Poll::Ready(None)
1297            }
1298            std::task::Poll::Ready(Some(StreamItem::Error(resp))) => {
1299                self.done = true;
1300                self.inner.clean_close.store(true, Ordering::SeqCst);
1301                let status = resp.status.to_wire();
1302                let message = String::from_utf8(resp.body.to_vec()).unwrap_or_else(|e| {
1303                    format!("<{} bytes of non-utf8 body>", e.into_bytes().len())
1304                });
1305                self.inner
1306                    .observer
1307                    .latch_error(format!("server returned status {status:#06x}: {message}"));
1308                std::task::Poll::Ready(Some(Err(RpcError::ServerError { status, message })))
1309            }
1310            std::task::Poll::Ready(None) => {
1311                self.done = true;
1312                std::task::Poll::Ready(None)
1313            }
1314            std::task::Poll::Pending => std::task::Poll::Pending,
1315        }
1316    }
1317}
1318
1319/// Caller-side handle for a duplex RPC. Combines a `DuplexSink`
1320/// (upload) and `DuplexStream` (download). For application code
1321/// that wants to encode requests in one task and decode responses
1322/// in another, use [`Self::into_split`] to peel off the two halves.
1323///
1324/// Bidi streaming plan (Phase D).
1325pub struct DuplexCallRaw {
1326    sink: DuplexSink,
1327    stream: DuplexStream,
1328}
1329
1330impl DuplexCallRaw {
1331    /// Server-assigned `call_id`.
1332    pub fn call_id(&self) -> u64 {
1333        self.sink.call_id()
1334    }
1335
1336    /// Whether the upload side is flow-controlled.
1337    pub fn flow_controlled(&self) -> bool {
1338        self.sink.flow_controlled()
1339    }
1340
1341    /// Push one body chunk to the server. Delegates to the inner
1342    /// `DuplexSink::send`.
1343    pub async fn send(&mut self, body: Bytes) -> Result<(), RpcError> {
1344        self.sink.send(body).await
1345    }
1346
1347    /// Close the upload direction. Delegates to the inner
1348    /// `DuplexSink::finish_sending` but keeps the receive side
1349    /// alive so the caller can keep polling response chunks.
1350    ///
1351    /// NOTE: consumes the sink half but not the stream half.
1352    /// Internally, we replace `self.sink` with a no-op
1353    /// placeholder so subsequent send() / finish_sending()
1354    /// surface a clear error (`send() after finish_sending()`).
1355    pub async fn finish_sending(&mut self) -> Result<(), RpcError> {
1356        // Take the sink out by swapping in a placeholder whose
1357        // state is `Done` so subsequent sends error cleanly.
1358        let placeholder = DuplexSink {
1359            inner: Arc::clone(&self.sink.inner),
1360            service: String::new(),
1361            initial_headers: Vec::new(),
1362            initial_flags: 0,
1363            deadline_ns: 0,
1364            credit_sem: None,
1365            grant_pump: None,
1366            state: ClientStreamState::Done,
1367        };
1368        let sink = std::mem::replace(&mut self.sink, placeholder);
1369        sink.finish_sending().await
1370    }
1371
1372    /// Pull the next response chunk. `None` on terminal Ok;
1373    /// `Some(Err)` then `None` on terminal non-Ok. Same shape as
1374    /// `futures::StreamExt::next`.
1375    pub async fn next(&mut self) -> Option<Result<Bytes, RpcError>> {
1376        use futures::StreamExt;
1377        self.stream.next().await
1378    }
1379
1380    /// Split into independent send / receive halves. Both halves
1381    /// hold an `Arc<DuplexInner>`; CANCEL fires only when BOTH
1382    /// halves drop without a clean close.
1383    pub fn into_split(self) -> (DuplexSink, DuplexStream) {
1384        (self.sink, self.stream)
1385    }
1386}
1387
1388impl futures::Stream for DuplexCallRaw {
1389    type Item = Result<Bytes, RpcError>;
1390
1391    fn poll_next(
1392        mut self: std::pin::Pin<&mut Self>,
1393        cx: &mut std::task::Context<'_>,
1394    ) -> std::task::Poll<Option<Self::Item>> {
1395        std::pin::Pin::new(&mut self.stream).poll_next(cx)
1396    }
1397}
1398
1399// ============================================================================
1400// Unary call: CANCEL-on-drop guard.
1401// ============================================================================
1402
1403/// RAII guard that fires CANCEL to the server if the unary call
1404/// future is dropped before a response arrives. Without this, a
1405/// `select!`-loser future (e.g. hedge runner-up) would leave the
1406/// server-side handler running to completion — wasting CPU on a
1407/// reply nobody will read.
1408///
1409/// The guard is built *after* the REQUEST has been successfully
1410/// published — if the publish fails, no guard is constructed and
1411/// no CANCEL is sent. On the success path the call function flips
1412/// `completed = true` so Drop becomes a no-op (the server already
1413/// finished and removed its in-flight entry).
1414struct UnaryCallGuard {
1415    pending: Arc<super::cortex::RpcClientPending>,
1416    mesh: Arc<MeshNode>,
1417    target_node_id: u64,
1418    request_channel: ChannelName,
1419    self_origin: u64,
1420    call_id: u64,
1421    /// True after the call resolved Ok or got a definitive
1422    /// non-cancellable Err. Drop checks this — `false` fires
1423    /// CANCEL, `true` is a no-op (still removes the pending
1424    /// entry).
1425    completed: bool,
1426}
1427
1428impl Drop for UnaryCallGuard {
1429    fn drop(&mut self) {
1430        self.pending.cancel(self.call_id);
1431        if !self.completed {
1432            spawn_cancel_publish(
1433                Arc::clone(&self.mesh),
1434                self.target_node_id,
1435                self.request_channel.clone(),
1436                self.self_origin,
1437                self.call_id,
1438            );
1439        }
1440    }
1441}
1442
1443// ============================================================================
1444// Streaming/duplex observer-fire bookkeeping.
1445//
1446// The unary `MeshNode::call` fires `RpcObserver::on_call` at each
1447// terminal return path (see line ~2306). The streaming /
1448// client-streaming / duplex paths have multiple terminal points
1449// (poll_next sees End / Error; finish() returns; Drop without
1450// terminal observation). To avoid sprinkling `fire_rpc_observer_outbound`
1451// at every terminal site, each handle holds a
1452// `StreamingObserverState` that latches the terminal status on
1453// observation and fires exactly once on Drop. The Deck NRPC tab
1454// + every consumer of `RpcObserver` get one event per streaming
1455// / duplex call, same as for unary today.
1456// ============================================================================
1457
1458/// Per-call observer-fire bookkeeping shared between the
1459/// streaming + client-streaming + duplex caller-side handles.
1460/// Latches terminal status on observation; `fire()` (called from
1461/// the handle's Drop) emits one `RpcCallEvent` with the latched
1462/// status (or `Canceled` if nothing latched — i.e. the handle
1463/// was dropped before observing its terminator).
1464///
1465/// Status discriminator:
1466///   0 = none latched (Drop → Canceled)
1467///   1 = Ok
1468///   2 = Error (message in `observer_msg`)
1469///   3 = Timeout
1470pub(crate) struct StreamingObserverState {
1471    mesh: Arc<MeshNode>,
1472    target_node_id: u64,
1473    service: String,
1474    started: Instant,
1475    request_bytes: AtomicU32,
1476    response_bytes: AtomicU32,
1477    observer_status: AtomicU8,
1478    observer_msg: parking_lot::Mutex<Option<String>>,
1479    fired: AtomicBool,
1480}
1481
1482impl StreamingObserverState {
1483    pub(crate) fn new(
1484        mesh: Arc<MeshNode>,
1485        target_node_id: u64,
1486        service: impl Into<String>,
1487        request_bytes: u32,
1488    ) -> Self {
1489        Self {
1490            mesh,
1491            target_node_id,
1492            service: service.into(),
1493            started: Instant::now(),
1494            request_bytes: AtomicU32::new(request_bytes),
1495            response_bytes: AtomicU32::new(0),
1496            observer_status: AtomicU8::new(0),
1497            observer_msg: parking_lot::Mutex::new(None),
1498            fired: AtomicBool::new(false),
1499        }
1500    }
1501
1502    pub(crate) fn add_request_bytes(&self, n: u32) {
1503        self.request_bytes.fetch_add(n, Ordering::Relaxed);
1504    }
1505
1506    pub(crate) fn add_response_bytes(&self, n: u32) {
1507        self.response_bytes.fetch_add(n, Ordering::Relaxed);
1508    }
1509
1510    pub(crate) fn latch_ok(&self) {
1511        self.observer_status.store(1, Ordering::Relaxed);
1512    }
1513
1514    pub(crate) fn latch_error(&self, msg: impl Into<String>) {
1515        *self.observer_msg.lock() = Some(msg.into());
1516        self.observer_status.store(2, Ordering::Relaxed);
1517    }
1518
1519    pub(crate) fn latch_timeout(&self) {
1520        self.observer_status.store(3, Ordering::Relaxed);
1521    }
1522
1523    /// Fire the observer event. Idempotent — only the first call
1524    /// actually emits; subsequent are no-ops. Called from each
1525    /// streaming handle's Drop.
1526    pub(crate) fn fire(&self) {
1527        if self.fired.swap(true, Ordering::SeqCst) {
1528            return;
1529        }
1530        let status_code = self.observer_status.load(Ordering::Relaxed);
1531        let status = match status_code {
1532            1 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
1533            2 => {
1534                let msg = self.observer_msg.lock().clone().unwrap_or_default();
1535                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(msg)
1536            }
1537            3 => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
1538            _ => crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1539        };
1540        self.mesh.fire_rpc_observer_outbound(
1541            self.target_node_id,
1542            &self.service,
1543            self.started.elapsed().as_millis() as u32,
1544            status,
1545            self.request_bytes.load(Ordering::Relaxed),
1546            self.response_bytes.load(Ordering::Relaxed),
1547        );
1548    }
1549}
1550
1551/// Per-call cap on in-flight request-direction credits. Tokio's
1552/// `Semaphore::MAX_PERMITS` is `usize::MAX >> 3`; we cap the
1553/// caller-side accumulator at this value so a misbehaving server
1554/// can't make the caller hold an unbounded outstanding window.
1555/// 1M credits is already orders of magnitude beyond any sane
1556/// request burst — a caller sitting on 1M unconsumed credits is
1557/// either misconfigured or under attack.
1558const REQUEST_GRANT_PER_CALL_CAP: usize = 1_000_000;
1559
1560/// Add `credits` to a caller-side request-direction credit
1561/// semaphore, capped so the accumulator never exceeds
1562/// [`REQUEST_GRANT_PER_CALL_CAP`]. Per-frame cap of `usize::MAX >> 4`
1563/// remains as a second line of defense against pathological frame
1564/// values.
1565fn add_request_grant_credits(sem: &tokio::sync::Semaphore, credits: u32) {
1566    if credits == 0 {
1567        return;
1568    }
1569    let current = sem.available_permits();
1570    let remaining = REQUEST_GRANT_PER_CALL_CAP.saturating_sub(current);
1571    let safe = (credits as usize).min(usize::MAX >> 4).min(remaining);
1572    if safe > 0 {
1573        sem.add_permits(safe);
1574    }
1575}
1576
1577/// Build a coalescing REQUEST_GRANT emitter.
1578///
1579/// Naive emitters `tokio::spawn` one publish task per consumed
1580/// chunk, which becomes a spawn-storm + AEAD-storm under bursting.
1581/// This helper hands back an emitter that pushes `(caller_origin,
1582/// call_id, credits)` into an unbounded mpsc; a single dedicated
1583/// drainer task `try_recv`s the queue to drain whatever is
1584/// immediately available, coalesces credits per call_id, and
1585/// publishes ONE batched REQUEST_GRANT per call per drain cycle.
1586///
1587/// Lifecycle: the drainer task lives as long as any clone of the
1588/// returned emitter (mpsc sender count > 0). When the fold and all
1589/// in-flight handlers release the emitter, `rx.recv` returns `None`
1590/// and the drainer exits naturally.
1591fn build_request_grant_emitter(
1592    mesh: Arc<MeshNode>,
1593    service: String,
1594    server_origin: u64,
1595    diag_tag: &'static str,
1596) -> RpcRequestGrantEmitter {
1597    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(u64, u64, u32)>();
1598    tokio::spawn(async move {
1599        while let Some(first) = rx.recv().await {
1600            let mut summed: std::collections::HashMap<(u64, u64), u32> =
1601                std::collections::HashMap::new();
1602            let (caller, call_id, credits) = first;
1603            summed.insert((caller, call_id), credits);
1604            // Coalesce anything immediately queued behind the first
1605            // wake. Bounded by what the substrate has produced so
1606            // far; doesn't add latency since `try_recv` returns
1607            // immediately when the queue is empty.
1608            while let Ok((caller, call_id, credits)) = rx.try_recv() {
1609                let entry = summed.entry((caller, call_id)).or_insert(0);
1610                *entry = entry.saturating_add(credits);
1611            }
1612            for ((caller, call_id), credits) in summed {
1613                let reply_channel_name = format!("{service}.replies.{caller:016x}");
1614                let reply_channel = match ChannelName::new(&reply_channel_name) {
1615                    Ok(c) => c,
1616                    Err(e) => {
1617                        tracing::warn!(
1618                            error = %e,
1619                            channel = %reply_channel_name,
1620                            tag = diag_tag,
1621                            "rpc grant drainer: invalid reply channel name");
1622                        continue;
1623                    }
1624                };
1625                let meta = EventMeta::new(DISPATCH_RPC_REQUEST_GRANT, 0, server_origin, call_id, 0);
1626                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 12);
1627                buf.extend_from_slice(&meta.to_bytes());
1628                buf.extend_from_slice(&encode_request_grant(call_id, credits));
1629                let publisher = ChannelPublisher::new(reply_channel, PublishConfig::default());
1630                if let Err(e) = mesh.publish(&publisher, Bytes::from(buf)).await {
1631                    tracing::warn!(
1632                        error = %e,
1633                        caller_origin = format!("{:#x}", caller),
1634                        call_id,
1635                        tag = diag_tag,
1636                        "rpc grant drainer: REQUEST_GRANT publish failed");
1637                }
1638            }
1639        }
1640    });
1641    Arc::new(move |caller_origin, call_id, credits| {
1642        // Send failure means the drainer has exited (all sender
1643        // clones dropped, then we somehow cloned a stale one).
1644        // Treat as a no-op — the call is tearing down anyway.
1645        let _ = tx.send((caller_origin, call_id, credits));
1646    })
1647}
1648
1649/// Per-service map from a caller's `origin_hash` (wire field) to the
1650/// AEAD-verified `from_node` of the session that delivered their
1651/// inbound REQUEST. Populated by the serve_rpc bridge tasks at
1652/// REQUEST-receipt time; consulted by [`publish_response_to_caller`]
1653/// to skip the roster fan-out on the response leg.
1654///
1655/// Lives per `serve_rpc*` registration rather than mesh-wide because
1656/// the source-of-truth `MeshNode::origin_hash_to_node` is only safe
1657/// to populate from *signed* capability announcements — populating
1658/// it from unsigned wire `origin_hash` fields would let any session
1659/// peer pre-claim arbitrary origins. This map is bridge-local and
1660/// only used by the matching service's response emit, so a malicious
1661/// peer can at most misdirect responses for THEIR own request — they
1662/// already could.
1663///
1664/// **Bounded** ([`OriginKeyedLru`]): the key is the wire-claimed
1665/// `origin_hash` and the bridge inserts it *before* the capability gate,
1666/// so an unbounded map would let one authed peer spray distinct origins and
1667/// amplify server memory. The LRU caps the footprint; eviction costs only a
1668/// response-path cache miss (roster fallback), never correctness.
1669type RpcOriginNodeCache = Arc<OriginKeyedLru<u64>>;
1670
1671/// Capacity bound for the per-`serve_rpc` caller-keyed caches
1672/// ([`RpcOriginNodeCache`] and the §8b reply-channel cache). Sized for the
1673/// legitimate active-caller working set of a single service; well past it the
1674/// LRU evicts cold origins rather than growing without limit under a
1675/// crafted-origin flood. Each entry is tiny (a `u64` and, for the reply
1676/// cache, an `Arc<str>` channel name), so the whole bound is a few hundred KB
1677/// per service.
1678const RPC_CALLER_CACHE_CAP: usize = 4096;
1679
1680/// Non-zero form of [`RPC_CALLER_CACHE_CAP`], validated at compile time so
1681/// `OriginKeyedLru::new` carries no runtime `unwrap`/`expect`. A zero cap
1682/// would fail the build here rather than panic at startup.
1683const RPC_CALLER_CACHE_CAP_NZ: std::num::NonZeroUsize =
1684    match std::num::NonZeroUsize::new(RPC_CALLER_CACHE_CAP) {
1685        Some(n) => n,
1686        None => panic!("RPC_CALLER_CACHE_CAP must be non-zero"),
1687    };
1688
1689/// Thread-safe, bounded LRU keyed by the wire-claimed caller `origin_hash`.
1690///
1691/// Backs both [`RpcOriginNodeCache`] and the §8b reply-channel cache. Wraps
1692/// `lru::LruCache` (which needs `&mut` even to read, to bump the entry to
1693/// most-recently-used) in a `parking_lot::Mutex`. The per-response lock is
1694/// uncontended in the common case — one fold drives a given service — and is
1695/// far cheaper than the `format!` + `ChannelName` allocation / roster fan-out
1696/// the caches exist to avoid. Eviction is always safe: a miss just recomputes
1697/// the value (channel name) or falls back to the roster lookup.
1698struct OriginKeyedLru<V>(Mutex<lru::LruCache<u64, V>>);
1699
1700impl<V: Clone> OriginKeyedLru<V> {
1701    fn new() -> Self {
1702        Self(Mutex::new(lru::LruCache::new(RPC_CALLER_CACHE_CAP_NZ)))
1703    }
1704
1705    /// Look up `origin`, promoting it to most-recently-used on a hit.
1706    fn get(&self, origin: u64) -> Option<V> {
1707        self.0.lock().get(&origin).cloned()
1708    }
1709
1710    /// Insert / refresh `origin`, evicting the least-recently-used entry
1711    /// when at capacity.
1712    fn insert(&self, origin: u64, value: V) {
1713        self.0.lock().put(origin, value);
1714    }
1715}
1716
1717/// Direct-send a built RESPONSE (or streaming chunk) packet to the
1718/// caller's reply channel, bypassing the roster fan-out path
1719/// [`MeshNode::publish`] uses.
1720///
1721/// **Fast path:** when the bridge has cached the caller's
1722/// `from_node` (i.e. the server processed an inbound REQUEST from
1723/// this caller via an AEAD-authenticated session), or when the
1724/// caller's capability announcement has reached us, the response
1725/// rides `publish_to_peer` — one DashMap lookup instead of roster
1726/// lookup + ACL check + subnet filter + per-recipient `Vec<Bytes>`
1727/// allocation.
1728///
1729/// **Fallback:** when neither lookup resolves — pathological cases
1730/// like a test harness where the caller never announces and the
1731/// bridge cache is empty — fall back to [`MeshNode::publish`] via
1732/// the roster, matching the pre-T1.2 behavior verbatim.
1733/// PERF_AUDIT §3.10 — accepts pre-computed
1734/// `reply_channel_hash` and `reply_stream_id` so the per-response
1735/// path doesn't re-run `ChannelId::new` + xxh3 + `publish_stream_id`
1736/// on every send. The emit closure's `OriginKeyedLru<CachedReplyChannel>`
1737/// caches the triple per caller_origin.
1738async fn publish_response_to_caller(
1739    mesh: &MeshNode,
1740    caller_origin: u64,
1741    target_hint: Option<u64>,
1742    reply_channel: &ChannelName,
1743    reply_channel_hash: ChannelHash,
1744    reply_stream_id: u64,
1745    payload: Bytes,
1746) -> Result<(), AdapterError> {
1747    let resolved = target_hint.or_else(|| mesh.get_node_by_origin_hash(caller_origin));
1748    if let Some(target_node_id) = resolved {
1749        return mesh
1750            .publish_to_peer(
1751                target_node_id,
1752                reply_channel_hash,
1753                reply_stream_id,
1754                /* reliable */ true,
1755                std::slice::from_ref(&payload),
1756            )
1757            .await;
1758    }
1759    // Fallback: roster fan-out. Reached when the caller's origin is
1760    // unknown to both the bridge cache AND the global reverse index.
1761    let publisher = ChannelPublisher::new(reply_channel.clone(), PublishConfig::default());
1762    mesh.publish(&publisher, payload).await.map(|_| ())
1763}
1764
1765/// Shared CANCEL-publish helper: spawn a task that fires a
1766/// CANCEL event for `call_id` to `target` on the request channel.
1767/// Both [`RpcStream::Drop`] and [`UnaryCallGuard::Drop`] use it.
1768fn spawn_cancel_publish(
1769    mesh: Arc<MeshNode>,
1770    target: u64,
1771    request_channel: ChannelName,
1772    self_origin: u64,
1773    call_id: u64,
1774) {
1775    tokio::spawn(async move {
1776        let meta = EventMeta::new(DISPATCH_RPC_CANCEL, 0, self_origin, call_id, 0);
1777        let request_channel_id = ChannelId::new(request_channel);
1778        let request_channel_hash = request_channel_id.hash();
1779        let stream_id = MeshNode::publish_stream_id(&request_channel_id);
1780        let payload = Bytes::from(meta.to_bytes().to_vec());
1781        let _ = mesh
1782            .publish_to_peer(
1783                target,
1784                request_channel_hash,
1785                stream_id,
1786                /* reliable */ true,
1787                std::slice::from_ref(&payload),
1788            )
1789            .await;
1790    });
1791}
1792
1793/// Type alias for the keep-alive sender that streaming-call handles
1794/// store. Its purpose is *only* to signal "stream done" when the
1795/// handle drops: the cancel-watcher task `select!`s on the matching
1796/// receiver, and dropping the sender (which happens on handle Drop)
1797/// resolves the receiver with an `Err` so the watcher exits cleanly.
1798///
1799/// `()` payload because the signal IS the resolution; no data is
1800/// transmitted.
1801type StreamCancelKeepAlive = tokio::sync::oneshot::Sender<()>;
1802
1803/// Spawn a cancel-watcher task for a streaming call (call_streaming,
1804/// call_client_stream, call_duplex). The watcher races
1805/// `cancel_notify.notified()` against the keep-alive oneshot — first
1806/// to fire wins. On cancel, the watcher drops the pending-streaming
1807/// entry (which closes the receiver's mpsc, letting the stream's
1808/// poll_next observe EOF), then releases the registry entry. On
1809/// handle Drop, the keep-alive sender drops, the oneshot resolves
1810/// `Err`, and the watcher exits via the done arm with a registry
1811/// release.
1812///
1813/// When `cancel_token == 0` (the "no token" sentinel), this is a
1814/// no-op: the returned sender is a placeholder whose drop has no
1815/// observable effect, and no task is spawned. Lets the streaming
1816/// call shapes always store a keep-alive on the returned handle
1817/// without branching on whether a token was set.
1818fn spawn_stream_cancel_watcher(
1819    cancel_notify: Arc<tokio::sync::Notify>,
1820    cancel_token: u64,
1821    cancel_registry: Arc<crate::adapter::net::cancel_registry::CancelRegistry>,
1822    pending: Arc<crate::adapter::net::cortex::RpcClientPending>,
1823    call_id: u64,
1824) -> StreamCancelKeepAlive {
1825    let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1826    if cancel_token == 0 {
1827        // No-op fast path. The returned sender is held by the
1828        // handle but never paired with a watcher, so its eventual
1829        // drop has no effect. Avoids spawning a task per
1830        // cancel-less stream.
1831        return done_tx;
1832    }
1833    tokio::spawn(async move {
1834        tokio::select! {
1835            biased;
1836            _ = cancel_notify.notified() => {
1837                // Cancel fired. Drop the pending-stream entry so
1838                // the receiver's mpsc closes (causing the stream's
1839                // poll_next to observe EOF via Ready(None)). The
1840                // handle's Drop will then fire CANCEL on the wire
1841                // via its existing per-shape Drop impl.
1842                pending.cancel(call_id);
1843                cancel_registry.release(cancel_token);
1844            }
1845            _ = done_rx => {
1846                // Stream completed normally — sender dropped on
1847                // handle Drop, recv returns Err. Just release the
1848                // registry entry; no CANCEL emission needed (the
1849                // handle's Drop handles that path itself if it
1850                // wasn't a clean close).
1851                cancel_registry.release(cancel_token);
1852            }
1853        }
1854    });
1855    done_tx
1856}
1857
1858/// One-call helper that registers a cancel-notify against the
1859/// caller's `opts.cancel_token` and spawns the stream cancel
1860/// watcher. Used by every streaming call shape (`call_streaming`,
1861/// `call_client_stream`, `call_duplex`) to keep their bodies free
1862/// of the three-step token/notify/spawn boilerplate.
1863///
1864/// When `opts.cancel_token` is `None` (or `Some(0)`), this is the
1865/// same no-op fast path as [`spawn_stream_cancel_watcher`].
1866fn arm_stream_cancel(
1867    mesh: &Arc<MeshNode>,
1868    opts: &CallOptions,
1869    pending: &Arc<crate::adapter::net::cortex::RpcClientPending>,
1870    call_id: u64,
1871) -> StreamCancelKeepAlive {
1872    let cancel_token = opts.cancel_token.unwrap_or(0);
1873    let cancel_notify = mesh.cancel_registry().register_notify(cancel_token);
1874    spawn_stream_cancel_watcher(
1875        cancel_notify,
1876        cancel_token,
1877        Arc::clone(mesh.cancel_registry()),
1878        Arc::clone(pending),
1879        call_id,
1880    )
1881}
1882
1883/// Side-effects + return value for the unary `call`'s cancel
1884/// branch. Releases the registry entry, records the Transport
1885/// outcome on the metrics guard, fires the Canceled observer
1886/// event, and returns `RpcError::Cancelled`. Both the
1887/// no-deadline and with-deadline `select!` arms invoke this so a
1888/// shape change to the cancel outcome (extra metric, new field
1889/// on the observer event) lands in exactly one place.
1890fn fire_unary_cancel_outcome(
1891    mesh: &Arc<MeshNode>,
1892    metrics_guard: &mut crate::adapter::net::mesh_rpc_metrics::CallMetricsGuard,
1893    cancel_token: u64,
1894    target_node_id: u64,
1895    service: &str,
1896    started_total: Instant,
1897    request_bytes_len: u32,
1898) -> RpcError {
1899    mesh.cancel_registry().release(cancel_token);
1900    metrics_guard.record(crate::adapter::net::mesh_rpc_metrics::CallOutcome::Transport);
1901    mesh.fire_rpc_observer_outbound(
1902        target_node_id,
1903        service,
1904        started_total.elapsed().as_millis() as u32,
1905        crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Canceled,
1906        request_bytes_len,
1907        0,
1908    );
1909    RpcError::Cancelled
1910}
1911
1912// ============================================================================
1913// MeshNode extensions.
1914// ============================================================================
1915
1916impl MeshNode {
1917    /// Register an nRPC handler for `service` on this node.
1918    ///
1919    /// Subscribes this node to `<service>.requests` (so the local
1920    /// `register_rpc_inbound` dispatcher feeds inbound REQUEST
1921    /// events into the [`RpcServerFold`]) and wires the fold's
1922    /// RESPONSE-emit callback to publish on
1923    /// `<service>.replies.<caller_origin>` via the existing
1924    /// pub/sub path.
1925    ///
1926    /// **Local-only registration** (Phase 1). Multi-instance
1927    /// services that load-balance via `SubscriptionMode::QueueGroup`
1928    /// require each replica to call `serve_rpc` on its own node;
1929    /// the mesh-level subscriber roster + `dispatch_recipients`
1930    /// then routes one-of-N as designed. Each replica's local
1931    /// `serve_rpc` must use the same service name (which becomes
1932    /// the queue-group identifier).
1933    ///
1934    /// Returns a [`ServeHandle`] whose Drop tears down the
1935    /// registration. Concurrent registrations for the same service
1936    /// on one node return `Err(ServeError::AlreadyServing)`.
1937    pub fn serve_rpc<H: RpcHandler>(
1938        self: &Arc<Self>,
1939        service: &str,
1940        handler: Arc<H>,
1941    ) -> Result<ServeHandle, ServeError> {
1942        let request_channel = ChannelName::new(&format!("{service}.requests"))
1943            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
1944        let channel_hash = request_channel.hash();
1945
1946        // Bridge: a tokio mpsc the inbound dispatcher pushes into.
1947        // The bridge task drains it and runs each event through
1948        // the fold. Bounded so a runaway publisher can't OOM the
1949        // server; over-cap pushes drop the inbound event (which
1950        // surfaces to the caller as a timeout).
1951        let (tx, mut rx) = mpsc::channel::<RpcInboundEvent>(1024);
1952
1953        // T1.2 cache: maps each caller's wire `origin_hash` to the
1954        // AEAD-verified `from_node` of the session that delivered
1955        // its REQUEST. Populated by the bridge below; consumed by
1956        // the emit closure so [`publish_response_to_caller`] can
1957        // skip the roster fan-out on the response leg.
1958        let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
1959
1960        // Build the emit closure. When the handler completes, the
1961        // fold calls this (synchronously) with `(caller_origin, call_id,
1962        // response)`. §8a: instead of `tokio::spawn`ing a task per response,
1963        // the closure builds the wire payload (cheap, no await) and hands a
1964        // job to a single per-service response drainer task (below), which
1965        // does the `.await` publish. A `tokio::spawn` per response cost
1966        // ~1–2 µs of scheduling on a wake-bound path; a channel send is a
1967        // fraction of that, and the drainer amortizes the wakeup.
1968        let service_for_emit = service.to_string();
1969        let server_origin = self.identity_origin_hash();
1970        let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
1971        // §8b reply-channel cache: the reply channel name is
1972        // `<service>.replies.<caller_origin:016x>` — deterministic from
1973        // `(service, caller_origin)`, and `service` is fixed for this
1974        // `serve_rpc`, so it varies only by `caller_origin`. `ChannelName` is
1975        // `Arc<str>`, so a cache hit is an Arc bump; this removes the per-
1976        // response `format!` String + `ChannelName::new` (`Arc<str>`) allocation
1977        // (and the per-call `service.clone()`) the emit closure used to pay on
1978        // every response. Keyed by the wire-claimed `caller_origin` and so
1979        // bounded the same way as `origin_node_cache` above — an
1980        // `OriginKeyedLru`, not an unbounded map, so a crafted-origin flood
1981        // can't amplify server memory (a miss just rebuilds the name).
1982        // PERF_AUDIT §3.10 — cache the triple (name, hash, stream_id)
1983        // per caller_origin so the per-response drainer doesn't
1984        // recompute xxh3 + publish_stream_id on every send.
1985        let reply_channel_cache: Arc<OriginKeyedLru<CachedReplyChannel>> =
1986            Arc::new(OriginKeyedLru::new());
1987        // §8a response drainer channel. Bounded like the inbound channel; a
1988        // full channel means the drainer can't keep up, so we drop (the
1989        // caller times out) rather than block the fold.
1990        let (resp_tx, mut resp_rx) = mpsc::channel::<RpcResponseJob>(1024);
1991        let emit: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
1992            let target_hint = origin_node_cache_for_emit.get(caller_origin);
1993            // Resolve the reply channel from cache (Arc bump on hit; one
1994            // `format!` + `ChannelName::new` the first time we see a caller).
1995            let cached = match reply_channel_cache.get(caller_origin) {
1996                Some(c) => c,
1997                None => {
1998                    let name = format!("{service_for_emit}.replies.{caller_origin:016x}");
1999                    match ChannelName::new(&name) {
2000                        Ok(channel_name) => {
2001                            // Compute hash + stream_id ONCE per caller_origin
2002                            // and stash them alongside the name.
2003                            let channel_id = ChannelId::new(channel_name.clone());
2004                            let triple = CachedReplyChannel {
2005                                hash: channel_id.hash(),
2006                                stream_id: MeshNode::publish_stream_id(&channel_id),
2007                                name: channel_name,
2008                            };
2009                            reply_channel_cache.insert(caller_origin, triple.clone());
2010                            triple
2011                        }
2012                        Err(e) => {
2013                            tracing::warn!(error = %e, channel = %name,
2014                                "rpc serve_rpc: invalid reply channel name");
2015                            return;
2016                        }
2017                    }
2018                }
2019            };
2020            // Build the RESPONSE event envelope (24-byte meta + encoded
2021            // payload) synchronously — pure CPU, no await — then hand it to
2022            // the drainer.
2023            let meta = EventMeta::new(
2024                super::cortex::DISPATCH_RPC_RESPONSE,
2025                0,
2026                server_origin,
2027                call_id,
2028                0,
2029            );
2030            let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2031            buf.extend_from_slice(&meta.to_bytes());
2032            resp.encode_into(&mut buf);
2033            if resp_tx
2034                .try_send(RpcResponseJob {
2035                    caller_origin,
2036                    call_id,
2037                    target_hint,
2038                    reply_channel: cached.name,
2039                    reply_channel_hash: cached.hash,
2040                    reply_stream_id: cached.stream_id,
2041                    payload: Bytes::from(buf),
2042                })
2043                .is_err()
2044            {
2045                tracing::debug!(
2046                    caller_origin = format!("{:#x}", caller_origin),
2047                    call_id,
2048                    "rpc serve_rpc: response drainer at capacity; dropping response"
2049                );
2050            }
2051        });
2052
2053        // Build the server fold and wrap it in an Arc<Mutex<...>>
2054        // so the bridge task can drive it (the trait takes
2055        // `&mut self`). Attach the per-service metrics handle so
2056        // the spawned handler tasks bump server-side counters.
2057        let metrics_handle = self.rpc_metrics_arc().for_service(service);
2058        // Keep a clone of the emit closure for the callee-side
2059        // capability-auth defense-in-depth path in the bridge
2060        // below — the fold owns its own clone, this one only
2061        // emits the `CapabilityDenied` rejection before the fold
2062        // sees the event.
2063        let emit_for_bridge = Arc::clone(&emit);
2064        // Clone the per-service metrics handle so the bridge can
2065        // bump `capability_denied_total` on gate rejection. The
2066        // fold's own clone (passed via `with_metrics`) handles the
2067        // handler-side counters; this one covers the path BEFORE
2068        // the handler runs, which the fold-side metrics never see.
2069        let metrics_for_bridge = Arc::clone(&metrics_handle);
2070        let fold = Arc::new(Mutex::new(
2071            RpcServerFold::new(handler as Arc<dyn RpcHandler>, emit).with_metrics(metrics_handle),
2072        ));
2073
2074        // Register the inbound dispatcher. Push into the mpsc;
2075        // the bridge task does the actual fold work.
2076        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2077            // Best-effort send — over-cap means the bridge can't
2078            // keep up; drop and let the caller time out. Logging
2079            // here would spam.
2080            let _ = tx.try_send(ev);
2081        });
2082        // Register the service in `rpc_local_services` and refresh
2083        // the self-indexed announcement BEFORE installing the
2084        // dispatcher so the callee-side gate (in the bridge below)
2085        // sees a self-announcement carrying `nrpc:<service>` the
2086        // moment the first inbound event lands. Without this, the
2087        // gate was either silently permissive (no self-ann) or
2088        // silently denying (self-ann from a prior
2089        // `announce_capabilities` that pre-dated this service's
2090        // registration). See `docs/misc/CODE_REVIEW_2026_05_19_CAPABILITY_AUTH.md`
2091        // H1 + H2.
2092        self.rpc_local_services_arc().insert(service.to_string());
2093        self.index_self_with_local_services();
2094
2095        if self
2096            .register_rpc_inbound(channel_hash, dispatcher)
2097            .is_some()
2098        {
2099            return Err(ServeError::AlreadyServing(service.to_string()));
2100        }
2101
2102        // Spawn the bridge task. It reads inbound events, runs
2103        // the v0.4 capability-auth callee-side gate (defense in
2104        // depth — the caller-side gate inside `call_service`
2105        // covers the well-behaved client path), and on accept
2106        // feeds them to the fold.
2107        let mesh_for_bridge = Arc::clone(self);
2108        let service_for_bridge = service.to_string();
2109        let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2110        let bridge = tokio::spawn(async move {
2111            let tag = format!("nrpc:{}", service_for_bridge);
2112            use crate::adapter::net::behavior::fold::capability_bridge;
2113            while let Some(inbound) = rx.recv().await {
2114                // T1.2 cache populate. `from_node` is the
2115                // AEAD-verified session peer; `origin_hash` is the
2116                // wire-claimed entity (untrusted on its own but
2117                // bound here to a session peer we just authed).
2118                // `from_node == 0` is the loopback/test sentinel —
2119                // skip the insert so the response path falls back
2120                // to the roster lookup instead of trying to send to
2121                // node 0.
2122                if inbound.from_node != 0 {
2123                    origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2124                }
2125                // Defense-in-depth check. Skip only when the wire
2126                // session resolved no NodeId (`from_node == 0` is
2127                // the loopback / test sentinel per
2128                // `RpcInboundEvent::from_node` — production wire
2129                // delivery drops events that fail NodeId
2130                // resolution rather than passing 0). The cold-
2131                // start "no self-ann" skip the original
2132                // implementation carried was a permissive hole;
2133                // `index_self_with_local_services` above
2134                // guarantees a self-ann exists before the
2135                // dispatcher is wired, so denying when the gate
2136                // says no is now the safe failure mode.
2137                let self_node = mesh_for_bridge.node_id();
2138                let from_node = inbound.from_node;
2139                if from_node != 0
2140                    && !capability_bridge::may_execute(
2141                        mesh_for_bridge.capability_fold(),
2142                        self_node,
2143                        &tag,
2144                        from_node,
2145                    )
2146                {
2147                    // Decode the EventMeta so we can address the
2148                    // caller's reply channel (keyed on
2149                    // `caller_origin`) and tag the response with
2150                    // the correct `call_id`. A garbled meta means
2151                    // the request would have been rejected by the
2152                    // fold's own decode path too; drop silently
2153                    // to match the existing skip-on-malformed
2154                    // behavior there.
2155                    let Some(meta) = (if inbound.payload.len() >= EVENT_META_SIZE {
2156                        EventMeta::from_bytes(&inbound.payload[..EVENT_META_SIZE])
2157                    } else {
2158                        None
2159                    }) else {
2160                        continue;
2161                    };
2162                    let resp = super::cortex::RpcResponsePayload {
2163                        status: RpcStatus::CapabilityDenied,
2164                        headers: vec![],
2165                        body: Bytes::from(format!(
2166                            "callee-side capability-auth gate denied nrpc:{}",
2167                            service_for_bridge
2168                        )),
2169                    };
2170                    // Server-side metrics: bump `capability_denied_total`
2171                    // on the per-service counter. The fold-side
2172                    // metrics never see this path (the handler isn't
2173                    // invoked), so without this bump a noisy
2174                    // unauthorized caller is invisible to operators
2175                    // watching `nrpc_handler_invocations_total` —
2176                    // the dashboard sees "0 requests" while the
2177                    // caller sees `CapabilityDenied`.
2178                    metrics_for_bridge
2179                        .capability_denied_total
2180                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2181                    (emit_for_bridge)(meta.origin_hash, meta.seq_or_ts, resp);
2182                    continue;
2183                }
2184                let payload = inbound.payload;
2185                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2186                let ev = RedexEvent { entry, payload };
2187                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2188                    tracing::warn!(error = %e, "rpc serve_rpc: fold apply error");
2189                }
2190            }
2191        });
2192
2193        // §8a response drainer. Drains `resp_rx` and does the `.await`
2194        // publish that the emit closure used to `tokio::spawn` per response.
2195        // Exits on its own when `resp_tx` (held only by the emit closure,
2196        // which the fold owns) is dropped — i.e. when the bridge task ends
2197        // and the fold drops, the same teardown that stops `_bridge`.
2198        let response_drain_mesh = Arc::clone(self);
2199        let response_drain = tokio::spawn(async move {
2200            while let Some(job) = resp_rx.recv().await {
2201                if let Err(e) = publish_response_to_caller(
2202                    &response_drain_mesh,
2203                    job.caller_origin,
2204                    job.target_hint,
2205                    &job.reply_channel,
2206                    job.reply_channel_hash,
2207                    job.reply_stream_id,
2208                    job.payload,
2209                )
2210                .await
2211                {
2212                    tracing::warn!(
2213                        error = %e,
2214                        caller_origin = format!("{:#x}", job.caller_origin),
2215                        call_id = job.call_id,
2216                        "rpc serve_rpc: response publish failed"
2217                    );
2218                }
2219            }
2220        });
2221
2222        // Spawn an async re-announce so peers also learn about
2223        // the new service without the operator having to call
2224        // `announce_capabilities` manually. The local self-index
2225        // already happened above; this is purely for peer
2226        // visibility (the broadcast path also re-runs the
2227        // self-index, which is a cheap version bump).
2228        let mesh_for_announce = Arc::clone(self);
2229        let service_for_log = service.to_string();
2230        tokio::spawn(async move {
2231            let baseline = mesh_for_announce.user_caps_snapshot();
2232            if let Err(e) = mesh_for_announce.announce_capabilities(baseline).await {
2233                tracing::warn!(
2234                    error = %e,
2235                    service = %service_for_log,
2236                    "serve_rpc: auto re-announce failed",
2237                );
2238            }
2239        });
2240
2241        Ok(ServeHandle {
2242            channel_hash,
2243            service: service.to_string(),
2244            _bridge: bridge,
2245            _response_drain: Some(response_drain),
2246            mesh: Arc::clone(self),
2247        })
2248    }
2249
2250    /// Streaming variant of [`Self::serve_rpc`]. The handler
2251    /// receives an [`RpcResponseSink`](super::cortex::RpcResponseSink)
2252    /// it writes chunks to via `sink.send(body)`; returning
2253    /// `Ok(())` closes the stream cleanly, `Err(_)` closes with
2254    /// an error frame.
2255    ///
2256    /// Wire-level identical to the unary path apart from the
2257    /// per-chunk `nrpc-streaming` header markers
2258    /// (`continue` / `end`). Same auto-registration of
2259    /// `<service>.requests` + `<service>.replies.` prefix.
2260    pub fn serve_rpc_streaming<H: RpcStreamingHandler>(
2261        self: &Arc<Self>,
2262        service: &str,
2263        handler: Arc<H>,
2264    ) -> Result<ServeHandle, ServeError> {
2265        let request_channel = ChannelName::new(&format!("{service}.requests"))
2266            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2267        let channel_hash = request_channel.hash();
2268        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2269
2270        // T1.2 cache: bridge populates from inbound.from_node, emit
2271        // closure consults to skip roster fan-out. See the unary
2272        // serve_rpc above for the full rationale.
2273        let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2274
2275        let mesh_for_emit = Arc::clone(self);
2276        let service_for_emit = service.to_string();
2277        let server_origin = self.identity_origin_hash();
2278        let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2279        // Async emit so the streaming fold's pump can `.await` each
2280        // publish — guarantees per-call chunk ordering on the wire.
2281        let emit: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2282            let mesh = Arc::clone(&mesh_for_emit);
2283            let service = service_for_emit.clone();
2284            let target_hint = origin_node_cache_for_emit.get(caller_origin);
2285            Box::pin(async move {
2286                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2287                let reply_channel = match ChannelName::new(&reply_channel_name) {
2288                    Ok(c) => c,
2289                    Err(e) => {
2290                        tracing::warn!(error = %e, channel = %reply_channel_name,
2291                                "rpc serve_rpc_streaming: invalid reply channel name");
2292                        return;
2293                    }
2294                };
2295                let meta = EventMeta::new(
2296                    super::cortex::DISPATCH_RPC_RESPONSE,
2297                    0,
2298                    server_origin,
2299                    call_id,
2300                    0,
2301                );
2302                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2303                buf.extend_from_slice(&meta.to_bytes());
2304                resp.encode_into(&mut buf);
2305                // PERF_AUDIT §3.10: compute hash + stream_id at the
2306                // call site. These legacy streaming paths don't yet
2307                // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2308                // wiring them up is a follow-up — for now the
2309                // compute happens here per response, same as the
2310                // pre-fix in-function shape.
2311                let reply_channel_id = ChannelId::new(reply_channel.clone());
2312                let reply_channel_hash = reply_channel_id.hash();
2313                let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2314                if let Err(e) = publish_response_to_caller(
2315                    &mesh,
2316                    caller_origin,
2317                    target_hint,
2318                    &reply_channel,
2319                    reply_channel_hash,
2320                    reply_stream_id,
2321                    Bytes::from(buf),
2322                )
2323                .await
2324                {
2325                    tracing::warn!(error = %e,
2326                            caller_origin = format!("{:#x}", caller_origin),
2327                            call_id,
2328                            "rpc serve_rpc_streaming: chunk publish failed");
2329                }
2330            })
2331        });
2332
2333        // Attach per-service metrics so the spawned handler tasks
2334        // + pump task bump server-side counters (including the
2335        // streaming-only `streaming_chunks_emitted_total`).
2336        let metrics_handle = self.rpc_metrics_arc().for_service(service);
2337        let fold = Arc::new(Mutex::new(
2338            RpcServerStreamingFold::new(handler as Arc<dyn RpcStreamingHandler>, emit)
2339                .with_metrics(metrics_handle),
2340        ));
2341        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2342            let _ = tx.try_send(ev);
2343        });
2344        if self
2345            .register_rpc_inbound(channel_hash, dispatcher)
2346            .is_some()
2347        {
2348            return Err(ServeError::AlreadyServing(service.to_string()));
2349        }
2350        let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2351        let bridge = tokio::spawn(async move {
2352            while let Some(inbound) = rx.recv().await {
2353                if inbound.from_node != 0 {
2354                    origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2355                }
2356                let payload = inbound.payload;
2357                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2358                let ev = RedexEvent { entry, payload };
2359                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2360                    tracing::warn!(error = %e, "rpc serve_rpc_streaming: fold apply error");
2361                }
2362            }
2363        });
2364        self.rpc_local_services_arc().insert(service.to_string());
2365        Ok(ServeHandle {
2366            channel_hash,
2367            service: service.to_string(),
2368            _bridge: bridge,
2369            // Streaming/duplex variants still spawn per emit (§8a covers the
2370            // unary hot path); no drainer.
2371            _response_drain: None,
2372            mesh: Arc::clone(self),
2373        })
2374    }
2375
2376    /// Register a client-streaming nRPC handler for `service`.
2377    /// Mirror of [`Self::serve_rpc_streaming`] but using the
2378    /// request-side fold ([`RpcStreamingRequestFold`]) — the
2379    /// handler receives one stream of REQUEST_CHUNK bodies and
2380    /// emits one terminal RESPONSE.
2381    ///
2382    /// Wires two emit callbacks:
2383    /// - A sync [`RpcResponseEmitter`] for the terminal RESPONSE
2384    ///   (single emit per call, no ordering concern).
2385    /// - An [`RpcRequestGrantEmitter`] for upload-direction
2386    ///   credit grants, which publishes [`DISPATCH_RPC_REQUEST_GRANT`]
2387    ///   events on the caller's reply channel.
2388    ///
2389    /// Bidi streaming plan (Phase C).
2390    pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>(
2391        self: &Arc<Self>,
2392        service: &str,
2393        handler: Arc<H>,
2394    ) -> Result<ServeHandle, ServeError> {
2395        let request_channel = ChannelName::new(&format!("{service}.requests"))
2396            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2397        let channel_hash = request_channel.hash();
2398        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2399
2400        // T1.2 cache — see serve_rpc above for full rationale.
2401        let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2402
2403        let mesh_for_emit = Arc::clone(self);
2404        let service_for_emit = service.to_string();
2405        let server_origin = self.identity_origin_hash();
2406
2407        // Terminal RESPONSE emitter — sync because there's only
2408        // one RESPONSE per call (no per-call ordering concern that
2409        // would require an async-await between chunks).
2410        let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2411        let emit_resp_service = service_for_emit.clone();
2412        let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2413        let emit_resp: RpcResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2414            let mesh = Arc::clone(&emit_resp_mesh);
2415            let service = emit_resp_service.clone();
2416            let target_hint = origin_node_cache_for_emit.get(caller_origin);
2417            tokio::spawn(async move {
2418                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2419                let reply_channel = match ChannelName::new(&reply_channel_name) {
2420                    Ok(c) => c,
2421                    Err(e) => {
2422                        tracing::warn!(error = %e, channel = %reply_channel_name,
2423                                "rpc serve_rpc_client_stream: invalid reply channel name");
2424                        return;
2425                    }
2426                };
2427                let meta = EventMeta::new(
2428                    super::cortex::DISPATCH_RPC_RESPONSE,
2429                    0,
2430                    server_origin,
2431                    call_id,
2432                    0,
2433                );
2434                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2435                buf.extend_from_slice(&meta.to_bytes());
2436                resp.encode_into(&mut buf);
2437                // PERF_AUDIT §3.10: compute hash + stream_id at the
2438                // call site. These legacy streaming paths don't yet
2439                // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2440                // wiring them up is a follow-up — for now the
2441                // compute happens here per response, same as the
2442                // pre-fix in-function shape.
2443                let reply_channel_id = ChannelId::new(reply_channel.clone());
2444                let reply_channel_hash = reply_channel_id.hash();
2445                let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2446                if let Err(e) = publish_response_to_caller(
2447                    &mesh,
2448                    caller_origin,
2449                    target_hint,
2450                    &reply_channel,
2451                    reply_channel_hash,
2452                    reply_stream_id,
2453                    Bytes::from(buf),
2454                )
2455                .await
2456                {
2457                    tracing::warn!(error = %e,
2458                            caller_origin = format!("{:#x}", caller_origin),
2459                            call_id,
2460                            "rpc serve_rpc_client_stream: terminal RESPONSE publish failed");
2461                }
2462            });
2463        });
2464
2465        // REQUEST_GRANT emitter — coalesces per-chunk credits into
2466        // a single drainer task that batches by call_id. Avoids the
2467        // tokio::spawn-per-emit storm under bursting.
2468        let emit_grant = build_request_grant_emitter(
2469            Arc::clone(&mesh_for_emit),
2470            service_for_emit.clone(),
2471            server_origin,
2472            "serve_rpc_client_stream",
2473        );
2474
2475        let metrics_handle = self.rpc_metrics_arc().for_service(service);
2476        let fold = Arc::new(Mutex::new(
2477            RpcStreamingRequestFold::new(handler as Arc<dyn RpcClientStreamingHandler>, emit_resp)
2478                .with_grant_emitter(emit_grant)
2479                .with_metrics(metrics_handle),
2480        ));
2481        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2482            let _ = tx.try_send(ev);
2483        });
2484        if self
2485            .register_rpc_inbound(channel_hash, dispatcher)
2486            .is_some()
2487        {
2488            return Err(ServeError::AlreadyServing(service.to_string()));
2489        }
2490        let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2491        let bridge = tokio::spawn(async move {
2492            while let Some(inbound) = rx.recv().await {
2493                if inbound.from_node != 0 {
2494                    origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2495                }
2496                let payload = inbound.payload;
2497                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2498                let ev = RedexEvent { entry, payload };
2499                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2500                    tracing::warn!(error = %e,
2501                        "rpc serve_rpc_client_stream: fold apply error");
2502                }
2503            }
2504        });
2505        self.rpc_local_services_arc().insert(service.to_string());
2506        Ok(ServeHandle {
2507            channel_hash,
2508            service: service.to_string(),
2509            _bridge: bridge,
2510            // Streaming/duplex variants still spawn per emit (§8a covers the
2511            // unary hot path); no drainer.
2512            _response_drain: None,
2513            mesh: Arc::clone(self),
2514        })
2515    }
2516
2517    /// Client-streaming variant of [`Self::call`]. Returns a
2518    /// [`ClientStreamCallRaw`] handle the caller pushes N items
2519    /// into via `send`, then `finish` to await the terminal
2520    /// RESPONSE.
2521    ///
2522    /// **Lazy initial REQUEST.** This method does NOT publish a
2523    /// REQUEST to the wire. It only ensures the caller's reply
2524    /// subscription is set up and registers the pending entry; the
2525    /// initial REQUEST is emitted by the first `send` (or by
2526    /// `finish` for the zero-item degenerate path).
2527    ///
2528    /// Sets `FLAG_RPC_CLIENT_STREAMING_REQUEST` on the initial
2529    /// REQUEST so the server's request-streaming fold knows to
2530    /// open a request-side stream. Optional `request_window_initial`
2531    /// header opts into upload-direction flow control.
2532    ///
2533    /// Bidi streaming plan (Phase C).
2534    pub async fn call_client_stream(
2535        self: &Arc<Self>,
2536        target_node_id: u64,
2537        service: &str,
2538        opts: CallOptions,
2539    ) -> Result<ClientStreamCallRaw, RpcError> {
2540        // `request_window_initial = Some(0)` would deadlock the
2541        // caller: every `send` awaits a credit, but the initial
2542        // REQUEST is lazy (not emitted until the first send), so
2543        // the server never sees the call and never publishes a
2544        // GRANT. Reject up front — `None` means "unbounded credit",
2545        // any positive value opts into flow control.
2546        if matches!(opts.request_window_initial, Some(0)) {
2547            return Err(RpcError::Codec {
2548                direction: CodecDirection::Encode,
2549                message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2550                    .to_string(),
2551            });
2552        }
2553        // T1.3: per-service route cache (see PERF_AUDIT
2554        // 2026-05-19). One DashMap::get + Arc::clone instead of
2555        // 2 format! + 2 ChannelName::new + xxhash per call.
2556        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2557        let self_origin = self.identity_origin_hash();
2558        self.ensure_reply_subscription(
2559            target_node_id,
2560            service,
2561            route.reply_channel.clone(),
2562            route.reply_hash,
2563        )
2564        .await?;
2565
2566        let call_id = mint_random_call_id();
2567        let pending = self.rpc_client_pending();
2568        let (terminal_rx, mut grant_rx) =
2569            pending.register_client_streaming(call_id, target_node_id);
2570
2571        // Build the header set + flags we'll queue for the initial
2572        // REQUEST (deferred to the first send / finish).
2573        let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST;
2574        let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2575        if let Some(tc) = opts.trace_context.as_ref() {
2576            initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2577            initial_headers.extend(build_trace_headers(tc));
2578        }
2579        if let Some(window) = opts.request_window_initial {
2580            initial_headers.push((
2581                HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2582                window.to_string().into_bytes(),
2583            ));
2584        }
2585        initial_headers.extend(opts.request_headers.iter().cloned());
2586
2587        // Per-call credit semaphore when flow control is opted in.
2588        // Initial permits = the caller's declared window. Refilled
2589        // by REQUEST_GRANT events arriving on the reply channel,
2590        // pumped through `grant_rx` by the spawned `grant_pump`.
2591        let credit_sem = opts
2592            .request_window_initial
2593            .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2594        let grant_pump = credit_sem.as_ref().map(|sem| {
2595            let sem = Arc::clone(sem);
2596            tokio::spawn(async move {
2597                while let Some(credits) = grant_rx.recv().await {
2598                    add_request_grant_credits(&sem, credits);
2599                }
2600            })
2601        });
2602
2603        let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2604        let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2605        let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2606        Ok(ClientStreamCallRaw {
2607            mesh: Arc::clone(self),
2608            target_node_id,
2609            request_channel: route.request_channel.clone(),
2610            request_channel_hash: route.request_channel_hash,
2611            request_stream_id: route.request_stream_id,
2612            self_origin,
2613            call_id,
2614            service: service.to_string(),
2615            initial_headers,
2616            initial_flags,
2617            deadline_ns,
2618            credit_sem,
2619            grant_pump,
2620            terminal_rx: Some(terminal_rx),
2621            state: ClientStreamState::JustOpened,
2622            started: Instant::now(),
2623            observer,
2624            _cancel_keep_alive: cancel_keep_alive,
2625        })
2626    }
2627
2628    /// Register a duplex nRPC handler for `service`. Composes
2629    /// [`Self::serve_rpc_client_stream`] (request-side stream)
2630    /// with [`Self::serve_rpc_streaming`] (response-side multi-
2631    /// fire emit) via [`RpcDuplexFold`].
2632    ///
2633    /// Wires THREE emit callbacks:
2634    /// - Async [`RpcAsyncResponseEmitter`] for response chunks +
2635    ///   the terminal frame (per-call ordering required because
2636    ///   the response side is multi-fire).
2637    /// - [`RpcRequestGrantEmitter`] for upload-direction credit
2638    ///   grants (one per consumed request chunk when flow
2639    ///   control is opted into).
2640    ///
2641    /// Bidi streaming plan (Phase D).
2642    pub fn serve_rpc_duplex<H: RpcDuplexHandler>(
2643        self: &Arc<Self>,
2644        service: &str,
2645        handler: Arc<H>,
2646    ) -> Result<ServeHandle, ServeError> {
2647        let request_channel = ChannelName::new(&format!("{service}.requests"))
2648            .map_err(|e| ServeError::InvalidServiceName(e.to_string()))?;
2649        let channel_hash = request_channel.hash();
2650        let (tx, mut rx) = tokio::sync::mpsc::channel::<RpcInboundEvent>(1024);
2651
2652        // T1.2 cache — see serve_rpc above for full rationale.
2653        let origin_node_cache: RpcOriginNodeCache = Arc::new(OriginKeyedLru::new());
2654
2655        let mesh_for_emit = Arc::clone(self);
2656        let service_for_emit = service.to_string();
2657        let server_origin = self.identity_origin_hash();
2658
2659        // Async response emitter — per-call ordering matters here
2660        // because the response side is multi-fire (same rationale
2661        // as serve_rpc_streaming).
2662        let emit_resp_mesh = Arc::clone(&mesh_for_emit);
2663        let emit_resp_service = service_for_emit.clone();
2664        let origin_node_cache_for_emit = Arc::clone(&origin_node_cache);
2665        let emit_resp: RpcAsyncResponseEmitter = Arc::new(move |caller_origin, call_id, resp| {
2666            let mesh = Arc::clone(&emit_resp_mesh);
2667            let service = emit_resp_service.clone();
2668            let target_hint = origin_node_cache_for_emit.get(caller_origin);
2669            Box::pin(async move {
2670                let reply_channel_name = format!("{service}.replies.{caller_origin:016x}");
2671                let reply_channel = match ChannelName::new(&reply_channel_name) {
2672                    Ok(c) => c,
2673                    Err(e) => {
2674                        tracing::warn!(error = %e, channel = %reply_channel_name,
2675                                "rpc serve_rpc_duplex: invalid reply channel name");
2676                        return;
2677                    }
2678                };
2679                let meta = EventMeta::new(
2680                    super::cortex::DISPATCH_RPC_RESPONSE,
2681                    0,
2682                    server_origin,
2683                    call_id,
2684                    0,
2685                );
2686                let mut buf = Vec::with_capacity(EVENT_META_SIZE + 64);
2687                buf.extend_from_slice(&meta.to_bytes());
2688                resp.encode_into(&mut buf);
2689                // PERF_AUDIT §3.10: compute hash + stream_id at the
2690                // call site. These legacy streaming paths don't yet
2691                // cache the triple via `OriginKeyedLru<CachedReplyChannel>`;
2692                // wiring them up is a follow-up — for now the
2693                // compute happens here per response, same as the
2694                // pre-fix in-function shape.
2695                let reply_channel_id = ChannelId::new(reply_channel.clone());
2696                let reply_channel_hash = reply_channel_id.hash();
2697                let reply_stream_id = MeshNode::publish_stream_id(&reply_channel_id);
2698                if let Err(e) = publish_response_to_caller(
2699                    &mesh,
2700                    caller_origin,
2701                    target_hint,
2702                    &reply_channel,
2703                    reply_channel_hash,
2704                    reply_stream_id,
2705                    Bytes::from(buf),
2706                )
2707                .await
2708                {
2709                    tracing::warn!(error = %e,
2710                            caller_origin = format!("{:#x}", caller_origin),
2711                            call_id,
2712                            "rpc serve_rpc_duplex: chunk publish failed");
2713                }
2714            })
2715        });
2716
2717        // Request-direction grant emitter — same coalescing
2718        // drainer shape as serve_rpc_client_stream.
2719        let emit_grant = build_request_grant_emitter(
2720            Arc::clone(&mesh_for_emit),
2721            service_for_emit.clone(),
2722            server_origin,
2723            "serve_rpc_duplex",
2724        );
2725
2726        let metrics_handle = self.rpc_metrics_arc().for_service(service);
2727        let fold = Arc::new(Mutex::new(
2728            RpcDuplexFold::new(handler as Arc<dyn RpcDuplexHandler>, emit_resp)
2729                .with_grant_emitter(emit_grant)
2730                .with_metrics(metrics_handle),
2731        ));
2732        let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
2733            let _ = tx.try_send(ev);
2734        });
2735        if self
2736            .register_rpc_inbound(channel_hash, dispatcher)
2737            .is_some()
2738        {
2739            return Err(ServeError::AlreadyServing(service.to_string()));
2740        }
2741        let origin_node_cache_for_bridge = Arc::clone(&origin_node_cache);
2742        let bridge = tokio::spawn(async move {
2743            while let Some(inbound) = rx.recv().await {
2744                if inbound.from_node != 0 {
2745                    origin_node_cache_for_bridge.insert(inbound.origin_hash, inbound.from_node);
2746                }
2747                let payload = inbound.payload;
2748                let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, 0);
2749                let ev = RedexEvent { entry, payload };
2750                if let Err(e) = fold.lock().apply(&ev, &mut ()) {
2751                    tracing::warn!(error = %e,
2752                        "rpc serve_rpc_duplex: fold apply error");
2753                }
2754            }
2755        });
2756        self.rpc_local_services_arc().insert(service.to_string());
2757        Ok(ServeHandle {
2758            channel_hash,
2759            service: service.to_string(),
2760            _bridge: bridge,
2761            // Streaming/duplex variants still spawn per emit (§8a covers the
2762            // unary hot path); no drainer.
2763            _response_drain: None,
2764            mesh: Arc::clone(self),
2765        })
2766    }
2767
2768    /// Duplex variant of [`Self::call`]. Returns a
2769    /// [`DuplexCallRaw`] handle with both upload (`send`,
2770    /// `finish_sending`) and download (`next`, or impl
2771    /// `futures::Stream`) surfaces. Use `into_split` to peel off
2772    /// the two halves for the "encoder task + decoder task"
2773    /// shape.
2774    ///
2775    /// Initial REQUEST sets BOTH `FLAG_RPC_CLIENT_STREAMING_REQUEST`
2776    /// AND `FLAG_RPC_STREAMING_RESPONSE`. Lazy publish — the
2777    /// initial REQUEST flies on the first `send` (or on
2778    /// `finish_sending` for the zero-item degenerate path).
2779    ///
2780    /// Bidi streaming plan (Phase D).
2781    pub async fn call_duplex(
2782        self: &Arc<Self>,
2783        target_node_id: u64,
2784        service: &str,
2785        opts: CallOptions,
2786    ) -> Result<DuplexCallRaw, RpcError> {
2787        // Same deadlock guard as `call_client_stream`: Some(0)
2788        // means "send must await a credit that can never arrive"
2789        // because the initial REQUEST is lazy.
2790        if matches!(opts.request_window_initial, Some(0)) {
2791            return Err(RpcError::Codec {
2792                direction: CodecDirection::Encode,
2793                message: "request_window_initial must be None or >= 1; Some(0) deadlocks send"
2794                    .to_string(),
2795            });
2796        }
2797        // T1.3: per-service route cache (see PERF_AUDIT
2798        // 2026-05-19). One DashMap::get + Arc::clone instead of
2799        // 2 format! + 2 ChannelName::new + xxhash per call.
2800        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2801        let self_origin = self.identity_origin_hash();
2802        self.ensure_reply_subscription(
2803            target_node_id,
2804            service,
2805            route.reply_channel.clone(),
2806            route.reply_hash,
2807        )
2808        .await?;
2809
2810        let call_id = mint_random_call_id();
2811        let pending = self.rpc_client_pending();
2812        let (chunks_rx, mut grant_rx) = pending.register_duplex(call_id, target_node_id);
2813
2814        let mut initial_flags = FLAG_RPC_CLIENT_STREAMING_REQUEST | FLAG_RPC_STREAMING_RESPONSE;
2815        let mut initial_headers: Vec<(String, Vec<u8>)> = Vec::new();
2816        if let Some(tc) = opts.trace_context.as_ref() {
2817            initial_flags |= FLAG_RPC_PROPAGATE_TRACE;
2818            initial_headers.extend(build_trace_headers(tc));
2819        }
2820        if let Some(window) = opts.request_window_initial {
2821            initial_headers.push((
2822                HEADER_NRPC_REQUEST_WINDOW_INITIAL.to_string(),
2823                window.to_string().into_bytes(),
2824            ));
2825        }
2826        if let Some(window) = opts.stream_window_initial {
2827            initial_headers.push((
2828                HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2829                window.to_string().into_bytes(),
2830            ));
2831        }
2832        initial_headers.extend(opts.request_headers.iter().cloned());
2833
2834        let credit_sem = opts
2835            .request_window_initial
2836            .map(|n| Arc::new(tokio::sync::Semaphore::new(n as usize)));
2837        let grant_pump = credit_sem.as_ref().map(|sem| {
2838            let sem = Arc::clone(sem);
2839            tokio::spawn(async move {
2840                while let Some(credits) = grant_rx.recv().await {
2841                    add_request_grant_credits(&sem, credits);
2842                }
2843            })
2844        });
2845
2846        let deadline_ns = opts.deadline.map(instant_to_unix_nanos).unwrap_or(0);
2847        let observer = StreamingObserverState::new(Arc::clone(self), target_node_id, service, 0);
2848        // Cancel keep-alive lives on the shared Arc<DuplexInner>
2849        // so it survives into_split — the watcher exits only when
2850        // BOTH the sink AND stream halves drop, matching the
2851        // existing CANCEL-on-drop semantics.
2852        let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2853        let inner = Arc::new(DuplexInner {
2854            mesh: Arc::clone(self),
2855            target_node_id,
2856            request_channel: route.request_channel.clone(),
2857            request_channel_hash: route.request_channel_hash,
2858            request_stream_id: route.request_stream_id,
2859            self_origin,
2860            call_id,
2861            initial_sent: std::sync::atomic::AtomicBool::new(false),
2862            clean_close: std::sync::atomic::AtomicBool::new(false),
2863            observer,
2864            _cancel_keep_alive: Some(cancel_keep_alive),
2865        });
2866        let sink = DuplexSink {
2867            inner: Arc::clone(&inner),
2868            service: service.to_string(),
2869            initial_headers,
2870            initial_flags,
2871            deadline_ns,
2872            credit_sem,
2873            grant_pump,
2874            state: ClientStreamState::JustOpened,
2875        };
2876        let stream = DuplexStream {
2877            inner,
2878            chunks_rx,
2879            done: false,
2880        };
2881        Ok(DuplexCallRaw { sink, stream })
2882    }
2883
2884    /// Streaming variant of [`Self::call`]. Returns an
2885    /// [`RpcStream`] that yields chunks (as `Result<Bytes, RpcError>`)
2886    /// until the server closes the stream.
2887    ///
2888    /// Sets `FLAG_RPC_STREAMING_RESPONSE` on the request so the
2889    /// server's streaming fold knows to expect multi-fire emits.
2890    /// Same lazy reply-subscription + direct-unicast REQUEST
2891    /// as the unary `call` path.
2892    ///
2893    /// Cancellation: dropping the returned `RpcStream` emits a
2894    /// CANCEL to the server (best-effort) and discards any
2895    /// in-flight chunks.
2896    pub async fn call_streaming(
2897        self: &Arc<Self>,
2898        target_node_id: u64,
2899        service: &str,
2900        payload: Bytes,
2901        opts: CallOptions,
2902    ) -> Result<RpcStream, RpcError> {
2903        // `stream_window_initial = Some(0)` would deadlock the
2904        // RESPONSE direction by default: server's pump awaits one
2905        // credit per chunk, the caller's auto-grant only fires on
2906        // consumed chunks, and the first chunk can never be
2907        // delivered. `None` means "unbounded credit"; any positive
2908        // value opts into flow control. Reject up front — symmetric
2909        // with the request-direction guard in `call_client_stream`.
2910        if matches!(opts.stream_window_initial, Some(0)) {
2911            return Err(RpcError::Codec {
2912                direction: CodecDirection::Encode,
2913                message: "stream_window_initial must be None or >= 1; Some(0) deadlocks the response pump"
2914                    .to_string(),
2915            });
2916        }
2917        // T1.3: per-service route cache. One DashMap::get + Arc::clone
2918        // on the hot path instead of 2 format! + 2 ChannelName::new +
2919        // xxhash per call.
2920        let route = self.rpc_route_or_no_route(target_node_id, service)?;
2921        let self_origin = self.identity_origin_hash();
2922        self.ensure_reply_subscription(
2923            target_node_id,
2924            service,
2925            route.reply_channel.clone(),
2926            route.reply_hash,
2927        )
2928        .await?;
2929
2930        let call_id = mint_random_call_id();
2931        let pending = self.rpc_client_pending();
2932        // S-4 part 2: bind the pending entry to the wire-session
2933        // peer the request is dispatched to. The fold's deliver
2934        // gate rejects RESPONSE frames whose from_node doesn't
2935        // match, so a leaked call_id alone can't spoof a reply.
2936        let rx = pending.register_streaming(call_id, target_node_id);
2937
2938        // Build the REQUEST: STREAMING_RESPONSE flag plus optional
2939        // trace-context headers / propagate-trace flag, same as
2940        // unary `call`. Plus the optional flow-control header
2941        // (`nrpc-stream-window-initial`) when the caller opted in
2942        // via `CallOptions::stream_window_initial`.
2943        let mut flags = FLAG_RPC_STREAMING_RESPONSE;
2944        let mut headers = Vec::new();
2945        if let Some(tc) = opts.trace_context.as_ref() {
2946            flags |= FLAG_RPC_PROPAGATE_TRACE;
2947            headers.extend(build_trace_headers(tc));
2948        }
2949        if let Some(window) = opts.stream_window_initial {
2950            headers.push((
2951                HEADER_NRPC_STREAM_WINDOW_INITIAL.to_string(),
2952                window.to_string().into_bytes(),
2953            ));
2954        }
2955        // Append caller-supplied request headers (Phase 9b — same
2956        // semantics as the unary `call` path).
2957        headers.extend(opts.request_headers.iter().cloned());
2958        let req = RpcRequestPayload {
2959            service: service.to_string(),
2960            deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
2961            flags,
2962            headers,
2963            body: payload.clone(),
2964        };
2965        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
2966        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
2967        buf.extend_from_slice(&meta.to_bytes());
2968        req.encode_into(&mut buf);
2969
2970        let payload_bytes = Bytes::from(buf);
2971        if let Err(e) = self
2972            .publish_to_peer(
2973                target_node_id,
2974                route.request_channel_hash,
2975                route.request_stream_id,
2976                /* reliable */ true,
2977                std::slice::from_ref(&payload_bytes),
2978            )
2979            .await
2980        {
2981            pending.cancel(call_id);
2982            return Err(RpcError::Transport(e));
2983        }
2984
2985        let request_bytes_len = payload_bytes.len() as u32;
2986        // Cancel keep-alive lives on the returned RpcStream so the
2987        // watcher exits cleanly when the stream drops without cancel.
2988        let cancel_keep_alive = arm_stream_cancel(self, &opts, &pending, call_id);
2989        Ok(RpcStream {
2990            mesh: Arc::clone(self),
2991            target_node_id,
2992            request_channel: route.request_channel.clone(),
2993            // PERF_AUDIT §3.10 — cache the channel hash + stream
2994            // id from `route` so per-chunk grants in `poll_next`
2995            // don't re-run `ChannelId::new` + xxh3.
2996            request_channel_hash: route.request_channel_hash,
2997            request_stream_id: route.request_stream_id,
2998            self_origin,
2999            call_id,
3000            inner: rx,
3001            done: false,
3002            stream_window: opts.stream_window_initial,
3003            grant_pending: 0,
3004            _cancel_keep_alive: cancel_keep_alive,
3005            observer: StreamingObserverState::new(
3006                Arc::clone(self),
3007                target_node_id,
3008                service,
3009                request_bytes_len,
3010            ),
3011        })
3012    }
3013
3014    /// Find every node currently advertising `service` via the
3015    /// `nrpc:<service>` capability tag. Returns node IDs in
3016    /// roster order; the caller picks one (or use [`Self::call_service`]
3017    /// for the round-robin shortcut).
3018    ///
3019    /// Pre-Phase 2: requires the target nodes to have called
3020    /// `serve_rpc` AND `announce_capabilities` so the
3021    /// `nrpc:<service>` tag has propagated through capability
3022    /// announcements. The local node's own services are NOT
3023    /// automatically included (callers don't typically invoke
3024    /// themselves via the network — for in-process invocation,
3025    /// the user has the handler directly).
3026    pub fn find_service_nodes(&self, service: &str) -> Vec<u64> {
3027        use crate::adapter::net::behavior::capability::CapabilityFilter;
3028        use crate::adapter::net::behavior::fold::capability_bridge;
3029        let tag = format!("nrpc:{service}");
3030        let filter = CapabilityFilter::default().require_tag(tag);
3031        capability_bridge::find_nodes_matching(self.capability_fold(), &filter)
3032    }
3033
3034    /// Issue an RPC call to `service`, picking one node from
3035    /// those advertising the `nrpc:<service>` tag in the local
3036    /// capability index according to `opts.routing_policy`.
3037    ///
3038    /// Returns `RpcError::NoRoute` if no nodes advertise the
3039    /// service (or if `opts.filter_unhealthy` is set and every
3040    /// candidate is unavailable per the local `ProximityGraph`).
3041    pub async fn call_service(
3042        self: &Arc<Self>,
3043        service: &str,
3044        payload: Bytes,
3045        opts: CallOptions,
3046    ) -> Result<RpcReply, RpcError> {
3047        let mut candidates = self.find_service_nodes(service);
3048        if candidates.is_empty() {
3049            return Err(RpcError::NoRoute {
3050                target: 0,
3051                reason: format!(
3052                    "no nodes advertise `nrpc:{service}` (have any servers \
3053                     for this service called serve_rpc + announce_capabilities?)"
3054                ),
3055            });
3056        }
3057
3058        // Health filtering. Skip candidates the proximity graph
3059        // marks unhealthy (`!is_available()`). Candidates with no
3060        // proximity entry at all are KEPT — absence of evidence
3061        // is not evidence of unhealth, and a freshly-announced
3062        // service shouldn't be filtered just because pingwaves
3063        // haven't propagated yet.
3064        //
3065        // The bridge: each candidate's session-layer `node_id: u64`
3066        // is mapped to the entity-layer `[u8; 32]` via
3067        // `MeshNode::entity_id_for_node`. The proximity graph is
3068        // keyed on the entity id.
3069        if opts.filter_unhealthy {
3070            let proximity = self.proximity_graph();
3071            candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
3072                Some(entity_id) => match proximity.get_node(&entity_id) {
3073                    Some(node) => node.is_available(),
3074                    None => true, // no proximity data → keep
3075                },
3076                None => true, // no entity-id mapping → keep
3077            });
3078            if candidates.is_empty() {
3079                return Err(RpcError::NoRoute {
3080                    target: 0,
3081                    reason: format!(
3082                        "every node advertising `nrpc:{service}` is marked \
3083                         unhealthy by the local proximity graph",
3084                    ),
3085                });
3086            }
3087        }
3088
3089        // Sort once so consistent-hash policies (Sticky) produce
3090        // a stable ordering across calls regardless of how the
3091        // capability index returned the candidates, and so the
3092        // LowestLatency-with-no-proximity-data fallback is
3093        // deterministic. Cheap — the candidate set is typically
3094        // small.
3095        candidates.sort_unstable();
3096
3097        // v0.4 capability-auth caller-side gate. Filter the
3098        // candidate set BEFORE target selection so the routing
3099        // policy never picks a peer the caller can't actually
3100        // reach. Pre-fix `select_target` could pick a denied
3101        // candidate even when authorized peers existed in the
3102        // set, and the resulting `CapabilityDenied` masked the
3103        // fact that the call would have succeeded against B or
3104        // C. Each candidate's own announcement lists
3105        // `nrpc:<service>` (otherwise it wouldn't be a
3106        // `find_service_nodes` candidate), so the gate's
3107        // `has_tag` arm short-circuits in the common case; the
3108        // new work is the allow-list scan. Permissive
3109        // announcements (all three lists empty) admit any
3110        // caller — the byte-identity wire-compat tests pin that
3111        // an unmodified peer's announcement stays unrestricted.
3112        // See `docs/plans/CAPABILITY_AUTH_PLAN.md` §3.
3113        let tag = format!("nrpc:{service}");
3114        use crate::adapter::net::behavior::fold::capability_bridge;
3115        let self_id = self.node_id();
3116        let any_candidate = candidates[0];
3117        let fold = self.capability_fold();
3118        // PERF_AUDIT §4.2 — batch the per-candidate gate so the
3119        // fold read lock is taken once and the caller's subnet +
3120        // groups are parsed once, not N times.
3121        let verdicts = capability_bridge::may_execute_batch(fold, &candidates, &tag, self_id);
3122        let mut iter = verdicts.into_iter();
3123        candidates.retain(|_| iter.next().unwrap_or(false));
3124        if candidates.is_empty() {
3125            return Err(RpcError::CapabilityDenied {
3126                // No authorized target; surface one of the
3127                // originally-advertised candidates so the caller
3128                // can correlate the denial with a real peer. The
3129                // semantic is "no peer advertising `nrpc:<service>`
3130                // authorizes this caller" — `any_candidate` is a
3131                // representative, not necessarily the strictest.
3132                target: any_candidate,
3133                capability: service.to_string(),
3134            });
3135        }
3136
3137        let target = self.select_target(&candidates, &opts.routing_policy);
3138        self.call(target, service, payload, opts).await
3139    }
3140
3141    /// Capability-routed server-streaming call. Same routing as
3142    /// [`call_service`] — capability-fold lookup, health filter,
3143    /// routing-policy sort, capability-auth gate, target selection —
3144    /// but the terminal step is [`call_streaming`] instead of
3145    /// [`call`]. Returns the substrate's `RpcStream` so callers can
3146    /// drive an `async for chunk in stream:` loop.
3147    ///
3148    /// Use cases: an agent invoking a long-running tool that emits
3149    /// progress + a terminal result, a fan-out subscriber that wants
3150    /// streaming chunks from whatever node currently advertises the
3151    /// service, any consumer that today reaches for
3152    /// `find_service_nodes` → manual target selection → `call_streaming`
3153    /// and ends up re-implementing the cap-auth gate `call_service`
3154    /// already enforces.
3155    ///
3156    /// Honors `CallOptions::cancel_token` (v3) and
3157    /// `CallOptions::deadline` exactly like `call_streaming`.
3158    ///
3159    /// [`call_service`]: Self::call_service
3160    /// [`call_streaming`]: Self::call_streaming
3161    /// [`call`]: Self::call
3162    pub async fn call_service_streaming(
3163        self: &Arc<Self>,
3164        service: &str,
3165        payload: Bytes,
3166        opts: CallOptions,
3167    ) -> Result<RpcStream, RpcError> {
3168        let mut candidates = self.find_service_nodes(service);
3169        if candidates.is_empty() {
3170            return Err(RpcError::NoRoute {
3171                target: 0,
3172                reason: format!(
3173                    "no nodes advertise `nrpc:{service}` (have any servers \
3174                     for this service called serve_rpc + announce_capabilities?)"
3175                ),
3176            });
3177        }
3178
3179        // Health filter — mirrors `call_service`. Candidates with no
3180        // proximity entry are kept (absence of evidence ≠ evidence of
3181        // unhealth); only candidates the proximity graph marks
3182        // explicitly unavailable get dropped.
3183        if opts.filter_unhealthy {
3184            let proximity = self.proximity_graph();
3185            candidates.retain(|node_id| match self.entity_id_for_node(*node_id) {
3186                Some(entity_id) => match proximity.get_node(&entity_id) {
3187                    Some(node) => node.is_available(),
3188                    None => true,
3189                },
3190                None => true,
3191            });
3192            if candidates.is_empty() {
3193                return Err(RpcError::NoRoute {
3194                    target: 0,
3195                    reason: format!(
3196                        "every node advertising `nrpc:{service}` is marked \
3197                         unhealthy by the local proximity graph",
3198                    ),
3199                });
3200            }
3201        }
3202
3203        // Deterministic ordering so Sticky / LowestLatency-fallback
3204        // pick stably across calls — mirrors `call_service`.
3205        candidates.sort_unstable();
3206
3207        // v0.4 capability-auth caller-side gate. Same as `call_service`:
3208        // filter the candidate set BEFORE target selection so the
3209        // routing policy never picks a peer the caller can't reach.
3210        let tag = format!("nrpc:{service}");
3211        use crate::adapter::net::behavior::fold::capability_bridge;
3212        let self_id = self.node_id();
3213        let any_candidate = candidates[0];
3214        let fold = self.capability_fold();
3215        // PERF_AUDIT §4.2 — batch the per-candidate gate. See the
3216        // mirror site at `:3093`.
3217        let verdicts = capability_bridge::may_execute_batch(fold, &candidates, &tag, self_id);
3218        let mut iter = verdicts.into_iter();
3219        candidates.retain(|_| iter.next().unwrap_or(false));
3220        if candidates.is_empty() {
3221            return Err(RpcError::CapabilityDenied {
3222                target: any_candidate,
3223                capability: service.to_string(),
3224            });
3225        }
3226
3227        let target = self.select_target(&candidates, &opts.routing_policy);
3228        self.call_streaming(target, service, payload, opts).await
3229    }
3230
3231    /// Select a single target from `candidates` according to
3232    /// `policy`. Caller has already ensured `candidates` is
3233    /// non-empty and sorted (so `Sticky` is consistent across
3234    /// calls).
3235    fn select_target(&self, candidates: &[u64], policy: &RoutingPolicy) -> u64 {
3236        match policy {
3237            RoutingPolicy::RoundRobin => {
3238                // `fetch_add(1)` on a dedicated cursor — NOT a
3239                // `load(call_id)` — so two concurrent
3240                // `call_service` invocations always observe
3241                // distinct values and pick distinct targets.
3242                let n = self
3243                    .rpc_round_robin_cursor_arc()
3244                    .fetch_add(1, Ordering::Relaxed);
3245                candidates[(n as usize) % candidates.len()]
3246            }
3247            RoutingPolicy::Random => {
3248                // Lightweight RNG via a fresh fetch_add (same
3249                // counter, separate per-call value) mixed through
3250                // xxh3. Sufficient for load distribution;
3251                // not cryptographically random.
3252                let n = self
3253                    .rpc_round_robin_cursor_arc()
3254                    .fetch_add(1, Ordering::Relaxed);
3255                let mixed = xxhash_rust::xxh3::xxh3_64(&n.to_le_bytes());
3256                candidates[(mixed as usize) % candidates.len()]
3257            }
3258            RoutingPolicy::Sticky { key } => {
3259                // Consistent-hash to a position in the (sorted)
3260                // candidate list. Same key + same candidate set =
3261                // same target. A change to the candidate set
3262                // (server failover) reshuffles roughly 1/N of keys.
3263                let h = xxhash_rust::xxh3::xxh3_64(&key.to_le_bytes());
3264                candidates[(h as usize) % candidates.len()]
3265            }
3266            RoutingPolicy::LowestLatency => {
3267                // Walk candidates, look up each via the bridge
3268                // → proximity graph, pick the smallest
3269                // `latency_us`. Candidates without a proximity
3270                // entry (no observed pingwave or no entity-id
3271                // mapping yet) are treated as `u64::MAX` so they
3272                // sort to the bottom — a known-fast node beats an
3273                // unknown one.
3274                //
3275                // Determinism on tie / no-data: `best_node` starts
3276                // at `candidates[0]` (the lexicographically first
3277                // sorted candidate), so all-ties or all-unknown
3278                // collapse to that consistent fallback.
3279                let proximity = self.proximity_graph();
3280                let mut best_node = candidates[0];
3281                let mut best_latency = u64::MAX;
3282                for &node_id in candidates {
3283                    let lat = self
3284                        .entity_id_for_node(node_id)
3285                        .and_then(|eid| proximity.get_node(&eid))
3286                        .map(|n| n.latency_us)
3287                        .unwrap_or(u64::MAX);
3288                    if lat < best_latency {
3289                        best_latency = lat;
3290                        best_node = node_id;
3291                    }
3292                }
3293                best_node
3294            }
3295        }
3296    }
3297
3298    /// Issue an RPC call to `target_node_id` for `service`.
3299    ///
3300    /// Phase 1 — direct entity-to-entity addressing. The caller
3301    /// specifies which target to send to; service discovery (the
3302    /// "find me a healthy instance of X" lookup) is Phase 2.
3303    ///
3304    /// Lazily subscribes the local node's `RpcClientFold` to
3305    /// `<service>.replies.<self_origin>` from `target_node_id` on
3306    /// the first call to that (target, service) pair. The
3307    /// subscription is reused across subsequent calls.
3308    ///
3309    /// On `opts.deadline` expiring OR the future being dropped,
3310    /// emits a CANCEL event so the server can drop the in-flight
3311    /// handler.
3312    pub async fn call(
3313        self: &Arc<Self>,
3314        target_node_id: u64,
3315        service: &str,
3316        payload: Bytes,
3317        mut opts: CallOptions,
3318    ) -> Result<RpcReply, RpcError> {
3319        // `started_total` brackets the entire call for the
3320        // `RpcObserver` latency field; the substrate-internal
3321        // `started` further down (set after the subscription
3322        // setup) drives the existing `RpcReply::latency_ns`
3323        // accounting so observers and Prometheus metrics
3324        // measure slightly different spans but stay consistent
3325        // within their own surface.
3326        let started_total = Instant::now();
3327        let request_bytes_len = payload.len() as u32;
3328        // Per-service route cache: one `DashMap::get(&str)` +
3329        // `Arc::clone` on the hot path instead of 2 `format!` +
3330        // 2 `ChannelName::new` + xxhash per call (T1.3 perf audit
3331        // — `docs/misc/PERF_AUDIT_2026_05_19_NRPC.md`).
3332        let route = self.rpc_route_or_no_route(target_node_id, service)?;
3333        let self_origin = self.identity_origin_hash();
3334
3335        // Caller-side metrics guard. Bumps `in_flight` immediately;
3336        // each early-return path calls `metrics_guard.record(...)`
3337        // with the outcome, and Drop records the latency + bumps
3338        // the matching counter. A future dropped before any
3339        // `record(...)` call (e.g. a hedge loser) leaves the guard
3340        // with `outcome = None` so `in_flight` decrements but no
3341        // outcome is double-counted.
3342        let metrics_registry = self.rpc_metrics_arc();
3343        let mut metrics_guard = CallMetricsGuard::new(metrics_registry.for_service(service));
3344
3345        // Lazy reply-channel subscription. Once per (target, service).
3346        // Reply channel + hash come from the cached `RpcRoute`; we
3347        // only `.clone()` the `ChannelName` (cheap — internally an
3348        // Arc<str>) instead of building it from scratch.
3349        if let Err(e) = self
3350            .ensure_reply_subscription(
3351                target_node_id,
3352                service,
3353                route.reply_channel.clone(),
3354                route.reply_hash,
3355            )
3356            .await
3357        {
3358            metrics_guard.record(CallOutcome::NoRoute);
3359            self.fire_rpc_observer_outbound(
3360                target_node_id,
3361                service,
3362                started_total.elapsed().as_millis() as u32,
3363                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(e.to_string()),
3364                request_bytes_len,
3365                0,
3366            );
3367            return Err(e);
3368        }
3369
3370        // Allocate a fresh call_id. Random u64 from getrandom; a
3371        // sequential counter would let any session peer that
3372        // observed one of their own call_ids predict the next-
3373        // allocated ids and ship spoofed RESPONSE frames on the
3374        // victim's reply channel. Random u64 collides with
3375        // probability 2^-64 per call and is unguessable from
3376        // another peer's perspective.
3377        let call_id = mint_random_call_id();
3378
3379        // Register the oneshot before publishing the REQUEST so a
3380        // very-fast RESPONSE doesn't arrive before we're ready.
3381        // S-4 part 2: bind the pending entry to target_node_id so
3382        // the fold's deliver gate rejects spoofed RESPONSE frames
3383        // arriving from any other session peer.
3384        let pending = self.rpc_client_pending();
3385        let rx = pending.register(call_id, target_node_id);
3386
3387        // Build the REQUEST envelope. If a trace context is set,
3388        // emit `traceparent` / `tracestate` headers and signal
3389        // via `FLAG_RPC_PROPAGATE_TRACE` so the server's fold
3390        // populates `RpcContext::trace_context`.
3391        let (flags, mut headers) = match opts.trace_context.as_ref() {
3392            Some(tc) => (FLAG_RPC_PROPAGATE_TRACE, build_trace_headers(tc)),
3393            None => (0u16, Vec::new()),
3394        };
3395        // Append caller-supplied request headers (e.g. the
3396        // `net-where` predicate header for Phase 9b
3397        // predicate-pushdown). Auto-generated headers come first
3398        // so name collisions resolve to caller-overrides via the
3399        // server-side `predicate_from_rpc_headers` first-match
3400        // semantics.
3401        // PERF_AUDIT §3.11 — `opts` is owned by this function and
3402        // its `request_headers` are unused after this point;
3403        // `Vec::append(&mut other)` drains `other` into `headers`
3404        // with zero allocation, vs the pre-fix
3405        // `.iter().cloned()` which deep-cloned each
3406        // `(String, Vec<u8>)` pair into a fresh entry.
3407        headers.append(&mut opts.request_headers);
3408        let req = RpcRequestPayload {
3409            service: service.to_string(),
3410            deadline_ns: opts.deadline.map(instant_to_unix_nanos).unwrap_or(0),
3411            flags,
3412            headers,
3413            body: payload.clone(),
3414        };
3415        let meta = EventMeta::new(DISPATCH_RPC_REQUEST, 0, self_origin, call_id, 0);
3416        let mut buf = Vec::with_capacity(EVENT_META_SIZE + req.body.len() + 32);
3417        buf.extend_from_slice(&meta.to_bytes());
3418        req.encode_into(&mut buf);
3419
3420        // Send the REQUEST directly to `target_node_id` via
3421        // `publish_to_peer`, bypassing the local subscriber roster
3422        // lookup. The roster-based `Mesh::publish` would consult
3423        // `dispatch_recipients(channel)` against the caller's local
3424        // roster, which has no knowledge of who serves this service
3425        // (no Subscribe message ever propagated from the server back
3426        // to the caller — `serve_rpc` is local-only). For Phase 1
3427        // direct addressing we know the target, so direct-send is
3428        // the right primitive.
3429        //
3430        // The receiver routes via the per-channel-hash dispatcher
3431        // hook (channel_hash is stamped on the wire by
3432        // publish_to_peer).
3433        let started = Instant::now();
3434        // Request channel hash + stream_id come from the cached
3435        // route — no `ChannelId::new` clone + xxhash per call.
3436        let payload_bytes = Bytes::from(buf);
3437        if let Err(e) = self
3438            .publish_to_peer(
3439                target_node_id,
3440                route.request_channel_hash,
3441                route.request_stream_id,
3442                /* reliable */ true,
3443                std::slice::from_ref(&payload_bytes),
3444            )
3445            .await
3446        {
3447            pending.cancel(call_id);
3448            // Distinguish "I don't know how to reach this peer"
3449            // from a generic transport blip: when the publish path
3450            // surfaces a no-session error, that's NoRoute (the
3451            // routing layer's job, retry won't help). Other
3452            // transport errors stay as Transport so retry is
3453            // applicable.
3454            let err = if classify_publish_no_session(&e) {
3455                metrics_guard.record(CallOutcome::NoRoute);
3456                RpcError::NoRoute {
3457                    target: target_node_id,
3458                    reason: e.to_string(),
3459                }
3460            } else {
3461                metrics_guard.record(CallOutcome::Transport);
3462                RpcError::Transport(e)
3463            };
3464            self.fire_rpc_observer_outbound(
3465                target_node_id,
3466                service,
3467                started_total.elapsed().as_millis() as u32,
3468                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(err.to_string()),
3469                request_bytes_len,
3470                0,
3471            );
3472            return Err(err);
3473        }
3474
3475        // From here on, the REQUEST is in flight on the server.
3476        // Wrap the rest of the call in an RAII guard whose Drop
3477        // fires CANCEL if `guard.completed` isn't set — covering:
3478        //  - the call future being dropped mid-flight (e.g. hedge
3479        //    loser, select!-cancelled future, caller awaiting a
3480        //    `JoinHandle` that gets cancelled).
3481        //  - the timeout path (we leave `completed=false` so Drop
3482        //    handles CANCEL emission; no need for a separate
3483        //    `send_rpc_cancel` call).
3484        //  - the cancel_token path (same: leave completed=false,
3485        //    Drop emits CANCEL).
3486        let mut guard = UnaryCallGuard {
3487            pending: Arc::clone(&pending),
3488            mesh: Arc::clone(self),
3489            target_node_id,
3490            request_channel: route.request_channel.clone(),
3491            self_origin,
3492            call_id,
3493            completed: false,
3494        };
3495
3496        // Substrate cancel-token plumbing (v3 / C-S1). When the
3497        // caller set `opts.cancel_token`, register a Notify against
3498        // the per-mesh cancel_registry. The select! arm below
3499        // observes the cancel signal and short-circuits to
3500        // RpcError::Cancelled, leaving guard.completed = false so
3501        // Drop fires CANCEL on the wire. Release the registry
3502        // entry once the call resolves so the registry doesn't
3503        // grow unboundedly.
3504        let cancel_token = opts.cancel_token.unwrap_or(0);
3505        let cancel_notify = self.cancel_registry().register_notify(cancel_token);
3506
3507        // Race the receiver against the deadline AND the cancel
3508        // signal. Each branch lifts to the same outcome shape
3509        // (Result<Result<RpcResponsePayload, _>, Elapsed>) so the
3510        // existing post-match logic stays unchanged for the ok /
3511        // timeout paths; the cancel arm returns early via
3512        // fire_unary_cancel_outcome — leaves guard.completed=false
3513        // so Drop emits CANCEL on the wire.
3514        let outcome: Result<Result<RpcResponsePayload, _>, tokio::time::error::Elapsed> =
3515            match opts.deadline {
3516                None => {
3517                    tokio::select! {
3518                        biased;
3519                        _ = cancel_notify.notified() => {
3520                            return Err(fire_unary_cancel_outcome(
3521                                self,
3522                                &mut metrics_guard,
3523                                cancel_token,
3524                                target_node_id,
3525                                service,
3526                                started_total,
3527                                request_bytes_len,
3528                            ));
3529                        }
3530                        r = rx => Ok(r),
3531                    }
3532                }
3533                Some(deadline) => {
3534                    let timeout_at = deadline.saturating_duration_since(Instant::now());
3535                    tokio::select! {
3536                        biased;
3537                        _ = cancel_notify.notified() => {
3538                            return Err(fire_unary_cancel_outcome(
3539                                self,
3540                                &mut metrics_guard,
3541                                cancel_token,
3542                                target_node_id,
3543                                service,
3544                                started_total,
3545                                request_bytes_len,
3546                            ));
3547                        }
3548                        r = tokio::time::timeout(timeout_at, rx) => r,
3549                    }
3550                }
3551            };
3552
3553        // Whichever non-cancel path won, release the registry
3554        // entry. Idempotent if the cancel arm already released.
3555        self.cancel_registry().release(cancel_token);
3556
3557        let resp = match outcome {
3558            Ok(Ok(resp)) => {
3559                guard.completed = true;
3560                resp
3561            }
3562            Ok(Err(_recv_err)) => {
3563                // Sender dropped externally — pending entry is
3564                // already gone (someone else removed it). Mark
3565                // completed so Drop doesn't fire a useless CANCEL
3566                // for a server that's no longer tracking this id.
3567                guard.completed = true;
3568                metrics_guard.record(CallOutcome::Transport);
3569                let err = RpcError::Transport(AdapterError::Connection(
3570                    "rpc client pending sender dropped (no response will arrive)".into(),
3571                ));
3572                self.fire_rpc_observer_outbound(
3573                    target_node_id,
3574                    service,
3575                    started_total.elapsed().as_millis() as u32,
3576                    crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(
3577                        err.to_string(),
3578                    ),
3579                    request_bytes_len,
3580                    0,
3581                );
3582                return Err(err);
3583            }
3584            Err(_elapsed) => {
3585                // Timeout: leave `completed=false` so Drop emits
3586                // CANCEL automatically; surface Timeout to caller.
3587                metrics_guard.record(CallOutcome::Timeout);
3588                self.fire_rpc_observer_outbound(
3589                    target_node_id,
3590                    service,
3591                    started_total.elapsed().as_millis() as u32,
3592                    crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Timeout,
3593                    request_bytes_len,
3594                    0,
3595                );
3596                return Err(RpcError::Timeout {
3597                    elapsed_ms: started.elapsed().as_millis() as u64,
3598                });
3599            }
3600        };
3601
3602        // Map the wire status onto the public Result type.
3603        if resp.status.is_ok() {
3604            metrics_guard.record(CallOutcome::Ok);
3605            let response_bytes_len = resp.body.len() as u32;
3606            self.fire_rpc_observer_outbound(
3607                target_node_id,
3608                service,
3609                started_total.elapsed().as_millis() as u32,
3610                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Ok,
3611                request_bytes_len,
3612                response_bytes_len,
3613            );
3614            Ok(RpcReply {
3615                body: resp.body,
3616                headers: resp.headers,
3617                latency_ns: started.elapsed().as_nanos() as u64,
3618            })
3619        } else {
3620            metrics_guard.record(CallOutcome::ServerError);
3621            let status = resp.status.to_wire();
3622            let response_bytes_len = resp.body.len() as u32;
3623            let message = String::from_utf8(resp.body.to_vec())
3624                .unwrap_or_else(|e| format!("<{} bytes of non-utf8 body>", e.into_bytes().len()));
3625            self.fire_rpc_observer_outbound(
3626                target_node_id,
3627                service,
3628                started_total.elapsed().as_millis() as u32,
3629                crate::adapter::net::cortex::rpc_observer::RpcCallStatus::Error(message.clone()),
3630                request_bytes_len,
3631                response_bytes_len,
3632            );
3633            // v0.4 capability-auth: callee-side defense-in-depth
3634            // surfaces as a wire `CapabilityDenied` status. Map it
3635            // back to the typed `RpcError::CapabilityDenied` so
3636            // application code sees the same variant regardless of
3637            // which side of the gate fired.
3638            if matches!(resp.status, RpcStatus::CapabilityDenied) {
3639                return Err(RpcError::CapabilityDenied {
3640                    target: target_node_id,
3641                    capability: service.to_string(),
3642                });
3643            }
3644            Err(RpcError::ServerError { status, message })
3645        }
3646    }
3647
3648    // ----------------------------------------------------------------
3649    // Internal helpers.
3650    // ----------------------------------------------------------------
3651
3652    /// Lazy-subscribe `reply_channel` from `target_node_id` and
3653    /// register an inbound dispatcher that drives the per-Mesh
3654    /// `RpcClientFold`. Idempotent — subsequent calls for the
3655    /// same (target, service) pair are no-ops.
3656    ///
3657    /// **Bounded** at [`MAX_REPLY_SUBSCRIPTIONS`]: a caller talking
3658    /// to many short-lived (target, service) pairs would otherwise
3659    /// grow the registry indefinitely. Past the cap we refuse the
3660    /// new subscription with `NoRoute` rather than evict an
3661    /// existing one (eviction could rip out a healthy in-flight
3662    /// reply path).
3663    ///
3664    /// **Dispatcher reuse**: the reply-channel name embeds the
3665    /// CALLER's `self_origin`, NOT the target's, so a single
3666    /// caller talking to multiple servers for the same service
3667    /// reuses the same reply channel (same hash). We register the
3668    /// dispatcher only if the slot is unoccupied; subsequent
3669    /// (target, service) pairs that hash to the same slot are
3670    /// allowed to share the existing dispatcher (which routes to
3671    /// the same per-Mesh `pending` map regardless of target). A
3672    /// genuine cross-service hash collision is detected at
3673    /// `serve_rpc` time (the AlreadyServing path) for the server
3674    /// side; on the caller side here, sharing the dispatcher is
3675    /// the correct behavior because all RESPONSE events route
3676    /// through the same `RpcClientPending` keyed by `call_id`.
3677    async fn ensure_reply_subscription(
3678        self: &Arc<Self>,
3679        target_node_id: u64,
3680        service: &str,
3681        reply_channel: ChannelName,
3682        reply_hash: ChannelHash,
3683    ) -> Result<(), RpcError> {
3684        let registry = self.rpc_reply_subscriptions_arc();
3685        // PERF_AUDIT §3.5 — DashMap keyed by
3686        // `(target, xxh3_64(service))`. Pre-fix this was a global
3687        // `Mutex<Vec<(u64, String)>>` that every concurrent RPC
3688        // caller took on every call to scan the Vec with a String
3689        // compare per entry — all callers serialized on it. Now the
3690        // hot path is one shard-local read with a single String
3691        // compare against the slot's stored service name (xxh3 is
3692        // not collision-free; see `reply_subscription_covers`).
3693        let service_hash = xxhash_rust::xxh3::xxh3_64(service.as_bytes());
3694        if reply_subscription_covers(&registry, target_node_id, service_hash, service) {
3695            return Ok(());
3696        }
3697        // Cap the registry. `len()` on DashMap is approximate under
3698        // concurrent churn (it sums shard counts under shard reads,
3699        // not a global lock), which is exactly the semantics we
3700        // want here — the cap is a soft guard against a runaway
3701        // caller, not a precise invariant. Past the cap, new
3702        // entries are refused.
3703        if registry.len() >= MAX_REPLY_SUBSCRIPTIONS {
3704            return Err(RpcError::NoRoute {
3705                target: target_node_id,
3706                reason: format!(
3707                    "reply-subscription registry at cap ({} entries); refusing new \
3708                     (target={target_node_id:#x}, service={service:?}). Caller should \
3709                     reuse an existing target+service pair or shrink the active set.",
3710                    MAX_REPLY_SUBSCRIPTIONS,
3711                ),
3712            });
3713        }
3714
3715        // Subscribe to our own reply channel from the target so the
3716        // target's roster has us as a subscriber when the server's
3717        // emit closure publishes the RESPONSE.
3718        self.subscribe_channel(target_node_id, reply_channel.clone())
3719            .await
3720            .map_err(|e| RpcError::NoRoute {
3721                target: target_node_id,
3722                reason: e.to_string(),
3723            })?;
3724
3725        // Register the inbound dispatcher only if the slot is
3726        // unoccupied. The reply-channel name embeds *self_origin*,
3727        // not the target, so multiple targets serving the same
3728        // service share one reply channel + one dispatcher. The
3729        // existing dispatcher routes to the same per-Mesh
3730        // `RpcClientPending` keyed by call_id, so reuse is safe.
3731        if !self.rpc_inbound_dispatcher_registered(reply_hash) {
3732            let pending = self.rpc_client_pending();
3733            let fold = Arc::new(Mutex::new(RpcClientFold::new(pending)));
3734            // S-4 part 2: use `apply_inbound` so the wire-session
3735            // peer's NodeId (resolved in mesh.rs's dispatch site)
3736            // flows into the fold's deliver gate. The legacy
3737            // `RedexFold::apply` shim delivers with from_node=0,
3738            // which would defeat the binding.
3739            let dispatcher: RpcInboundDispatcher = Arc::new(move |ev| {
3740                fold.lock().apply_inbound(&ev);
3741            });
3742            // Race-safe: a concurrent caller might have just
3743            // registered between our check and our insert. In that
3744            // case `register_rpc_inbound` returns the prior
3745            // dispatcher; our new fresh fold is dropped here, and
3746            // the prior dispatcher (which routes to the same
3747            // shared `pending`) keeps doing the job. No collision
3748            // — both folds are functionally equivalent.
3749            if let Some(prev) = self.register_rpc_inbound(reply_hash, dispatcher) {
3750                // Roll back: keep the prior dispatcher (it's
3751                // already wired to the same shared pending map).
3752                let _ = self.register_rpc_inbound(reply_hash, prev);
3753            }
3754        }
3755
3756        let _ = reply_hash; // captured into the dispatcher above; surfaced for debug
3757                            // `insert` is idempotent — a concurrent caller that beat us
3758                            // to it just overwrote the slot with the identical value.
3759                            // On a genuine xxh3 collision between two service names on
3760                            // the same target, the slot flips to whichever service
3761                            // subscribed last and the other re-subscribes on its next
3762                            // call (idempotent, correct, merely un-cached). Cap drift
3763                            // past MAX_REPLY_SUBSCRIPTIONS during a concurrent insert
3764                            // race is bounded by the number of concurrent callers,
3765                            // which operators tune separately.
3766        registry.insert((target_node_id, service_hash), Arc::from(service));
3767        Ok(())
3768    }
3769}
3770
3771/// PERF_AUDIT §3.5 — hot-path membership check for the
3772/// reply-subscription registry. Returns `true` only when the slot
3773/// for `(target, xxh3(service))` exists AND the stored service
3774/// name matches exactly. xxh3_64 is neither collision-free nor
3775/// cryptographic; a hash-only hit that skipped the subscribe for
3776/// a *different* service would silently drop that service's
3777/// replies — the reply channel name embeds the service, so being
3778/// in the target's roster for the colliding service's channel
3779/// does nothing for this one. Verifying the stored name turns a
3780/// collision into a per-call re-subscribe (idempotent, harmless)
3781/// instead of a correctness bug.
3782fn reply_subscription_covers(
3783    registry: &dashmap::DashMap<(u64, u64), Arc<str>>,
3784    target_node_id: u64,
3785    service_hash: u64,
3786    service: &str,
3787) -> bool {
3788    registry
3789        .get(&(target_node_id, service_hash))
3790        .is_some_and(|entry| entry.value().as_ref() == service)
3791}
3792
3793/// Hard cap on the number of distinct (target_node_id, service)
3794/// pairs the caller-side reply-subscription registry will hold.
3795/// Past the cap, the lazy-subscribe path inside [`MeshNode::call`]
3796/// refuses new entries with [`RpcError::NoRoute`]. 1024 is
3797/// generous for any realistic deployment — a caller that needs
3798/// more should reuse existing reply paths.
3799pub const MAX_REPLY_SUBSCRIPTIONS: usize = 1024;
3800
3801/// Mint a random 64-bit call_id. Used as the correlation token
3802/// for REQUEST/RESPONSE pairing. The fold keys pending oneshots on
3803/// this value; any session peer with publish access to the reply
3804/// channel could ship a forged RESPONSE if it could guess the
3805/// value. Sequential u64s are predictable from any peer that
3806/// observes a single allocation; random u64s collide with 2^-64
3807/// probability per call and are unpredictable to observing peers.
3808///
3809/// **PERF_AUDIT §3.8** — pre-fix this called `getrandom::fill` for
3810/// 8 bytes per RPC — one OS entropy syscall per call
3811/// (BCryptGenRandom on Windows, ~200-400 ns; somewhat cheaper on
3812/// Linux). Now each thread refills a small pool of raw OS entropy
3813/// ([`CALL_ID_ENTROPY_POOL_BYTES`]) with a single `getrandom`
3814/// syscall and hands out 8 bytes per call, amortizing the syscall
3815/// across [`CALL_ID_ENTROPY_POOL_BYTES`]/8 mints.
3816///
3817/// Every minted id is still raw OS entropy — NOT the output of a
3818/// userspace PRNG — so the unpredictability-to-peers property is
3819/// byte-for-byte identical to the pre-§3.8 per-call fill. (An
3820/// earlier draft of this fix streamed ids from a thread-local
3821/// SplitMix64; that was unsound for this threat model: call_ids
3822/// are sent to callees by design, and SplitMix64's output
3823/// finalizer is a public bijection, so a single observed id
3824/// reveals the generator state and with it every FUTURE call_id
3825/// minted on that thread — letting one callee forge responses to
3826/// races on calls addressed to other peers. Raw pooled entropy
3827/// has no such state to recover.)
3828///
3829/// If the pool refill fails, falls back to a process-global
3830/// monotonic counter rather than returning `0`: two concurrent
3831/// callers that both minted `0` would `register(0, …)` over each
3832/// other, so the first caller's oneshot closes with
3833/// `RecvError::Closed` (a spurious `Transport` error, not the clean
3834/// timeout the all-distinct path yields). The counter keeps ids
3835/// distinct (predictable on entropy failure, but the S-4
3836/// `from_node` gate still blocks cross-peer forgery, and such calls
3837/// time out anyway). `getrandom::fill` failure is a fatal-
3838/// environment signal (no `/dev/urandom`, broken syscall) and the
3839/// broader stack won't be functional anyway; the pool cursor is
3840/// left exhausted so the next mint retries the refill. `0` is
3841/// reserved as a sentinel and never returned.
3842fn mint_random_call_id() -> u64 {
3843    thread_local! {
3844        // (pool, cursor). Cursor starts exhausted so the first
3845        // mint on each thread performs the initial refill.
3846        static CALL_ID_ENTROPY_POOL: std::cell::RefCell<([u8; CALL_ID_ENTROPY_POOL_BYTES], usize)> = const {
3847            std::cell::RefCell::new(([0u8; CALL_ID_ENTROPY_POOL_BYTES], CALL_ID_ENTROPY_POOL_BYTES))
3848        };
3849    }
3850    CALL_ID_ENTROPY_POOL.with(|cell| {
3851        let mut pool = cell.borrow_mut();
3852        let (buf, cursor) = &mut *pool;
3853        if *cursor >= CALL_ID_ENTROPY_POOL_BYTES {
3854            if getrandom::fill(buf).is_err() {
3855                // Entropy unavailable. Do NOT return 0 — concurrent
3856                // callers would all mint 0 and clobber each other's
3857                // pending entries. A process-global counter keeps ids
3858                // distinct (starts at 1, so it is non-zero until it
3859                // wraps the full u64 range, at which point the 0 is
3860                // mapped to 1 below).
3861                static CALL_ID_FALLBACK: std::sync::atomic::AtomicU64 =
3862                    std::sync::atomic::AtomicU64::new(1);
3863                let id = CALL_ID_FALLBACK.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3864                return if id == 0 { 1 } else { id };
3865            }
3866            *cursor = 0;
3867        }
3868        let mut id = [0u8; 8];
3869        id.copy_from_slice(&buf[*cursor..*cursor + 8]);
3870        *cursor += 8;
3871        // Reserve 0 as the "no correlation" sentinel: on the ~1-in-2^64
3872        // chance the pool yields all-zero bytes, remap to a fixed non-zero.
3873        match u64::from_le_bytes(id) {
3874            0 => 1,
3875            id => id,
3876        }
3877    })
3878}
3879
3880/// Per-thread OS-entropy pool size for [`mint_random_call_id`].
3881/// 64 ids (512 bytes) per `getrandom` syscall — the syscall cost
3882/// is dominated by the fixed kernel round-trip, so batching 64
3883/// mints recovers ~98% of the per-call overhead while keeping the
3884/// amount of buffered future-id entropy per thread small.
3885const CALL_ID_ENTROPY_POOL_BYTES: usize = 64 * 8;
3886
3887// ============================================================================
3888// Internal: tiny shims so the `serve_rpc` / `call` impls stay
3889// readable. The underlying state lives on `MeshNode`; these just
3890// rename the accessor methods locally.
3891// ============================================================================
3892
3893impl MeshNode {
3894    fn rpc_client_pending(&self) -> Arc<super::cortex::RpcClientPending> {
3895        self.rpc_client_pending_arc()
3896    }
3897    fn identity_origin_hash(&self) -> u64 {
3898        self.public_key_origin_hash()
3899    }
3900
3901    /// Caller-side helper that pairs `rpc_route_for_service` with
3902    /// the `RpcError::NoRoute { target, reason }` mapping every
3903    /// `Mesh::call*` entry point needs. Returning `Arc<RpcRoute>`
3904    /// keeps the hot-path allocation profile of the cache intact
3905    /// (one refcount bump per caller).
3906    fn rpc_route_or_no_route(
3907        &self,
3908        target_node_id: u64,
3909        service: &str,
3910    ) -> Result<Arc<super::mesh::RpcRoute>, RpcError> {
3911        self.rpc_route_for_service(service)
3912            .map_err(|reason| RpcError::NoRoute {
3913                target: target_node_id,
3914                reason,
3915            })
3916    }
3917}
3918
3919// `proximity_graph()` is already a public accessor on MeshNode
3920// (see the existing `pub fn proximity_graph(&self) -> &Arc<...>`).
3921// `select_target` uses it directly; no shim needed.
3922
3923// ============================================================================
3924// Errors.
3925// ============================================================================
3926
3927/// Errors returned by [`MeshNode::serve_rpc`].
3928#[derive(Debug, thiserror::Error)]
3929pub enum ServeError {
3930    /// The service name fails channel-name validation.
3931    #[error("invalid service name: {0}")]
3932    InvalidServiceName(String),
3933    /// A handler for this service is already registered on this
3934    /// node. Drop the prior `ServeHandle` to free the slot.
3935    #[error("already serving service `{0}` on this node")]
3936    AlreadyServing(String),
3937}
3938
3939// ============================================================================
3940// Typed-call helper.
3941// ============================================================================
3942
3943/// Wire-shape failures from [`typed_call`]. Distinct variants
3944/// for transport (no route, timeout, etc.) vs codec (serde /
3945/// postcard) so service-specific client error enums can wrap
3946/// each independently. Server-level (application) errors are
3947/// decoded into `Resp` itself — the client matches on the
3948/// resulting `Resp::Error(...)` variant.
3949#[derive(Debug, thiserror::Error)]
3950pub enum TypedCallError {
3951    /// Transport-level failure surfaced by [`MeshNode::call`].
3952    #[error("transport: {0}")]
3953    Transport(#[from] RpcError),
3954    /// Request serialization or response deserialization failed.
3955    #[error("codec: {0}")]
3956    Codec(String),
3957}
3958
3959impl From<postcard::Error> for TypedCallError {
3960    fn from(e: postcard::Error) -> Self {
3961        Self::Codec(e.to_string())
3962    }
3963}
3964
3965/// Send a postcard-encoded request to a remote RPC service and
3966/// decode the postcard-encoded reply. The shared shape every
3967/// substrate-internal RPC client wants:
3968///
3969/// 1. `postcard::to_allocvec(request)` → wire body.
3970/// 2. `MeshNode::call(target, service, body, opts{deadline})`.
3971/// 3. `postcard::from_bytes::<Resp>(reply.body)`.
3972///
3973/// Caller wraps the returned `Resp` in its own typed-error
3974/// surface (typically a `Server` variant that holds the
3975/// service-specific error enum decoded from `Resp`). Returning
3976/// `TypedCallError` here keeps the wrapper code to a one-line
3977/// `From<TypedCallError>` impl per client.
3978pub async fn typed_call<Req, Resp>(
3979    mesh: &std::sync::Arc<crate::adapter::net::MeshNode>,
3980    target_node_id: u64,
3981    service: &str,
3982    request: &Req,
3983    deadline: std::time::Duration,
3984) -> Result<Resp, TypedCallError>
3985where
3986    Req: serde::Serialize,
3987    Resp: serde::de::DeserializeOwned,
3988{
3989    let body = postcard::to_allocvec(request)?;
3990    let opts = CallOptions {
3991        deadline: Some(std::time::Instant::now() + deadline),
3992        ..Default::default()
3993    };
3994    let reply = mesh
3995        .call(target_node_id, service, Bytes::from(body), opts)
3996        .await?;
3997    Ok(postcard::from_bytes(&reply.body)?)
3998}
3999
4000// ============================================================================
4001// Helpers.
4002// ============================================================================
4003
4004/// Detect the "no session to the target node id" sub-case of
4005/// [`AdapterError::Connection`]. The publish path can surface
4006/// this through one of two messages depending on which inner
4007/// helper landed it:
4008///
4009///   - `"publish: no session for subscriber {hash}"` — emitted
4010///     by `mesh.rs::publish_to_peer` when the subscriber-roster
4011///     path can't find an active session.
4012///   - `"no session to publisher {hash}"` — emitted by the lower
4013///     mesh.rs send path when there's no active session to the
4014///     target's publisher record at all.
4015///
4016/// Both mean "I can't reach this peer". When we observe either,
4017/// we surface as [`RpcError::NoRoute`] rather than `Transport`
4018/// because retrying the same target without a session is
4019/// pointless and the right behavior for a routing helper is to
4020/// try a different target.
4021fn classify_publish_no_session(err: &AdapterError) -> bool {
4022    match err {
4023        AdapterError::Connection(msg) => {
4024            msg.contains("no session for subscriber") || msg.contains("no session to publisher")
4025        }
4026        _ => false,
4027    }
4028}
4029
4030fn instant_to_unix_nanos(instant: Instant) -> u64 {
4031    // `Instant` is monotonic and not wall-clock — convert via the
4032    // delta from now plus current SystemTime. The result drifts
4033    // marginally with wall-clock skew but is good enough for
4034    // server-side deadline-already-passed short-circuits (which are
4035    // the only consumer of `deadline_ns`).
4036    let now_instant = Instant::now();
4037    let now_wall = std::time::SystemTime::now()
4038        .duration_since(std::time::UNIX_EPOCH)
4039        .map(|d| d.as_nanos() as u64)
4040        .unwrap_or(0);
4041    if instant >= now_instant {
4042        let delta = instant.duration_since(now_instant);
4043        now_wall.saturating_add(delta.as_nanos() as u64)
4044    } else {
4045        let delta = now_instant.duration_since(instant);
4046        now_wall.saturating_sub(delta.as_nanos() as u64)
4047    }
4048}
4049
4050#[allow(dead_code)]
4051fn _ensure_send_sync() {
4052    fn assert_send_sync<T: Send + Sync>() {}
4053    assert_send_sync::<ServeHandle>();
4054    assert_send_sync::<RpcCancellationToken>();
4055    assert_send_sync::<RpcContext>();
4056    assert_send_sync::<RpcHandlerError>();
4057    assert_send_sync::<RpcStatus>();
4058    assert_send_sync::<RpcReply>();
4059    assert_send_sync::<CallOptions>();
4060}
4061
4062#[cfg(test)]
4063mod origin_cache_tests {
4064    use super::*;
4065
4066    /// The crafted-origin memory-amplification guard (cubic P2): the reply-
4067    /// channel / origin-node caches are keyed by the *wire-claimed*
4068    /// `caller_origin`, which a single authed peer can vary freely. Spraying
4069    /// far more distinct origins than the capacity must NOT grow the cache —
4070    /// it stays pinned at `RPC_CALLER_CACHE_CAP`, evicting the coldest.
4071    #[test]
4072    fn origin_keyed_lru_bounds_under_crafted_origin_flood() {
4073        let cache: OriginKeyedLru<u64> = OriginKeyedLru::new();
4074        let flood = (RPC_CALLER_CACHE_CAP as u64) * 4;
4075        for origin in 0..flood {
4076            cache.insert(origin, origin);
4077        }
4078        assert_eq!(
4079            cache.0.lock().len(),
4080            RPC_CALLER_CACHE_CAP,
4081            "cache must stay at its capacity bound under a crafted-origin flood"
4082        );
4083        // The most-recently-seen window survives; the cold prefix is evicted.
4084        assert_eq!(cache.get(flood - 1), Some(flood - 1));
4085        assert_eq!(cache.get(0), None);
4086    }
4087
4088    /// PERF_AUDIT §3.8 — `mint_random_call_id` mints thousands of
4089    /// values from the pooled-entropy path, all of which must be
4090    /// distinct in practice (a duplicate would let two in-flight
4091    /// calls collide on the per-Mesh pending map). 100k samples is
4092    /// far below the 2^32 birthday-paradox boundary, so a
4093    /// regression that recycled pool bytes (cursor mis-advance,
4094    /// missed refill) would fail loudly. The loop also crosses the
4095    /// pool-refill boundary thousands of times (pool holds 64 ids),
4096    /// pinning the refill/cursor arithmetic.
4097    #[test]
4098    fn mint_random_call_id_produces_distinct_values_across_thousands_of_calls() {
4099        let mut seen = std::collections::HashSet::with_capacity(100_000);
4100        for _ in 0..100_000 {
4101            let id = super::mint_random_call_id();
4102            // 0 is the fallback sentinel — should not appear under
4103            // a working `getrandom` refill.
4104            assert_ne!(id, 0, "fallback-zero path triggered unexpectedly");
4105            assert!(seen.insert(id), "duplicate call_id minted: {:#x}", id);
4106        }
4107    }
4108
4109    /// PERF_AUDIT §3.8 — minted ids are raw OS entropy: count
4110    /// set-bits across 10k mints and assert the fraction is near
4111    /// 0.5. A pool-management bug that handed out the zeroed
4112    /// initial buffer (or re-served a stale window) would skew
4113    /// this hard; properly random 64-bit ids have expected ~0.5
4114    /// set bits per sample with O(1/sqrt(N)) tolerance.
4115    #[test]
4116    fn mint_random_call_id_set_bit_density_is_balanced() {
4117        let n = 10_000u64;
4118        let mut total_set: u64 = 0;
4119        for _ in 0..n {
4120            total_set += super::mint_random_call_id().count_ones() as u64;
4121        }
4122        let bits_total = n * 64;
4123        let fraction = total_set as f64 / bits_total as f64;
4124        // Expected 0.5; tolerance generous (~3σ) to keep the test
4125        // reliable while still catching collapsed-pool regressions.
4126        assert!(
4127            (fraction - 0.5).abs() < 0.02,
4128            "set-bit density {} is too far from 0.5 — pool may be mismanaged",
4129            fraction
4130        );
4131    }
4132
4133    /// PERF_AUDIT §3.5 — the reply-subscription registry's hot
4134    /// path must be a `(target, xxh3(service))` lookup, not a
4135    /// `Mutex<Vec<(u64, String)>>` linear scan. Pin the contract
4136    /// via `reply_subscription_covers` (the exact hot-path check):
4137    /// 1. distinct (target, service) pairs are distinct keys —
4138    ///    same service against two targets, and two services
4139    ///    against one target, never alias;
4140    /// 2. repeat insert of the same pair is idempotent and the
4141    ///    fast path keeps answering `true`;
4142    /// 3. an xxh3 COLLISION (same hash, different service name)
4143    ///    must NOT count as covered — a false positive here would
4144    ///    skip a needed subscribe and silently drop the colliding
4145    ///    service's replies. The stored-name verification turns it
4146    ///    into a re-subscribe instead;
4147    /// 4. the cap is enforced via `len()`, not a separate counter
4148    ///    that could drift.
4149    #[test]
4150    fn reply_subscriptions_keyed_by_target_and_service_hash() {
4151        use dashmap::DashMap;
4152        let registry: DashMap<(u64, u64), Arc<str>> = DashMap::new();
4153        let h_a = xxhash_rust::xxh3::xxh3_64(b"svc-a");
4154        let h_b = xxhash_rust::xxh3::xxh3_64(b"svc-b");
4155        // Same target, different services → distinct entries.
4156        registry.insert((0xAA, h_a), Arc::from("svc-a"));
4157        registry.insert((0xAA, h_b), Arc::from("svc-b"));
4158        assert!(super::reply_subscription_covers(
4159            &registry, 0xAA, h_a, "svc-a"
4160        ));
4161        assert!(super::reply_subscription_covers(
4162            &registry, 0xAA, h_b, "svc-b"
4163        ));
4164        // Same service, different targets → distinct entries.
4165        assert!(!super::reply_subscription_covers(
4166            &registry, 0xBB, h_a, "svc-a"
4167        ));
4168        registry.insert((0xBB, h_a), Arc::from("svc-a"));
4169        assert!(super::reply_subscription_covers(
4170            &registry, 0xBB, h_a, "svc-a"
4171        ));
4172        // Idempotent — repeat insert overwrites with the identical
4173        // value; the fast path keeps answering true.
4174        registry.insert((0xAA, h_a), Arc::from("svc-a"));
4175        assert!(super::reply_subscription_covers(
4176            &registry, 0xAA, h_a, "svc-a"
4177        ));
4178        assert_eq!(registry.len(), 3);
4179        // xxh3 collision: "svc-evil" hashing to h_a (forced here —
4180        // xxh3_64 collisions are computable offline since the hash
4181        // isn't cryptographic) must NOT cover "svc-a"'s slot, and
4182        // vice versa. The hash-only DashSet shape this replaced
4183        // answered `true` and silently skipped the subscribe.
4184        assert!(
4185            !super::reply_subscription_covers(&registry, 0xAA, h_a, "svc-evil"),
4186            "hash collision must not satisfy the membership check for a \
4187             different service name"
4188        );
4189        // After the colliding service legitimately subscribes (slot
4190        // overwritten), the original service degrades to
4191        // re-subscribe — covered must flip to false for it, never
4192        // silently true for both.
4193        registry.insert((0xAA, h_a), Arc::from("svc-evil"));
4194        assert!(super::reply_subscription_covers(
4195            &registry, 0xAA, h_a, "svc-evil"
4196        ));
4197        assert!(!super::reply_subscription_covers(
4198            &registry, 0xAA, h_a, "svc-a"
4199        ));
4200    }
4201
4202    /// PERF_AUDIT §3.3 — grant-stall backstop check for the
4203    /// window/2 auto-grant coalescing. Simulates the full
4204    /// credit loop for every window 1..=64: the server starts
4205    /// with `window` credits and consumes one per chunk; the
4206    /// client accumulates via `accumulate_auto_grant` and only
4207    /// flushes at the threshold. Asserts:
4208    /// 1. liveness — an actively-polling consumer never observes
4209    ///    the server starved (credits exhausted with nothing left
4210    ///    to poll), i.e. withholding sub-threshold credits cannot
4211    ///    deadlock the stream and no timer/drop backstop is needed;
4212    /// 2. coalescing — grant-packet count stays at
4213    ///    ~chunks / (window/2), and is strictly fewer than one
4214    ///    grant per chunk once window ≥ 4 (the integration suite
4215    ///    only exercises window=2, whose threshold degenerates
4216    ///    to per-chunk).
4217    #[test]
4218    fn auto_grant_coalescing_never_starves_the_server_pump() {
4219        for window in 1u32..=64 {
4220            let chunks = 1_000u32;
4221            let mut server_credits = window as u64;
4222            let mut pending = 0u32;
4223            let mut sent = 0u32;
4224            let mut delivered = 0u32;
4225            let mut grants = 0u32;
4226            while delivered < chunks {
4227                // Server pump: send while credits remain.
4228                while server_credits > 0 && sent < chunks {
4229                    server_credits -= 1;
4230                    sent += 1;
4231                }
4232                assert!(
4233                    sent > delivered,
4234                    "window {window}: server starved while the consumer is actively \
4235                     polling (credits {server_credits}, pending {pending}, \
4236                     sent {sent}, delivered {delivered})"
4237                );
4238                // Consumer polls exactly one chunk.
4239                delivered += 1;
4240                if let Some(amount) = super::accumulate_auto_grant(&mut pending, window) {
4241                    grants += 1;
4242                    server_credits += amount as u64;
4243                }
4244            }
4245            let threshold = (window / 2).max(1);
4246            assert!(
4247                grants <= chunks / threshold + 1,
4248                "window {window}: {grants} grant packets exceeds the \
4249                 coalesced cadence bound of {}",
4250                chunks / threshold + 1
4251            );
4252            if window >= 4 {
4253                assert!(
4254                    grants < chunks,
4255                    "window {window}: coalescing must emit fewer grants than chunks"
4256                );
4257            }
4258        }
4259    }
4260
4261    /// `get` promotes to most-recently-used, so a touched entry outlives an
4262    /// untouched one when the cache overflows by one — confirming the wrapper
4263    /// gives true LRU semantics (a hot caller isn't evicted out from under an
4264    /// in-flight exchange).
4265    #[test]
4266    fn origin_keyed_lru_get_promotes_to_mru() {
4267        let cache: OriginKeyedLru<u64> = OriginKeyedLru::new();
4268        for origin in 0..(RPC_CALLER_CACHE_CAP as u64) {
4269            cache.insert(origin, origin);
4270        }
4271        // Touch origin 0 (otherwise the LRU), then overflow by one entry.
4272        assert_eq!(cache.get(0), Some(0));
4273        cache.insert(u64::MAX, 1);
4274        assert_eq!(cache.get(0), Some(0), "touched entry must survive eviction");
4275        assert_eq!(cache.get(1), None, "the now-LRU entry (1) must be evicted");
4276    }
4277}