running_process/broker/server/handoff/
fallback.rs1use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11use super::super::backend_registry::BackendKey;
12
13pub const DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW: usize = 8;
15
16pub const DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW: Duration = Duration::from_secs(30);
18
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub struct HandoffFallbackPolicy {
22 pub max_failed_attempts_per_window: usize,
24 pub failed_attempt_window: Duration,
26}
27
28impl HandoffFallbackPolicy {
29 pub fn new(max_failed_attempts_per_window: usize, failed_attempt_window: Duration) -> Self {
31 Self {
32 max_failed_attempts_per_window: max_failed_attempts_per_window.max(1),
33 failed_attempt_window: if failed_attempt_window.is_zero() {
34 Duration::from_millis(1)
35 } else {
36 failed_attempt_window
37 },
38 }
39 }
40}
41
42impl Default for HandoffFallbackPolicy {
43 fn default() -> Self {
44 Self {
45 max_failed_attempts_per_window: DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW,
46 failed_attempt_window: DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW,
47 }
48 }
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub struct HandoffAttemptInputs {
54 pub client_supports_handoff: bool,
56 pub service_policy_enabled: bool,
58 pub fd_pressure_disabled: bool,
60 pub backend_adopted_existing: bool,
62}
63
64impl HandoffAttemptInputs {
65 pub fn new(
67 client_supports_handoff: bool,
68 service_policy_enabled: bool,
69 fd_pressure_disabled: bool,
70 ) -> Self {
71 Self {
72 client_supports_handoff,
73 service_policy_enabled,
74 fd_pressure_disabled,
75 backend_adopted_existing: false,
76 }
77 }
78
79 pub fn enabled() -> Self {
81 Self::new(true, true, false)
82 }
83
84 pub fn adopted_backend(client_supports_handoff: bool) -> Self {
86 Self {
87 client_supports_handoff,
88 service_policy_enabled: true,
89 fd_pressure_disabled: false,
90 backend_adopted_existing: true,
91 }
92 }
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
97pub enum HandoffAttemptDecision {
98 Attempt,
100 FallbackToReconnect(HandoffFallbackDecision),
102}
103
104impl HandoffAttemptDecision {
105 pub fn fallback(&self) -> Option<&HandoffFallbackDecision> {
107 match self {
108 Self::Attempt => None,
109 Self::FallbackToReconnect(decision) => Some(decision),
110 }
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct HandoffFallbackDecision {
117 pub reason: HandoffFallbackReason,
119 pub retry_after: Option<Duration>,
121}
122
123impl HandoffFallbackDecision {
124 pub fn new(reason: HandoffFallbackReason) -> Self {
126 Self {
127 reason,
128 retry_after: None,
129 }
130 }
131
132 pub fn with_retry_after(reason: HandoffFallbackReason, retry_after: Duration) -> Self {
134 Self {
135 reason,
136 retry_after: Some(retry_after),
137 }
138 }
139
140 pub fn uses_backend_reconnect(&self) -> bool {
142 true
143 }
144
145 pub fn sends_client_error(&self) -> bool {
147 false
148 }
149}
150
151#[derive(Clone, Copy, Debug, PartialEq, Eq)]
153pub enum HandoffFallbackReason {
154 ClientUnsupported,
156 ServicePolicyDisabled,
158 FdPressureDisabled,
160 FailedAttemptRateLimited,
162 PermissionDenied,
164 IntegrityMismatch,
166 BackendAckTimeout,
168 AdoptedBackend,
170}
171
172#[derive(Clone, Copy, Debug, PartialEq, Eq)]
174pub enum HandoffAttemptFailure {
175 PermissionDenied,
177 IntegrityMismatch,
179 BackendAckTimeout,
181}
182
183impl From<HandoffAttemptFailure> for HandoffFallbackReason {
184 fn from(value: HandoffAttemptFailure) -> Self {
185 match value {
186 HandoffAttemptFailure::PermissionDenied => Self::PermissionDenied,
187 HandoffAttemptFailure::IntegrityMismatch => Self::IntegrityMismatch,
188 HandoffAttemptFailure::BackendAckTimeout => Self::BackendAckTimeout,
189 }
190 }
191}
192
193#[derive(Debug)]
195pub struct HandoffFallbackState {
196 policy: HandoffFallbackPolicy,
197 failed_attempts: HashMap<BackendKey, FailedAttemptWindow>,
198}
199
200impl HandoffFallbackState {
201 pub fn new() -> Self {
203 Self::with_policy(HandoffFallbackPolicy::default())
204 }
205
206 pub fn with_policy(policy: HandoffFallbackPolicy) -> Self {
208 Self {
209 policy,
210 failed_attempts: HashMap::new(),
211 }
212 }
213
214 pub fn policy(&self) -> HandoffFallbackPolicy {
216 self.policy
217 }
218
219 pub fn should_attempt(
221 &mut self,
222 backend: &BackendKey,
223 inputs: HandoffAttemptInputs,
224 now: Instant,
225 ) -> HandoffAttemptDecision {
226 if !inputs.client_supports_handoff {
227 return fallback(HandoffFallbackReason::ClientUnsupported);
228 }
229 if !inputs.service_policy_enabled {
230 return fallback(HandoffFallbackReason::ServicePolicyDisabled);
231 }
232 if inputs.fd_pressure_disabled {
233 return fallback(HandoffFallbackReason::FdPressureDisabled);
234 }
235 if inputs.backend_adopted_existing {
236 return fallback(HandoffFallbackReason::AdoptedBackend);
237 }
238
239 match self.rate_limit_for(backend, now) {
240 Some(retry_after) => HandoffAttemptDecision::FallbackToReconnect(
241 HandoffFallbackDecision::with_retry_after(
242 HandoffFallbackReason::FailedAttemptRateLimited,
243 retry_after,
244 ),
245 ),
246 None => HandoffAttemptDecision::Attempt,
247 }
248 }
249
250 pub fn record_failed_attempt(
252 &mut self,
253 backend: BackendKey,
254 failure: HandoffAttemptFailure,
255 now: Instant,
256 ) -> HandoffAttemptDecision {
257 let entry = self
258 .failed_attempts
259 .entry(backend)
260 .or_insert_with(|| FailedAttemptWindow::new(now));
261 entry.refresh_if_expired(now, self.policy.failed_attempt_window);
262 entry.count = entry
263 .count
264 .saturating_add(1)
265 .min(self.policy.max_failed_attempts_per_window);
266
267 fallback(failure.into())
268 }
269
270 pub fn record_success(&mut self, backend: &BackendKey) {
272 self.failed_attempts.remove(backend);
273 }
274
275 pub fn failed_attempt_count(&mut self, backend: &BackendKey, now: Instant) -> usize {
277 let policy_window = self.policy.failed_attempt_window;
278 let Some(entry) = self.failed_attempts.get_mut(backend) else {
279 return 0;
280 };
281 entry.refresh_if_expired(now, policy_window);
282 entry.count
283 }
284
285 fn rate_limit_for(&mut self, backend: &BackendKey, now: Instant) -> Option<Duration> {
286 let policy = self.policy;
287 let entry = self.failed_attempts.get_mut(backend)?;
288 entry.refresh_if_expired(now, policy.failed_attempt_window);
289 if entry.count < policy.max_failed_attempts_per_window {
290 return None;
291 }
292
293 Some(entry.retry_after(now, policy.failed_attempt_window))
294 }
295}
296
297impl Default for HandoffFallbackState {
298 fn default() -> Self {
299 Self::new()
300 }
301}
302
303#[derive(Clone, Debug)]
304struct FailedAttemptWindow {
305 started_at: Instant,
306 count: usize,
307}
308
309impl FailedAttemptWindow {
310 fn new(now: Instant) -> Self {
311 Self {
312 started_at: now,
313 count: 0,
314 }
315 }
316
317 fn refresh_if_expired(&mut self, now: Instant, window: Duration) {
318 if now
319 .checked_duration_since(self.started_at)
320 .is_some_and(|elapsed| elapsed >= window)
321 {
322 self.started_at = now;
323 self.count = 0;
324 }
325 }
326
327 fn retry_after(&self, now: Instant, window: Duration) -> Duration {
328 let elapsed = now
329 .checked_duration_since(self.started_at)
330 .unwrap_or(Duration::ZERO);
331 window.saturating_sub(elapsed)
332 }
333}
334
335fn fallback(reason: HandoffFallbackReason) -> HandoffAttemptDecision {
336 HandoffAttemptDecision::FallbackToReconnect(HandoffFallbackDecision::new(reason))
337}