net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# nRPC — request/response as a CortEX fold convention

Plan for a first-class request/response primitive on Net. **Architectural anchor: nRPC is not a new subsystem. It is a convention layer over CortEX folds plus one missing channel-layer primitive.** Every piece of the request/response state machine — correlation, idempotency, snapshot/restore, replay-debugging, causal-chain integration, capability-token authz — already exists in CortEX with different names. nRPC is a typed `dispatch` enum on `EventMeta`, a channel-naming convention, and small caller-side / server-side helpers.

## Build status

What's landed in-tree (post-Phase-1 prerequisites):

- ✅ **`SubscriptionMode::QueueGroup`** on the channel roster (`channel/roster.rs`) — work-distribution dispatch alongside the existing `Broadcast` mode. `add_with_mode` / `dispatch_recipients` / `subscriber_mode` API; back-compat shims preserve every existing call site. 8 regression tests.
- ✅ **`MembershipMsg::Subscribe.queue_group: Option<String>`** wire field (`channel/membership.rs`) — `u8` length + UTF-8 bytes after the existing token field. Forward-compat: pre-queue-group senders (zero remaining bytes after token) decode as `Broadcast`. 5 regression tests.
- ✅ **`Mesh::subscribe_channel_in_queue_group[_with_token]`** public APIs and the inbound-Subscribe handler routes mode through to `roster.add_with_mode`. The publisher (`mesh.rs:5164`) consumes `dispatch_recipients` instead of `members`, so queue-group subscribers actually load-balance.
- ✅ **`cortex::rpc` codec** (`cortex/rpc.rs`) — dispatch constants (`DISPATCH_RPC_REQUEST/RESPONSE/CANCEL/DEADLINE_EXCEEDED`), flag bits (`FLAG_RPC_IDEMPOTENT/STREAMING_RESPONSE/PROPAGATE_TRACE`), `RpcStatus` enum (Net-native with documented gRPC equivalence), `RpcRequestPayload` / `RpcResponsePayload` round-trip codec with `MAX_RPC_*` caps. 15 regression tests pin wire stability + decode-rejection of malformed payloads.
- ✅ **`RpcServerFold`** (`cortex/rpc.rs`) — `RedexFold<()>` that decodes REQUEST events, dispatches the handler in tokio, emits RESPONSE via a `RpcResponseEmitter` callback. `RpcCancellationToken` (Notify+AtomicBool wrapper, race-safe), `RpcContext` (caller_origin + decoded payload + cancellation), `RpcHandler` async-trait, `RpcHandlerError` (Application/Internal). Handler panic caught via `catch_unwind` and surfaced as `RpcStatus::Internal`. Fast deadline-already-passed short-circuit. CANCEL flips the in-flight token. Malformed payloads emit `UnknownVersion` and continue (do not kill the cortex adapter). 10 regression tests.
- ✅ **`RpcClientFold`** + **`RpcClientPending`** (`cortex/rpc.rs`) — symmetric caller side. `RpcClientPending::register(call_id) -> oneshot::Receiver`; the fold's `apply` decodes RESPONSE events and routes them to the matching pending sender. Re-register of the same call_id closes the prior receiver (misuse detection). 5 regression tests.
- ✅ **End-to-end loopback integration test** (`tests/integration_nrpc_loopback.rs`) — proves the server + client folds compose into a working request/response round trip without going through the real Mesh publish path (uses synthesized `RedexEvent`s). 6 tests: round-trip, multiplexed concurrent calls, exactly-once handler invocation, cancellation flowing into the handler, application error round-trip, panic surfacing as Internal.
- ✅ **Per-channel-hash inbound dispatch hook on `MeshNode`** — `register_rpc_inbound(channel_hash, dispatcher)` / `unregister_rpc_inbound(channel_hash)` API. The mesh's inbound packet path consults the dispatcher map per packet (one DashMap get); registered channel hashes route directly to the dispatcher and skip the per-shard `inbound` queue. **Wire-format change**: the publish path (`publish_to_peer`) now stamps `channel_hash` on the outgoing packet header (was always `0` pre-fix). New `ThreadLocalPooledBuilder::set_channel_hash` exposes the underlying builder method. End-to-end network test (`tests/nrpc_inbound_dispatcher.rs`, 3 tests) proves: register/unregister round-trip, registered dispatchers receive published events through real network, unregister restores the shard-inbound path.
- ✅ **`Mesh::serve_rpc(service, handler)` / `Mesh::call(target_node_id, service, payload, opts)` glue** (`adapter/net/mesh_rpc.rs`). The wire-up:
  - **`serve_rpc`** registers an inbound dispatcher for `<service>.requests`'s channel hash. The dispatcher pushes events into a tokio mpsc; a bridge task drains it and runs each event through the `RpcServerFold` (which spawns the handler). The fold's emit closure publishes RESPONSE events on `<service>.replies.<caller_origin>` via the standard roster-based `Mesh::publish` (which works because the caller pre-subscribes to its reply channel from the server).
  - **`call`** lazy-subscribes the caller to its own reply channel from `target_node_id` (one round-trip per (target, service) pair, cached). Allocates a `call_id`, registers a oneshot in the per-Mesh `RpcClientPending`, **direct-sends** the REQUEST to `target_node_id` via `publish_to_peer` (now `pub(super)`) bypassing the local subscriber roster — RPC's caller-knows-target model doesn't fit the publisher-led pub/sub roster. Awaits the receiver; deadline timer fires CANCEL on timeout. Returns `RpcReply` on `Ok`, `RpcError` on any failure.
  - **`ServeHandle`** (RAII) unregisters the dispatcher and aborts the bridge task on Drop.
  - **Per-Mesh state additions** on `MeshNode`: `rpc_client_pending: Arc<RpcClientPending>`, `rpc_next_call_id: Arc<AtomicU64>`, `rpc_reply_subscriptions: Arc<Mutex<Vec<(u64, String)>>>`. All initialized in the constructor, exposed via `pub(super)` accessors.
- ✅ **End-to-end Mesh integration test** (`tests/integration_nrpc_mesh.rs`, 4 tests through real network handshake): round-trip echo, multiple sequential calls reusing the lazy reply subscription with exactly-once handler invocation, server panic surfaces as `Internal`, deadline emits CANCEL and surfaces as `Timeout` to the caller.
- ✅ **Real-network queue-group coverage** (`tests/queue_group_dispatch.rs`, 2 tests): two `QueueGroup` subscribers on different nodes divide a stream of 100 events between them with exactly-once delivery; broadcast subscriber + queue-group pool coexist on one channel ("audit logger + worker pool" pattern from the design doc).
- ✅ **Phase 2 first chunk: service discovery via capability announcements**. `Mesh::serve_rpc` auto-registers the service in a per-Mesh `rpc_local_services` set; `announce_capabilities[_with]` auto-merges `nrpc:<service>` tags onto the announced `CapabilitySet`, propagating through the existing capability-broadcast machinery. Two new public APIs:
  - `Mesh::find_service_nodes(service) -> Vec<u64>` queries the local capability index for nodes carrying the `nrpc:<service>` tag.
  - `Mesh::call_service(service, payload, opts) -> Result<RpcReply, RpcError>` shortcut: finds candidates, picks one via naive `call_id %  len()` round-robin, dispatches via the existing direct-addressed `call(target, ...)`. Returns `RpcError::NoRoute` if no servers advertise the tag.
  
  `ServeHandle::Drop` removes the service from the local registry so subsequent announcements stop emitting the tag.
