Skip to main content

net_sdk/
mesh_rpc_resilience.rs

1//! Caller-side resilience helpers for nRPC calls.
2//!
3//! These are thin wrappers around the underlying typed call APIs
4//! that add operational concerns the raw `call_typed` /
5//! `call_service_typed` paths leave to the user:
6//!
7//! - **`call_with_retry` / `call_typed_with_retry`** — re-issue
8//!   transient failures with exponential backoff + jitter.
9//! - **`call_with_hedge_to` / `call_service_with_hedge`** — fire
10//!   a backup request after a delay; race the responses; first
11//!   one wins. Bounds tail latency at the cost of duplicated
12//!   work on the loser.
13//! - **[`CircuitBreaker`]** — a long-lived stateful guard that
14//!   trips after N consecutive failures, short-circuits while
15//!   open, and probes for recovery via a half-open state.
16//!   Compose around any async call (raw, typed, retried,
17//!   hedged) via `breaker.call(|| async { ... }).await`.
18//!
19//! Each helper composes with the others (and with the underlying
20//! `CallOptions` deadline / routing policy) without special
21//! plumbing — they're regular async wrappers, not a separate
22//! pipeline. Use them when you need them; pay nothing when you
23//! don't.
24
25use parking_lot::Mutex;
26use std::sync::Arc;
27use std::sync::OnceLock;
28use std::time::{Duration, Instant};
29
30use bytes::Bytes;
31use serde::{de::DeserializeOwned, Serialize};
32
33use crate::mesh::Mesh;
34use crate::mesh_rpc::{
35    CallOptions, CallOptionsTyped, CodecDirection, RpcError, RpcReply, RpcStatus,
36};
37
38// ============================================================================
39// Retry policy.
40// ============================================================================
41
42/// What counts as "this should be retried" for a given [`RpcError`].
43/// Defaults to [`default_retryable`] which retries transient
44/// infrastructure failures (timeout, server-side internal /
45/// backpressure, transport) and skips terminal ones (no route,
46/// application errors, unknown-version).
47pub type RetryablePredicate = Arc<dyn Fn(&RpcError) -> bool + Send + Sync>;
48
49/// Backoff + retry policy for [`Mesh::call_with_retry`] and friends.
50/// Defaults: 3 attempts total, 50ms initial backoff, doubling per
51/// attempt, capped at 1s, full jitter on. Override the predicate
52/// via [`Self::with_retryable`] to retry application errors,
53/// non-transient failures, etc.
54#[derive(Clone)]
55pub struct RetryPolicy {
56    /// Total number of attempts (NOT additional retries). 1 means
57    /// "no retry"; 3 means "original + up to 2 retries". Must be
58    /// >= 1; values below 1 are treated as 1.
59    pub max_attempts: u32,
60    /// Backoff before the first retry. Subsequent backoffs scale
61    /// by `backoff_multiplier`, capped by `max_backoff`.
62    pub initial_backoff: Duration,
63    /// Upper bound on the per-attempt backoff (before jitter).
64    /// Stops exponential growth from blowing past the underlying
65    /// call's deadline budget.
66    pub max_backoff: Duration,
67    /// Multiplicative growth factor between attempts. `2.0` is the
68    /// canonical "exponential backoff" default; values < 1.0 are
69    /// clamped to 1.0 (so backoff never shrinks).
70    pub backoff_multiplier: f64,
71    /// When `true` (the default), each backoff is multiplied by a
72    /// uniform random factor in `[0.5, 1.0]` to decorrelate retry
73    /// storms across callers. When `false`, backoffs are
74    /// deterministic.
75    pub jitter: bool,
76    /// Decides whether a given error is retryable. Default:
77    /// [`default_retryable`].
78    pub retryable: RetryablePredicate,
79}
80
81impl std::fmt::Debug for RetryPolicy {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("RetryPolicy")
84            .field("max_attempts", &self.max_attempts)
85            .field("initial_backoff", &self.initial_backoff)
86            .field("max_backoff", &self.max_backoff)
87            .field("backoff_multiplier", &self.backoff_multiplier)
88            .field("jitter", &self.jitter)
89            .field("retryable", &"<fn>")
90            .finish()
91    }
92}
93
94impl Default for RetryPolicy {
95    fn default() -> Self {
96        Self {
97            max_attempts: 3,
98            initial_backoff: Duration::from_millis(50),
99            max_backoff: Duration::from_secs(1),
100            backoff_multiplier: 2.0,
101            jitter: true,
102            retryable: Arc::new(default_retryable),
103        }
104    }
105}
106
107impl RetryPolicy {
108    /// Replace the retryable predicate. Use this to extend (or
109    /// narrow) what's considered worth retrying — e.g. retry only
110    /// `Timeout`, or also retry a specific application error code.
111    pub fn with_retryable<F: Fn(&RpcError) -> bool + Send + Sync + 'static>(
112        mut self,
113        predicate: F,
114    ) -> Self {
115        self.retryable = Arc::new(predicate);
116        self
117    }
118}
119
120/// Default predicate for [`RetryPolicy::retryable`]. Retries:
121/// `Timeout`, `Transport`, and `ServerError` for the canonical
122/// transient statuses (`Internal`, `Backpressure`, server-observed
123/// `Timeout`). Does NOT retry: `NoRoute`, `Codec` (caller-fixable
124/// local bug), or `ServerError` for `Application` / `NotFound` /
125/// `Unauthorized` / `UnknownVersion` / `Cancelled` (those are
126/// caller-fixable or terminal).
127pub fn default_retryable(err: &RpcError) -> bool {
128    match err {
129        RpcError::NoRoute { .. } => false,
130        RpcError::Timeout { .. } => true,
131        RpcError::Transport(_) => true,
132        RpcError::ServerError { status, .. } => {
133            *status == RpcStatus::Internal.to_wire()
134                || *status == RpcStatus::Backpressure.to_wire()
135                || *status == RpcStatus::Timeout.to_wire()
136        }
137        // Codec failures are caller-fixable bugs (wrong codec,
138        // schema drift, malformed `Serialize`/`Deserialize` impl).
139        // Retrying burns the backoff budget on the same
140        // deterministic failure and risks tripping the circuit
141        // breaker on a local-only fault.
142        RpcError::Codec { .. } => false,
143        // v0.4 capability-auth: the target's signed policy denies
144        // this caller — retry can't change the verdict until the
145        // target publishes a new (more permissive) announcement.
146        // Treat as terminal so the retry budget isn't wasted on
147        // a deterministic deny.
148        RpcError::CapabilityDenied { .. } => false,
149        // v3 (NRPC_V3 C-S1): caller-driven cancellation via
150        // `Mesh::cancel(token)`. The caller explicitly asked to
151        // stop — retrying would defeat the cancel.
152        RpcError::Cancelled => false,
153    }
154}
155
156// ============================================================================
157// Mesh extensions.
158// ============================================================================
159
160impl Mesh {
161    /// Direct-addressed raw call with retry. Re-issues on transient
162    /// failures per `policy`; the last error from the final attempt
163    /// is returned on exhaustion. The underlying [`CallOptions`] is
164    /// re-used for every attempt — note that `opts.deadline` is an
165    /// absolute `Instant` and does NOT advance across retries, so
166    /// the total wall-clock window is bounded by the initial
167    /// deadline plus the sum of backoffs.
168    pub async fn call_with_retry(
169        &self,
170        target_node_id: u64,
171        service: &str,
172        payload: Bytes,
173        opts: CallOptions,
174        policy: &RetryPolicy,
175    ) -> std::result::Result<RpcReply, RpcError> {
176        retry_loop(policy, |attempt| {
177            let payload = payload.clone();
178            let opts = opts.clone();
179            let _ = attempt;
180            async move { self.call(target_node_id, service, payload, opts).await }
181        })
182        .await
183    }
184
185    /// Service-name raw call with retry. Each attempt re-runs the
186    /// capability-index lookup + routing-policy selection — useful
187    /// when a server failover happens mid-retry-window: the next
188    /// attempt naturally lands on a different node. To pin a single
189    /// target across retries, use [`Self::call_with_retry`].
190    pub async fn call_service_with_retry(
191        &self,
192        service: &str,
193        payload: Bytes,
194        opts: CallOptions,
195        policy: &RetryPolicy,
196    ) -> std::result::Result<RpcReply, RpcError> {
197        retry_loop(policy, |attempt| {
198            let payload = payload.clone();
199            let opts = opts.clone();
200            let _ = attempt;
201            async move { self.call_service(service, payload, opts).await }
202        })
203        .await
204    }
205
206    /// Direct-addressed typed call with retry. Encodes once
207    /// (the request bytes are reused across attempts), retries
208    /// per `policy`, decodes the final reply.
209    pub async fn call_typed_with_retry<Req, Resp>(
210        &self,
211        target_node_id: u64,
212        service: &str,
213        request: &Req,
214        opts: CallOptionsTyped,
215        policy: &RetryPolicy,
216    ) -> std::result::Result<Resp, RpcError>
217    where
218        Req: Serialize,
219        Resp: DeserializeOwned,
220    {
221        let codec = opts.codec;
222        let body = codec.encode(request).map_err(|e| RpcError::Codec {
223            direction: CodecDirection::Encode,
224            message: format!("client encode: {e}"),
225        })?;
226        let body = Bytes::from(body);
227        let reply = self
228            .call_with_retry(target_node_id, service, body, opts.raw, policy)
229            .await?;
230        codec.decode(&reply.body).map_err(|e| RpcError::Codec {
231            direction: CodecDirection::Decode,
232            message: format!("client decode: {e}"),
233        })
234    }
235
236    /// Service-name typed call with retry. Same caveat as
237    /// [`Self::call_service_with_retry`] — each attempt re-resolves
238    /// the candidate set, so failover is automatic.
239    pub async fn call_service_typed_with_retry<Req, Resp>(
240        &self,
241        service: &str,
242        request: &Req,
243        opts: CallOptionsTyped,
244        policy: &RetryPolicy,
245    ) -> std::result::Result<Resp, RpcError>
246    where
247        Req: Serialize,
248        Resp: DeserializeOwned,
249    {
250        let codec = opts.codec;
251        let body = codec.encode(request).map_err(|e| RpcError::Codec {
252            direction: CodecDirection::Encode,
253            message: format!("client encode: {e}"),
254        })?;
255        let body = Bytes::from(body);
256        let reply = self
257            .call_service_with_retry(service, body, opts.raw, policy)
258            .await?;
259        codec.decode(&reply.body).map_err(|e| RpcError::Codec {
260            direction: CodecDirection::Decode,
261            message: format!("client decode: {e}"),
262        })
263    }
264}
265
266// ============================================================================
267// Internals: retry loop + backoff.
268// ============================================================================
269
270/// Run `attempt_fn` up to `policy.max_attempts` times, sleeping
271/// per-attempt backoff between failed retryable attempts. Returns
272/// the first `Ok`, or the last `Err` on exhaustion / the first
273/// non-retryable `Err` immediately.
274async fn retry_loop<T, F, Fut>(
275    policy: &RetryPolicy,
276    mut attempt_fn: F,
277) -> std::result::Result<T, RpcError>
278where
279    F: FnMut(u32) -> Fut,
280    Fut: std::future::Future<Output = std::result::Result<T, RpcError>>,
281{
282    let max = policy.max_attempts.max(1);
283    let mut last_err: Option<RpcError> = None;
284    for attempt in 1..=max {
285        match attempt_fn(attempt).await {
286            Ok(value) => return Ok(value),
287            Err(e) => {
288                let retryable = (policy.retryable)(&e);
289                let is_last = attempt == max;
290                if !retryable || is_last {
291                    return Err(e);
292                }
293                let backoff = compute_backoff(policy, attempt);
294                last_err = Some(e);
295                if !backoff.is_zero() {
296                    tokio::time::sleep(backoff).await;
297                }
298            }
299        }
300    }
301    // Loop body always returns on the final iteration; the
302    // unreachable here is a safety net for a future refactor that
303    // changes the bounds.
304    Err(last_err.unwrap_or_else(|| {
305        RpcError::Transport(net::error::AdapterError::Connection(
306            "retry_loop: exhausted with no error captured (bug)".into(),
307        ))
308    }))
309}
310
311// ============================================================================
312// Hedge policy.
313// ============================================================================
314
315/// Hedge configuration for [`Mesh::call_with_hedge_to`] and
316/// friends. **Fire-then-race** semantics: issue the primary
317/// request immediately, then after `delay` issue one or more
318/// backup requests in parallel and return whichever finishes
319/// first.
320///
321/// **What hedging buys you.** Bounds tail latency. A single slow
322/// replica (GC pause, cold cache, hostile NIC) stops dominating
323/// the p99 — once `delay` elapses without an answer, a healthy
324/// peer gets a chance and the first reply back wins.
325///
326/// **What it costs you.** The losers' work is wasted: the server
327/// runs the handler, publishes a response, and the client
328/// silently discards it on arrival. Pick `delay` close to your
329/// observed p95 (so most calls don't trigger a hedge) and
330/// `hedges` small (1 covers the common slow-replica case;
331/// values >1 multiply your server-side load).
332///
333/// **Cancellation.** When the winner returns, loser futures are
334/// dropped — and the underlying [`Mesh::call`](crate::mesh::Mesh::call)
335/// future has a `UnaryCallGuard` whose `Drop` fires a CANCEL to
336/// the corresponding server. The server's handler observes the
337/// cancellation on its `RpcContext::cancellation` token and can
338/// short-circuit cooperatively. CANCEL publish is best-effort
339/// (fire-and-forget) so a momentary network blip doesn't leave
340/// loser handlers running indefinitely.
341#[derive(Debug, Clone)]
342pub struct HedgePolicy {
343    /// Wait this long after the primary call before firing the
344    /// first hedge. Subsequent hedges (if `hedges > 1`) fire at
345    /// `delay * idx` after the primary. Default: 50ms.
346    pub delay: Duration,
347    /// Number of hedge requests to fire IN ADDITION to the
348    /// primary. `1` is the canonical "primary + one backup"
349    /// shape; `0` disables hedging (the wrapper degrades to a
350    /// straight call). Default: 1.
351    pub hedges: u32,
352}
353
354impl Default for HedgePolicy {
355    fn default() -> Self {
356        Self {
357            delay: Duration::from_millis(50),
358            hedges: 1,
359        }
360    }
361}
362
363// ============================================================================
364// Mesh extensions: hedge.
365// ============================================================================
366
367impl Mesh {
368    /// Hedge across an explicit set of target node ids. The first
369    /// element of `targets` is the primary (fired immediately);
370    /// subsequent elements are hedges fired at `policy.delay * idx`.
371    /// Whichever call resolves first wins (Ok or Err). Losing
372    /// in-flight calls are dropped on the caller side.
373    ///
374    /// Returns `RpcError::NoRoute` if `targets` is empty. If every
375    /// candidate fails, returns the LAST observed error (after
376    /// all hedges have been awaited).
377    pub async fn call_with_hedge_to(
378        &self,
379        targets: &[u64],
380        service: &str,
381        payload: Bytes,
382        opts: CallOptions,
383        policy: &HedgePolicy,
384    ) -> std::result::Result<RpcReply, RpcError> {
385        if targets.is_empty() {
386            return Err(RpcError::NoRoute {
387                target: 0,
388                reason: "call_with_hedge_to: targets is empty".into(),
389            });
390        }
391        let total = (1 + policy.hedges as usize).min(targets.len());
392        let chosen: Vec<u64> = targets[..total].to_vec();
393        hedge_race(self, &chosen, service, payload, opts, policy.delay).await
394    }
395
396    /// Hedge across `1 + policy.hedges` candidates picked from the
397    /// service registry. Candidates are sorted (so the picks are
398    /// deterministic for a stable registry) and the prefix is
399    /// taken. If fewer candidates exist than requested, hedges
400    /// degrade to whatever's available (no error if `hedges=2` but
401    /// only 1 candidate exists — you just get a straight call).
402    ///
403    /// `opts.routing_policy` is ignored (hedge picks its own
404    /// candidates from the service registry).
405    /// `opts.filter_unhealthy` is also ignored: hedge's whole
406    /// premise is "be robust to per-node slowness" — filtering
407    /// unhealthy candidates reduces the redundancy that hedge
408    /// buys you. If you want health-aware single-target dispatch,
409    /// use `call_service` directly with a routing policy.
410    pub async fn call_service_with_hedge(
411        &self,
412        service: &str,
413        payload: Bytes,
414        opts: CallOptions,
415        policy: &HedgePolicy,
416    ) -> std::result::Result<RpcReply, RpcError> {
417        let candidates = self.resolve_hedge_candidates(service)?;
418        let total = (1 + policy.hedges as usize).min(candidates.len());
419        let chosen = &candidates[..total];
420        hedge_race(self, chosen, service, payload, opts, policy.delay).await
421    }
422
423    /// Typed counterpart of [`Self::call_with_hedge_to`]. Encodes
424    /// once, hedges, decodes the winner's reply.
425    pub async fn call_typed_with_hedge_to<Req, Resp>(
426        &self,
427        targets: &[u64],
428        service: &str,
429        request: &Req,
430        opts: CallOptionsTyped,
431        policy: &HedgePolicy,
432    ) -> std::result::Result<Resp, RpcError>
433    where
434        Req: Serialize,
435        Resp: DeserializeOwned,
436    {
437        let codec = opts.codec;
438        let body = codec.encode(request).map_err(|e| RpcError::Codec {
439            direction: CodecDirection::Encode,
440            message: format!("client encode: {e}"),
441        })?;
442        let reply = self
443            .call_with_hedge_to(targets, service, Bytes::from(body), opts.raw, policy)
444            .await?;
445        codec.decode(&reply.body).map_err(|e| RpcError::Codec {
446            direction: CodecDirection::Decode,
447            message: format!("client decode: {e}"),
448        })
449    }
450
451    /// Typed counterpart of [`Self::call_service_with_hedge`].
452    pub async fn call_service_typed_with_hedge<Req, Resp>(
453        &self,
454        service: &str,
455        request: &Req,
456        opts: CallOptionsTyped,
457        policy: &HedgePolicy,
458    ) -> std::result::Result<Resp, RpcError>
459    where
460        Req: Serialize,
461        Resp: DeserializeOwned,
462    {
463        let codec = opts.codec;
464        let body = codec.encode(request).map_err(|e| RpcError::Codec {
465            direction: CodecDirection::Encode,
466            message: format!("client encode: {e}"),
467        })?;
468        let reply = self
469            .call_service_with_hedge(service, Bytes::from(body), opts.raw, policy)
470            .await?;
471        codec.decode(&reply.body).map_err(|e| RpcError::Codec {
472            direction: CodecDirection::Decode,
473            message: format!("client decode: {e}"),
474        })
475    }
476
477    fn resolve_hedge_candidates(&self, service: &str) -> std::result::Result<Vec<u64>, RpcError> {
478        let mut candidates = self.find_service_nodes(service);
479        if candidates.is_empty() {
480            return Err(RpcError::NoRoute {
481                target: 0,
482                reason: format!("no nodes advertise `nrpc:{service}`"),
483            });
484        }
485        // Sort so the prefix taken by the hedge is deterministic
486        // for a stable registry. Composes with caller-side
487        // observability (the same call always picks the same
488        // primary unless the registry has churned).
489        candidates.sort_unstable();
490        Ok(candidates)
491    }
492}
493
494// ============================================================================
495// Internals: hedge race.
496// ============================================================================
497
498async fn hedge_race(
499    mesh: &Mesh,
500    targets: &[u64],
501    service: &str,
502    payload: Bytes,
503    opts: CallOptions,
504    delay: Duration,
505) -> std::result::Result<RpcReply, RpcError> {
506    use futures::future::FutureExt;
507
508    // Clone the underlying Arc<MeshNode> once; each spawned future
509    // owns a clone for its `call(...)` invocation. Cheap (Arc bump).
510    let node = mesh.node_arc();
511    let service_owned = service.to_string();
512
513    // Build one future per target. Each yields `(target_idx,
514    // result)` so the race below can attribute errors back to the
515    // original target position regardless of completion order.
516    // The first fires immediately; subsequent ones wait
517    // `delay * idx`.
518    let mut futures: Vec<
519        futures::future::BoxFuture<'static, (usize, std::result::Result<RpcReply, RpcError>)>,
520    > = targets
521        .iter()
522        .copied()
523        .enumerate()
524        .map(|(idx, target)| {
525            let node = Arc::clone(&node);
526            let service = service_owned.clone();
527            let payload = payload.clone();
528            let opts = opts.clone();
529            let wait = delay.saturating_mul(idx as u32);
530            async move {
531                if !wait.is_zero() {
532                    tokio::time::sleep(wait).await;
533                }
534                let r = node.call(target, &service, payload, opts).await;
535                (idx, r)
536            }
537            .boxed()
538        })
539        .collect();
540
541    // Race them. Drop losers as they're left in `remaining`. If the
542    // first to resolve is `Ok`, return immediately. If every hedge
543    // errors, surface the **primary's** error deterministically —
544    // `select_all` resolves in completion order, so a naive "last
545    // error wins" semantic flips across runs depending on which
546    // hedge happened to lose its race. Tracking errors by their
547    // original target index and returning the lowest-indexed one
548    // makes the diagnostic stable.
549    let mut errors: Vec<Option<RpcError>> = (0..targets.len()).map(|_| None).collect();
550    while !futures.is_empty() {
551        let ((target_idx, result), _select_idx, remaining) =
552            futures::future::select_all(futures).await;
553        match result {
554            Ok(reply) => return Ok(reply),
555            Err(e) => {
556                if target_idx < errors.len() {
557                    errors[target_idx] = Some(e);
558                }
559                futures = remaining;
560            }
561        }
562    }
563    // Prefer the primary's error (target_idx 0) when present;
564    // otherwise the lowest-indexed hedge's error. Either way the
565    // choice is deterministic across runs given the same target
566    // list.
567    let chosen = errors.into_iter().flatten().next();
568    Err(chosen.unwrap_or_else(|| {
569        RpcError::Transport(net::error::AdapterError::Connection(
570            "hedge_race: drained with no error captured (bug)".into(),
571        ))
572    }))
573}
574
575/// `min(max_backoff, initial * multiplier^(attempt-1))`, optionally
576/// scaled by a uniform random factor in `[0.5, 1.0]` (full-half
577/// jitter — bounded enough to keep p99 predictable, randomized
578/// enough to break thundering-herd correlation across callers).
579///
580/// **Ceiling semantic:** `max_backoff` is a HARD ceiling on the
581/// returned duration — applied *after* jitter — so callers who
582/// configure e.g. `max_backoff = 1s` never sleep longer than 1s
583/// regardless of jitter or exponential growth. The previous
584/// implementation applied the cap before jitter, which let an
585/// `(initial=1s, max=1s, jitter=true)` policy sleep up to 1s
586/// (the doc was honest but the surprise was real).
587///
588/// **Jitter source:** decorrelated across simultaneously-failing
589/// callers by mixing in `Instant::now()` ticks (high-resolution
590/// monotonic; not subject to NTP step-back), `thread::current()
591/// .id()` (separates threads even when their wall-clocks alias on
592/// low-resolution Windows clocks), and the address of a stack
593/// local (separates different call sites within one thread). The
594/// previous implementation used `SystemTime::now().nanos` only —
595/// on Windows the wall-clock has ~15 ms resolution, so two
596/// callers in the same tick produced identical jitter and retried
597/// in lockstep, defeating the whole point.
598fn compute_backoff(policy: &RetryPolicy, attempt: u32) -> Duration {
599    let mult = policy.backoff_multiplier.max(1.0);
600    // attempt is 1-indexed; the backoff applies AFTER the first
601    // failure, so the "exponent" is `attempt - 1`.
602    let exp = (attempt.saturating_sub(1)) as i32;
603    let scaled = policy.initial_backoff.as_secs_f64() * mult.powi(exp);
604    let max_secs = policy.max_backoff.as_secs_f64();
605    let pre_cap = scaled.min(max_secs);
606    let jittered = if policy.jitter {
607        // Mix three independent sources so two callers (a) in the
608        // same SystemTime nanosecond bucket on a low-res Windows
609        // clock, (b) on different threads, and (c) from different
610        // call sites within one thread all decorrelate.
611        //
612        // Pre-fix this read `Instant::now().elapsed().as_nanos()`
613        // — but `.elapsed()` on a freshly-constructed `Instant`
614        // measures only the gap between construction and the
615        // very next call (a few ns of single-digit jitter), so
616        // the `now_ns` term contributed essentially zero entropy
617        // and parallel retries from the same call site stayed
618        // correlated. Anchor on a process-epoch `Instant` so we
619        // measure elapsed from process start instead.
620        static PROCESS_EPOCH: OnceLock<Instant> = OnceLock::new();
621        let epoch = PROCESS_EPOCH.get_or_init(Instant::now);
622        let now_ns = epoch.elapsed().as_nanos() as u64;
623        let thread_id_bits = {
624            // ThreadId is opaque; hash its Debug print to extract
625            // bits cheaply. Stable within the process lifetime.
626            let mut s = std::collections::hash_map::DefaultHasher::new();
627            std::hash::Hash::hash(&std::thread::current().id(), &mut s);
628            std::hash::Hasher::finish(&s)
629        };
630        let stack_addr = (&attempt as *const u32) as usize as u64;
631        let seed = now_ns
632            ^ thread_id_bits
633            ^ stack_addr.rotate_left(17)
634            ^ (attempt as u64).wrapping_mul(0x9E3779B97F4A7C15);
635        let mixed = seed
636            .wrapping_mul(0x100000001B3)
637            .wrapping_add(0xCBF29CE484222325);
638        // Top 32 bits → [0, u32::MAX]; map to [0.5, 1.0].
639        let frac = ((mixed >> 32) as u32) as f64 / u32::MAX as f64;
640        pre_cap * (0.5 + 0.5 * frac)
641    } else {
642        pre_cap
643    };
644    // True ceiling: after jitter, clamp again so `max_backoff`
645    // is the absolute upper bound.
646    let final_secs = jittered.min(max_secs).max(0.0);
647    Duration::from_secs_f64(final_secs)
648}
649
650// ============================================================================
651// Circuit breaker.
652// ============================================================================
653
654/// Per-call decision: does this `RpcError` count as a failure for
655/// the breaker? Defaults to [`default_breaker_failure`], which
656/// treats the same set of "transient infrastructure" errors as
657/// [`default_retryable`] — `Timeout`, `Transport`, `Internal`,
658/// `Backpressure`, server-observed `Timeout`. Application errors
659/// don't trip the breaker (they're caller-fixable bugs, not server
660/// health signals).
661pub type BreakerFailurePredicate = Arc<dyn Fn(&RpcError) -> bool + Send + Sync>;
662
663/// Configuration for [`CircuitBreaker`].
664#[derive(Clone)]
665pub struct CircuitBreakerConfig {
666    /// Consecutive failures while `Closed` before tripping to
667    /// `Open`. Must be >= 1; values below 1 are treated as 1.
668    /// Default: 5.
669    pub failure_threshold: u32,
670    /// Consecutive `HalfOpen` probe successes before transitioning
671    /// back to `Closed`. Must be >= 1. Default: 1 (one good probe
672    /// is enough).
673    pub success_threshold: u32,
674    /// `Open` cooldown — how long the breaker rejects every call
675    /// before transitioning to `HalfOpen` and allowing one probe.
676    /// Default: 30 seconds.
677    pub reset_after: Duration,
678    /// Predicate deciding which errors count toward the failure
679    /// counter. Default: [`default_breaker_failure`].
680    pub failure_predicate: BreakerFailurePredicate,
681}
682
683impl std::fmt::Debug for CircuitBreakerConfig {
684    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
685        f.debug_struct("CircuitBreakerConfig")
686            .field("failure_threshold", &self.failure_threshold)
687            .field("success_threshold", &self.success_threshold)
688            .field("reset_after", &self.reset_after)
689            .field("failure_predicate", &"<fn>")
690            .finish()
691    }
692}
693
694impl Default for CircuitBreakerConfig {
695    fn default() -> Self {
696        Self {
697            failure_threshold: 5,
698            success_threshold: 1,
699            reset_after: Duration::from_secs(30),
700            failure_predicate: Arc::new(default_breaker_failure),
701        }
702    }
703}
704
705/// Default breaker-failure predicate. Returns `true` for the same
706/// "transient infrastructure" errors that [`default_retryable`]
707/// considers retryable — these are the signals that a downstream
708/// is unhealthy. Application errors and routing failures do NOT
709/// trip the breaker.
710pub fn default_breaker_failure(err: &RpcError) -> bool {
711    default_retryable(err)
712}
713
714/// Operational state of a [`CircuitBreaker`]. Exposed via
715/// [`CircuitBreaker::state`] for diagnostics / observability.
716#[derive(Debug, Clone, Copy, PartialEq, Eq)]
717pub enum BreakerState {
718    /// Normal operation — calls go through. Failures are counted;
719    /// once `failure_threshold` consecutive failures land,
720    /// transitions to `Open`.
721    Closed,
722    /// Tripped — calls are short-circuited with
723    /// [`BreakerError::Open`]. After `reset_after` elapsed since
724    /// the trip, the next call transitions to `HalfOpen`.
725    Open,
726    /// Probing — at most ONE call may pass through to test
727    /// recovery. Concurrent calls during `HalfOpen` short-circuit
728    /// with [`BreakerError::Open`]. The probe's outcome decides:
729    /// success → consecutive_successes++ (transition to `Closed`
730    /// when threshold met); failure → back to `Open` with
731    /// cooldown reset.
732    HalfOpen,
733}
734
735/// What [`CircuitBreaker::call`] returns on failure: either the
736/// breaker rejected the call (`Open`) or the underlying call
737/// returned an error (`Inner`). Pattern-match to distinguish
738/// "I should fall back" (Open) from "the actual call failed and
739/// I should handle the error" (Inner).
740#[derive(Debug, thiserror::Error)]
741pub enum BreakerError {
742    /// Breaker is currently `Open` (or `HalfOpen` with a probe
743    /// already in flight). The wrapped call did NOT execute.
744    #[error("circuit breaker is open")]
745    Open,
746    /// The wrapped call executed and returned this error. The
747    /// breaker recorded it (per `failure_predicate`) before
748    /// surfacing.
749    #[error("inner: {0}")]
750    Inner(#[from] RpcError),
751}
752
753impl BreakerError {
754    /// Convert to the underlying `RpcError`, mapping the `Open`
755    /// short-circuit to an `RpcError::NoRoute` so callers that
756    /// don't care about the breaker distinction can flatten.
757    pub fn into_rpc_error(self) -> RpcError {
758        match self {
759            BreakerError::Open => RpcError::NoRoute {
760                target: 0,
761                reason: "circuit breaker is open".into(),
762            },
763            BreakerError::Inner(e) => e,
764        }
765    }
766}
767
768/// Three-state circuit breaker: `Closed` → `Open` → `HalfOpen` →
769/// `Closed`. Long-lived; instantiate once per logical downstream
770/// (one per service, or one per (service, target) pair, depending
771/// on how granular you want failure isolation to be) and reuse
772/// across calls.
773///
774/// **Thread-safety**: the breaker is `Send + Sync`; share via
775/// `Arc<CircuitBreaker>` across tasks. State transitions take a
776/// brief blocking lock — never held across `await`.
777///
778/// **Composition**: pass any closure returning a `Future<Output =
779/// Result<T, RpcError>>`. The breaker is generic over `T` so it
780/// works with raw [`RpcReply`], typed `Resp`, `Vec<RpcReply>`
781/// (hedge results), etc.
782///
783/// ```ignore
784/// use std::sync::Arc;
785/// use net_sdk::mesh_rpc_resilience::{CircuitBreaker, CircuitBreakerConfig};
786///
787/// let breaker = Arc::new(CircuitBreaker::new(CircuitBreakerConfig::default()));
788/// let result = breaker.call(|| async {
789///     mesh.call_typed::<MyReq, MyResp>(target, "svc", &req, opts).await
790/// }).await;
791/// ```
792pub struct CircuitBreaker {
793    config: CircuitBreakerConfig,
794    inner: Mutex<BreakerInner>,
795}
796
797struct BreakerInner {
798    state: BreakerState,
799    consecutive_failures: u32,
800    consecutive_successes: u32,
801    /// When `state == Open`, the instant the trip happened;
802    /// transitions to `HalfOpen` after `reset_after` elapsed.
803    /// `None` outside of `Open`.
804    opened_at: Option<std::time::Instant>,
805    /// True while a `HalfOpen` probe is in flight. Other calls
806    /// arriving during HalfOpen short-circuit on this flag.
807    probe_in_flight: bool,
808}
809
810impl CircuitBreaker {
811    /// Construct a breaker in the `Closed` state.
812    pub fn new(config: CircuitBreakerConfig) -> Self {
813        Self {
814            config,
815            inner: Mutex::new(BreakerInner {
816                state: BreakerState::Closed,
817                consecutive_failures: 0,
818                consecutive_successes: 0,
819                opened_at: None,
820                probe_in_flight: false,
821            }),
822        }
823    }
824
825    /// Lock the inner state. parking_lot::Mutex has no poison concept,
826    /// so the previous std::sync poison-recovery dance is unnecessary.
827    fn lock_inner(&self) -> parking_lot::MutexGuard<'_, BreakerInner> {
828        self.inner.lock()
829    }
830
831    /// Current operational state. Cheap snapshot — useful for
832    /// metrics / logging. Note that `Open` may have actually
833    /// elapsed its cooldown; the next `call` will transition to
834    /// `HalfOpen` on entry.
835    pub fn state(&self) -> BreakerState {
836        self.lock_inner().state
837    }
838
839    /// Snapshot of the consecutive-failure counter (resets to 0
840    /// on success or transition out of `Closed`). Useful for
841    /// alerting "we're approaching the trip threshold".
842    pub fn consecutive_failures(&self) -> u32 {
843        self.lock_inner().consecutive_failures
844    }
845
846    /// Test-only / operator override: force the breaker back to
847    /// `Closed` and zero all counters. Useful for runbooks
848    /// ("we manually verified the downstream is healthy, reset")
849    /// or test setup.
850    pub fn reset(&self) {
851        let mut g = self.lock_inner();
852        g.state = BreakerState::Closed;
853        g.consecutive_failures = 0;
854        g.consecutive_successes = 0;
855        g.opened_at = None;
856        g.probe_in_flight = false;
857    }
858
859    /// Wrap an async call. Returns:
860    ///
861    /// - `Ok(T)` if the inner call succeeded (and the breaker
862    ///   recorded the success).
863    /// - `Err(BreakerError::Open)` if the breaker rejected the
864    ///   call without running it (state was `Open` within
865    ///   cooldown, OR `HalfOpen` with a probe in flight).
866    /// - `Err(BreakerError::Inner(e))` if the inner call returned
867    ///   an error (recorded per `failure_predicate`).
868    ///
869    /// Successes always reset `consecutive_failures` to 0 (in
870    /// `Closed`) or increment `consecutive_successes` (in
871    /// `HalfOpen`, transitioning to `Closed` when threshold met).
872    /// Failures matching the predicate increment counters per
873    /// state.
874    pub async fn call<F, Fut, T>(&self, f: F) -> std::result::Result<T, BreakerError>
875    where
876        F: FnOnce() -> Fut,
877        Fut: std::future::Future<Output = std::result::Result<T, RpcError>>,
878    {
879        // Admission decision — short, no awaits.
880        let admitted_as = self.try_admit();
881        let admitted_as = match admitted_as {
882            AdmissionOutcome::Closed => Admission::Closed,
883            AdmissionOutcome::HalfOpenProbe => Admission::HalfOpenProbe,
884            AdmissionOutcome::Reject => return Err(BreakerError::Open),
885        };
886
887        // RAII probe-in-flight guard. If `f().await` panics, the
888        // guard's Drop clears `probe_in_flight=false` AND re-opens
889        // the breaker with a fresh cooldown — without this, a
890        // panicking probe wedges the breaker in HalfOpen forever
891        // (every subsequent call short-circuits with `Open`,
892        // unrecoverable except via `reset()`).
893        let _probe_guard = ProbeGuard {
894            breaker: self,
895            admission: admitted_as,
896            disarmed: std::cell::Cell::new(false),
897        };
898
899        // Run the inner call.
900        let outcome = f().await;
901
902        // Outcome bookkeeping — short, no awaits. Disarm the
903        // probe guard before mutating state ourselves so we don't
904        // double-write `probe_in_flight=false`.
905        _probe_guard.disarmed.set(true);
906        let mut g = self.lock_inner();
907        match (&outcome, admitted_as) {
908            (Ok(_), Admission::Closed) => {
909                g.consecutive_failures = 0;
910            }
911            (Ok(_), Admission::HalfOpenProbe) => {
912                g.probe_in_flight = false;
913                g.consecutive_successes = g.consecutive_successes.saturating_add(1);
914                if g.consecutive_successes >= self.config.success_threshold.max(1) {
915                    g.state = BreakerState::Closed;
916                    g.consecutive_failures = 0;
917                    g.consecutive_successes = 0;
918                    g.opened_at = None;
919                }
920            }
921            (Err(e), admission) => {
922                let counts = (self.config.failure_predicate)(e);
923                if matches!(admission, Admission::HalfOpenProbe) {
924                    g.probe_in_flight = false;
925                }
926                if counts {
927                    match admission {
928                        Admission::Closed => {
929                            g.consecutive_failures = g.consecutive_failures.saturating_add(1);
930                            if g.consecutive_failures >= self.config.failure_threshold.max(1) {
931                                g.state = BreakerState::Open;
932                                g.opened_at = Some(std::time::Instant::now());
933                                g.consecutive_successes = 0;
934                            }
935                        }
936                        Admission::HalfOpenProbe => {
937                            // Single bad probe → re-open with a
938                            // fresh cooldown.
939                            g.state = BreakerState::Open;
940                            g.opened_at = Some(std::time::Instant::now());
941                            g.consecutive_failures = 0;
942                            g.consecutive_successes = 0;
943                        }
944                    }
945                }
946                // If the predicate didn't classify this as a
947                // failure (e.g. application error), leave counters
948                // unchanged — the breaker treats it as a no-op
949                // signal.
950            }
951        }
952        drop(g);
953
954        outcome.map_err(BreakerError::Inner)
955    }
956
957    /// Pure admission decision. Returns one of:
958    /// - `Closed` — call goes through, count successes/failures.
959    /// - `HalfOpenProbe` — this caller becomes the probe.
960    /// - `Reject` — short-circuit with `Open`.
961    fn try_admit(&self) -> AdmissionOutcome {
962        let mut g = self.lock_inner();
963        match g.state {
964            BreakerState::Closed => AdmissionOutcome::Closed,
965            BreakerState::Open => {
966                let elapsed = g.opened_at.map(|i| i.elapsed()).unwrap_or(Duration::ZERO);
967                if elapsed >= self.config.reset_after {
968                    g.state = BreakerState::HalfOpen;
969                    g.consecutive_successes = 0;
970                    g.probe_in_flight = true;
971                    AdmissionOutcome::HalfOpenProbe
972                } else {
973                    AdmissionOutcome::Reject
974                }
975            }
976            BreakerState::HalfOpen => {
977                if g.probe_in_flight {
978                    AdmissionOutcome::Reject
979                } else {
980                    g.probe_in_flight = true;
981                    AdmissionOutcome::HalfOpenProbe
982                }
983            }
984        }
985    }
986}
987
988/// RAII guard that ensures a HalfOpen probe correctly clears its
989/// `probe_in_flight` flag and re-opens the breaker if the inner
990/// future panics. Disarmed via `disarmed.set(true)` before the
991/// normal bookkeeping path so the success / inner-error paths
992/// don't double-write the flag.
993struct ProbeGuard<'a> {
994    breaker: &'a CircuitBreaker,
995    admission: Admission,
996    disarmed: std::cell::Cell<bool>,
997}
998
999impl Drop for ProbeGuard<'_> {
1000    fn drop(&mut self) {
1001        if self.disarmed.get() {
1002            return;
1003        }
1004        // The future panicked between admission and bookkeeping.
1005        // For Closed admissions there's nothing to clean up — we
1006        // just propagate the panic. For HalfOpenProbe admissions
1007        // we MUST clear `probe_in_flight=false` (otherwise every
1008        // subsequent caller short-circuits with `Open` forever)
1009        // AND re-open with a fresh cooldown (a panicking probe is
1010        // a failed probe — same semantic as a probe that returned
1011        // a counted error).
1012        if matches!(self.admission, Admission::HalfOpenProbe) {
1013            let mut g = self.breaker.lock_inner();
1014            g.probe_in_flight = false;
1015            g.state = BreakerState::Open;
1016            g.opened_at = Some(std::time::Instant::now());
1017            g.consecutive_failures = 0;
1018            g.consecutive_successes = 0;
1019        }
1020    }
1021}
1022
1023#[derive(Clone, Copy)]
1024enum AdmissionOutcome {
1025    Closed,
1026    HalfOpenProbe,
1027    Reject,
1028}
1029
1030#[derive(Clone, Copy)]
1031enum Admission {
1032    Closed,
1033    HalfOpenProbe,
1034}