net_sdk/mesh_rpc_resilience.rs
1//! Caller-side resilience helpers for nRPC calls.
2//!
3//! These are thin wrappers around the underlying typed call APIs
4//! that add operational concerns the raw `call_typed` /
5//! `call_service_typed` paths leave to the user:
6//!
7//! - **`call_with_retry` / `call_typed_with_retry`** — re-issue
8//! transient failures with exponential backoff + jitter.
9//! - **`call_with_hedge_to` / `call_service_with_hedge`** — fire
10//! a backup request after a delay; race the responses; first
11//! one wins. Bounds tail latency at the cost of duplicated
12//! work on the loser.
13//! - **[`CircuitBreaker`]** — a long-lived stateful guard that
14//! trips after N consecutive failures, short-circuits while
15//! open, and probes for recovery via a half-open state.
16//! Compose around any async call (raw, typed, retried,
17//! hedged) via `breaker.call(|| async { ... }).await`.
18//!
19//! Each helper composes with the others (and with the underlying
20//! `CallOptions` deadline / routing policy) without special
21//! plumbing — they're regular async wrappers, not a separate
22//! pipeline. Use them when you need them; pay nothing when you
23//! don't.
24
25use parking_lot::Mutex;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::time::{Duration, Instant};
29
30use bytes::Bytes;
31use serde::{de::DeserializeOwned, Serialize};
32
33use crate::mesh::Mesh;
34use crate::mesh_rpc::{
35 CallOptions, CallOptionsTyped, CodecDirection, RpcError, RpcReply, RpcStatus,
36};
37
38// ============================================================================
39// Retry policy.
40// ============================================================================
41
42/// What counts as "this should be retried" for a given [`RpcError`].
43/// Defaults to [`default_retryable`] which retries transient
44/// infrastructure failures (timeout, server-side internal /
45/// backpressure, transport) and skips terminal ones (no route,
46/// application errors, unknown-version).
47pub type RetryablePredicate = Arc<dyn Fn(&RpcError) -> bool + Send + Sync>;
48
49/// Backoff + retry policy for [`Mesh::call_with_retry`] and friends.
50/// Defaults: 3 attempts total, 50ms initial backoff, doubling per
51/// attempt, capped at 1s, full jitter on. Override the predicate
52/// via [`Self::with_retryable`] to retry application errors,
53/// non-transient failures, etc.
54#[derive(Clone)]
55pub struct RetryPolicy {
56 /// Total number of attempts (NOT additional retries). 1 means
57 /// "no retry"; 3 means "original + up to 2 retries". Must be
58 /// >= 1; values below 1 are treated as 1.
59 pub max_attempts: u32,
60 /// Backoff before the first retry. Subsequent backoffs scale
61 /// by `backoff_multiplier`, capped by `max_backoff`.
62 pub initial_backoff: Duration,
63 /// Upper bound on the per-attempt backoff (before jitter).
64 /// Stops exponential growth from blowing past the underlying
65 /// call's deadline budget.
66 pub max_backoff: Duration,
67 /// Multiplicative growth factor between attempts. `2.0` is the
68 /// canonical "exponential backoff" default; values < 1.0 are
69 /// clamped to 1.0 (so backoff never shrinks).
70 pub backoff_multiplier: f64,
71 /// When `true` (the default), each backoff is multiplied by a
72 /// uniform random factor in `[0.5, 1.0]` to decorrelate retry
73 /// storms across callers. When `false`, backoffs are
74 /// deterministic.
75 pub jitter: bool,
76 /// Decides whether a given error is retryable. Default:
77 /// [`default_retryable`].
78 pub retryable: RetryablePredicate,
79}
80
81impl std::fmt::Debug for RetryPolicy {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("RetryPolicy")
84 .field("max_attempts", &self.max_attempts)
85 .field("initial_backoff", &self.initial_backoff)
86 .field("max_backoff", &self.max_backoff)
87 .field("backoff_multiplier", &self.backoff_multiplier)
88 .field("jitter", &self.jitter)
89 .field("retryable", &"<fn>")
90 .finish()
91 }
92}
93
94impl Default for RetryPolicy {
95 fn default() -> Self {
96 Self {
97 max_attempts: 3,
98 initial_backoff: Duration::from_millis(50),
99 max_backoff: Duration::from_secs(1),
100 backoff_multiplier: 2.0,
101 jitter: true,
102 retryable: Arc::new(default_retryable),
103 }
104 }
105}
106
107impl RetryPolicy {
108 /// Replace the retryable predicate. Use this to extend (or
109 /// narrow) what's considered worth retrying — e.g. retry only
110 /// `Timeout`, or also retry a specific application error code.
111 pub fn with_retryable<F: Fn(&RpcError) -> bool + Send + Sync + 'static>(
112 mut self,
113 predicate: F,
114 ) -> Self {
115 self.retryable = Arc::new(predicate);
116 self
117 }
118}
119
120/// Default predicate for [`RetryPolicy::retryable`]. Retries:
121/// `Timeout`, `Transport`, and `ServerError` for the canonical
122/// transient statuses (`Internal`, `Backpressure`, server-observed
123/// `Timeout`). Does NOT retry: `NoRoute`, `Codec` (caller-fixable
124/// local bug), or `ServerError` for `Application` / `NotFound` /
125/// `Unauthorized` / `UnknownVersion` / `Cancelled` (those are
126/// caller-fixable or terminal).
127pub fn default_retryable(err: &RpcError) -> bool {
128 match err {
129 RpcError::NoRoute { .. } => false,
130 RpcError::Timeout { .. } => true,
131 RpcError::Transport(_) => true,
132 RpcError::ServerError { status, .. } => {
133 *status == RpcStatus::Internal.to_wire()
134 || *status == RpcStatus::Backpressure.to_wire()
135 || *status == RpcStatus::Timeout.to_wire()
136 }
137 // Codec failures are caller-fixable bugs (wrong codec,
138 // schema drift, malformed `Serialize`/`Deserialize` impl).
139 // Retrying burns the backoff budget on the same
140 // deterministic failure and risks tripping the circuit
141 // breaker on a local-only fault.
142 RpcError::Codec { .. } => false,
143 // v0.4 capability-auth: the target's signed policy denies
144 // this caller — retry can't change the verdict until the
145 // target publishes a new (more permissive) announcement.
146 // Treat as terminal so the retry budget isn't wasted on
147 // a deterministic deny.
148 RpcError::CapabilityDenied { .. } => false,
149 // v3 (NRPC_V3 C-S1): caller-driven cancellation via
150 // `Mesh::cancel(token)`. The caller explicitly asked to
151 // stop — retrying would defeat the cancel.
152 RpcError::Cancelled => false,
153 }
154}
155
156// ============================================================================
157// Mesh extensions.
158// ============================================================================
159
160impl Mesh {
161 /// Direct-addressed raw call with retry. Re-issues on transient
162 /// failures per `policy`; the last error from the final attempt
163 /// is returned on exhaustion. The underlying [`CallOptions`] is
164 /// re-used for every attempt — note that `opts.deadline` is an
165 /// absolute `Instant` and does NOT advance across retries, so
166 /// the total wall-clock window is bounded by the initial
167 /// deadline plus the sum of backoffs.
168 pub async fn call_with_retry(
169 &self,
170 target_node_id: u64,
171 service: &str,
172 payload: Bytes,
173 opts: CallOptions,
174 policy: &RetryPolicy,
175 ) -> std::result::Result<RpcReply, RpcError> {
176 retry_loop(policy, |attempt| {
177 let payload = payload.clone();
178 let opts = opts.clone();
179 let _ = attempt;
180 async move { self.call(target_node_id, service, payload, opts).await }
181 })
182 .await
183 }
184
185 /// Service-name raw call with retry. Each attempt re-runs the
186 /// capability-index lookup + routing-policy selection — useful
187 /// when a server failover happens mid-retry-window: the next
188 /// attempt naturally lands on a different node. To pin a single
189 /// target across retries, use [`Self::call_with_retry`].
190 pub async fn call_service_with_retry(
191 &self,
192 service: &str,
193 payload: Bytes,
194 opts: CallOptions,
195 policy: &RetryPolicy,
196 ) -> std::result::Result<RpcReply, RpcError> {
197 retry_loop(policy, |attempt| {
198 let payload = payload.clone();
199 let opts = opts.clone();
200 let _ = attempt;
201 async move { self.call_service(service, payload, opts).await }
202 })
203 .await
204 }
205
206 /// Direct-addressed typed call with retry. Encodes once
207 /// (the request bytes are reused across attempts), retries
208 /// per `policy`, decodes the final reply.
209 pub async fn call_typed_with_retry<Req, Resp>(
210 &self,
211 target_node_id: u64,
212 service: &str,
213 request: &Req,
214 opts: CallOptionsTyped,
215 policy: &RetryPolicy,
216 ) -> std::result::Result<Resp, RpcError>
217 where
218 Req: Serialize,
219 Resp: DeserializeOwned,
220 {
221 let codec = opts.codec;
222 let body = codec.encode(request).map_err(|e| RpcError::Codec {
223 direction: CodecDirection::Encode,
224 message: format!("client encode: {e}"),
225 })?;
226 let body = Bytes::from(body);
227 let reply = self
228 .call_with_retry(target_node_id, service, body, opts.raw, policy)
229 .await?;
230 codec.decode(&reply.body).map_err(|e| RpcError::Codec {
231 direction: CodecDirection::Decode,
232 message: format!("client decode: {e}"),
233 })
234 }
235
236 /// Service-name typed call with retry. Same caveat as
237 /// [`Self::call_service_with_retry`] — each attempt re-resolves
238 /// the candidate set, so failover is automatic.
239 pub async fn call_service_typed_with_retry<Req, Resp>(
240 &self,
241 service: &str,
242 request: &Req,
243 opts: CallOptionsTyped,
244 policy: &RetryPolicy,
245 ) -> std::result::Result<Resp, RpcError>
246 where
247 Req: Serialize,
248 Resp: DeserializeOwned,
249 {
250 let codec = opts.codec;
251 let body = codec.encode(request).map_err(|e| RpcError::Codec {
252 direction: CodecDirection::Encode,
253 message: format!("client encode: {e}"),
254 })?;
255 let body = Bytes::from(body);
256 let reply = self
257 .call_service_with_retry(service, body, opts.raw, policy)
258 .await?;
259 codec.decode(&reply.body).map_err(|e| RpcError::Codec {
260 direction: CodecDirection::Decode,
261 message: format!("client decode: {e}"),
262 })
263 }
264}
265
266// ============================================================================
267// Internals: retry loop + backoff.
268// ============================================================================
269
270/// Run `attempt_fn` up to `policy.max_attempts` times, sleeping
271/// per-attempt backoff between failed retryable attempts. Returns
272/// the first `Ok`, or the last `Err` on exhaustion / the first
273/// non-retryable `Err` immediately.
274async fn retry_loop<T, F, Fut>(
275 policy: &RetryPolicy,
276 mut attempt_fn: F,
277) -> std::result::Result<T, RpcError>
278where
279 F: FnMut(u32) -> Fut,
280 Fut: std::future::Future<Output = std::result::Result<T, RpcError>>,
281{
282 let max = policy.max_attempts.max(1);
283 let mut last_err: Option<RpcError> = None;
284 for attempt in 1..=max {
285 match attempt_fn(attempt).await {
286 Ok(value) => return Ok(value),
287 Err(e) => {
288 let retryable = (policy.retryable)(&e);
289 let is_last = attempt == max;
290 if !retryable || is_last {
291 return Err(e);
292 }
293 let backoff = compute_backoff(policy, attempt);
294 last_err = Some(e);
295 if !backoff.is_zero() {
296 tokio::time::sleep(backoff).await;
297 }
298 }
299 }
300 }
301 // Loop body always returns on the final iteration; the
302 // unreachable here is a safety net for a future refactor that
303 // changes the bounds.
304 Err(last_err.unwrap_or_else(|| {
305 RpcError::Transport(net::error::AdapterError::Connection(
306 "retry_loop: exhausted with no error captured (bug)".into(),
307 ))
308 }))
309}
310
311// ============================================================================
312// Hedge policy.
313// ============================================================================
314
315/// Hedge configuration for [`Mesh::call_with_hedge_to`] and
316/// friends. **Fire-then-race** semantics: issue the primary
317/// request immediately, then after `delay` issue one or more
318/// backup requests in parallel and return whichever finishes
319/// first.
320///
321/// **What hedging buys you.** Bounds tail latency. A single slow
322/// replica (GC pause, cold cache, hostile NIC) stops dominating
323/// the p99 — once `delay` elapses without an answer, a healthy
324/// peer gets a chance and the first reply back wins.
325///
326/// **What it costs you.** The losers' work is wasted: the server
327/// runs the handler, publishes a response, and the client
328/// silently discards it on arrival. Pick `delay` close to your
329/// observed p95 (so most calls don't trigger a hedge) and
330/// `hedges` small (1 covers the common slow-replica case;
331/// values >1 multiply your server-side load).
332///
333/// **Cancellation.** When the winner returns, loser futures are
334/// dropped — and the underlying [`Mesh::call`](crate::mesh::Mesh::call)
335/// future has a `UnaryCallGuard` whose `Drop` fires a CANCEL to
336/// the corresponding server. The server's handler observes the
337/// cancellation on its `RpcContext::cancellation` token and can
338/// short-circuit cooperatively. CANCEL publish is best-effort
339/// (fire-and-forget) so a momentary network blip doesn't leave
340/// loser handlers running indefinitely.
341#[derive(Debug, Clone)]
342pub struct HedgePolicy {
343 /// Wait this long after the primary call before firing the
344 /// first hedge. Subsequent hedges (if `hedges > 1`) fire at
345 /// `delay * idx` after the primary. Default: 50ms.
346 pub delay: Duration,
347 /// Number of hedge requests to fire IN ADDITION to the
348 /// primary. `1` is the canonical "primary + one backup"
349 /// shape; `0` disables hedging (the wrapper degrades to a
350 /// straight call). Default: 1.
351 pub hedges: u32,
352}
353
354impl Default for HedgePolicy {
355 fn default() -> Self {
356 Self {
357 delay: Duration::from_millis(50),
358 hedges: 1,
359 }
360 }
361}
362
363// ============================================================================
364// Mesh extensions: hedge.
365// ============================================================================
366
367impl Mesh {
368 /// Hedge across an explicit set of target node ids. The first
369 /// element of `targets` is the primary (fired immediately);
370 /// subsequent elements are hedges fired at `policy.delay * idx`.
371 /// Whichever call resolves first wins (Ok or Err). Losing
372 /// in-flight calls are dropped on the caller side.
373 ///
374 /// Returns `RpcError::NoRoute` if `targets` is empty. If every
375 /// candidate fails, returns the LAST observed error (after
376 /// all hedges have been awaited).
377 pub async fn call_with_hedge_to(
378 &self,
379 targets: &[u64],
380 service: &str,
381 payload: Bytes,
382 opts: CallOptions,
383 policy: &HedgePolicy,
384 ) -> std::result::Result<RpcReply, RpcError> {
385 if targets.is_empty() {
386 return Err(RpcError::NoRoute {
387 target: 0,
388 reason: "call_with_hedge_to: targets is empty".into(),
389 });
390 }
391 let total = (1 + policy.hedges as usize).min(targets.len());
392 let chosen: Vec<u64> = targets[..total].to_vec();
393 hedge_race(self, &chosen, service, payload, opts, policy.delay).await
394 }
395
396 /// Hedge across `1 + policy.hedges` candidates picked from the
397 /// service registry. Candidates are sorted (so the picks are
398 /// deterministic for a stable registry) and the prefix is
399 /// taken. If fewer candidates exist than requested, hedges
400 /// degrade to whatever's available (no error if `hedges=2` but
401 /// only 1 candidate exists — you just get a straight call).
402 ///
403 /// `opts.routing_policy` is ignored (hedge picks its own
404 /// candidates from the service registry).
405 /// `opts.filter_unhealthy` is also ignored: hedge's whole
406 /// premise is "be robust to per-node slowness" — filtering
407 /// unhealthy candidates reduces the redundancy that hedge
408 /// buys you. If you want health-aware single-target dispatch,
409 /// use `call_service` directly with a routing policy.
410 pub async fn call_service_with_hedge(
411 &self,
412 service: &str,
413 payload: Bytes,
414 opts: CallOptions,
415 policy: &HedgePolicy,
416 ) -> std::result::Result<RpcReply, RpcError> {
417 let candidates = self.resolve_hedge_candidates(service)?;
418 let total = (1 + policy.hedges as usize).min(candidates.len());
419 let chosen = &candidates[..total];
420 hedge_race(self, chosen, service, payload, opts, policy.delay).await
421 }
422
423 /// Typed counterpart of [`Self::call_with_hedge_to`]. Encodes
424 /// once, hedges, decodes the winner's reply.
425 pub async fn call_typed_with_hedge_to<Req, Resp>(
426 &self,
427 targets: &[u64],
428 service: &str,
429 request: &Req,
430 opts: CallOptionsTyped,
431 policy: &HedgePolicy,
432 ) -> std::result::Result<Resp, RpcError>
433 where
434 Req: Serialize,
435 Resp: DeserializeOwned,
436 {
437 let codec = opts.codec;
438 let body = codec.encode(request).map_err(|e| RpcError::Codec {
439 direction: CodecDirection::Encode,
440 message: format!("client encode: {e}"),
441 })?;
442 let reply = self
443 .call_with_hedge_to(targets, service, Bytes::from(body), opts.raw, policy)
444 .await?;
445 codec.decode(&reply.body).map_err(|e| RpcError::Codec {
446 direction: CodecDirection::Decode,
447 message: format!("client decode: {e}"),
448 })
449 }
450
451 /// Typed counterpart of [`Self::call_service_with_hedge`].
452 pub async fn call_service_typed_with_hedge<Req, Resp>(
453 &self,
454 service: &str,
455 request: &Req,
456 opts: CallOptionsTyped,
457 policy: &HedgePolicy,
458 ) -> std::result::Result<Resp, RpcError>
459 where
460 Req: Serialize,
461 Resp: DeserializeOwned,
462 {
463 let codec = opts.codec;
464 let body = codec.encode(request).map_err(|e| RpcError::Codec {
465 direction: CodecDirection::Encode,
466 message: format!("client encode: {e}"),
467 })?;
468 let reply = self
469 .call_service_with_hedge(service, Bytes::from(body), opts.raw, policy)
470 .await?;
471 codec.decode(&reply.body).map_err(|e| RpcError::Codec {
472 direction: CodecDirection::Decode,
473 message: format!("client decode: {e}"),
474 })
475 }
476
477 fn resolve_hedge_candidates(&self, service: &str) -> std::result::Result<Vec<u64>, RpcError> {
478 let mut candidates = self.find_service_nodes(service);
479 if candidates.is_empty() {
480 return Err(RpcError::NoRoute {
481 target: 0,
482 reason: format!("no nodes advertise `nrpc:{service}`"),
483 });
484 }
485 // Sort so the prefix taken by the hedge is deterministic
486 // for a stable registry. Composes with caller-side
487 // observability (the same call always picks the same
488 // primary unless the registry has churned).
489 candidates.sort_unstable();
490 Ok(candidates)
491 }
492}
493
494// ============================================================================
495// Internals: hedge race.
496// ============================================================================
497
498async fn hedge_race(
499 mesh: &Mesh,
500 targets: &[u64],
501 service: &str,
502 payload: Bytes,
503 opts: CallOptions,
504 delay: Duration,
505) -> std::result::Result<RpcReply, RpcError> {
506 use futures::future::FutureExt;
507
508 // Clone the underlying Arc<MeshNode> once; each spawned future
509 // owns a clone for its `call(...)` invocation. Cheap (Arc bump).
510 let node = mesh.node_arc();
511 let service_owned = service.to_string();
512
513 // Build one future per target. Each yields `(target_idx,
514 // result)` so the race below can attribute errors back to the
515 // original target position regardless of completion order.
516 // The first fires immediately; subsequent ones wait
517 // `delay * idx`.
518 let mut futures: Vec<
519 futures::future::BoxFuture<'static, (usize, std::result::Result<RpcReply, RpcError>)>,
520 > = targets
521 .iter()
522 .copied()
523 .enumerate()
524 .map(|(idx, target)| {
525 let node = Arc::clone(&node);
526 let service = service_owned.clone();
527 let payload = payload.clone();
528 let opts = opts.clone();
529 let wait = delay.saturating_mul(idx as u32);
530 async move {
531 if !wait.is_zero() {
532 tokio::time::sleep(wait).await;
533 }
534 let r = node.call(target, &service, payload, opts).await;
535 (idx, r)
536 }
537 .boxed()
538 })
539 .collect();
540
541 // Race them. Drop losers as they're left in `remaining`. If the
542 // first to resolve is `Ok`, return immediately. If every hedge
543 // errors, surface the **primary's** error deterministically —
544 // `select_all` resolves in completion order, so a naive "last
545 // error wins" semantic flips across runs depending on which
546 // hedge happened to lose its race. Tracking errors by their
547 // original target index and returning the lowest-indexed one
548 // makes the diagnostic stable.
549 let mut errors: Vec<Option<RpcError>> = (0..targets.len()).map(|_| None).collect();
550 while !futures.is_empty() {
551 let ((target_idx, result), _select_idx, remaining) =
552 futures::future::select_all(futures).await;
553 match result {
554 Ok(reply) => return Ok(reply),
555 Err(e) => {
556 if target_idx < errors.len() {
557 errors[target_idx] = Some(e);
558 }
559 futures = remaining;
560 }
561 }
562 }
563 // Prefer the primary's error (target_idx 0) when present;
564 // otherwise the lowest-indexed hedge's error. Either way the
565 // choice is deterministic across runs given the same target
566 // list.
567 let chosen = errors.into_iter().flatten().next();
568 Err(chosen.unwrap_or_else(|| {
569 RpcError::Transport(net::error::AdapterError::Connection(
570 "hedge_race: drained with no error captured (bug)".into(),
571 ))
572 }))
573}
574
575/// `min(max_backoff, initial * multiplier^(attempt-1))`, optionally
576/// scaled by a uniform random factor in `[0.5, 1.0]` (full-half
577/// jitter — bounded enough to keep p99 predictable, randomized
578/// enough to break thundering-herd correlation across callers).
579///
580/// **Ceiling semantic:** `max_backoff` is a HARD ceiling on the
581/// returned duration — applied *after* jitter — so callers who
582/// configure e.g. `max_backoff = 1s` never sleep longer than 1s
583/// regardless of jitter or exponential growth. The previous
584/// implementation applied the cap before jitter, which let an
585/// `(initial=1s, max=1s, jitter=true)` policy sleep up to 1s
586/// (the doc was honest but the surprise was real).
587///
588/// **Jitter source:** decorrelated across simultaneously-failing
589/// callers by mixing in `Instant::now()` ticks (high-resolution
590/// monotonic; not subject to NTP step-back), `thread::current()
591/// .id()` (separates threads even when their wall-clocks alias on
592/// low-resolution Windows clocks), and the address of a stack
593/// local (separates different call sites within one thread). The
594/// previous implementation used `SystemTime::now().nanos` only —
595/// on Windows the wall-clock has ~15 ms resolution, so two
596/// callers in the same tick produced identical jitter and retried
597/// in lockstep, defeating the whole point.
598fn compute_backoff(policy: &RetryPolicy, attempt: u32) -> Duration {
599 let mult = policy.backoff_multiplier.max(1.0);
600 // attempt is 1-indexed; the backoff applies AFTER the first
601 // failure, so the "exponent" is `attempt - 1`.
602 let exp = (attempt.saturating_sub(1)) as i32;
603 let scaled = policy.initial_backoff.as_secs_f64() * mult.powi(exp);
604 let max_secs = policy.max_backoff.as_secs_f64();
605 let pre_cap = scaled.min(max_secs);
606 let jittered = if policy.jitter {
607 // Mix three independent sources so two callers (a) in the
608 // same SystemTime nanosecond bucket on a low-res Windows
609 // clock, (b) on different threads, and (c) from different
610 // call sites within one thread all decorrelate.
611 //
612 // Pre-fix this read `Instant::now().elapsed().as_nanos()`
613 // — but `.elapsed()` on a freshly-constructed `Instant`
614 // measures only the gap between construction and the
615 // very next call (a few ns of single-digit jitter), so
616 // the `now_ns` term contributed essentially zero entropy
617 // and parallel retries from the same call site stayed
618 // correlated. Anchor on a process-epoch `Instant` so we
619 // measure elapsed from process start instead.
620 static PROCESS_EPOCH: OnceLock<Instant> = OnceLock::new();
621 let epoch = PROCESS_EPOCH.get_or_init(Instant::now);
622 let now_ns = epoch.elapsed().as_nanos() as u64;
623 let thread_id_bits = {
624 // ThreadId is opaque; hash its Debug print to extract
625 // bits cheaply. Stable within the process lifetime.
626 let mut s = std::collections::hash_map::DefaultHasher::new();
627 std::hash::Hash::hash(&std::thread::current().id(), &mut s);
628 std::hash::Hasher::finish(&s)
629 };
630 let stack_addr = (&attempt as *const u32) as usize as u64;
631 let seed = now_ns
632 ^ thread_id_bits
633 ^ stack_addr.rotate_left(17)
634 ^ (attempt as u64).wrapping_mul(0x9E3779B97F4A7C15);
635 let mixed = seed
636 .wrapping_mul(0x100000001B3)
637 .wrapping_add(0xCBF29CE484222325);
638 // Top 32 bits → [0, u32::MAX]; map to [0.5, 1.0].
639 let frac = ((mixed >> 32) as u32) as f64 / u32::MAX as f64;
640 pre_cap * (0.5 + 0.5 * frac)
641 } else {
642 pre_cap
643 };
644 // True ceiling: after jitter, clamp again so `max_backoff`
645 // is the absolute upper bound.
646 let final_secs = jittered.min(max_secs).max(0.0);
647 Duration::from_secs_f64(final_secs)
648}
649
650// ============================================================================
651// Circuit breaker.
652// ============================================================================
653
654/// Per-call decision: does this `RpcError` count as a failure for
655/// the breaker? Defaults to [`default_breaker_failure`], which
656/// treats the same set of "transient infrastructure" errors as
657/// [`default_retryable`] — `Timeout`, `Transport`, `Internal`,
658/// `Backpressure`, server-observed `Timeout`. Application errors
659/// don't trip the breaker (they're caller-fixable bugs, not server
660/// health signals).
661pub type BreakerFailurePredicate = Arc<dyn Fn(&RpcError) -> bool + Send + Sync>;
662
663/// Configuration for [`CircuitBreaker`].
664#[derive(Clone)]
665pub struct CircuitBreakerConfig {
666 /// Consecutive failures while `Closed` before tripping to
667 /// `Open`. Must be >= 1; values below 1 are treated as 1.
668 /// Default: 5.
669 pub failure_threshold: u32,
670 /// Consecutive `HalfOpen` probe successes before transitioning
671 /// back to `Closed`. Must be >= 1. Default: 1 (one good probe
672 /// is enough).
673 pub success_threshold: u32,
674 /// `Open` cooldown — how long the breaker rejects every call
675 /// before transitioning to `HalfOpen` and allowing one probe.
676 /// Default: 30 seconds.
677 pub reset_after: Duration,
678 /// Predicate deciding which errors count toward the failure
679 /// counter. Default: [`default_breaker_failure`].
680 pub failure_predicate: BreakerFailurePredicate,
681}
682
683impl std::fmt::Debug for CircuitBreakerConfig {
684 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
685 f.debug_struct("CircuitBreakerConfig")
686 .field("failure_threshold", &self.failure_threshold)
687 .field("success_threshold", &self.success_threshold)
688 .field("reset_after", &self.reset_after)
689 .field("failure_predicate", &"<fn>")
690 .finish()
691 }
692}
693
694impl Default for CircuitBreakerConfig {
695 fn default() -> Self {
696 Self {
697 failure_threshold: 5,
698 success_threshold: 1,
699 reset_after: Duration::from_secs(30),
700 failure_predicate: Arc::new(default_breaker_failure),
701 }
702 }
703}
704
705/// Default breaker-failure predicate. Returns `true` for the same
706/// "transient infrastructure" errors that [`default_retryable`]
707/// considers retryable — these are the signals that a downstream
708/// is unhealthy. Application errors and routing failures do NOT
709/// trip the breaker.
710pub fn default_breaker_failure(err: &RpcError) -> bool {
711 default_retryable(err)
712}
713
714/// Operational state of a [`CircuitBreaker`]. Exposed via
715/// [`CircuitBreaker::state`] for diagnostics / observability.
716#[derive(Debug, Clone, Copy, PartialEq, Eq)]
717pub enum BreakerState {
718 /// Normal operation — calls go through. Failures are counted;
719 /// once `failure_threshold` consecutive failures land,
720 /// transitions to `Open`.
721 Closed,
722 /// Tripped — calls are short-circuited with
723 /// [`BreakerError::Open`]. After `reset_after` elapsed since
724 /// the trip, the next call transitions to `HalfOpen`.
725 Open,
726 /// Probing — at most ONE call may pass through to test
727 /// recovery. Concurrent calls during `HalfOpen` short-circuit
728 /// with [`BreakerError::Open`]. The probe's outcome decides:
729 /// success → consecutive_successes++ (transition to `Closed`
730 /// when threshold met); failure → back to `Open` with
731 /// cooldown reset.
732 HalfOpen,
733}
734
735/// What [`CircuitBreaker::call`] returns on failure: either the
736/// breaker rejected the call (`Open`) or the underlying call
737/// returned an error (`Inner`). Pattern-match to distinguish
738/// "I should fall back" (Open) from "the actual call failed and
739/// I should handle the error" (Inner).
740#[derive(Debug, thiserror::Error)]
741pub enum BreakerError {
742 /// Breaker is currently `Open` (or `HalfOpen` with a probe
743 /// already in flight). The wrapped call did NOT execute.
744 #[error("circuit breaker is open")]
745 Open,
746 /// The wrapped call executed and returned this error. The
747 /// breaker recorded it (per `failure_predicate`) before
748 /// surfacing.
749 #[error("inner: {0}")]
750 Inner(#[from] RpcError),
751}
752
753impl BreakerError {
754 /// Convert to the underlying `RpcError`, mapping the `Open`
755 /// short-circuit to an `RpcError::NoRoute` so callers that
756 /// don't care about the breaker distinction can flatten.
757 pub fn into_rpc_error(self) -> RpcError {
758 match self {
759 BreakerError::Open => RpcError::NoRoute {
760 target: 0,
761 reason: "circuit breaker is open".into(),
762 },
763 BreakerError::Inner(e) => e,
764 }
765 }
766}
767
768/// Three-state circuit breaker: `Closed` → `Open` → `HalfOpen` →
769/// `Closed`. Long-lived; instantiate once per logical downstream
770/// (one per service, or one per (service, target) pair, depending
771/// on how granular you want failure isolation to be) and reuse
772/// across calls.
773///
774/// **Thread-safety**: the breaker is `Send + Sync`; share via
775/// `Arc<CircuitBreaker>` across tasks. State transitions take a
776/// brief blocking lock — never held across `await`.
777///
778/// **Composition**: pass any closure returning a `Future<Output =
779/// Result<T, RpcError>>`. The breaker is generic over `T` so it
780/// works with raw [`RpcReply`], typed `Resp`, `Vec<RpcReply>`
781/// (hedge results), etc.
782///
783/// ```ignore
784/// use std::sync::Arc;
785/// use net_sdk::mesh_rpc_resilience::{CircuitBreaker, CircuitBreakerConfig};
786///
787/// let breaker = Arc::new(CircuitBreaker::new(CircuitBreakerConfig::default()));
788/// let result = breaker.call(|| async {
789/// mesh.call_typed::<MyReq, MyResp>(target, "svc", &req, opts).await
790/// }).await;
791/// ```
792pub struct CircuitBreaker {
793 config: CircuitBreakerConfig,
794 inner: Mutex<BreakerInner>,
795}
796
797struct BreakerInner {
798 state: BreakerState,
799 consecutive_failures: u32,
800 consecutive_successes: u32,
801 /// When `state == Open`, the instant the trip happened;
802 /// transitions to `HalfOpen` after `reset_after` elapsed.
803 /// `None` outside of `Open`.
804 opened_at: Option<std::time::Instant>,
805 /// True while a `HalfOpen` probe is in flight. Other calls
806 /// arriving during HalfOpen short-circuit on this flag.
807 probe_in_flight: bool,
808}
809
810impl CircuitBreaker {
811 /// Construct a breaker in the `Closed` state.
812 pub fn new(config: CircuitBreakerConfig) -> Self {
813 Self {
814 config,
815 inner: Mutex::new(BreakerInner {
816 state: BreakerState::Closed,
817 consecutive_failures: 0,
818 consecutive_successes: 0,
819 opened_at: None,
820 probe_in_flight: false,
821 }),
822 }
823 }
824
825 /// Lock the inner state. parking_lot::Mutex has no poison concept,
826 /// so the previous std::sync poison-recovery dance is unnecessary.
827 fn lock_inner(&self) -> parking_lot::MutexGuard<'_, BreakerInner> {
828 self.inner.lock()
829 }
830
831 /// Current operational state. Cheap snapshot — useful for
832 /// metrics / logging. Note that `Open` may have actually
833 /// elapsed its cooldown; the next `call` will transition to
834 /// `HalfOpen` on entry.
835 pub fn state(&self) -> BreakerState {
836 self.lock_inner().state
837 }
838
839 /// Snapshot of the consecutive-failure counter (resets to 0
840 /// on success or transition out of `Closed`). Useful for
841 /// alerting "we're approaching the trip threshold".
842 pub fn consecutive_failures(&self) -> u32 {
843 self.lock_inner().consecutive_failures
844 }
845
846 /// Test-only / operator override: force the breaker back to
847 /// `Closed` and zero all counters. Useful for runbooks
848 /// ("we manually verified the downstream is healthy, reset")
849 /// or test setup.
850 pub fn reset(&self) {
851 let mut g = self.lock_inner();
852 g.state = BreakerState::Closed;
853 g.consecutive_failures = 0;
854 g.consecutive_successes = 0;
855 g.opened_at = None;
856 g.probe_in_flight = false;
857 }
858
859 /// Wrap an async call. Returns:
860 ///
861 /// - `Ok(T)` if the inner call succeeded (and the breaker
862 /// recorded the success).
863 /// - `Err(BreakerError::Open)` if the breaker rejected the
864 /// call without running it (state was `Open` within
865 /// cooldown, OR `HalfOpen` with a probe in flight).
866 /// - `Err(BreakerError::Inner(e))` if the inner call returned
867 /// an error (recorded per `failure_predicate`).
868 ///
869 /// Successes always reset `consecutive_failures` to 0 (in
870 /// `Closed`) or increment `consecutive_successes` (in
871 /// `HalfOpen`, transitioning to `Closed` when threshold met).
872 /// Failures matching the predicate increment counters per
873 /// state.
874 pub async fn call<F, Fut, T>(&self, f: F) -> std::result::Result<T, BreakerError>
875 where
876 F: FnOnce() -> Fut,
877 Fut: std::future::Future<Output = std::result::Result<T, RpcError>>,
878 {
879 // Admission decision — short, no awaits.
880 let admitted_as = self.try_admit();
881 let admitted_as = match admitted_as {
882 AdmissionOutcome::Closed => Admission::Closed,
883 AdmissionOutcome::HalfOpenProbe => Admission::HalfOpenProbe,
884 AdmissionOutcome::Reject => return Err(BreakerError::Open),
885 };
886
887 // RAII probe-in-flight guard. If `f().await` panics, the
888 // guard's Drop clears `probe_in_flight=false` AND re-opens
889 // the breaker with a fresh cooldown — without this, a
890 // panicking probe wedges the breaker in HalfOpen forever
891 // (every subsequent call short-circuits with `Open`,
892 // unrecoverable except via `reset()`).
893 let _probe_guard = ProbeGuard {
894 breaker: self,
895 admission: admitted_as,
896 disarmed: std::cell::Cell::new(false),
897 };
898
899 // Run the inner call.
900 let outcome = f().await;
901
902 // Outcome bookkeeping — short, no awaits. Disarm the
903 // probe guard before mutating state ourselves so we don't
904 // double-write `probe_in_flight=false`.
905 _probe_guard.disarmed.set(true);
906 let mut g = self.lock_inner();
907 match (&outcome, admitted_as) {
908 (Ok(_), Admission::Closed) => {
909 g.consecutive_failures = 0;
910 }
911 (Ok(_), Admission::HalfOpenProbe) => {
912 g.probe_in_flight = false;
913 g.consecutive_successes = g.consecutive_successes.saturating_add(1);
914 if g.consecutive_successes >= self.config.success_threshold.max(1) {
915 g.state = BreakerState::Closed;
916 g.consecutive_failures = 0;
917 g.consecutive_successes = 0;
918 g.opened_at = None;
919 }
920 }
921 (Err(e), admission) => {
922 let counts = (self.config.failure_predicate)(e);
923 if matches!(admission, Admission::HalfOpenProbe) {
924 g.probe_in_flight = false;
925 }
926 if counts {
927 match admission {
928 Admission::Closed => {
929 g.consecutive_failures = g.consecutive_failures.saturating_add(1);
930 if g.consecutive_failures >= self.config.failure_threshold.max(1) {
931 g.state = BreakerState::Open;
932 g.opened_at = Some(std::time::Instant::now());
933 g.consecutive_successes = 0;
934 }
935 }
936 Admission::HalfOpenProbe => {
937 // Single bad probe → re-open with a
938 // fresh cooldown.
939 g.state = BreakerState::Open;
940 g.opened_at = Some(std::time::Instant::now());
941 g.consecutive_failures = 0;
942 g.consecutive_successes = 0;
943 }
944 }
945 }
946 // If the predicate didn't classify this as a
947 // failure (e.g. application error), leave counters
948 // unchanged — the breaker treats it as a no-op
949 // signal.
950 }
951 }
952 drop(g);
953
954 outcome.map_err(BreakerError::Inner)
955 }
956
957 /// Pure admission decision. Returns one of:
958 /// - `Closed` — call goes through, count successes/failures.
959 /// - `HalfOpenProbe` — this caller becomes the probe.
960 /// - `Reject` — short-circuit with `Open`.
961 fn try_admit(&self) -> AdmissionOutcome {
962 let mut g = self.lock_inner();
963 match g.state {
964 BreakerState::Closed => AdmissionOutcome::Closed,
965 BreakerState::Open => {
966 let elapsed = g.opened_at.map(|i| i.elapsed()).unwrap_or(Duration::ZERO);
967 if elapsed >= self.config.reset_after {
968 g.state = BreakerState::HalfOpen;
969 g.consecutive_successes = 0;
970 g.probe_in_flight = true;
971 AdmissionOutcome::HalfOpenProbe
972 } else {
973 AdmissionOutcome::Reject
974 }
975 }
976 BreakerState::HalfOpen => {
977 if g.probe_in_flight {
978 AdmissionOutcome::Reject
979 } else {
980 g.probe_in_flight = true;
981 AdmissionOutcome::HalfOpenProbe
982 }
983 }
984 }
985 }
986}
987
988/// RAII guard that ensures a HalfOpen probe correctly clears its
989/// `probe_in_flight` flag and re-opens the breaker if the inner
990/// future panics. Disarmed via `disarmed.set(true)` before the
991/// normal bookkeeping path so the success / inner-error paths
992/// don't double-write the flag.
993struct ProbeGuard<'a> {
994 breaker: &'a CircuitBreaker,
995 admission: Admission,
996 disarmed: std::cell::Cell<bool>,
997}
998
999impl Drop for ProbeGuard<'_> {
1000 fn drop(&mut self) {
1001 if self.disarmed.get() {
1002 return;
1003 }
1004 // The future panicked between admission and bookkeeping.
1005 // For Closed admissions there's nothing to clean up — we
1006 // just propagate the panic. For HalfOpenProbe admissions
1007 // we MUST clear `probe_in_flight=false` (otherwise every
1008 // subsequent caller short-circuits with `Open` forever)
1009 // AND re-open with a fresh cooldown (a panicking probe is
1010 // a failed probe — same semantic as a probe that returned
1011 // a counted error).
1012 if matches!(self.admission, Admission::HalfOpenProbe) {
1013 let mut g = self.breaker.lock_inner();
1014 g.probe_in_flight = false;
1015 g.state = BreakerState::Open;
1016 g.opened_at = Some(std::time::Instant::now());
1017 g.consecutive_failures = 0;
1018 g.consecutive_successes = 0;
1019 }
1020 }
1021}
1022
1023#[derive(Clone, Copy)]
1024enum AdmissionOutcome {
1025 Closed,
1026 HalfOpenProbe,
1027 Reject,
1028}
1029
1030#[derive(Clone, Copy)]
1031enum Admission {
1032 Closed,
1033 HalfOpenProbe,
1034}