running_process/broker/server/
recovery.rs1use std::collections::HashMap;
8use std::time::{Duration, Instant};
9
10use super::backend_registry::BackendKey;
11use super::spawn_coordinator::DEFAULT_SPAWN_BUDGET_WINDOW;
12
13pub const DEFAULT_RECOVERY_RETRY_BACKOFF: Duration = Duration::from_millis(250);
15
16pub const DEFAULT_RECOVERY_BUDGET_WINDOW: Duration = DEFAULT_SPAWN_BUDGET_WINDOW;
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub struct BackendRecoveryPolicy {
22 pub retry_backoff: Duration,
24 pub budget_window: Duration,
26}
27
28impl BackendRecoveryPolicy {
29 pub fn new(retry_backoff: Duration, budget_window: Duration) -> Self {
31 Self {
32 retry_backoff: non_zero_duration(retry_backoff),
33 budget_window: non_zero_duration(budget_window),
34 }
35 }
36}
37
38impl Default for BackendRecoveryPolicy {
39 fn default() -> Self {
40 Self {
41 retry_backoff: DEFAULT_RECOVERY_RETRY_BACKOFF,
42 budget_window: DEFAULT_RECOVERY_BUDGET_WINDOW,
43 }
44 }
45}
46
47#[derive(Debug)]
49pub struct BackendRecoveryState {
50 policy: BackendRecoveryPolicy,
51 entries: HashMap<BackendKey, BackendRecoveryEntry>,
52}
53
54impl BackendRecoveryState {
55 pub fn new() -> Self {
57 Self::with_policy(BackendRecoveryPolicy::default())
58 }
59
60 pub fn with_policy(policy: BackendRecoveryPolicy) -> Self {
62 Self {
63 policy,
64 entries: HashMap::new(),
65 }
66 }
67
68 pub fn record_crash(&mut self, key: BackendKey, now: Instant) -> BackendRecoveryDecision {
74 let entry = self
75 .entries
76 .entry(key)
77 .or_insert_with(|| BackendRecoveryEntry::new(now));
78 entry.refresh(now, self.policy.budget_window);
79 entry.crashes_in_window = entry.crashes_in_window.saturating_add(1);
80
81 if entry.crashes_in_window == 1 {
82 return BackendRecoveryDecision::Retry {
83 retry_after: self.policy.retry_backoff,
84 attempt: 1,
85 };
86 }
87
88 BackendRecoveryDecision::Refuse {
89 reason: BackendRecoveryRefusalReason::BackendUnavailable,
90 retry_after: retry_after(entry.window_started_at, now, self.policy.budget_window),
91 }
92 }
93
94 pub fn record_success(&mut self, key: &BackendKey) {
96 self.entries.remove(key);
97 }
98}
99
100impl Default for BackendRecoveryState {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum BackendRecoveryDecision {
109 Retry {
111 retry_after: Duration,
113 attempt: u32,
115 },
116 Refuse {
118 reason: BackendRecoveryRefusalReason,
120 retry_after: Duration,
122 },
123}
124
125#[derive(Clone, Copy, Debug, PartialEq, Eq)]
127pub enum BackendRecoveryRefusalReason {
128 BackendUnavailable,
130}
131
132#[derive(Clone, Debug)]
133struct BackendRecoveryEntry {
134 window_started_at: Instant,
135 crashes_in_window: u32,
136}
137
138impl BackendRecoveryEntry {
139 fn new(now: Instant) -> Self {
140 Self {
141 window_started_at: now,
142 crashes_in_window: 0,
143 }
144 }
145
146 fn refresh(&mut self, now: Instant, budget_window: Duration) {
147 if elapsed_since(self.window_started_at, now) >= budget_window {
148 self.window_started_at = now;
149 self.crashes_in_window = 0;
150 }
151 }
152}
153
154fn non_zero_duration(duration: Duration) -> Duration {
155 if duration.is_zero() {
156 Duration::from_millis(1)
157 } else {
158 duration
159 }
160}
161
162fn retry_after(window_started_at: Instant, now: Instant, budget_window: Duration) -> Duration {
163 budget_window.saturating_sub(elapsed_since(window_started_at, now))
164}
165
166fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
167 now.checked_duration_since(started_at)
168 .unwrap_or(Duration::ZERO)
169}