Skip to main content

adaptive_timeout/
timeout.rs

1use std::hash::{BuildHasher, Hash};
2use std::time::Duration;
3
4use crate::clock;
5use crate::config::TimeoutConfig;
6use crate::tracker::LatencyTracker;
7
8/// Computes adaptive timeouts based on observed latency quantiles.
9///
10/// For each destination, queries the tracker for a high quantile (default:
11/// P99.99), applies a safety factor and exponential backoff, clamps between
12/// floor and ceiling, and takes the maximum across all destinations.
13///
14/// Falls back to pure exponential backoff when histogram data is insufficient.
15///
16/// # Example
17///
18/// ```
19/// use std::time::{Duration, Instant};
20/// use adaptive_timeout::{AdaptiveTimeout, LatencyTracker};
21///
22/// let now = Instant::now();
23/// let mut tracker = LatencyTracker::<u32, Instant>::default();
24/// let timeout = AdaptiveTimeout::default();
25///
26/// // No data yet — falls back to exponential backoff (min_timeout).
27/// let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
28/// assert_eq!(t, Duration::from_millis(250));
29/// ```
30#[derive(Default, Clone)]
31pub struct AdaptiveTimeout {
32    config: TimeoutConfig,
33}
34
35impl AdaptiveTimeout {
36    /// Creates a new `AdaptiveTimeout` with the given configuration.
37    pub fn new(config: TimeoutConfig) -> Self {
38        Self { config }
39    }
40
41    /// Computes an adaptive timeout for a request to the given destinations.
42    ///
43    /// Returns the maximum timeout across all destinations, clamped to
44    /// `[min_timeout, max_timeout]`. `attempt` is 1-based; higher attempts
45    /// produce longer timeouts via exponential backoff.
46    #[inline]
47    pub fn select_timeout<'a, D, I, H, const N: usize>(
48        &self,
49        tracker: &mut LatencyTracker<D, I, H, N>,
50        destinations: impl IntoIterator<Item = &'a D>,
51        attempt: u32,
52        now: I,
53    ) -> Duration
54    where
55        D: Hash + Eq + Clone + 'a,
56        I: clock::Instant,
57        H: BuildHasher,
58    {
59        Duration::from_millis(self.select_timeout_ms(tracker, destinations, attempt, now))
60    }
61
62    /// Computes an adaptive timeout in milliseconds.
63    /// See [`select_timeout`](Self::select_timeout).
64    pub fn select_timeout_ms<'a, D, I, H, const N: usize>(
65        &self,
66        tracker: &mut LatencyTracker<D, I, H, N>,
67        destinations: impl IntoIterator<Item = &'a D>,
68        attempt: u32,
69        now: I,
70    ) -> u64
71    where
72        D: Hash + Eq + Clone + 'a,
73        I: clock::Instant,
74        H: BuildHasher,
75    {
76        let multiplier = Self::attempt_multiplier(attempt);
77        let floor = self.config.backoff.min_ms.get() as u64;
78        let ceiling = self.config.backoff.max_ms.get() as u64;
79        let fallback = (floor * multiplier).min(ceiling);
80        let mut selected = fallback;
81
82        for dest in destinations.into_iter() {
83            if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
84                let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
85                let clamped = adaptive_ms.max(floor).min(ceiling);
86                selected = selected.max(clamped);
87            }
88        }
89
90        selected
91    }
92
93    /// Pure exponential backoff: `min_timeout * 2^(attempt - 1)`, clamped to
94    /// `max_timeout`. Fallback when histogram data is insufficient.
95    #[inline]
96    pub fn exponential_backoff(&self, attempt: u32) -> Duration {
97        Duration::from_millis(self.exponential_backoff_ms(attempt))
98    }
99
100    /// Pure exponential backoff in milliseconds.
101    #[inline]
102    pub fn exponential_backoff_ms(&self, attempt: u32) -> u64 {
103        let multiplier = Self::attempt_multiplier(attempt);
104        let base = self.config.backoff.min_ms.get() as u64;
105        let ceiling = self.config.backoff.max_ms.get() as u64;
106        (base * multiplier).min(ceiling)
107    }
108
109    /// `2^(attempt - 1)`, capped at `2^20`.
110    #[inline]
111    fn attempt_multiplier(attempt: u32) -> u64 {
112        let exponent = attempt.saturating_sub(1).min(20);
113        1u64 << exponent
114    }
115
116    /// `safety_factor * estimate_ms * multiplier`.
117    #[inline]
118    fn compute_adaptive_ms(&self, estimate_ms: u64, multiplier: u64) -> u64 {
119        let base = estimate_ms.saturating_mul(multiplier);
120        (self.config.safety_factor * base as f64) as u64
121    }
122
123    /// Returns a reference to the timeout configuration.
124    #[inline]
125    pub fn config(&self) -> &TimeoutConfig {
126        &self.config
127    }
128
129    // -----------------------------------------------------------------------
130    // SyncLatencyTracker variants (feature = "sync")
131    // -----------------------------------------------------------------------
132
133    /// Like [`select_timeout`](Self::select_timeout) but for
134    /// [`SyncLatencyTracker`](crate::SyncLatencyTracker).
135    ///
136    /// Takes `&tracker` (shared reference) instead of `&mut tracker`.
137    #[cfg(feature = "sync")]
138    #[inline]
139    pub fn select_timeout_sync<'a, D, I, H, const N: usize>(
140        &self,
141        tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
142        destinations: impl IntoIterator<Item = &'a D>,
143        attempt: u32,
144        now: I,
145    ) -> Duration
146    where
147        D: Hash + Eq + Clone + Send + Sync + 'a,
148        I: clock::Instant,
149        H: BuildHasher + Clone,
150    {
151        Duration::from_millis(self.select_timeout_sync_ms(tracker, destinations, attempt, now))
152    }
153
154    /// Like [`select_timeout_ms`](Self::select_timeout_ms) but for
155    /// [`SyncLatencyTracker`](crate::SyncLatencyTracker).
156    #[cfg(feature = "sync")]
157    pub fn select_timeout_sync_ms<'a, D, I, H, const N: usize>(
158        &self,
159        tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
160        destinations: impl IntoIterator<Item = &'a D>,
161        attempt: u32,
162        now: I,
163    ) -> u64
164    where
165        D: Hash + Eq + Clone + Send + Sync + 'a,
166        I: clock::Instant,
167        H: BuildHasher + Clone,
168    {
169        let multiplier = Self::attempt_multiplier(attempt);
170        let floor = self.config.backoff.min_ms.get() as u64;
171        let ceiling = self.config.backoff.max_ms.get() as u64;
172        let fallback = (floor * multiplier).min(ceiling);
173        let mut selected = fallback;
174
175        for dest in destinations.into_iter() {
176            if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
177                let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
178                let clamped = adaptive_ms.max(floor).min(ceiling);
179                selected = selected.max(clamped);
180            }
181        }
182
183        selected
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::time::Instant;
190
191    use super::*;
192    use crate::config::TrackerConfig;
193    use crate::parse::BackoffInterval;
194
195    fn make_tracker_and_timeout<I: clock::Instant>() -> (LatencyTracker<u32, I>, AdaptiveTimeout) {
196        let tracker_config = TrackerConfig {
197            min_samples: 5,
198            ..TrackerConfig::default()
199        };
200        let timeout_config = TimeoutConfig {
201            backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
202            quantile: 0.99,
203            safety_factor: 2.0,
204        };
205        (
206            LatencyTracker::new(tracker_config),
207            AdaptiveTimeout::new(timeout_config),
208        )
209    }
210
211    #[test]
212    fn fallback_exponential_backoff_no_data() {
213        let now = Instant::now();
214        let (mut tracker, timeout) = make_tracker_and_timeout();
215
216        let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
217        assert_eq!(t1, Duration::from_millis(10));
218
219        let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
220        assert_eq!(t2, Duration::from_millis(20));
221
222        let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
223        assert_eq!(t3, Duration::from_millis(40));
224    }
225
226    #[test]
227    fn exponential_backoff_capped_at_max() {
228        let now = Instant::now();
229        let (mut tracker, timeout) = make_tracker_and_timeout();
230
231        let t = timeout.select_timeout(&mut tracker, &[1u32], 100, now);
232        assert_eq!(t, Duration::from_secs(60));
233    }
234
235    #[test]
236    fn adaptive_timeout_with_data() {
237        let now = Instant::now();
238        let (mut tracker, timeout) = make_tracker_and_timeout();
239
240        for _ in 0..100 {
241            tracker.record_latency(&1u32, Duration::from_millis(50), now);
242        }
243
244        // p99 ~50ms, safety_factor=2, attempt=1: 2 * 50 * 1 = 100ms
245        let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
246        assert_eq!(t, Duration::from_millis(100));
247    }
248
249    #[test]
250    fn adaptive_timeout_respects_floor() {
251        let now = Instant::now();
252        let (mut tracker, timeout) = make_tracker_and_timeout();
253
254        for _ in 0..100 {
255            tracker.record_latency(&1u32, Duration::from_millis(1), now);
256        }
257
258        let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
259        assert_eq!(t, Duration::from_millis(10));
260    }
261
262    #[test]
263    fn adaptive_timeout_respects_ceiling() {
264        let now = Instant::now();
265        let (mut tracker, timeout) = make_tracker_and_timeout();
266
267        for _ in 0..100 {
268            tracker.record_latency(&1u32, Duration::from_millis(50_000), now);
269        }
270
271        let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
272        assert_eq!(t, Duration::from_secs(60));
273    }
274
275    #[test]
276    fn max_across_destinations() {
277        let now = Instant::now();
278        let (mut tracker, timeout) = make_tracker_and_timeout();
279
280        for _ in 0..100 {
281            tracker.record_latency(&1u32, Duration::from_millis(10), now);
282            tracker.record_latency(&2u32, Duration::from_millis(500), now);
283        }
284
285        let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
286        assert!(
287            t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
288            "timeout was {t:?}"
289        );
290    }
291
292    #[test]
293    fn attempt_multiplier_increases_timeout() {
294        let now = Instant::now();
295        let (mut tracker, timeout) = make_tracker_and_timeout();
296
297        for _ in 0..100 {
298            tracker.record_latency(&1u32, Duration::from_millis(50), now);
299        }
300
301        let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
302        let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
303        let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
304
305        assert_eq!(t1, Duration::from_millis(100));
306        assert_eq!(t2, Duration::from_millis(200));
307        assert_eq!(t3, Duration::from_millis(400));
308    }
309
310    #[test]
311    fn mixed_data_and_no_data_destinations() {
312        let now = Instant::now();
313        let (mut tracker, timeout) = make_tracker_and_timeout();
314
315        for _ in 0..100 {
316            tracker.record_latency(&1u32, Duration::from_millis(50), now);
317        }
318
319        let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
320        assert_eq!(t, Duration::from_millis(100));
321    }
322
323    #[test]
324    fn ms_variants_match_duration_variants() {
325        let now = Instant::now();
326        let (mut tracker, timeout) = make_tracker_and_timeout();
327
328        for _ in 0..100 {
329            tracker.record_latency(&1u32, Duration::from_millis(50), now);
330        }
331
332        let dur = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
333        let ms = timeout.select_timeout_ms(&mut tracker, &[1u32], 1, now);
334        assert_eq!(dur, Duration::from_millis(ms));
335
336        let dur_fb = timeout.exponential_backoff(3);
337        let ms_fb = timeout.exponential_backoff_ms(3);
338        assert_eq!(dur_fb, Duration::from_millis(ms_fb));
339    }
340
341    // -----------------------------------------------------------------------
342    // SyncLatencyTracker tests (feature = "sync")
343    // -----------------------------------------------------------------------
344
345    #[cfg(feature = "sync")]
346    mod sync_tests {
347        use std::time::{Duration, Instant};
348
349        use crate::config::{TimeoutConfig, TrackerConfig};
350        use crate::parse::BackoffInterval;
351        use crate::sync_tracker::SyncLatencyTracker;
352        use crate::timeout::AdaptiveTimeout;
353
354        fn make_sync_tracker_and_timeout() -> (SyncLatencyTracker<u32>, AdaptiveTimeout) {
355            let tracker_config = TrackerConfig {
356                min_samples: 5,
357                ..TrackerConfig::default()
358            };
359            let timeout_config = TimeoutConfig {
360                backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
361                quantile: 0.99,
362                safety_factor: 2.0,
363            };
364            (
365                SyncLatencyTracker::new(tracker_config),
366                AdaptiveTimeout::new(timeout_config),
367            )
368        }
369
370        #[test]
371        fn sync_fallback_exponential_backoff_no_data() {
372            let now = Instant::now();
373            let (tracker, timeout) = make_sync_tracker_and_timeout();
374
375            let t1 = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
376            assert_eq!(t1, Duration::from_millis(10));
377
378            let t2 = timeout.select_timeout_sync(&tracker, &[1u32], 2, now);
379            assert_eq!(t2, Duration::from_millis(20));
380
381            let t3 = timeout.select_timeout_sync(&tracker, &[1u32], 3, now);
382            assert_eq!(t3, Duration::from_millis(40));
383        }
384
385        #[test]
386        fn sync_adaptive_timeout_with_data() {
387            let now = Instant::now();
388            let (tracker, timeout) = make_sync_tracker_and_timeout();
389
390            for _ in 0..100 {
391                tracker.record_latency(&1u32, Duration::from_millis(50), now);
392            }
393
394            // p99 ~50ms, safety_factor=2, attempt=1: 2 * 50 * 1 = 100ms
395            let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
396            assert_eq!(t, Duration::from_millis(100));
397        }
398
399        #[test]
400        fn sync_respects_floor_and_ceiling() {
401            let now = Instant::now();
402            let (tracker, timeout) = make_sync_tracker_and_timeout();
403
404            // Floor: tiny latency clamped to min_timeout
405            for _ in 0..100 {
406                tracker.record_latency(&1u32, Duration::from_millis(1), now);
407            }
408            let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
409            assert_eq!(t, Duration::from_millis(10));
410
411            // Ceiling: huge latency clamped to max_timeout
412            for _ in 0..100 {
413                tracker.record_latency(&2u32, Duration::from_millis(50_000), now);
414            }
415            let t = timeout.select_timeout_sync(&tracker, &[2u32], 1, now);
416            assert_eq!(t, Duration::from_secs(60));
417        }
418
419        #[test]
420        fn sync_max_across_destinations() {
421            let now = Instant::now();
422            let (tracker, timeout) = make_sync_tracker_and_timeout();
423
424            for _ in 0..100 {
425                tracker.record_latency(&1u32, Duration::from_millis(10), now);
426                tracker.record_latency(&2u32, Duration::from_millis(500), now);
427            }
428
429            let t = timeout.select_timeout_sync(&tracker, &[1u32, 2u32], 1, now);
430            assert!(
431                t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
432                "timeout was {t:?}"
433            );
434        }
435
436        #[test]
437        fn sync_ms_variants_match_duration_variants() {
438            let now = Instant::now();
439            let (tracker, timeout) = make_sync_tracker_and_timeout();
440
441            for _ in 0..100 {
442                tracker.record_latency(&1u32, Duration::from_millis(50), now);
443            }
444
445            let dur = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
446            let ms = timeout.select_timeout_sync_ms(&tracker, &[1u32], 1, now);
447            assert_eq!(dur, Duration::from_millis(ms));
448        }
449
450        #[test]
451        fn sync_matches_mutable_tracker_results() {
452            use crate::tracker::LatencyTracker;
453
454            let now = Instant::now();
455            let tracker_config = TrackerConfig {
456                min_samples: 5,
457                ..TrackerConfig::default()
458            };
459            let timeout_config = TimeoutConfig {
460                backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
461                quantile: 0.99,
462                safety_factor: 2.0,
463            };
464
465            let mut mutable_tracker = LatencyTracker::<u32, Instant>::new(tracker_config);
466            let sync_tracker = SyncLatencyTracker::<u32>::new(tracker_config);
467            let timeout = AdaptiveTimeout::new(timeout_config);
468
469            // Same data in both trackers.
470            for _ in 0..100 {
471                mutable_tracker.record_latency(&1u32, Duration::from_millis(50), now);
472                sync_tracker.record_latency(&1u32, Duration::from_millis(50), now);
473            }
474
475            let ms_mut = timeout.select_timeout_ms(&mut mutable_tracker, &[1u32], 1, now);
476            let ms_sync = timeout.select_timeout_sync_ms(&sync_tracker, &[1u32], 1, now);
477            assert_eq!(ms_mut, ms_sync);
478        }
479    }
480}