Skip to main content

adaptive_timeout/
sync_tracker.rs

1//! Thread-safe latency tracker using [`DashMap`] for concurrent access.
2//!
3//! [`SyncLatencyTracker`] provides the same per-destination sliding-window
4//! histogram tracking as [`LatencyTracker`](crate::LatencyTracker), but
5//! allows concurrent recording and querying from multiple threads without
6//! an external `Mutex`.
7//!
8//! Internally, each destination's histogram lives in a [`DashMap`] entry
9//! with per-shard locking. Concurrent operations on **different**
10//! destinations never contend. Operations on the **same** destination
11//! serialize, but the critical section is very short (~40-80ns).
12
13use std::hash::{BuildHasher, Hash};
14use std::time::Duration;
15
16use dashmap::DashMap;
17
18use crate::clock;
19use crate::config::{SIGNIFICANT_VALUE_DIGITS, TrackerConfig};
20use crate::histogram::SlidingWindowHistogram;
21use crate::tracker::DEFAULT_SUB_WINDOWS;
22
23/// Thread-safe latency tracker backed by [`DashMap`].
24///
25/// Drop-in concurrent replacement for [`LatencyTracker`](crate::LatencyTracker).
26/// All methods take `&self` instead of `&mut self`, and the type is both
27/// `Send` and `Sync`.
28///
29/// # Type parameters
30///
31/// Same as [`LatencyTracker`](crate::LatencyTracker):
32/// - `D` — destination key.
33/// - `I` — time source (defaults to [`std::time::Instant`]).
34/// - `N` — number of sub-windows (defaults to [`DEFAULT_SUB_WINDOWS`]).
35///
36/// # Example
37///
38/// ```
39/// use std::time::Instant;
40/// use adaptive_timeout::SyncLatencyTracker;
41///
42/// let now = Instant::now();
43/// let tracker = SyncLatencyTracker::<u32>::default();
44///
45/// // Can be called from any thread via &tracker.
46/// tracker.record_latency_ms(&1u32, 50, now);
47/// let p99 = tracker.quantile_ms(&1u32, 0.99, now);
48/// ```
49pub struct SyncLatencyTracker<
50    D,
51    I: clock::Instant = std::time::Instant,
52    H = foldhash::fast::RandomState,
53    const N: usize = DEFAULT_SUB_WINDOWS,
54> {
55    config: TrackerConfig,
56    histograms: DashMap<D, SlidingWindowHistogram<I, N>, H>,
57}
58
59impl<D, I> Default for SyncLatencyTracker<D, I, foldhash::fast::RandomState, DEFAULT_SUB_WINDOWS>
60where
61    D: Hash + Eq + Clone + Send + Sync,
62    I: clock::Instant,
63{
64    fn default() -> Self {
65        Self::new(TrackerConfig::default())
66    }
67}
68
69impl<D, I, H, const N: usize> SyncLatencyTracker<D, I, H, N>
70where
71    D: Hash + Eq + Clone + Send + Sync,
72    I: clock::Instant,
73    H: BuildHasher + Default + Clone,
74{
75    /// Creates a new tracker with the given configuration.
76    pub fn new(config: TrackerConfig) -> Self {
77        Self {
78            config,
79            histograms: DashMap::default(),
80        }
81    }
82}
83
84impl<D, I, H, const N: usize> SyncLatencyTracker<D, I, H, N>
85where
86    D: Hash + Eq + Clone + Send + Sync,
87    I: clock::Instant,
88    H: BuildHasher + Clone,
89{
90    pub fn with_hasher_and_config(hasher: H, config: TrackerConfig) -> Self {
91        Self {
92            config,
93            histograms: DashMap::with_hasher(hasher),
94        }
95    }
96
97    /// Records a latency sample given two instants. Returns the computed
98    /// duration.
99    #[inline]
100    pub fn record_latency_from(&self, dest: &D, earlier: I, now: I) -> Duration {
101        let latency = now.duration_since(earlier);
102        self.record_latency_ms(dest, latency.as_millis() as u64, now);
103        latency
104    }
105
106    /// Records a latency sample as a [`Duration`].
107    #[inline]
108    pub fn record_latency(&self, dest: &D, latency: Duration, now: I) {
109        self.record_latency_ms(dest, latency.as_millis() as u64, now);
110    }
111
112    /// Records a latency sample in milliseconds.
113    ///
114    /// Takes `&self` — safe to call from multiple threads concurrently.
115    /// Operations on different destinations proceed in parallel; operations
116    /// on the same destination serialize briefly.
117    #[inline]
118    pub fn record_latency_ms(&self, dest: &D, latency_ms: u64, now: I) {
119        if let Some(mut entry) = self.histograms.get_mut(dest) {
120            entry.value_mut().record(latency_ms, now);
121            return;
122        }
123        self.record_latency_ms_cold(dest.clone(), latency_ms, now);
124    }
125
126    #[cold]
127    fn record_latency_ms_cold(&self, dest: D, latency_ms: u64, now: I) {
128        let mut histogram = self.new_histogram(now);
129        histogram.record(latency_ms, now);
130        self.histograms.insert(dest, histogram);
131    }
132
133    /// Returns the estimated latency in milliseconds at the given quantile,
134    /// or `None` if insufficient data.
135    ///
136    /// Takes `&self` — safe to call from multiple threads concurrently.
137    #[inline]
138    pub fn quantile_ms(&self, dest: &D, quantile: f64, now: I) -> Option<u64> {
139        let mut entry = self.histograms.get_mut(dest)?;
140        entry
141            .value_mut()
142            .quantile(quantile, self.config.min_samples as u64, now)
143    }
144
145    /// Returns the estimated latency as a [`Duration`] at the given quantile,
146    /// or `None` if insufficient data.
147    #[inline]
148    pub fn quantile(&self, dest: &D, quantile: f64, now: I) -> Option<Duration> {
149        self.quantile_ms(dest, quantile, now)
150            .map(Duration::from_millis)
151    }
152
153    /// Clears all tracked state.
154    pub fn clear(&self) {
155        self.histograms.clear();
156    }
157
158    /// Returns a reference to the tracker configuration.
159    #[inline]
160    pub fn config(&self) -> &TrackerConfig {
161        &self.config
162    }
163
164    fn new_histogram(&self, now: I) -> SlidingWindowHistogram<I, N> {
165        SlidingWindowHistogram::new(
166            self.config.window(),
167            SIGNIFICANT_VALUE_DIGITS,
168            self.config.max_trackable_latency_ms as u64,
169            now,
170        )
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::time::Instant;
177
178    use super::*;
179
180    type TestTracker = SyncLatencyTracker<u32>;
181
182    fn make_tracker() -> TestTracker {
183        let config = TrackerConfig {
184            min_samples: 5,
185            ..TrackerConfig::default()
186        };
187        SyncLatencyTracker::new(config)
188    }
189
190    #[test]
191    fn no_data_returns_none() {
192        let now = Instant::now();
193        let tracker = make_tracker();
194        assert_eq!(tracker.quantile(&1, 0.5, now), None);
195    }
196
197    #[test]
198    fn record_latency_directly() {
199        let now = Instant::now();
200        let tracker = make_tracker();
201
202        for _ in 0..10 {
203            tracker.record_latency(&1, Duration::from_millis(100), now);
204        }
205
206        let p50 = tracker.quantile(&1, 0.5, now).unwrap();
207        assert_eq!(p50, Duration::from_millis(100));
208    }
209
210    #[test]
211    fn record_latency_ms_directly() {
212        let now = Instant::now();
213        let tracker = make_tracker();
214
215        for _ in 0..10 {
216            tracker.record_latency_ms(&1, 100, now);
217        }
218
219        let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
220        assert_eq!(p50, 100);
221    }
222
223    #[test]
224    fn record_latency_from_computes_duration() {
225        let now = Instant::now();
226        let tracker = make_tracker();
227        let later = now + Duration::from_millis(42);
228
229        for _ in 0..10 {
230            let d = tracker.record_latency_from(&1, now, later);
231            assert_eq!(d, Duration::from_millis(42));
232        }
233
234        let p50 = tracker.quantile_ms(&1, 0.5, later).unwrap();
235        assert_eq!(p50, 42);
236    }
237
238    #[test]
239    fn per_destination_isolation() {
240        let now = Instant::now();
241        let tracker = make_tracker();
242
243        for _ in 0..10 {
244            tracker.record_latency(&1, Duration::from_millis(100), now);
245            tracker.record_latency(&2, Duration::from_millis(500), now);
246        }
247
248        let p1 = tracker.quantile(&1, 0.5, now).unwrap();
249        let p2 = tracker.quantile(&2, 0.5, now).unwrap();
250
251        assert_eq!(p1, Duration::from_millis(100));
252        assert!(
253            p2 >= Duration::from_millis(495) && p2 <= Duration::from_millis(505),
254            "p2 was {p2:?}"
255        );
256
257        assert_eq!(tracker.quantile(&3, 0.5, now), None);
258    }
259
260    #[test]
261    fn clear_resets_all_state() {
262        let now = Instant::now();
263        let tracker = make_tracker();
264
265        for _ in 0..10 {
266            tracker.record_latency(&1, Duration::from_millis(100), now);
267        }
268
269        tracker.clear();
270
271        assert_eq!(tracker.quantile(&1, 0.5, now), None);
272    }
273
274    #[test]
275    fn insufficient_samples_returns_none() {
276        let now = Instant::now();
277        let tracker = make_tracker(); // min_samples = 5
278
279        for _ in 0..4 {
280            tracker.record_latency(&1, Duration::from_millis(100), now);
281        }
282
283        assert_eq!(tracker.quantile(&1, 0.5, now), None);
284
285        // 5th sample tips it over.
286        tracker.record_latency(&1, Duration::from_millis(100), now);
287        assert!(tracker.quantile(&1, 0.5, now).is_some());
288    }
289
290    #[test]
291    fn is_send_and_sync() {
292        fn assert_send_sync<T: Send + Sync>() {}
293        assert_send_sync::<SyncLatencyTracker<u32>>();
294        assert_send_sync::<SyncLatencyTracker<String>>();
295    }
296
297    // -----------------------------------------------------------------------
298    // Concurrent stress tests
299    // -----------------------------------------------------------------------
300
301    #[test]
302    fn concurrent_record_same_destination() {
303        use std::sync::Arc;
304        use std::thread;
305
306        let now = Instant::now();
307        let tracker = Arc::new(make_tracker());
308        let num_threads = 8;
309        let samples_per_thread = 1_000;
310
311        let handles: Vec<_> = (0..num_threads)
312            .map(|_| {
313                let tracker = Arc::clone(&tracker);
314                thread::spawn(move || {
315                    for _ in 0..samples_per_thread {
316                        tracker.record_latency_ms(&1, 50, now);
317                    }
318                })
319            })
320            .collect();
321
322        for h in handles {
323            h.join().unwrap();
324        }
325
326        let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
327        assert_eq!(p50, 50);
328    }
329
330    #[test]
331    fn concurrent_record_different_destinations() {
332        use std::sync::Arc;
333        use std::thread;
334
335        let now = Instant::now();
336        let tracker = Arc::new(make_tracker());
337        let num_threads = 8;
338        let samples_per_thread = 1_000;
339
340        let handles: Vec<_> = (0..num_threads)
341            .map(|tid| {
342                let tracker = Arc::clone(&tracker);
343                thread::spawn(move || {
344                    let dest = tid as u32;
345                    for _ in 0..samples_per_thread {
346                        tracker.record_latency_ms(&dest, (tid as u64 + 1) * 10, now);
347                    }
348                })
349            })
350            .collect();
351
352        for h in handles {
353            h.join().unwrap();
354        }
355
356        // Each destination should have its own latency.
357        for tid in 0..num_threads {
358            let dest = tid as u32;
359            let expected = (tid as u64 + 1) * 10;
360            let p50 = tracker.quantile_ms(&dest, 0.5, now).unwrap();
361            assert_eq!(p50, expected, "dest {dest}");
362        }
363    }
364
365    #[test]
366    fn concurrent_read_and_write() {
367        use std::sync::Arc;
368        use std::thread;
369
370        let now = Instant::now();
371        let tracker = Arc::new(make_tracker());
372
373        // Pre-fill with enough data for quantile to return Some.
374        for _ in 0..100 {
375            tracker.record_latency_ms(&1, 50, now);
376        }
377
378        let num_writers = 4;
379        let num_readers = 4;
380        let iterations = 5_000;
381
382        let mut handles = Vec::new();
383
384        // Writers
385        for _ in 0..num_writers {
386            let tracker = Arc::clone(&tracker);
387            handles.push(thread::spawn(move || {
388                for _ in 0..iterations {
389                    tracker.record_latency_ms(&1, 50, now);
390                }
391            }));
392        }
393
394        // Readers
395        for _ in 0..num_readers {
396            let tracker = Arc::clone(&tracker);
397            handles.push(thread::spawn(move || {
398                for _ in 0..iterations {
399                    if let Some(p) = tracker.quantile_ms(&1, 0.5, now) {
400                        assert_eq!(p, 50, "unexpected quantile: {p}");
401                    }
402                }
403            }));
404        }
405
406        for h in handles {
407            h.join().unwrap();
408        }
409    }
410}