Skip to main content

dynomite/net/
auto_eject.rs

1//! Consecutive-failure auto-eject decision state.
2//!
3//! When a backend datastore (or peer) accumulates more than
4//! `failure_limit` consecutive connection or operation failures,
5//! the engine ejects it: subsequent calls into the pool return
6//! [`AutoEjectState::Ejected`] until the eject window configured
7//! by `retry_after` has elapsed. After the window passes,
8//! [`AutoEject::record_attempt`] resumes returning
9//! [`AutoEjectState::Reachable`] (the next outbound connect attempt
10//! will then run, and a successful connect resets the failure
11//! counter through [`AutoEject::record_success`]).
12//!
13//! The same shared policy is reused by [`crate::net::pool::ConnPool`]
14//! and by the Stage 10 cluster layer; lifting the policy out of any
15//! one caller keeps the implementation single-sourced.
16//!
17//! # Examples
18//!
19//! ```
20//! use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
21//! use std::time::{Duration, Instant};
22//!
23//! let mut ae = AutoEject::new(true, 2, Duration::from_millis(50));
24//! let now = Instant::now();
25//! assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
26//!
27//! ae.record_failure(now);
28//! ae.record_failure(now);
29//! // After two consecutive failures the host is ejected for 50ms.
30//! assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
31//!
32//! let later = now + Duration::from_millis(60);
33//! assert_eq!(ae.record_attempt(later), AutoEjectState::Reachable);
34//!
35//! ae.record_success(later);
36//! assert_eq!(ae.failure_count(), 0);
37//! ```
38
39use std::time::{Duration, Instant};
40
41/// Result of asking [`AutoEject`] whether a target is reachable.
42#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
43pub enum AutoEjectState {
44    /// Target is reachable. The caller may proceed.
45    Reachable,
46    /// Target was auto-ejected and the eject window has not yet
47    /// elapsed. The caller must skip this target.
48    Ejected,
49}
50
51/// Failure tracker that decides whether a target is currently
52/// auto-ejected.
53///
54/// The struct is purely synchronous: it never schedules timers and
55/// never holds locks. The tokio-driven dispatch layer queries it
56/// before issuing every outbound request and feeds back the result
57/// through [`AutoEject::record_success`] or
58/// [`AutoEject::record_failure`].
59#[derive(Debug, Clone)]
60pub struct AutoEject {
61    enabled: bool,
62    failure_limit: u32,
63    retry_after: Duration,
64    failures: u32,
65    next_retry: Option<Instant>,
66}
67
68impl AutoEject {
69    /// Construct a fresh tracker. `enabled` mirrors the
70    /// `auto_eject_hosts` knob from the YAML config.
71    /// `failure_limit` mirrors `server_failure_limit`. `retry_after`
72    /// mirrors `server_retry_timeout_ms` rendered as a
73    /// [`Duration`].
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use dynomite::net::auto_eject::AutoEject;
79    /// use std::time::Duration;
80    ///
81    /// let ae = AutoEject::new(true, 3, Duration::from_secs(1));
82    /// assert!(ae.is_enabled());
83    /// assert_eq!(ae.failure_limit(), 3);
84    /// ```
85    #[must_use]
86    pub fn new(enabled: bool, failure_limit: u32, retry_after: Duration) -> Self {
87        Self {
88            enabled,
89            failure_limit,
90            retry_after,
91            failures: 0,
92            next_retry: None,
93        }
94    }
95
96    /// True when auto-eject is enabled.
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// use dynomite::net::auto_eject::AutoEject;
102    /// use std::time::Duration;
103    /// assert!(!AutoEject::new(false, 1, Duration::from_secs(1)).is_enabled());
104    /// ```
105    #[must_use]
106    pub fn is_enabled(&self) -> bool {
107        self.enabled
108    }
109
110    /// Configured failure limit before ejecting.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// use dynomite::net::auto_eject::AutoEject;
116    /// use std::time::Duration;
117    /// assert_eq!(AutoEject::new(true, 5, Duration::from_secs(1)).failure_limit(), 5);
118    /// ```
119    #[must_use]
120    pub fn failure_limit(&self) -> u32 {
121        self.failure_limit
122    }
123
124    /// Eject window length.
125    #[must_use]
126    pub fn retry_after(&self) -> Duration {
127        self.retry_after
128    }
129
130    /// Current consecutive-failure count.
131    #[must_use]
132    pub fn failure_count(&self) -> u32 {
133        self.failures
134    }
135
136    /// Instant after which the target should be retried, when an
137    /// eject is currently active.
138    #[must_use]
139    pub fn next_retry(&self) -> Option<Instant> {
140        self.next_retry
141    }
142
143    /// Test whether the caller should proceed (`Reachable`) or skip
144    /// (`Ejected`) at the given instant.
145    ///
146    /// The caller passes `now` so the function stays deterministic
147    /// in tests.
148    ///
149    /// # Examples
150    ///
151    /// ```
152    /// use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
153    /// use std::time::{Duration, Instant};
154    /// let mut ae = AutoEject::new(true, 1, Duration::from_millis(10));
155    /// let now = Instant::now();
156    /// ae.record_failure(now);
157    /// assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
158    /// ```
159    pub fn record_attempt(&mut self, now: Instant) -> AutoEjectState {
160        if !self.enabled {
161            return AutoEjectState::Reachable;
162        }
163        match self.next_retry {
164            Some(eta) if now < eta => AutoEjectState::Ejected,
165            Some(_) => {
166                // Eject window has elapsed; clear the marker so the
167                // caller can retry. The failure counter stays at
168                // `failure_limit` so a single follow-up failure
169                // re-ejects immediately.
170                self.next_retry = None;
171                AutoEjectState::Reachable
172            }
173            None => AutoEjectState::Reachable,
174        }
175    }
176
177    /// Record a successful operation.
178    ///
179    /// Resets the consecutive-failure counter and clears any active
180    /// eject window. After a success, the next failure starts a
181    /// fresh streak from one (so the host has to fail
182    /// `failure_limit` more times before being re-ejected).
183    ///
184    /// `_now` is currently unused but accepted for parity with
185    /// [`record_attempt`](Self::record_attempt) so callers can
186    /// supply a deterministic clock in tests; future revisions may
187    /// use it to record time-to-recovery metrics.
188    pub fn record_success(&mut self, _now: Instant) {
189        self.failures = 0;
190        self.next_retry = None;
191    }
192
193    /// Record a failed operation. Returns the new state of the
194    /// tracker.
195    ///
196    /// When the consecutive-failure count reaches
197    /// `failure_limit`, the function arms the eject window starting
198    /// at `now + retry_after`.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
204    /// use std::time::{Duration, Instant};
205    /// let mut ae = AutoEject::new(true, 2, Duration::from_secs(1));
206    /// let now = Instant::now();
207    /// assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
208    /// assert_eq!(ae.record_failure(now), AutoEjectState::Ejected);
209    /// ```
210    pub fn record_failure(&mut self, now: Instant) -> AutoEjectState {
211        self.failures = self.failures.saturating_add(1);
212        if self.enabled && self.failures >= self.failure_limit {
213            self.next_retry = Some(now + self.retry_after);
214            AutoEjectState::Ejected
215        } else {
216            AutoEjectState::Reachable
217        }
218    }
219
220    /// Reset the tracker to its post-construction state.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use dynomite::net::auto_eject::AutoEject;
226    /// use std::time::{Duration, Instant};
227    /// let mut ae = AutoEject::new(true, 1, Duration::from_millis(10));
228    /// ae.record_failure(Instant::now());
229    /// ae.reset();
230    /// assert_eq!(ae.failure_count(), 0);
231    /// ```
232    pub fn reset(&mut self) {
233        self.failures = 0;
234        self.next_retry = None;
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn disabled_never_ejects() {
244        let mut ae = AutoEject::new(false, 1, Duration::from_secs(1));
245        let now = Instant::now();
246        for _ in 0..5 {
247            assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
248        }
249        assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
250    }
251
252    #[test]
253    fn ejects_after_threshold_and_recovers_after_window() {
254        let mut ae = AutoEject::new(true, 3, Duration::from_millis(50));
255        let now = Instant::now();
256        assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
257        assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
258        assert_eq!(ae.record_failure(now), AutoEjectState::Ejected);
259        assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
260        let after = now + Duration::from_millis(51);
261        assert_eq!(ae.record_attempt(after), AutoEjectState::Reachable);
262    }
263
264    #[test]
265    fn record_success_clears_state() {
266        let mut ae = AutoEject::new(true, 2, Duration::from_secs(1));
267        let now = Instant::now();
268        ae.record_failure(now);
269        ae.record_failure(now);
270        assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
271        ae.record_success(now);
272        assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
273        assert_eq!(ae.failure_count(), 0);
274    }
275}