Skip to main content

running_process/broker/server/
recovery.rs

1//! Backend crash recovery policy for broker-managed services.
2//!
3//! This module only models state transitions. The caller remains responsible
4//! for removing dead registry entries, spawning processes, and mapping refusal
5//! decisions into wire-level `Refused` replies.
6
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10use super::backend_registry::BackendKey;
11use super::spawn_coordinator::DEFAULT_SPAWN_BUDGET_WINDOW;
12
13/// Default delay before the broker retries a crashed backend once.
14pub const DEFAULT_RECOVERY_RETRY_BACKOFF: Duration = Duration::from_millis(250);
15
16/// Default window before a backend-unavailable refusal can retry recovery.
17pub const DEFAULT_RECOVERY_BUDGET_WINDOW: Duration = DEFAULT_SPAWN_BUDGET_WINDOW;
18
19/// Recovery tuning for one backend key.
20#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub struct BackendRecoveryPolicy {
22    /// Delay reported before the one allowed retry should begin.
23    pub retry_backoff: Duration,
24    /// Window bounding crash retries and backend-unavailable retry-after hints.
25    pub budget_window: Duration,
26}
27
28impl BackendRecoveryPolicy {
29    /// Build a policy, clamping zero durations to a non-zero floor.
30    pub fn new(retry_backoff: Duration, budget_window: Duration) -> Self {
31        Self {
32            retry_backoff: non_zero_duration(retry_backoff),
33            budget_window: non_zero_duration(budget_window),
34        }
35    }
36}
37
38impl Default for BackendRecoveryPolicy {
39    fn default() -> Self {
40        Self {
41            retry_backoff: DEFAULT_RECOVERY_RETRY_BACKOFF,
42            budget_window: DEFAULT_RECOVERY_BUDGET_WINDOW,
43        }
44    }
45}
46
47/// Per-backend recovery state keyed by broker instance, service, and version.
48#[derive(Debug)]
49pub struct BackendRecoveryState {
50    policy: BackendRecoveryPolicy,
51    entries: HashMap<BackendKey, BackendRecoveryEntry>,
52}
53
54impl BackendRecoveryState {
55    /// Create empty recovery state with the default policy.
56    pub fn new() -> Self {
57        Self::with_policy(BackendRecoveryPolicy::default())
58    }
59
60    /// Create empty recovery state with explicit policy settings.
61    pub fn with_policy(policy: BackendRecoveryPolicy) -> Self {
62        Self {
63            policy,
64            entries: HashMap::new(),
65        }
66    }
67
68    /// Record one observed backend crash and return the recovery decision.
69    ///
70    /// The first crash in a budget window permits one retry after the configured
71    /// backoff. A second crash in the same window is treated as backend
72    /// unavailable and returns a retry-after hint for the rest of the window.
73    pub fn record_crash(&mut self, key: BackendKey, now: Instant) -> BackendRecoveryDecision {
74        let entry = self
75            .entries
76            .entry(key)
77            .or_insert_with(|| BackendRecoveryEntry::new(now));
78        entry.refresh(now, self.policy.budget_window);
79        entry.crashes_in_window = entry.crashes_in_window.saturating_add(1);
80
81        if entry.crashes_in_window == 1 {
82            return BackendRecoveryDecision::Retry {
83                retry_after: self.policy.retry_backoff,
84                attempt: 1,
85            };
86        }
87
88        BackendRecoveryDecision::Refuse {
89            reason: BackendRecoveryRefusalReason::BackendUnavailable,
90            retry_after: retry_after(entry.window_started_at, now, self.policy.budget_window),
91        }
92    }
93
94    /// Reset crash recovery state after the backend is verified healthy again.
95    pub fn record_success(&mut self, key: &BackendKey) {
96        self.entries.remove(key);
97    }
98}
99
100impl Default for BackendRecoveryState {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106/// Decision returned after an observed backend crash.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum BackendRecoveryDecision {
109    /// The broker may retry this backend once after `retry_after`.
110    Retry {
111        /// Delay before the retry should begin.
112        retry_after: Duration,
113        /// 1-based recovery attempt number.
114        attempt: u32,
115    },
116    /// The backend should be refused as unavailable.
117    Refuse {
118        /// Refusal reason suitable for mapping to a Hello `Refused` reason.
119        reason: BackendRecoveryRefusalReason,
120        /// Retry-after hint for clients.
121        retry_after: Duration,
122    },
123}
124
125/// Refusal reason emitted by backend recovery state.
126#[derive(Clone, Copy, Debug, PartialEq, Eq)]
127pub enum BackendRecoveryRefusalReason {
128    /// The backend crashed again after its one retry.
129    BackendUnavailable,
130}
131
132#[derive(Clone, Debug)]
133struct BackendRecoveryEntry {
134    window_started_at: Instant,
135    crashes_in_window: u32,
136}
137
138impl BackendRecoveryEntry {
139    fn new(now: Instant) -> Self {
140        Self {
141            window_started_at: now,
142            crashes_in_window: 0,
143        }
144    }
145
146    fn refresh(&mut self, now: Instant, budget_window: Duration) {
147        if elapsed_since(self.window_started_at, now) >= budget_window {
148            self.window_started_at = now;
149            self.crashes_in_window = 0;
150        }
151    }
152}
153
154fn non_zero_duration(duration: Duration) -> Duration {
155    if duration.is_zero() {
156        Duration::from_millis(1)
157    } else {
158        duration
159    }
160}
161
162fn retry_after(window_started_at: Instant, now: Instant, budget_window: Duration) -> Duration {
163    budget_window.saturating_sub(elapsed_since(window_started_at, now))
164}
165
166fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
167    now.checked_duration_since(started_at)
168        .unwrap_or(Duration::ZERO)
169}