dynomite/net/auto_eject.rs
1//! Consecutive-failure auto-eject decision state.
2//!
3//! When a backend datastore (or peer) accumulates more than
4//! `failure_limit` consecutive connection or operation failures,
5//! the engine ejects it: subsequent calls into the pool return
6//! [`AutoEjectState::Ejected`] until the eject window configured
7//! by `retry_after` has elapsed. After the window passes,
8//! [`AutoEject::record_attempt`] resumes returning
9//! [`AutoEjectState::Reachable`] (the next outbound connect attempt
10//! will then run, and a successful connect resets the failure
11//! counter through [`AutoEject::record_success`]).
12//!
13//! The same shared policy is reused by [`crate::net::pool::ConnPool`]
14//! and by the Stage 10 cluster layer; lifting the policy out of any
15//! one caller keeps the implementation single-sourced.
16//!
17//! # Examples
18//!
19//! ```
20//! use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
21//! use std::time::{Duration, Instant};
22//!
23//! let mut ae = AutoEject::new(true, 2, Duration::from_millis(50));
24//! let now = Instant::now();
25//! assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
26//!
27//! ae.record_failure(now);
28//! ae.record_failure(now);
29//! // After two consecutive failures the host is ejected for 50ms.
30//! assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
31//!
32//! let later = now + Duration::from_millis(60);
33//! assert_eq!(ae.record_attempt(later), AutoEjectState::Reachable);
34//!
35//! ae.record_success(later);
36//! assert_eq!(ae.failure_count(), 0);
37//! ```
38
39use std::time::{Duration, Instant};
40
41/// Result of asking [`AutoEject`] whether a target is reachable.
42#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
43pub enum AutoEjectState {
44 /// Target is reachable. The caller may proceed.
45 Reachable,
46 /// Target was auto-ejected and the eject window has not yet
47 /// elapsed. The caller must skip this target.
48 Ejected,
49}
50
51/// Failure tracker that decides whether a target is currently
52/// auto-ejected.
53///
54/// The struct is purely synchronous: it never schedules timers and
55/// never holds locks. The tokio-driven dispatch layer queries it
56/// before issuing every outbound request and feeds back the result
57/// through [`AutoEject::record_success`] or
58/// [`AutoEject::record_failure`].
59#[derive(Debug, Clone)]
60pub struct AutoEject {
61 enabled: bool,
62 failure_limit: u32,
63 retry_after: Duration,
64 failures: u32,
65 next_retry: Option<Instant>,
66}
67
68impl AutoEject {
69 /// Construct a fresh tracker. `enabled` mirrors the
70 /// `auto_eject_hosts` knob from the YAML config.
71 /// `failure_limit` mirrors `server_failure_limit`. `retry_after`
72 /// mirrors `server_retry_timeout_ms` rendered as a
73 /// [`Duration`].
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use dynomite::net::auto_eject::AutoEject;
79 /// use std::time::Duration;
80 ///
81 /// let ae = AutoEject::new(true, 3, Duration::from_secs(1));
82 /// assert!(ae.is_enabled());
83 /// assert_eq!(ae.failure_limit(), 3);
84 /// ```
85 #[must_use]
86 pub fn new(enabled: bool, failure_limit: u32, retry_after: Duration) -> Self {
87 Self {
88 enabled,
89 failure_limit,
90 retry_after,
91 failures: 0,
92 next_retry: None,
93 }
94 }
95
96 /// True when auto-eject is enabled.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use dynomite::net::auto_eject::AutoEject;
102 /// use std::time::Duration;
103 /// assert!(!AutoEject::new(false, 1, Duration::from_secs(1)).is_enabled());
104 /// ```
105 #[must_use]
106 pub fn is_enabled(&self) -> bool {
107 self.enabled
108 }
109
110 /// Configured failure limit before ejecting.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// use dynomite::net::auto_eject::AutoEject;
116 /// use std::time::Duration;
117 /// assert_eq!(AutoEject::new(true, 5, Duration::from_secs(1)).failure_limit(), 5);
118 /// ```
119 #[must_use]
120 pub fn failure_limit(&self) -> u32 {
121 self.failure_limit
122 }
123
124 /// Eject window length.
125 #[must_use]
126 pub fn retry_after(&self) -> Duration {
127 self.retry_after
128 }
129
130 /// Current consecutive-failure count.
131 #[must_use]
132 pub fn failure_count(&self) -> u32 {
133 self.failures
134 }
135
136 /// Instant after which the target should be retried, when an
137 /// eject is currently active.
138 #[must_use]
139 pub fn next_retry(&self) -> Option<Instant> {
140 self.next_retry
141 }
142
143 /// Test whether the caller should proceed (`Reachable`) or skip
144 /// (`Ejected`) at the given instant.
145 ///
146 /// The caller passes `now` so the function stays deterministic
147 /// in tests.
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
153 /// use std::time::{Duration, Instant};
154 /// let mut ae = AutoEject::new(true, 1, Duration::from_millis(10));
155 /// let now = Instant::now();
156 /// ae.record_failure(now);
157 /// assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
158 /// ```
159 pub fn record_attempt(&mut self, now: Instant) -> AutoEjectState {
160 if !self.enabled {
161 return AutoEjectState::Reachable;
162 }
163 match self.next_retry {
164 Some(eta) if now < eta => AutoEjectState::Ejected,
165 Some(_) => {
166 // Eject window has elapsed; clear the marker so the
167 // caller can retry. The failure counter stays at
168 // `failure_limit` so a single follow-up failure
169 // re-ejects immediately.
170 self.next_retry = None;
171 AutoEjectState::Reachable
172 }
173 None => AutoEjectState::Reachable,
174 }
175 }
176
177 /// Record a successful operation.
178 ///
179 /// Resets the consecutive-failure counter and clears any active
180 /// eject window. After a success, the next failure starts a
181 /// fresh streak from one (so the host has to fail
182 /// `failure_limit` more times before being re-ejected).
183 ///
184 /// `_now` is currently unused but accepted for parity with
185 /// [`record_attempt`](Self::record_attempt) so callers can
186 /// supply a deterministic clock in tests; future revisions may
187 /// use it to record time-to-recovery metrics.
188 pub fn record_success(&mut self, _now: Instant) {
189 self.failures = 0;
190 self.next_retry = None;
191 }
192
193 /// Record a failed operation. Returns the new state of the
194 /// tracker.
195 ///
196 /// When the consecutive-failure count reaches
197 /// `failure_limit`, the function arms the eject window starting
198 /// at `now + retry_after`.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// use dynomite::net::auto_eject::{AutoEject, AutoEjectState};
204 /// use std::time::{Duration, Instant};
205 /// let mut ae = AutoEject::new(true, 2, Duration::from_secs(1));
206 /// let now = Instant::now();
207 /// assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
208 /// assert_eq!(ae.record_failure(now), AutoEjectState::Ejected);
209 /// ```
210 pub fn record_failure(&mut self, now: Instant) -> AutoEjectState {
211 self.failures = self.failures.saturating_add(1);
212 if self.enabled && self.failures >= self.failure_limit {
213 self.next_retry = Some(now + self.retry_after);
214 AutoEjectState::Ejected
215 } else {
216 AutoEjectState::Reachable
217 }
218 }
219
220 /// Reset the tracker to its post-construction state.
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// use dynomite::net::auto_eject::AutoEject;
226 /// use std::time::{Duration, Instant};
227 /// let mut ae = AutoEject::new(true, 1, Duration::from_millis(10));
228 /// ae.record_failure(Instant::now());
229 /// ae.reset();
230 /// assert_eq!(ae.failure_count(), 0);
231 /// ```
232 pub fn reset(&mut self) {
233 self.failures = 0;
234 self.next_retry = None;
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn disabled_never_ejects() {
244 let mut ae = AutoEject::new(false, 1, Duration::from_secs(1));
245 let now = Instant::now();
246 for _ in 0..5 {
247 assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
248 }
249 assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
250 }
251
252 #[test]
253 fn ejects_after_threshold_and_recovers_after_window() {
254 let mut ae = AutoEject::new(true, 3, Duration::from_millis(50));
255 let now = Instant::now();
256 assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
257 assert_eq!(ae.record_failure(now), AutoEjectState::Reachable);
258 assert_eq!(ae.record_failure(now), AutoEjectState::Ejected);
259 assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
260 let after = now + Duration::from_millis(51);
261 assert_eq!(ae.record_attempt(after), AutoEjectState::Reachable);
262 }
263
264 #[test]
265 fn record_success_clears_state() {
266 let mut ae = AutoEject::new(true, 2, Duration::from_secs(1));
267 let now = Instant::now();
268 ae.record_failure(now);
269 ae.record_failure(now);
270 assert_eq!(ae.record_attempt(now), AutoEjectState::Ejected);
271 ae.record_success(now);
272 assert_eq!(ae.record_attempt(now), AutoEjectState::Reachable);
273 assert_eq!(ae.failure_count(), 0);
274 }
275}