- ✅ **Phase 2 end-to-end test** (`tests/integration_nrpc_service_discovery.rs`, 4 tests): three nodes, two serve "echo", one caller uses `call_service` — `call_service_discovers_servers_via_capability_announcements` asserts both servers are exercised by round-robin distribution; `sticky_routing_pins_a_key_to_one_server` pins consistency under `RoutingPolicy::Sticky`; `random_routing_distributes_across_servers` validates `RoutingPolicy::Random`; `call_service_with_no_servers_returns_no_route` returns `RpcError::NoRoute` with a diagnostic naming the missing tag.
- ✅ **`RoutingPolicy` enum** plumbed onto `CallOptions` (default `RoundRobin`):
  - `RoundRobin` — naive `call_id % len`. Even distribution.
  - `Random` — xxh3 of `call_id`, modulo. Stateless, even.
  - `Sticky { key: u64 }` — xxh3 of key, modulo a sorted candidate list. Same key → same target while the candidate set is stable. Useful for session affinity / shard routing / conversation pinning.
  - `LowestLatency` — picks the candidate with smallest `latency_us` per the local `ProximityGraph`. Candidates with no proximity entry fall to `u64::MAX` and lose; if every candidate lacks proximity data, falls back deterministically to the lexicographically-first sorted node id (so a freshly-discovered service still routes consistently).
- ✅ **`filter_unhealthy: bool` on `CallOptions`** (default `true`) — skips candidates whose `ProximityGraph` entry reports `!is_available()` (i.e. `Unhealthy` / `Unknown`). Pin: candidates with NO proximity entry are KEPT (absence of evidence ≠ evidence of unhealth), so a freshly-announced server isn't falsely filtered just because pingwaves haven't propagated yet.
- ✅ **EntityId ↔ node_id bridge** — `MeshNode::entity_id_for_node(u64) -> Option<[u8; 32]>` accessor consults `peer_entity_ids` to map session-layer node ids to entity-layer keys. This is the single piece that was missing; `LowestLatency` and `filter_unhealthy` both flow through it.
- ✅ **Two new bridge tests** in `tests/integration_nrpc_service_discovery.rs`:
  - `lowest_latency_falls_back_to_first_when_no_proximity_data` — 20 calls under `LowestLatency` with no pingwaves exchanged. All 20 land on the lexicographically-first sorted candidate (deterministic fallback).
  - `filter_unhealthy_keeps_candidates_with_no_proximity_data` — 20 calls with `filter_unhealthy=true` against two fresh servers (no proximity data); both servers receive a non-zero share. Pins the "absence of evidence ≠ unhealth" semantic.

Phase 1 + Phase 2 are functionally complete. The asymmetric routing pattern (REQUESTs direct-unicast, RESPONSEs roster-based) is what Phase 1 settled on and remains in Phase 2 — the discovery layer just removes the need for the caller to specify `target_node_id` explicitly, and the four routing policies + health filter let the caller hint at session affinity, even distribution, latency-driven selection, or unhealthy exclusion.

- ✅ **Rust SDK typed wrappers** (`sdk/src/mesh_rpc.rs`):
  - **Raw passthroughs** (`Mesh::serve_rpc`, `Mesh::call`, `Mesh::call_service`, `Mesh::find_service_nodes`) — thin delegates to the underlying `MeshNode` API.
  - **Typed wrappers** (`Mesh::serve_rpc_typed`, `Mesh::call_typed`, `Mesh::call_service_typed`) — auto serde via a per-call selectable `Codec` (default `Json`, `JsonPretty` for diagnostic dumps). The handler signature is `Fn(Req) -> Future<Output = Result<Resp, String>>` — `Err(String)` surfaces as `RpcError::ServerError` with `RpcStatus::Application(0x4001)` and the message as the body. Malformed request bodies short-circuit to `Application(0x4000)` before the user closure runs.
  - **`Codec` enum** with `encode<T>` / `decode<T>` helpers; round-trips primitive and struct types via `serde_json`.
  - **Re-exports** of `RpcError`, `RpcReply`, `CallOptions`, `RoutingPolicy`, `ServeHandle`, `RpcContext`, `RpcHandler`, `RpcHandlerError`, `RpcStatus`, `ServeError` from the SDK so users have one place to import from.
  - **4 unit tests** (`sdk/tests/mesh_rpc_typed.rs`) pinning the typed-handler trait round-trip, application-error mapping, malformed-body short-circuit (user closure NOT invoked), and codec round-trip semantics.

- ✅ **`ChannelConfigRegistry` prefix-match** — new `insert_prefix(prefix, config)` / `remove_prefix(prefix)` API. `get_by_name(name)` falls back to a prefix walk when no exact match exists; the first prefix `name` starts with wins. The exact-match hot path (DashMap get) is unaffected; prefix lookups are O(num_prefixes) on the slow path. Documented as "use sparingly — one prefix per service is fine, hundreds is not."
- ✅ **SDK auto-registration** in `Mesh::serve_rpc` and `Mesh::serve_rpc_typed` — registers two `ChannelConfig` entries per service:
  - Exact: `<service>.requests` (channel callers publish REQUESTs onto).
  - Prefix: `<service>.replies.` (admits every per-caller `<service>.replies.<caller_origin>` subscribe without pre-registration).
  Both default to permissive (no `publish_caps`, no `require_token`); operators who want RPC ACLs can call `register_channel` / `register_channel_prefix` themselves before `serve_rpc` to override. Resolves the SDK channel-registry friction noted in the prior follow-up.
- ✅ **End-to-end SDK nRPC tests** in `sdk/tests/mesh_rpc_typed.rs` (4 tests, real network handshake): typed `call_typed` round-trip, handler `Err(String)` mapping, `call_service_typed` discovers the server via capability announcements, codec round-trip semantics. **All four tests pass over the SDK's default `MeshBuilder::build` path** — no special opt-in required.
- ✅ **W3C Trace Context propagation** (`cortex::rpc::TraceContext` + `extract_trace_context` / `build_trace_headers` helpers). New `CallOptions::trace_context: Option<TraceContext>` and `RpcContext::trace_context: Option<TraceContext>` fields. When the caller sets `CallOptions::trace_context`, the SDK emits `traceparent` / `tracestate` headers and sets `FLAG_RPC_PROPAGATE_TRACE`; the server's fold extracts the headers and populates `RpcContext::trace_context`. nRPC is **transport-only** — application code on both sides reads/writes via whatever tracing backend it has wired up (`tracing-opentelemetry`, Datadog, etc.). Empty `tracestate` is omitted on the wire (W3C convention). 4 unit tests + 1 end-to-end test (`integration_nrpc_mesh::rpc_trace_context_propagates_to_server`) prove the round-trip via real network publish.
- ✅ **Phase 3 first chunk: streaming responses.** Multi-fire `DISPATCH_RPC_RESPONSE` events for one `call_id` marked non-terminal vs. terminal via the `nrpc-streaming` header (`continue` / `end`). New surface:
  - **Wire markers**: `HEADER_NRPC_STREAMING` header with `continue` (non-terminal) and `end` (terminal-Ok) values; non-`Ok` status is implicitly terminal regardless of header. `classify_streaming_chunk(&resp) -> StreamingChunkKind` is the single decision point. Caller sets `FLAG_RPC_STREAMING_RESPONSE` on the REQUEST to signal "expect multi-fire".
  - **Server side**: `RpcResponseSink` (unbounded mpsc, `sink.send(body)` is non-blocking), `RpcStreamingHandler` async-trait taking `(ctx, sink)`, and `RpcServerStreamingFold` (parallel to `RpcServerFold` but spawns a pump task draining the sink and emitting per-chunk `nrpc-streaming: continue` frames; handler return → terminal `end` frame, handler `Err` → terminal non-`Ok` frame, handler panic caught by `catch_unwind` → terminal `Internal`). `Mesh::serve_rpc_streaming` is the public glue.
  - **Per-call ordering guarantee**: the streaming fold takes an `RpcAsyncResponseEmitter` (Arc<dyn Fn(...) -> BoxFuture<()>>) instead of the unary fold's sync `RpcResponseEmitter`, and the pump task `.await`s each emit before reading the next sink chunk. Without this, two chunks emitted in tight succession would race into the publish path via independent `tokio::spawn`s and arrive at the caller out of order — or be eclipsed by the terminal frame and lost entirely (caller stops reading once terminal arrives). Pinned by the SDK streaming test (the unary fold keeps the cheaper sync emitter — exactly one RESPONSE per call, no ordering dependency).
  - **Client side**: `RpcClientPending` refactored from oneshot-only to a `PendingEntry::{Unary | Streaming}` enum so a single `RpcClientFold` demuxes both call kinds. `register_streaming(call_id) -> mpsc::UnboundedReceiver<StreamItem>` is the streaming counterpart of `register`. `Mesh::call_streaming` returns an `RpcStream: futures::Stream<Item = Result<Bytes, RpcError>>`; terminal-Ok closes the stream, terminal-error yields one final `Err(RpcError::ServerError)` then closes. `RpcStream::Drop` clears the pending entry and best-effort emits CANCEL via direct unicast so the server's handler observes `ctx.cancellation`.
  - **Tests** (`tests/integration_nrpc_streaming.rs`, 3 tests through real network): `rpc_streaming_collects_all_chunks` (server emits 5 chunks, caller collects all 5 in order, sees clean EOF), `rpc_streaming_drop_cancels_handler` (caller drops mid-stream, handler observes `ctx.cancellation` cooperatively), `rpc_streaming_terminal_error_after_partial_stream` (server emits 2 chunks then `Err` → caller sees both chunks then `RpcError::ServerError` with `Internal` status).
