Skip to main content

adaptive_timeout/
tracker.rs

1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::hash::{BuildHasher, Hash};
4use std::time::Duration;
5
6use foldhash::fast::FixedState;
7
8use crate::clock;
9use crate::config::{SIGNIFICANT_VALUE_DIGITS, TrackerConfig};
10use crate::histogram::SlidingWindowHistogram;
11
12/// Default number of sub-windows for the sliding window histogram.
13///
14/// With the default `window_ms` of 60 seconds, this gives 6-second
15/// sub-windows. See [`LatencyTracker`] for details on the tradeoffs of
16/// changing this value.
17pub const DEFAULT_SUB_WINDOWS: usize = 10;
18
19/// Tracks latencies per destination and provides quantile estimates.
20///
21/// Each destination gets its own sliding-window histogram. Create one tracker
22/// per service/operation type to track latencies independently.
23///
24/// # Type parameters
25///
26/// - `D` — destination key (node, endpoint, shard, …).
27/// - `I` — time source, defaults to [`std::time::Instant`].
28/// - `N` — number of sub-windows in each sliding-window histogram, defaults to
29///   [`DEFAULT_SUB_WINDOWS`] (10).
30///
31/// # Sliding window and sub-windows
32///
33/// Think of it like the Linux load average displayed by `top`: the 1-min,
34/// 5-min, and 15-min averages all track the same metric but react to changes
35/// at different speeds. A shorter window (1-min) catches spikes quickly but
36/// is noisy; a longer window (15-min) is smoother but slow to reflect new
37/// conditions.
38///
39/// Here, two settings control this behaviour:
40///
41/// [`window_ms`](crate::TrackerConfig::window_ms) sets **how far back the
42/// histogram looks** (default: 60s). This is the "memory" of the tracker —
43/// like choosing between a 1-min or 15-min load average:
44///
45/// - **Longer window** (e.g. 5 min): more samples, more stable estimates,
46///   but slower to react to sudden latency changes. Good for low-traffic
47///   destinations where samples arrive infrequently.
48///
49/// - **Shorter window** (e.g. 5s): reacts quickly to latency shifts, but
50///   with fewer samples the estimates are noisier. May drop below
51///   [`min_samples`](crate::TrackerConfig::min_samples) during traffic lulls,
52///   causing the system to fall back to exponential backoff.
53///
54/// `N` controls **how smoothly old data is shed** as the window slides
55/// forward. The window is divided into `N` equal sub-windows, and as time
56/// advances, sub-windows rotate out one at a time:
57///
58/// - **Higher `N`** (e.g. 20): samples expire in small increments
59///   (`window / N` per step), so quantile estimates transition smoothly —
60///   like a load average that updates every few seconds. The cost is more
61///   memory (each sub-window is a separate HdrHistogram allocation).
62///
63/// - **Lower `N`** (e.g. 3): samples expire in large chunks. Uses less
64///   memory, but a bigger fraction of data disappears at once — like a load
65///   average that only updates every few minutes, causing jumpy readings.
66///
67/// - **`N = 1`**: the entire window expires at once — a tumbling window. The
68///   histogram alternates between "full of data" and "completely empty" each
69///   `window_ms` period.
70///
71/// The two settings interact: the effective rotation interval is
72/// `window_ms / N`. With the defaults (`window_ms = 60_000`, `N = 10`), each
73/// sub-window covers 6 seconds — old data is shed in 10% increments every
74/// 6 seconds. If you want finer granularity (e.g. 1-second rotations at a
75/// 60s window), set `N = 60`.
76///
77/// # Example
78///
79/// ```
80/// use std::time::{Duration, Instant};
81/// use adaptive_timeout::LatencyTracker;
82///
83/// let now = Instant::now();
84/// let mut tracker = LatencyTracker::<u32, Instant>::default();
85///
86/// for _ in 0..100 {
87///     tracker.record_latency_ms(&1u32, 50, now);
88/// }
89///
90/// let p99 = tracker.quantile_ms(&1u32, 0.99, now);
91/// assert_eq!(p99, Some(50));
92/// ```
93pub struct LatencyTracker<
94    D,
95    I: clock::Instant = std::time::Instant,
96    H = foldhash::fast::RandomState,
97    const N: usize = DEFAULT_SUB_WINDOWS,
98> {
99    config: TrackerConfig,
100    histograms: HashMap<D, SlidingWindowHistogram<I, N>, H>,
101}
102
103impl<D, I> Default for LatencyTracker<D, I, foldhash::fast::RandomState, DEFAULT_SUB_WINDOWS>
104where
105    D: Hash + Eq + Clone,
106    I: clock::Instant,
107{
108    fn default() -> Self {
109        Self::new(TrackerConfig::default())
110    }
111}
112
113impl<D, I, const N: usize> LatencyTracker<D, I, foldhash::fast::FixedState, N>
114where
115    D: Hash + Eq + Clone,
116    I: clock::Instant,
117{
118    /// Creates a new tracker with the given configuration.
119    pub const fn const_new(config: TrackerConfig) -> Self {
120        Self::with_hasher_and_config(FixedState::with_seed(125322317734512), config)
121    }
122}
123
124impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
125where
126    D: Hash + Eq + Clone,
127    I: clock::Instant,
128    H: Default,
129{
130    /// Creates a new tracker with the given configuration.
131    pub fn new(config: TrackerConfig) -> Self {
132        Self {
133            config,
134            histograms: HashMap::default(),
135        }
136    }
137}
138
139impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
140where
141    D: Hash + Eq + Clone,
142    I: clock::Instant,
143    H: BuildHasher,
144{
145    pub const fn with_hasher_and_config(hasher: H, config: TrackerConfig) -> Self {
146        Self {
147            config,
148            histograms: HashMap::with_hasher(hasher),
149        }
150    }
151}
152
153impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
154where
155    D: Hash + Eq + Clone,
156    I: clock::Instant,
157    H: BuildHasher,
158{
159    /// Records a latency sample given two instants. Returns the computed
160    /// duration.
161    ///
162    /// ```
163    /// # use std::time::{Duration, Instant};
164    /// # use adaptive_timeout::LatencyTracker;
165    /// let now = Instant::now();
166    /// let mut tracker = LatencyTracker::<u32, Instant>::default();
167    /// let later = now + Duration::from_millis(42);
168    /// let latency = tracker.record_latency_from(&1u32, now, later);
169    /// assert_eq!(latency, Duration::from_millis(42));
170    /// ```
171    #[inline]
172    pub fn record_latency_from<Q>(&mut self, dest: &Q, earlier: I, now: I) -> Duration
173    where
174        D: Borrow<Q>,
175        Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
176    {
177        let latency = now.duration_since(earlier);
178        self.record_latency_ms(dest, latency.as_millis() as u64, now);
179        latency
180    }
181
182    /// Records a latency sample as a [`Duration`].
183    #[inline]
184    pub fn record_latency<Q>(&mut self, dest: &Q, latency: Duration, now: I)
185    where
186        D: Borrow<Q>,
187        Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
188    {
189        self.record_latency_ms(dest, latency.as_millis() as u64, now);
190    }
191
192    /// Records a latency sample in milliseconds. This is the fastest
193    /// recording path — no `Duration` conversion, no allocation on the
194    /// hot path (destination already seen).
195    #[inline]
196    pub fn record_latency_ms<Q>(&mut self, dest: &Q, latency_ms: u64, now: I)
197    where
198        D: Borrow<Q>,
199        Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
200    {
201        if let Some(histogram) = self.histograms.get_mut(dest) {
202            histogram.record(latency_ms, now);
203            return;
204        }
205        self.record_latency_ms_cold(dest.to_owned(), latency_ms, now);
206    }
207
208    #[cold]
209    fn record_latency_ms_cold(&mut self, dest: D, latency_ms: u64, now: I) {
210        let mut histogram = self.new_histogram(now);
211        histogram.record(latency_ms, now);
212        self.histograms.insert(dest, histogram);
213    }
214
215    /// Returns the estimated latency in milliseconds at the given quantile,
216    /// or `None` if insufficient data.
217    #[inline]
218    pub fn quantile_ms<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<u64>
219    where
220        D: Borrow<Q>,
221        Q: Hash + Eq + ?Sized,
222    {
223        let histogram = self.histograms.get_mut(dest)?;
224        histogram.quantile(quantile, self.config.min_samples as u64, now)
225    }
226
227    /// Returns the estimated latency as a [`Duration`] at the given quantile,
228    /// or `None` if insufficient data.
229    #[inline]
230    pub fn quantile<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<Duration>
231    where
232        D: Borrow<Q>,
233        Q: Hash + Eq + ?Sized,
234    {
235        self.quantile_ms(dest, quantile, now)
236            .map(Duration::from_millis)
237    }
238
239    /// Clears all tracked state.
240    pub fn clear(&mut self) {
241        self.histograms.clear();
242    }
243
244    /// Returns a reference to the tracker configuration.
245    #[inline]
246    pub fn config(&self) -> &TrackerConfig {
247        &self.config
248    }
249
250    fn new_histogram(&self, now: I) -> SlidingWindowHistogram<I, N> {
251        SlidingWindowHistogram::new(
252            self.config.window(),
253            SIGNIFICANT_VALUE_DIGITS,
254            self.config.max_trackable_latency_ms as u64,
255            now,
256        )
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use std::time::Instant;
263
264    use super::*;
265
266    type TestTracker = LatencyTracker<u32, Instant>;
267
268    fn make_tracker() -> TestTracker {
269        let config = TrackerConfig {
270            min_samples: 5,
271            ..TrackerConfig::default()
272        };
273        LatencyTracker::new(config)
274    }
275
276    #[test]
277    fn no_data_returns_none() {
278        let now = Instant::now();
279        let mut tracker = make_tracker();
280        assert_eq!(tracker.quantile(&1, 0.5, now), None);
281    }
282
283    #[test]
284    fn record_latency_directly() {
285        let now = Instant::now();
286        let mut tracker = make_tracker();
287
288        for _ in 0..10 {
289            tracker.record_latency(&1, Duration::from_millis(100), now);
290        }
291
292        let p50 = tracker.quantile(&1, 0.5, now).unwrap();
293        assert_eq!(p50, Duration::from_millis(100));
294    }
295
296    #[test]
297    fn record_latency_ms_directly() {
298        let now = Instant::now();
299        let mut tracker = make_tracker();
300
301        for _ in 0..10 {
302            tracker.record_latency_ms(&1, 100, now);
303        }
304
305        let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
306        assert_eq!(p50, 100);
307    }
308
309    #[test]
310    fn record_latency_from_computes_duration() {
311        let now = Instant::now();
312        let mut tracker = make_tracker();
313        let later = now + Duration::from_millis(42);
314
315        for _ in 0..10 {
316            let d = tracker.record_latency_from(&1, now, later);
317            assert_eq!(d, Duration::from_millis(42));
318        }
319
320        let p50 = tracker.quantile_ms(&1, 0.5, later).unwrap();
321        assert_eq!(p50, 42);
322    }
323
324    #[test]
325    fn per_destination_isolation() {
326        let now = Instant::now();
327        let mut tracker = make_tracker();
328
329        for _ in 0..10 {
330            tracker.record_latency(&1, Duration::from_millis(100), now);
331            tracker.record_latency(&2, Duration::from_millis(500), now);
332        }
333
334        let p1 = tracker.quantile(&1, 0.5, now).unwrap();
335        let p2 = tracker.quantile(&2, 0.5, now).unwrap();
336
337        assert_eq!(p1, Duration::from_millis(100));
338        assert!(
339            p2 >= Duration::from_millis(495) && p2 <= Duration::from_millis(505),
340            "p2 was {p2:?}"
341        );
342
343        assert_eq!(tracker.quantile(&3, 0.5, now), None);
344    }
345
346    #[test]
347    fn clear_resets_all_state() {
348        let now = Instant::now();
349        let mut tracker = make_tracker();
350
351        for _ in 0..10 {
352            tracker.record_latency(&1, Duration::from_millis(100), now);
353        }
354
355        tracker.clear();
356
357        assert_eq!(tracker.quantile(&1, 0.5, now), None);
358    }
359
360    #[test]
361    fn insufficient_samples_returns_none() {
362        let now = Instant::now();
363        let mut tracker = make_tracker(); // min_samples = 5
364
365        for _ in 0..4 {
366            tracker.record_latency(&1, Duration::from_millis(100), now);
367        }
368
369        assert_eq!(tracker.quantile(&1, 0.5, now), None);
370
371        // 5th sample tips it over.
372        tracker.record_latency(&1, Duration::from_millis(100), now);
373        assert!(tracker.quantile(&1, 0.5, now).is_some());
374    }
375}