Skip to main content

running_process/broker/server/handoff/
fallback.rs

1//! Fallback policy for optional broker-to-backend connection handoff.
2//!
3//! This module does not transfer handles. It keeps the decision surface for
4//! Phase 6 small: decide whether a handoff attempt is allowed, record failed
5//! attempts, and always translate handoff failures into a silent reconnect
6//! fallback for the client.
7
8use std::collections::HashMap;
9use std::time::{Duration, Instant};
10
11use super::super::backend_registry::BackendKey;
12
13/// Default number of failed handoff attempts allowed per backend/window.
14pub const DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW: usize = 8;
15
16/// Default window used to rate-limit failed handoff attempts.
17pub const DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW: Duration = Duration::from_secs(30);
18
19/// Runtime bounds for optional handoff fallback behavior.
20#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub struct HandoffFallbackPolicy {
22    /// Maximum failed handoff attempts before attempt suppression begins.
23    pub max_failed_attempts_per_window: usize,
24    /// Window over which failed handoff attempts are counted.
25    pub failed_attempt_window: Duration,
26}
27
28impl HandoffFallbackPolicy {
29    /// Build a policy, clamping zero values to safe non-zero defaults.
30    pub fn new(max_failed_attempts_per_window: usize, failed_attempt_window: Duration) -> Self {
31        Self {
32            max_failed_attempts_per_window: max_failed_attempts_per_window.max(1),
33            failed_attempt_window: if failed_attempt_window.is_zero() {
34                Duration::from_millis(1)
35            } else {
36                failed_attempt_window
37            },
38        }
39    }
40}
41
42impl Default for HandoffFallbackPolicy {
43    fn default() -> Self {
44        Self {
45            max_failed_attempts_per_window: DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW,
46            failed_attempt_window: DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW,
47        }
48    }
49}
50
51/// Inputs that decide whether a specific client request may attempt handoff.
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub struct HandoffAttemptInputs {
54    /// Whether the client advertised support for staying on a handed-off pipe.
55    pub client_supports_handoff: bool,
56    /// Whether the matched service permits the handoff optimization.
57    pub service_policy_enabled: bool,
58    /// Whether fd/handle pressure has temporarily disabled handoff attempts.
59    pub fd_pressure_disabled: bool,
60    /// Whether this backend was adopted after broker restart rather than spawned by this broker.
61    pub backend_adopted_existing: bool,
62}
63
64impl HandoffAttemptInputs {
65    /// Build inputs for one handoff decision.
66    pub fn new(
67        client_supports_handoff: bool,
68        service_policy_enabled: bool,
69        fd_pressure_disabled: bool,
70    ) -> Self {
71        Self {
72            client_supports_handoff,
73            service_policy_enabled,
74            fd_pressure_disabled,
75            backend_adopted_existing: false,
76        }
77    }
78
79    /// Inputs for the common path where both client and service permit handoff.
80    pub fn enabled() -> Self {
81        Self::new(true, true, false)
82    }
83
84    /// Inputs for an adopted backend that must use reconnect fallback.
85    pub fn adopted_backend(client_supports_handoff: bool) -> Self {
86        Self {
87            client_supports_handoff,
88            service_policy_enabled: true,
89            fd_pressure_disabled: false,
90            backend_adopted_existing: true,
91        }
92    }
93}
94
95/// Decision produced before or after an optional handoff attempt.
96#[derive(Clone, Debug, PartialEq, Eq)]
97pub enum HandoffAttemptDecision {
98    /// The broker may attempt platform-specific handoff.
99    Attempt,
100    /// The broker must reply with the backend endpoint and let the client reconnect.
101    FallbackToReconnect(HandoffFallbackDecision),
102}
103
104impl HandoffAttemptDecision {
105    /// Return the fallback decision when this is a reconnect fallback.
106    pub fn fallback(&self) -> Option<&HandoffFallbackDecision> {
107        match self {
108            Self::Attempt => None,
109            Self::FallbackToReconnect(decision) => Some(decision),
110        }
111    }
112}
113
114/// Client-visible behavior when broker handoff is skipped or fails.
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct HandoffFallbackDecision {
117    /// Reason retained for broker logging and metrics.
118    pub reason: HandoffFallbackReason,
119    /// Time until handoff attempts should be retried for this backend, when rate-limited.
120    pub retry_after: Option<Duration>,
121}
122
123impl HandoffFallbackDecision {
124    /// Build a reconnect fallback with no retry-after hint.
125    pub fn new(reason: HandoffFallbackReason) -> Self {
126        Self {
127            reason,
128            retry_after: None,
129        }
130    }
131
132    /// Build a reconnect fallback that carries a rate-limit retry-after hint.
133    pub fn with_retry_after(reason: HandoffFallbackReason, retry_after: Duration) -> Self {
134        Self {
135            reason,
136            retry_after: Some(retry_after),
137        }
138    }
139
140    /// Return true because fallback must send `backend_pipe` for reconnect.
141    pub fn uses_backend_reconnect(&self) -> bool {
142        true
143    }
144
145    /// Return false because handoff fallback is an optimization failure, not a client error.
146    pub fn sends_client_error(&self) -> bool {
147        false
148    }
149}
150
151/// Broker-side reason for falling back from handoff to client reconnect.
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
153pub enum HandoffFallbackReason {
154    /// The client did not advertise handoff support.
155    ClientUnsupported,
156    /// The matched service disabled the handoff optimization.
157    ServicePolicyDisabled,
158    /// Handoff was temporarily disabled because fd/handle pressure is high.
159    FdPressureDisabled,
160    /// Failed handoff attempts reached the policy limit for this backend/window.
161    FailedAttemptRateLimited,
162    /// The platform handoff API denied access to duplicate or pass the handle.
163    PermissionDenied,
164    /// The broker or backend integrity boundary refused the handoff.
165    IntegrityMismatch,
166    /// The backend did not acknowledge the handed-off connection in time.
167    BackendAckTimeout,
168    /// The broker adopted an existing backend and cannot transfer handles from the old owner.
169    AdoptedBackend,
170}
171
172/// Failure observed after a platform-specific handoff attempt was started.
173#[derive(Clone, Copy, Debug, PartialEq, Eq)]
174pub enum HandoffAttemptFailure {
175    /// The platform denied handle/fd passing permissions.
176    PermissionDenied,
177    /// The broker and backend integrity levels or trust domains were incompatible.
178    IntegrityMismatch,
179    /// The backend did not acknowledge the handoff before the broker deadline.
180    BackendAckTimeout,
181}
182
183impl From<HandoffAttemptFailure> for HandoffFallbackReason {
184    fn from(value: HandoffAttemptFailure) -> Self {
185        match value {
186            HandoffAttemptFailure::PermissionDenied => Self::PermissionDenied,
187            HandoffAttemptFailure::IntegrityMismatch => Self::IntegrityMismatch,
188            HandoffAttemptFailure::BackendAckTimeout => Self::BackendAckTimeout,
189        }
190    }
191}
192
193/// Per-backend state for suppressing repeatedly failing handoff attempts.
194#[derive(Debug)]
195pub struct HandoffFallbackState {
196    policy: HandoffFallbackPolicy,
197    failed_attempts: HashMap<BackendKey, FailedAttemptWindow>,
198}
199
200impl HandoffFallbackState {
201    /// Create state with default fallback bounds.
202    pub fn new() -> Self {
203        Self::with_policy(HandoffFallbackPolicy::default())
204    }
205
206    /// Create state with explicit fallback bounds.
207    pub fn with_policy(policy: HandoffFallbackPolicy) -> Self {
208        Self {
209            policy,
210            failed_attempts: HashMap::new(),
211        }
212    }
213
214    /// Return the active policy.
215    pub fn policy(&self) -> HandoffFallbackPolicy {
216        self.policy
217    }
218
219    /// Decide whether this request may attempt handoff.
220    pub fn should_attempt(
221        &mut self,
222        backend: &BackendKey,
223        inputs: HandoffAttemptInputs,
224        now: Instant,
225    ) -> HandoffAttemptDecision {
226        if !inputs.client_supports_handoff {
227            return fallback(HandoffFallbackReason::ClientUnsupported);
228        }
229        if !inputs.service_policy_enabled {
230            return fallback(HandoffFallbackReason::ServicePolicyDisabled);
231        }
232        if inputs.fd_pressure_disabled {
233            return fallback(HandoffFallbackReason::FdPressureDisabled);
234        }
235        if inputs.backend_adopted_existing {
236            return fallback(HandoffFallbackReason::AdoptedBackend);
237        }
238
239        match self.rate_limit_for(backend, now) {
240            Some(retry_after) => HandoffAttemptDecision::FallbackToReconnect(
241                HandoffFallbackDecision::with_retry_after(
242                    HandoffFallbackReason::FailedAttemptRateLimited,
243                    retry_after,
244                ),
245            ),
246            None => HandoffAttemptDecision::Attempt,
247        }
248    }
249
250    /// Record a failed handoff attempt and return the silent reconnect fallback.
251    pub fn record_failed_attempt(
252        &mut self,
253        backend: BackendKey,
254        failure: HandoffAttemptFailure,
255        now: Instant,
256    ) -> HandoffAttemptDecision {
257        let entry = self
258            .failed_attempts
259            .entry(backend)
260            .or_insert_with(|| FailedAttemptWindow::new(now));
261        entry.refresh_if_expired(now, self.policy.failed_attempt_window);
262        entry.count = entry
263            .count
264            .saturating_add(1)
265            .min(self.policy.max_failed_attempts_per_window);
266
267        fallback(failure.into())
268    }
269
270    /// Clear failed-attempt state after a successful handoff.
271    pub fn record_success(&mut self, backend: &BackendKey) {
272        self.failed_attempts.remove(backend);
273    }
274
275    /// Return the bounded failed-attempt count for a backend.
276    pub fn failed_attempt_count(&mut self, backend: &BackendKey, now: Instant) -> usize {
277        let policy_window = self.policy.failed_attempt_window;
278        let Some(entry) = self.failed_attempts.get_mut(backend) else {
279            return 0;
280        };
281        entry.refresh_if_expired(now, policy_window);
282        entry.count
283    }
284
285    fn rate_limit_for(&mut self, backend: &BackendKey, now: Instant) -> Option<Duration> {
286        let policy = self.policy;
287        let entry = self.failed_attempts.get_mut(backend)?;
288        entry.refresh_if_expired(now, policy.failed_attempt_window);
289        if entry.count < policy.max_failed_attempts_per_window {
290            return None;
291        }
292
293        Some(entry.retry_after(now, policy.failed_attempt_window))
294    }
295}
296
297impl Default for HandoffFallbackState {
298    fn default() -> Self {
299        Self::new()
300    }
301}
302
303#[derive(Clone, Debug)]
304struct FailedAttemptWindow {
305    started_at: Instant,
306    count: usize,
307}
308
309impl FailedAttemptWindow {
310    fn new(now: Instant) -> Self {
311        Self {
312            started_at: now,
313            count: 0,
314        }
315    }
316
317    fn refresh_if_expired(&mut self, now: Instant, window: Duration) {
318        if now
319            .checked_duration_since(self.started_at)
320            .is_some_and(|elapsed| elapsed >= window)
321        {
322            self.started_at = now;
323            self.count = 0;
324        }
325    }
326
327    fn retry_after(&self, now: Instant, window: Duration) -> Duration {
328        let elapsed = now
329            .checked_duration_since(self.started_at)
330            .unwrap_or(Duration::ZERO);
331        window.saturating_sub(elapsed)
332    }
333}
334
335fn fallback(reason: HandoffFallbackReason) -> HandoffAttemptDecision {
336    HandoffAttemptDecision::FallbackToReconnect(HandoffFallbackDecision::new(reason))
337}