- ✅ **SDK typed streaming surface** (`sdk/src/mesh_rpc.rs`):
  - **Raw passthroughs** (`Mesh::serve_rpc_streaming`, `Mesh::call_streaming`) — thin delegates plus the same `auto_register_rpc_channels` as the unary path.
  - **Typed wrappers** (`Mesh::serve_rpc_streaming_typed`, `Mesh::call_streaming_typed`) auto serde via the per-call `Codec`. Handler signature is `Fn(Req, ResponseSinkTyped<Resp>) -> Future<Output = Result<(), String>>`. `Err(String)` surfaces as `RpcError::ServerError` with `RpcStatus::Application(0x4001)` carrying the message; malformed request bodies short-circuit to `Application(0x4000)` before the user closure runs.
  - **`ResponseSinkTyped<Resp>`** wraps `RpcResponseSink` + `Codec` and `send(&value) -> Result<(), String>` encodes per send (encode-failure surfaced to the handler so it can choose to abort the stream).
  - **`RpcStreamTyped<Resp>`** wraps `RpcStream` and decodes each chunk; decode failure terminates the stream with one `Err(RpcError::ServerError(Internal))` carrying the decode diagnostic.
  - **Re-exports** of `RpcResponseSink`, `RpcStreamingHandler`, `StreamItem`, `RpcStream` from the SDK module.
  - **3 unit tests** (`sdk/tests/mesh_rpc_streaming_typed.rs`, all over real network handshake): `typed_streaming_collects_all_chunks`, `typed_streaming_handler_error_after_partial_stream`, `typed_streaming_chunk_decode_failure_terminates_stream` (server emits a JSON shape the caller's `Resp` can't decode → caller sees one `Err` then EOF, never silently swallows).
- ✅ **Phase 3 caller-side resilience: retry helper** (`sdk/src/mesh_rpc_resilience.rs`):
  - **`RetryPolicy`** with full-half jitter (each backoff scaled by uniform random in `[0.5, 1.0]`), exponential growth (`backoff_multiplier`, default `2.0`), upper-bound cap (`max_backoff`), and a swappable `retryable: Arc<dyn Fn(&RpcError) -> bool>` predicate. Default policy: 3 attempts, 50ms initial → 1s cap.
  - **`default_retryable`** — retries `Timeout`, `Transport`, and `ServerError` for canonical transient statuses (`Internal`, `Backpressure`, server-observed `Timeout`); does NOT retry `NoRoute`, application errors, `NotFound`, `Unauthorized`, `UnknownVersion`, or `Cancelled` (caller-fixable / terminal).
  - **Four wrappers on `Mesh`**: `call_with_retry`, `call_service_with_retry`, `call_typed_with_retry`, `call_service_typed_with_retry`. Typed variants encode once and reuse the bytes across attempts; service variants re-resolve the candidate set per attempt so failover is automatic.
  - **No new dependencies** — jitter source is a tiny inline mix of `SystemTime::now()` nanos with the attempt counter (good enough to decorrelate retry storms; the goal is not unpredictability, it is independence across callers).
  - **4 tests** (`sdk/tests/mesh_rpc_retry.rs`): `retry_eventually_succeeds_after_transient_failures` (server fails first 2, succeeds 3rd; wrapper observes single `Ok` reply and exactly 3 handler invocations), `retry_does_not_retry_application_errors` (typed handler `Err(String)` surfaces as `Application(0x4001)` after 1 attempt — no retry), `retry_exhaustion_surfaces_last_error` (3-attempt cap, server always Internal, last error round-trips with original diagnostic), `default_retryable_classifies_canonical_errors` (pure-function unit pin on the predicate's classification).
- ✅ **Phase 3 caller-side resilience: hedge helper** (`sdk/src/mesh_rpc_resilience.rs`):
  - **`HedgePolicy { delay, hedges }`** — fire-then-race: primary at `t=0`, additional hedges at `t=delay*idx`, first reply (Ok or Err) wins; if first finisher is `Err`, the wrapper waits for remaining hedges before surfacing the last error. Defaults: 50ms delay, 1 hedge.
  - **Four wrappers on `Mesh`**: `call_with_hedge_to(targets, ...)` / `call_typed_with_hedge_to` for explicit-target hedging (e.g. primary + warm-standby), `call_service_with_hedge` / `call_service_typed_with_hedge` for capability-index-driven hedging across replicas. Service variants sort the candidate set so the prefix taken is deterministic.
  - **Why service-only and explicit-targets-only, not direct-to-one-target**: hedging to the same target is always wrong (same backlog, same GC pause, doubles your load for nothing). Hedging only buys p99 reduction across distinct replicas / endpoints.
  - **Why no `filter_unhealthy` on the service variant**: hedge's whole premise is "be robust to per-node slowness" — including unhealthy-but-still-responsive nodes. Filtering them out reduces the redundancy hedging buys you. Documented; users who want health-aware single-target dispatch use `call_service` with a routing policy directly.
  - **Cancellation tradeoff (RESOLVED — see CANCEL-on-drop entry below)**: ~~loser hedges are NOT explicitly CANCELled today.~~ Now wired via `UnaryCallGuard`: hedge losers' `UnaryCallGuard::Drop` fires CANCEL to the server, which observes it on `ctx.cancellation`. Pinned by `hedge_loser_handler_observes_cancellation`.
  - **3 tests** (`sdk/tests/mesh_rpc_hedge.rs`): `hedge_backup_wins_when_primary_is_slow` (primary sleeps 800ms, backup is instant; with 50ms hedge delay the backup's body wins under 600ms wall-clock), `hedge_zero_degrades_to_single_call` (`hedges=0` falls back to a single straight call against `targets[0]`; second target ignored), `hedge_empty_targets_returns_no_route` (empty `targets` slice → immediate `RpcError::NoRoute` with diagnostic).
- ✅ **Phase 3 caller-side metrics** (`adapter/net/mesh_rpc_metrics.rs`):
  - **`RpcMetricsRegistry`** — per-Mesh `DashMap<String, Arc<ServiceMetricsAtomic>>`. One entry per service that's been called. Thread-safe atomic counters.
  - **Per-service counters**: `calls_total`, `errors_no_route` / `errors_timeout` / `errors_server` / `errors_transport`, `in_flight` (gauge), `latency_sum_ns` / `latency_count`, and a Prometheus-default cumulative bucketed histogram (`[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]` seconds + `+Inf` terminal bucket).
  - **`CallMetricsGuard`** — RAII shim built at the top of `Mesh::call` (BEFORE any potential early-return): bumps `in_flight` on construction, balances on Drop. Each return path calls `guard.record(CallOutcome::*)`; Drop reads the recorded outcome and bumps the appropriate counter + records the latency. A future dropped without a `record(...)` call (hedge loser) leaves `in_flight` correctly decremented but does not synthesize a phantom call resolution.
  - **Snapshot + Prometheus formatter**: `MeshNode::rpc_metrics_snapshot() -> RpcMetricsSnapshot` (cheap one-DashMap-pass copy; suitable for per-scrape pull). `RpcMetricsSnapshot::prometheus_text() -> String` emits `text/plain; version=0.0.4` straight into a `/metrics` HTTP handler. Five canonical metric families per service: `nrpc_calls_total`, `nrpc_errors_total{kind=no_route|timeout|server|transport}`, `nrpc_in_flight_calls`, `nrpc_call_latency_seconds_{bucket,sum,count}`. Service names are escaped per Prometheus exposition convention (backslash, double-quote, newline).
  - ~~**Caller-side only for v1**~~ **Server-side counters now wired** (see "Server-side metrics" entry below). Caller-side surface is unchanged; the same `ServiceMetricsAtomic` carries both directions, so a node that both calls and serves a service has complete observability for that service in one record.
  - **No new dependencies** — atomics + DashMap (already in core).
  - **5 unit tests** in `mesh_rpc_metrics.rs`: registry round-trip with cumulative-bucket arithmetic, Prometheus text contains canonical names with `+Inf` bucket, label escaping, `CallMetricsGuard` records on `record(...)` path, `CallMetricsGuard` dropped without `record(...)` only balances `in_flight` (no phantom outcome).
  - **2 SDK integration tests** (`sdk/tests/mesh_rpc_metrics.rs`): mixed-outcome (3 ok + 1 server-error + 1 timeout) lands in the right counters with `+Inf` bucket equal to count and `in_flight = 0` at end; Prometheus text contains all five metric families plus `service="prom_test"` label and `le="+Inf"` terminal bucket. SDK re-exports `RpcMetricsSnapshot`, `ServiceMetrics`, `DEFAULT_LATENCY_BUCKETS_SECS`, plus `Mesh::rpc_metrics_snapshot()` shortcut.
- ✅ **Phase 3 per-streaming-response window grants** (closes the Phase 3 backlog):
  - **Wire additions**: `DISPATCH_RPC_STREAM_GRANT: u8 = 0x14` (caller → server, payload is 4-byte big-endian `u32` credit count) + `HEADER_NRPC_STREAM_WINDOW_INITIAL: &str = "nrpc-stream-window-initial"` (REQUEST header, ASCII-decimal `u32` initial window). Helpers: `encode_stream_grant`, `decode_stream_grant`, `parse_stream_window_initial`.
  - **Server side**: `RpcServerStreamingFold` keeps a per-call `Arc<tokio::sync::Semaphore>` map (`FlowControlMap`). On REQUEST, if the initial-window header is present, a semaphore is created with that many permits and stashed under `(caller_origin, call_id)`. The pump task `acquire_owned().await` + `forget()` per chunk. STREAM_GRANT events `add_permits(n)`. CANCEL + terminal both drop the entry. Header absent → no entry → pump skips the await (back-compat path is unchanged; existing 3 streaming tests still pass).
  - **Caller side**: `CallOptions::stream_window_initial: Option<u32>`. `Mesh::call_streaming` emits the header on REQUEST and stashes the window on the returned `RpcStream`. `RpcStream::poll_next` auto-grants 1 credit per delivered chunk (so the in-flight credit holds near the initial window). `RpcStream::grant(n)` is the explicit API for batched cadence (e.g. grant `window/2` after every `window/2` chunks consumed); no-op when flow control isn't enabled. `RpcStream::flow_controlled() -> bool` exposes the state for tests / diagnostics. Both auto-grant and explicit-grant route through the new shared `spawn_grant_publish` helper, parallel to `spawn_cancel_publish`.
  - **Defensive caps**: tokio's `Semaphore::add_permits` is bounded by `MAX_PERMITS = usize::MAX >> 3`. Server applies a `>>4`-conservative cap on incoming GRANT amounts so a misbehaving caller can't overflow. STREAM_GRANT for an unknown / non-flow-controlled call_id is silently dropped (CANCEL race / wrong-stream confusion are both harmless).
  - **3 new integration tests** (`tests/integration_nrpc_streaming.rs`, real network):
    - `rpc_streaming_window_throttles_pump_until_grants` — server emits 20 chunks back-to-back, caller sets `window=3` and DOESN'T poll; after 300ms the server's `streaming_chunks_emitted_total` metric is exactly 3 (pump genuinely stalled on the semaphore, not just network-bound).
    - `rpc_streaming_auto_grant_drains_full_stream_under_small_window` — server emits 25 chunks, caller `window=2` + normal drain via `Stream::next`; auto-grant lets the full stream flow through with all 25 received in order.
    - `rpc_streaming_explicit_grant_unblocks_pump` — server emits 20, `window=2`, no consumption → metric stays at 2; explicit `stream.grant(5)` → metric advances to 7. Pin: explicit grants work in addition to auto-grant.
  - **Existing 3 streaming tests continue to pass** without changes — they don't set `stream_window_initial`, so they get the unbounded-credit path.
- ✅ **Phase 3 server-side metrics** (extends `mesh_rpc_metrics.rs` + hooks in `cortex/rpc.rs`):
  - **New `ServiceMetricsAtomic` fields**: `handler_invocations_total`, `handler_panics_total`, `handler_in_flight` (gauge), `handler_duration_sum_ns` / `handler_duration_count` / `handler_duration_buckets` (Prometheus-default cumulative histogram, separate from the caller-side latency histogram so users can compare network vs handler time), `streaming_chunks_emitted_total`. All on the same per-service record as the caller-side counters — one snapshot covers both directions.
  - **`RpcServerFold::with_metrics(...)`** and **`RpcServerStreamingFold::with_metrics(...)`** — opt-in builder methods that attach a per-service metrics handle. The fold's spawned handler task hooks: bump `handler_invocations_total` + `handler_in_flight` at start, record `handler_duration_*` + decrement `handler_in_flight` at end, bump `handler_panics_total` from the `catch_unwind` Err arm. The streaming fold's pump task additionally bumps `streaming_chunks_emitted_total` per `sink.send(...)`.
  - **`Mesh::serve_rpc` + `Mesh::serve_rpc_streaming`** automatically attach the metrics handle (`self.rpc_metrics_arc().for_service(service).with_metrics(...)`); user code doesn't need to opt in. Tests that construct folds directly skip metrics by using the bare `new(...)` constructor.
  - **Prometheus output** (extends `prometheus_text()`): five new metric families per service — `nrpc_handler_invocations_total`, `nrpc_handler_panics_total`, `nrpc_handler_in_flight`, `nrpc_handler_duration_seconds_{bucket,sum,count}`, `nrpc_streaming_chunks_emitted_total`. Service label + `+Inf` bucket conventions identical to the caller-side path.
  - **2 new module-level tests** in `mesh_rpc_metrics.rs`: `prometheus_text_emits_canonical_metric_names` extended to assert all five server-side families, plus `record_handler_duration_lands_in_buckets` pinning bucket arithmetic for the new histogram.
  - **1 new SDK integration test** (`sdk/tests/mesh_rpc_metrics.rs::server_side_metrics_increment_for_unary_and_streaming`): server with both unary `Echo` (4 calls, 2ms sleep) and streaming `Counter` (1 call, 3 chunks); asserts `handler_invocations_total = 4` for unary + `=1` for streaming + `streaming_chunks_emitted_total = 3` + `handler_in_flight = 0` after settling + `≤5ms` bucket has the 2ms-sleep observations + caller-side mesh has zero handler invocations (different mesh, separate registry).
- ✅ **Unary-call CANCEL-on-drop** (`adapter/net/mesh_rpc.rs`): closes the gap that hedging exposed. New `UnaryCallGuard` is constructed inside `Mesh::call` immediately after the REQUEST is published; if the call future is dropped before resolving (hedge loser, `tokio::select!` losing arm, caller-side `JoinHandle::abort`, etc.), the guard's Drop runs `pending.cancel(call_id)` AND spawns a CANCEL publish to the server via the new `spawn_cancel_publish` helper (shared with `RpcStream::Drop`). The success path flips `guard.completed = true` so a happy call doesn't fire a useless CANCEL. The deadline-timeout path leaves `completed=false` and lets Drop emit CANCEL automatically — the previous explicit `send_rpc_cancel` call is removed (deduplication). Pinned by `rpc_dropped_call_future_fires_cancel_to_server` (raw integration) and `hedge_loser_handler_observes_cancellation` (SDK hedge), both asserting the server-side handler's `ctx.cancellation` actually fires after the caller drops the future.
- ✅ **Phase 3 caller-side resilience: circuit breaker** (`sdk/src/mesh_rpc_resilience.rs`):
  - **`CircuitBreaker`** with `CircuitBreakerConfig` — three-state machine `Closed → Open → HalfOpen → Closed/Open`. Defaults: 5 consecutive failures to trip, 30s open cooldown, 1 successful probe to close.
  - **Different shape from retry/hedge**: a long-lived stateful guard the user instantiates once (typically per logical downstream — one per service, or one per `(service, target)` pair) and shares via `Arc<CircuitBreaker>`. The wrapper takes a closure: `breaker.call(|| async { mesh.call_typed::<Req,Resp>(...).await }).await`. Generic over the inner result type so it composes around raw, typed, retried, OR hedged calls without specialized variants.
  - **`BreakerError::{Open | Inner(RpcError)}`** — pattern-match `Open` to fall back, `Inner` to handle the underlying error. `into_rpc_error()` flattens to `RpcError` for callers that don't care about the distinction.
  - **`default_breaker_failure`** predicate matches `default_retryable` — counts transient infra failures (`Timeout`, `Transport`, server `Internal` / `Backpressure` / `Timeout`); does NOT count application errors (caller-fixable bugs aren't health signals). Swap via `CircuitBreakerConfig::failure_predicate`.
  - **HalfOpen semantics**: at most ONE concurrent probe; other calls during HalfOpen short-circuit. Probe success → `consecutive_successes++` (transition to `Closed` when `success_threshold` met); probe failure → straight back to `Open` with cooldown reset.
  - **Observability**: `state()` and `consecutive_failures()` are cheap snapshots; `reset()` is the operator override (clear all state to `Closed`, zero counters) for runbook scenarios.
  - **No new dependencies** — state lives in `std::sync::Mutex` (held briefly, never across `await`).
  - **5 tests** (`sdk/tests/mesh_rpc_breaker.rs`, real-network where state transitions matter): `breaker_full_state_machine_cycle` (3 failures trip → 5 short-circuit → fix downstream + wait cooldown → probe closes → normal flow resumes; pinned via handler-invocation counter that the cooldown phase never invokes the server), `breaker_failed_half_open_probe_reopens` (bad probe re-opens with fresh cooldown, doesn't slip into Closed), `breaker_application_errors_do_not_trip` (5 consecutive typed `Err(String)` keeps state Closed, counter at 0), `breaker_reset_clears_state`, `breaker_error_flatten`.

What's still pending:

- ⏳ **`Mesh::serve_rpc(service, handler)` / `Mesh::call(service, payload, opts)` glue**. The shape is locked; the implementation is the next concrete pickup.

  **Seam decision (locked): Option A — cortex adapter on the request channel.** `serve_rpc` opens a real `CortexAdapter` on `<service>.requests` with the `RpcServerFold` as its fold. Subscribed inbound events (delivered via the existing `inbound: DashMap<u16, SegQueue<StoredEvent>>` path) are ingested into the adapter's local redex log; the adapter's tail-and-fold task drives `RpcServerFold::apply` from there. The alternative (Option B — bypass the redex log, feed subscribed events directly into the fold via a tokio channel) was rejected for Phase 1 because (a) it requires a hot-path code change to the mesh's inbound delivery point, and (b) Option A reuses every existing piece of plumbing while gaining durability + replay + snapshot-restore of in-flight RPC state for free. The per-call redex-append latency cost (~microseconds) is acceptable when the network alone is hundreds of microseconds.

  Concrete steps:

  1. **`serve_rpc(service, handler)`**:
     - Open `CortexAdapter::open(redex, "<service>.requests", ..., RpcServerFold::new(handler, emit), ())`.
     - Where `emit: RpcResponseEmitter` builds an `EventEnvelope` (meta + `RpcResponsePayload::encode()`) and calls `Mesh::publish(channel_publisher_for("<service>.replies.<caller_origin>"), payload)`. Reply-channel naming uses the caller's `origin_hash` (8-byte hex), so each caller's reply channel is private and naturally subscribed only by them.
     - Self-subscribe via `Mesh::subscribe_channel_in_queue_group(self_node_id, "<service>.requests", "<service>")`. This is a local-only roster mutation; the queue-group dispatch then routes one-of-N requests across replicas that all subscribe with the same group name.
     - Bridge inbound events into the cortex adapter's ingest path. The mesh already pushes subscribed events into `inbound[shard_id]`. The bridge spawns a task that polls `MeshNode::poll_shard(...)` for the relevant shard, filters to events for the channel, and calls `adapter.ingest(envelope)`. (A future optimization: hook directly at the inbound-delivery point to skip the poll loop. Phase 2 work.)
     - Return a `ServeHandle` whose Drop closes the adapter, unsubscribes, and stops the bridge task.

  2. **`call(service, payload, opts)`**:
     - Lazily ensure: (a) a subscription to `<service>.replies.<self_origin>` (Broadcast mode, sole subscriber by construction), (b) a `CortexAdapter` on that channel with `RpcClientFold` as its fold, (c) the same inbound bridge as above.
     - Allocate `call_id` from a per-Mesh `AtomicU64`.
     - `pending.register(call_id) -> oneshot::Receiver`.
     - Publish REQUEST envelope on `<service>.requests` via `Mesh::publish`.
     - Await the receiver under `opts.deadline` (race with `tokio::time::sleep_until`).
     - On future-drop OR deadline-fire: publish CANCEL envelope on `<service>.requests` and `pending.cancel(call_id)`.

  3. **Per-Mesh state** (a small extension to `MeshNode`):
     - `rpc_servers: DashMap<String /* service */, ServeHandle>` — active server registrations.
     - `rpc_client_pending: Arc<RpcClientPending>` — the singleton pending-calls store.
     - `rpc_next_call_id: AtomicU64`.
     - `rpc_reply_subscription: Mutex<Option<ReplySubscription>>` — lazily initialized on first `call`, torn down after `idle_reply_subscription_ttl` of no in-flight calls.

  4. **End-to-end integration test** (once the glue lands): two `MeshNode` instances in one process. Node A: `serve_rpc("echo", echo_handler)`. Node B: `call("echo", b"hi")`. Assert round-trip + queue-group load distribution across N>1 servers + cancellation + crash-recovery (kill node A mid-call, restart with the same redex; the request gets re-folded on rehydrate and the response lands).
- ⏳ **End-to-end integration test against real Mesh instances** — once the glue lands, two Mesh nodes in one process: one calls `serve_rpc("echo", ...)`, the other calls `call("echo", ...)`; assert round-trip + queue-group load distribution across N servers + cancellation flowing across the network.
- ⏳ **Phase 2** — service registry derived from existing capability announcements; routing policies; SDK typed wrappers for the four bindings.
- ⏳ **Phase 3** — streaming responses, tracing context propagation, retry/circuit-breaker/hedging helpers.

## The framing

An RPC server is a CortEX fold:

| RPC concept | CortEX equivalent |
|------|------|
| Server's accumulated state | `RedexFold::State` |
| A request | An `EventEnvelope` with `meta.dispatch = REQUEST` |
| Correlation ID | `EventMeta::seq_or_ts` (per-caller monotonic) |
| Caller identity | `EventMeta::origin_hash` (AEAD-verified upstream) |
| Response | An `EventEnvelope` with `meta.dispatch = RESPONSE`, same `seq_or_ts` |
| Awaiting a response | `wait_for_seq(call_id)` on the reply channel |
| Snapshot of in-flight RPC state | `CortexAdapter::snapshot()` |
| Mid-call crash recovery | Replay from log (the request was durable before processing) |
| Idempotency | Fold's natural state — replaying the same `seq_or_ts` is a no-op |
| Cancellation | A `CANCEL` event with the request's `seq_or_ts` |
| Distributed tracing | `FLAG_CAUSAL` + the existing causal-chain integration |
| Service authorization | Channel-level capability tokens (existing) |
| Replay debugging | "Which request caused the bad state" — replay the channel |

This is the same pattern that drives event-sourcing and CQRS architectures. We get all of it for the price of a `dispatch` enum extension.

## The one missing primitive: `SubscriptionMode::QueueGroup`

Channels today broadcast every published event to every subscriber. That's correct for events but wrong for request/response: N replica servers each running the request and racing on the reply is wasteful work and a synchronization headache (which response is canonical?). RPC needs **work-distribution semantics**: one-of-N delivery to a named group of co-equal subscribers.

JetStream / NATS / SQS all settled on the same shape. We adopt it:

```rust
pub enum SubscriptionMode {
    /// Existing behavior: every published event is delivered to
    /// this subscriber. Multiple subscribers in this mode receive
    /// independent copies. Right for events.
    Broadcast,

    /// Work-distribution: every published event is delivered to
    /// exactly one subscriber in the named group. Multiple
    /// subscribers in the same `QueueGroup(name)` divide the
    /// stream amongst themselves. Right for request/response.
    QueueGroup(String),
}
```

A subscriber's mode is set at `subscribe` time and is stable for the lifetime of the subscription. The roster bookkeeping changes from `subscribers: HashSet<EntityId>` to `subscribers: HashMap<EntityId, SubscriptionMode>`; the dispatch path picks one queue-group member (round-robin or P2C) per event, and broadcasts to all `Broadcast` subscribers.

This primitive is useful beyond RPC: any work-queue pattern (background job processing, ETL pipeline shards, batched fetchers) wants the same shape today. RPC is the forcing function but the surface is general.

## What's already there (that we don't have to build)

- **Typed dispatch on `EventMeta`** — `dispatch: u8` with `0x00..0x7F` reserved for CortEX-internal and `0x80..0xFF` for application/vendor. nRPC consumes a small block of the cortex-internal range.
- **Per-event integrity** — `compute_checksum_with_meta` covers the meta header (audit #8), so a bit-flip in `dispatch: REQUEST → RESPONSE` is detected by the per-event check.
- **Per-origin monotonic counters** — `seq_or_ts` is documented as either per-origin monotonic OR unix nanos. RPC uses per-caller monotonic; that's the deterministic-fold-order option, no extra work.
- **`wait_for_seq` futures** — `CortexAdapter::wait_for_seq(seq).await` returns when the fold has applied `seq`. This is literally the response-await primitive.
- **Snapshot / restore** — `applied_through_seq` strict-prefix watermark snapshots cleanly; in-flight RPC state survives restart with the rest of the fold's state.
- **Causal chain** — `FLAG_CAUSAL` events carry a `parent_hash`. RPC requests in the same trace chain together for free.
- **Capability tokens** — `PermissionToken` with `TokenScope::PUBLISH` / `SUBSCRIBE` already gates channel access. Service-level authorization is a small extension (a per-token service allowlist).
- **Mesh-level channel routing** — `SubscriberRoster` + the existing dispatch path already routes published events to remote subscribers across the mesh. No transport changes.
- **Backpressure** — RedEX append is the natural rate-limiter; events that can't be appended fast enough surface to the publisher as a typed error.
- **Identity verification** — `origin_hash` on incoming events is set by the bus from the AEAD-verified peer; not self-claimable.

What's left to build is the *convention layer* on top.

## Conventions

### Channel naming

```text
<service>.requests                       — server(s) subscribe in QueueGroup(<service>)
<service>.replies.<caller_origin_hash>   — caller subscribes in Broadcast (sole subscriber)
```

Caller publishes the request to `<service>.requests`. Exactly one server in the queue group receives it (work-distribution). Server publishes the response to `<service>.replies.<origin_hash>` — a private channel scoped to the caller. Caller is already subscribed (subscription is established lazily on first call to a service and cached for reuse).

This naming matches the existing `ChannelName` shape (forward-slash-separated segments under `cortex::adapter::net::channel`). The reply-channel name encodes the caller's `origin_hash` so each caller subscribes only to their own replies — no cross-caller fan-out.

### `EventMeta::dispatch` values

In the cortex-internal range (`0x00..0x7F`):

```rust
pub const DISPATCH_RPC_REQUEST: u8 = 0x10;
pub const DISPATCH_RPC_RESPONSE: u8 = 0x11;
pub const DISPATCH_RPC_CANCEL: u8 = 0x12;
pub const DISPATCH_RPC_DEADLINE_EXCEEDED: u8 = 0x13;
```

The rest of the dispatch space is unaffected. CortEX adapters that don't care about RPC ignore these dispatches as they ignore any other unknown dispatch.

### Payload shape (after the 24-byte `EventMeta`)

```rust
struct RpcRequestPayload {
    service: String,                    // varint+bytes (max 256)
    deadline_ns: u64,                   // 0 = no deadline
    flags: u16,                         // IDEMPOTENT | STREAMING_RESPONSE | PROPAGATE_TRACE | ...
    headers: Vec<(String, Vec<u8>)>,    // varint count + name/value pairs
    body: Bytes,                        // application-defined
}

struct RpcResponsePayload {
    status: u16,                        // 0x0000 = Ok; see status table
    headers: Vec<(String, Vec<u8>)>,
    body: Bytes,                        // for Ok = app response; for errors = UTF-8 diagnostic
}

struct RpcCancelPayload {}              // empty; the seq_or_ts in EventMeta is the call_id
struct RpcDeadlineExceededPayload {}    // empty; same
```

Encoded with `postcard` for compactness (matches the rest of the cortex envelope conventions).

`status` codes (Net-native, with documented gRPC equivalence in this doc):

| status | meaning | gRPC analog |
|---|---|---|
| `0x0000` | `Ok` | OK |
| `0x0001` | `NotFound` (no service registered with that name) | NOT_FOUND |
| `0x0002` | `Unauthorized` (token doesn't include the requested service) | PERMISSION_DENIED |
| `0x0003` | `Timeout` (server observed deadline expired before starting) | DEADLINE_EXCEEDED |
| `0x0004` | `Backpressure` (server's per-service queue full) | RESOURCE_EXHAUSTED |
| `0x0005` | `Cancelled` (caller emitted CANCEL before server completed) | CANCELLED |
| `0x0006` | `Internal` (handler panicked or returned an error) | INTERNAL |
| `0x0007` | `UnknownVersion` (request payload version not supported) | UNIMPLEMENTED |
| `0x0008..0x7FFF` | reserved | — |
| `0x8000..0xFFFF` | application-defined | — |

### Correlation

`EventMeta::seq_or_ts` is the `call_id`. Caller-generated, per-caller monotonic. Same value on the request, the response, and any associated CANCEL or DEADLINE_EXCEEDED events.

No separate UUID needed — `seq_or_ts` is already 8 bytes and it's already the deterministic-fold-order field.

## The fold pattern

### Server-side: `RpcServerFold`

```rust
pub trait RpcHandler<S>: Send + Sync + 'static {
    type Future: Future<Output = Result<RpcResponsePayload, RpcHandlerError>> + Send;
    fn call(&self, ctx: RpcContext, state: &mut S) -> Self::Future;
}

pub struct RpcServerFold<H, S> {
    handler: H,
    /// Per-caller in-flight set; entries cleared on RESPONSE emission
    /// or on CANCEL / DEADLINE_EXCEEDED.
    in_flight: DashMap<(u64, u64), CancellationToken>, // (origin_hash, call_id) -> token
    /// LRU of completed idempotent calls; key is (origin_hash,
    /// call_id), value is the cached RESPONSE payload. Bounded so a
    /// long-running fold doesn't grow without bound.
    completed_idempotent: lru::LruCache<(u64, u64), RpcResponsePayload>,
    _state: PhantomData<S>,
}

impl<H, S> RedexFold<S> for RpcServerFold<H, S>
where
    H: RpcHandler<S>,
    S: Send + Sync,
{
    fn apply(&mut self, ev: &RedexEvent, state: &mut S) -> Result<(), RedexError> {
        let meta = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])?;
        let key = (meta.origin_hash, meta.seq_or_ts);
        match meta.dispatch {
            DISPATCH_RPC_REQUEST => {
                let req: RpcRequestPayload = postcard::from_bytes(&ev.payload[EVENT_META_SIZE..])?;
                // Idempotency: replay of a previously-completed call
                // returns the cached response without re-running.
                if req.flags & IDEMPOTENT != 0 {
                    if let Some(cached) = self.completed_idempotent.get(&key) {
                        self.emit_response(meta.origin_hash, meta.seq_or_ts, cached.clone());
                        return Ok(());
                    }
                }
                // Fast deadline-already-passed short-circuit: emit
                // Timeout without running the handler.
                if req.deadline_ns != 0 && now_ns() > req.deadline_ns {
                    self.emit_response(meta.origin_hash, meta.seq_or_ts, RpcResponsePayload::timeout());
                    return Ok(());
                }
                let cancel = CancellationToken::new();
                self.in_flight.insert(key, cancel.clone());
                let ctx = RpcContext { caller: meta.origin_hash, call_id: meta.seq_or_ts, request: req, cancel };
                // Spawn the handler off the fold thread. The fold
                // returns immediately so subsequent events
                // (including CANCEL for *this* call_id) can be
                // processed without head-of-line blocking.
                tokio::spawn(self.handler.call(ctx, state));
            }
            DISPATCH_RPC_CANCEL => {
                if let Some((_, token)) = self.in_flight.remove(&key) {
                    token.cancel();
                }
            }
            _ => {} // RESPONSE / DEADLINE_EXCEEDED: ignored on the server side
        }
        Ok(())
    }
}
```

The handler runs in a `tokio::spawn` so the fold doesn't block on application work. When the handler completes, it emits the RESPONSE event via `emit_response`, which publishes to `<service>.replies.<caller_origin_hash>`. The fold sees the RESPONSE indirectly when its `wait_for_seq` future resolves on the reply channel.

Note the head-of-line property: a long-running call doesn't block subsequent calls (or the CANCEL of itself). The fold itself never awaits.

### Caller-side: `RpcClientFold`

```rust
pub struct RpcClientFold {
    /// Pending calls awaiting a response. Each call owns a oneshot
    /// receiver; the fold completes the sender when the matching
    /// RESPONSE arrives.
    pending: DashMap<u64, oneshot::Sender<RpcResponsePayload>>, // call_id -> sender
}

impl RedexFold<()> for RpcClientFold {
    fn apply(&mut self, ev: &RedexEvent, _state: &mut ()) -> Result<(), RedexError> {
        let meta = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])?;
        if meta.dispatch != DISPATCH_RPC_RESPONSE { return Ok(()); }
        let resp: RpcResponsePayload = postcard::from_bytes(&ev.payload[EVENT_META_SIZE..])?;
        if let Some((_, tx)) = self.pending.remove(&meta.seq_or_ts) {
            let _ = tx.send(resp);
        }
        Ok(())
    }
}
```

This fold has empty user state — it's purely a routing index from `call_id` → caller's awaiting future. The actual RPC state is on the server's fold.

## API surface

### Caller: `Mesh::call`

```rust
impl Mesh {
    pub async fn call(
        &self,
        service: &str,
        payload: Bytes,
        opts: CallOptions,
    ) -> Result<RpcReply, RpcError>;
}

#[derive(Debug, Clone, Default)]
pub struct CallOptions {
    pub deadline: Option<Instant>,
    pub idempotent: bool,
    pub trace_context: Option<TraceContext>,
    pub max_in_flight: u32, // caller-side semaphore (default 64)
}
```

Internals: `Mesh::call`
1. Allocates a fresh `call_id` from the per-caller monotonic counter.
2. Registers a oneshot in the local `RpcClientFold::pending` keyed on `call_id`.
3. Publishes a REQUEST event to `<service>.requests` with `meta.seq_or_ts = call_id`, `meta.origin_hash = self.identity.origin_hash()`, `dispatch = DISPATCH_RPC_REQUEST`.
4. Awaits the oneshot. If `opts.deadline` fires first → publishes a CANCEL event (so the server can drop the in-flight entry) and returns `RpcError::Timeout`. If the future is dropped before the response → publishes a CANCEL event.
5. On response: returns the decoded `RpcReply`.

**Subscription**: the first `Mesh::call(service, ...)` lazily subscribes to `<service>.replies.<origin_hash>` in `Broadcast` mode (it's the only subscriber by construction). Subsequent calls reuse the subscription. A background task tears down the reply subscription after `idle_reply_subscription_ttl` of no in-flight calls.

### Server: `Mesh::serve_rpc`

```rust
impl Mesh {
    /// Register a handler for `service`. Subscribes to
    /// `<service>.requests` in QueueGroup(<service>) mode; multiple
    /// nodes calling `serve_rpc` for the same service automatically
    /// form a load-balanced group. Returns a `ServeHandle` whose
    /// Drop deregisters and unsubscribes.
    pub fn serve_rpc<S, H>(
        &self,
        service: &str,
        initial_state: S,
        handler: H,
    ) -> Result<ServeHandle, ServeError>
    where
        S: Send + Sync + 'static,
        H: RpcHandler<S>;
}
```

Internals: `serve_rpc` opens a CortEX adapter on `<service>.requests` with an `RpcServerFold` wrapping the user's handler. The adapter subscribes to the channel in `QueueGroup(<service>)` mode. The `ServeHandle` carries a `Drop` that closes the adapter and unsubscribes.

Multi-instance is automatic: every node that calls `serve_rpc("foo", ...)` joins the `foo` queue group. The channel layer's queue-group dispatch picks one of them per request.

### SDK typed wrapper

```rust
impl RpcClient {
    pub async fn call<Req, Resp>(
        &self,
        service: &str,
        request: &Req,
    ) -> Result<Resp, RpcError>
    where
        Req: Serialize,
        Resp: DeserializeOwned;
}
```

Codec selectable per client (`serde_json` / `postcard`). Bindings (Node / Python / Go) get parallel typed surfaces.

## Service discovery (Phase 2)

Reuses the existing `CapabilityAnnouncement` machinery. Each receiving node already learns "node X subscribes to channel Y" from announcements; we just add a small derived index:

```rust
pub struct ServiceRegistry {
    /// service_name -> nodes serving it (derived from
    /// CapabilityAnnouncement subscriptions to <service>.requests)
    services: DashMap<String, BTreeSet<EntityId>>,
}
```

The registry is populated automatically from existing channel announcements — no new wire kind. A `Mesh::call_service(name, ...)` shortcut consults the registry to confirm at least one server is reachable; the actual routing decision (which of N servers) happens at the channel layer via the queue-group dispatcher.

For routing-policy pluggability (round-robin, P2C, sticky, lowest-latency), the policy is configured per `serve_rpc` call (server-side) AND per `Mesh::call` (caller can hint). The default is P2C against in-flight count, which is what `behavior::loadbalance.rs` already implements.

## Authorization

Two layers, both load-bearing:

1. **Channel-level (existing).** Capability tokens already gate `subscribe` and `publish` per channel. Calling an RPC service requires `publish` on `<service>.requests` and `subscribe` on `<service>.replies.<self_origin_hash>`. The latter is naturally scoped to the caller's own origin (no other token has the right to subscribe to *your* reply channel).
2. **Service-level allowlist (new).** Add `rpc_services: Vec<String>` to `PermissionToken`. Server-side, the RPC fold rejects requests whose token doesn't list the service in scope; rejection is `RpcStatus::Unauthorized`. Empty list = no services allowed (defense-in-depth default; tokens predating the field don't authorize RPC).

End-to-end identity: `meta.origin_hash` is set by the bus from the AEAD-verified peer; not self-claimable. nRPC inherits the existing in-channel-identity-spoofability tradeoff (see `adapter/net/identity/origin.rs`).

## What naturally falls out of CortEX (free wins)

- **Crash recovery.** A request that was appended to the channel before the server crashed is replayed when the server's fold rehydrates from the log. The `applied_through_seq` strict-prefix watermark guarantees at-least-once handler execution. Pair with `IDEMPOTENT` flag for safe retry semantics.
- **Snapshot-based migration.** A server's in-flight RPC state migrates with the rest of its fold state (compute layer's snapshot/restore). In-flight calls survive a planned migration; in-flight calls survive a process restart.
- **Time-travel debugging.** "Which request caused the bad state?" — open the channel, replay events, see exactly which REQUEST flipped the fold into the broken state. Causal-chain integration shows the trace.
- **Audit trail.** Every RPC call is durable. Operators get a free per-service audit log without instrumenting handlers.
- **Backpressure.** RedEX append rate-limits naturally; over-cap publishers see `RedexError::Append` and surface it as `RpcError::Backpressure`.

## What we lose vs. a transport-level RPC (and why we're OK with it)

- **Per-call latency floor.** Each call goes through the redex append → fold dispatch → response publish → caller fold pipeline. Even with in-memory redex this is a few extra microseconds vs. a direct stream send/receive. Acceptable for any call where the network alone is hundreds of microseconds, which is every realistic microservice RPC.
- **Stream-level backpressure.** Stream-based RPC (gRPC's per-stream window) gives finer-grained flow control than channel-level append backpressure. For the streaming-response case (Phase 3) we may need to add channel-level credit grants — an extension of the existing `SUBPROTOCOL_STREAM_WINDOW` shape — but that's a small follow-up, not a blocker.
- **Direct unicast.** Every request goes through a channel even when the caller knows the target's `entity_id`. This is fine: the mesh's dispatcher already optimizes pub/sub to direct-deliver when there's a single subscriber. Queue-group dispatch is the same cost as broadcast-with-one-recipient.

## Phasing

| Phase | Release | Scope |
|------|---------|-------|
| **1** | v0.12 | `SubscriptionMode::QueueGroup(name)` lands on the channel layer; existing `Broadcast` semantics unchanged. `RpcServerFold` + `RpcClientFold` + the four `dispatch` constants. `Mesh::call` / `Mesh::serve_rpc` API. Channel naming convention enforced by helpers. Token-scope check (`rpc_services` allowlist on `PermissionToken`). Test suite covering: queue-group one-of-N delivery, correlation, deadline → CANCEL emission, idempotency replay, server panic, backpressure, token-scope rejection, identity guard. |
| **2** | v0.13 | `ServiceRegistry` derived from existing channel-subscription announcements. `Mesh::call_service` shortcut + routing-policy hooks (RoundRobin, P2C, Sticky, LowestLatency) wired into queue-group dispatch. Health-aware filtering against `proximity::node_health`. SDK typed wrappers for Rust / Node / Python / Go. |
| **3** | v0.14 | Streaming responses (`STREAMING_RESPONSE` flag → multiple `DISPATCH_RPC_RESPONSE` events with same `seq_or_ts` and a `is_terminal` payload bit). Per-streaming-response window grants. Caller-side helpers: `with_retry`, `with_circuit_breaker`, `with_hedge`. W3C Trace Context propagation hardened. Per-call latency / error-rate metrics on a Prometheus-compatible endpoint. |
| **deferred** | v0.15+ | Client-streaming, bidirectional streaming, schema registry / IDL codegen (`.nrpc` files → typed Rust/TS/Python clients). |

## Test surface

### Phase 1
- **Queue-group one-of-N delivery.** Spawn 4 servers in `QueueGroup("foo")`; publish 1000 requests; assert each request is processed by exactly one server, and load is approximately balanced (within 10% of even).
- **Queue-group + broadcast coexistence.** Same channel, mix of `Broadcast` subscribers (e.g., audit logger) and `QueueGroup("worker")` subscribers; assert broadcast subscribers see every event, queue-group sees one-of-N.
- **Correlation across concurrent calls.** Spawn N concurrent `call()` futures; assert each gets its own response keyed on the right `call_id`.
- **Deadline → CANCEL.** Caller's deadline fires before response; assert a CANCEL event is published; server's fold removes the in-flight entry; the response (if it was already mid-flight) lands on a non-existent oneshot and is dropped harmlessly.
- **Caller drop → CANCEL.** Caller drops the future; same CANCEL flow.
- **Idempotency replay.** Replay a request with `IDEMPOTENT` flag set after the original completed; assert the cached response is returned without re-running the handler.
- **Server panic.** Handler that panics surfaces as `RpcStatus::Internal` to the caller (caught by the spawn boundary's `JoinHandle` / `catch_unwind`).
- **Backpressure on overload.** Fill the redex append capacity; assert publishers see `RpcError::Backpressure`.
- **Token-scope rejection.** Token without `rpc_services` listing the service rejects the call with `Unauthorized`.
- **Identity guard.** `RpcContext::caller` is the AEAD-verified peer; not the value in the payload.
- **Crash recovery.** Append a request, kill the server before it processes, restart; assert the request is processed on rehydrate (at-least-once) and the response lands on the caller's reply channel.

### Phase 2
- **ServiceRegistry derivation.** Bring up N servers calling `serve_rpc("foo", ...)`; assert every node's local `ServiceRegistry` learns of "foo" within one capability-announcement interval.
- **Health-aware exclusion.** Mark one server unhealthy via `proximity`; assert subsequent calls don't route to it; assert recovery puts it back.
- **Routing-policy correctness.** `Sticky` is consistent across calls with the same key; `LowestLatency` picks the lowest-p50 instance.

### Phase 3
- **Streaming responses in order.** N RESPONSE events with same `seq_or_ts`; assert order; assert the terminal bit closes the call.
- **Stream cancellation mid-flight.** Caller cancels; assert subsequent RESPONSE events are dropped on arrival (the pending entry is gone).
- **Trace context propagation.** `traceparent` / `tracestate` round-trip through the headers block.

## Out of scope

- **Pub/sub replacement.** Channels-as-event-bus stay. RPC and events coexist on the same channel mechanism.
- **Service-mesh sidecar.** nRPC runs in-process.
- **Mutual TLS / cert rotation.** Net's existing AEAD + capability-token model is the substrate.
- **Schema-validated payloads in v1.** Payloads are `Bytes`; schema registry is deferred.
- **Sync RPC.** Async-only API.

## Open design questions

1. **Queue-group dispatch policy default.** P2C (against in-flight count, observed locally per channel publisher) is the recommended default; round-robin is the documented alternative. Either is fine for v1; pick one to ship and add the other later. Recommend P2C — it composes better with heterogeneous server capacity.

2. **Reply-channel naming with `origin_hash` vs `entity_id`.** `origin_hash` (8 bytes after the widening) is structurally fine for channel naming but has a known birthday-collision floor; using `entity_id` (32 bytes) eliminates collisions but produces a longer channel name. Per-caller reply channels are private to the caller anyway (capability tokens scope `subscribe` to your own `origin_hash`-named channel), so collisions don't cause cross-caller leakage — they just mean two callers share a channel. Recommend `origin_hash` for terseness; revisit if `entity_id`-keyed channels become a uniform convention elsewhere.

3. **Where does the queue-group selection happen — sender or receiver side?** Either works:
   - **Sender-side:** publisher consults the local roster, picks one queue-group member, sends to that one. Lower fan-out cost; biased view of who's healthy.
   - **Receiver-side:** publisher broadcasts to all queue-group members; each receiver picks a deterministic "should I take this one?" decision (consistent hash on `seq_or_ts`). Higher fan-out but unbiased.
   
   Recommend sender-side — it's what the existing dispatcher already does for unicast and it composes with the proximity-driven load metrics. Receiver-side is the fallback for cases where the sender doesn't have a complete roster view (e.g., partition healing).

4. **Idempotency cache eviction.** `completed_idempotent` LRU needs a sized bound. Default 10K entries per server fold? Per-caller? Per-(caller, service)? Recommend a single per-fold LRU sized at 10K with TTL of 5 minutes — covers reasonable retry windows without unbounded growth. Operator-tunable.

5. **Streaming-response ordering across queue-group failover.** If the server handling a streaming response dies mid-stream, queue-group dispatch reroutes subsequent events to a peer that has no context. Need either (a) sticky session affinity (queue group with `Sticky(call_id)` policy ensures all events for a call_id go to the same server) or (b) explicit takeover via the snapshot-restore path. Phase 3 problem; flagging early so the design accommodates it.