1use std::hash::{BuildHasher, Hash};
14use std::time::Duration;
15
16use dashmap::DashMap;
17
18use crate::clock;
19use crate::config::{SIGNIFICANT_VALUE_DIGITS, TrackerConfig};
20use crate::histogram::SlidingWindowHistogram;
21use crate::tracker::DEFAULT_SUB_WINDOWS;
22
23pub struct SyncLatencyTracker<
50 D,
51 I: clock::Instant = std::time::Instant,
52 H = foldhash::fast::RandomState,
53 const N: usize = DEFAULT_SUB_WINDOWS,
54> {
55 config: TrackerConfig,
56 histograms: DashMap<D, SlidingWindowHistogram<I, N>, H>,
57}
58
59impl<D, I> Default for SyncLatencyTracker<D, I, foldhash::fast::RandomState, DEFAULT_SUB_WINDOWS>
60where
61 D: Hash + Eq + Clone + Send + Sync,
62 I: clock::Instant,
63{
64 fn default() -> Self {
65 Self::new(TrackerConfig::default())
66 }
67}
68
69impl<D, I, H, const N: usize> SyncLatencyTracker<D, I, H, N>
70where
71 D: Hash + Eq + Clone + Send + Sync,
72 I: clock::Instant,
73 H: BuildHasher + Default + Clone,
74{
75 pub fn new(config: TrackerConfig) -> Self {
77 Self {
78 config,
79 histograms: DashMap::default(),
80 }
81 }
82}
83
84impl<D, I, H, const N: usize> SyncLatencyTracker<D, I, H, N>
85where
86 D: Hash + Eq + Clone + Send + Sync,
87 I: clock::Instant,
88 H: BuildHasher + Clone,
89{
90 pub fn with_hasher_and_config(hasher: H, config: TrackerConfig) -> Self {
91 Self {
92 config,
93 histograms: DashMap::with_hasher(hasher),
94 }
95 }
96
97 #[inline]
100 pub fn record_latency_from(&self, dest: &D, earlier: I, now: I) -> Duration {
101 let latency = now.duration_since(earlier);
102 self.record_latency_ms(dest, latency.as_millis() as u64, now);
103 latency
104 }
105
106 #[inline]
108 pub fn record_latency(&self, dest: &D, latency: Duration, now: I) {
109 self.record_latency_ms(dest, latency.as_millis() as u64, now);
110 }
111
112 #[inline]
118 pub fn record_latency_ms(&self, dest: &D, latency_ms: u64, now: I) {
119 if let Some(mut entry) = self.histograms.get_mut(dest) {
120 entry.value_mut().record(latency_ms, now);
121 return;
122 }
123 self.record_latency_ms_cold(dest.clone(), latency_ms, now);
124 }
125
126 #[cold]
127 fn record_latency_ms_cold(&self, dest: D, latency_ms: u64, now: I) {
128 let mut histogram = self.new_histogram(now);
129 histogram.record(latency_ms, now);
130 self.histograms.insert(dest, histogram);
131 }
132
133 #[inline]
138 pub fn quantile_ms(&self, dest: &D, quantile: f64, now: I) -> Option<u64> {
139 let mut entry = self.histograms.get_mut(dest)?;
140 entry
141 .value_mut()
142 .quantile(quantile, self.config.min_samples as u64, now)
143 }
144
145 #[inline]
148 pub fn quantile(&self, dest: &D, quantile: f64, now: I) -> Option<Duration> {
149 self.quantile_ms(dest, quantile, now)
150 .map(Duration::from_millis)
151 }
152
153 pub fn clear(&self) {
155 self.histograms.clear();
156 }
157
158 #[inline]
160 pub fn config(&self) -> &TrackerConfig {
161 &self.config
162 }
163
164 fn new_histogram(&self, now: I) -> SlidingWindowHistogram<I, N> {
165 SlidingWindowHistogram::new(
166 self.config.window(),
167 SIGNIFICANT_VALUE_DIGITS,
168 self.config.max_trackable_latency_ms as u64,
169 now,
170 )
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::time::Instant;
177
178 use super::*;
179
180 type TestTracker = SyncLatencyTracker<u32>;
181
182 fn make_tracker() -> TestTracker {
183 let config = TrackerConfig {
184 min_samples: 5,
185 ..TrackerConfig::default()
186 };
187 SyncLatencyTracker::new(config)
188 }
189
190 #[test]
191 fn no_data_returns_none() {
192 let now = Instant::now();
193 let tracker = make_tracker();
194 assert_eq!(tracker.quantile(&1, 0.5, now), None);
195 }
196
197 #[test]
198 fn record_latency_directly() {
199 let now = Instant::now();
200 let tracker = make_tracker();
201
202 for _ in 0..10 {
203 tracker.record_latency(&1, Duration::from_millis(100), now);
204 }
205
206 let p50 = tracker.quantile(&1, 0.5, now).unwrap();
207 assert_eq!(p50, Duration::from_millis(100));
208 }
209
210 #[test]
211 fn record_latency_ms_directly() {
212 let now = Instant::now();
213 let tracker = make_tracker();
214
215 for _ in 0..10 {
216 tracker.record_latency_ms(&1, 100, now);
217 }
218
219 let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
220 assert_eq!(p50, 100);
221 }
222
223 #[test]
224 fn record_latency_from_computes_duration() {
225 let now = Instant::now();
226 let tracker = make_tracker();
227 let later = now + Duration::from_millis(42);
228
229 for _ in 0..10 {
230 let d = tracker.record_latency_from(&1, now, later);
231 assert_eq!(d, Duration::from_millis(42));
232 }
233
234 let p50 = tracker.quantile_ms(&1, 0.5, later).unwrap();
235 assert_eq!(p50, 42);
236 }
237
238 #[test]
239 fn per_destination_isolation() {
240 let now = Instant::now();
241 let tracker = make_tracker();
242
243 for _ in 0..10 {
244 tracker.record_latency(&1, Duration::from_millis(100), now);
245 tracker.record_latency(&2, Duration::from_millis(500), now);
246 }
247
248 let p1 = tracker.quantile(&1, 0.5, now).unwrap();
249 let p2 = tracker.quantile(&2, 0.5, now).unwrap();
250
251 assert_eq!(p1, Duration::from_millis(100));
252 assert!(
253 p2 >= Duration::from_millis(495) && p2 <= Duration::from_millis(505),
254 "p2 was {p2:?}"
255 );
256
257 assert_eq!(tracker.quantile(&3, 0.5, now), None);
258 }
259
260 #[test]
261 fn clear_resets_all_state() {
262 let now = Instant::now();
263 let tracker = make_tracker();
264
265 for _ in 0..10 {
266 tracker.record_latency(&1, Duration::from_millis(100), now);
267 }
268
269 tracker.clear();
270
271 assert_eq!(tracker.quantile(&1, 0.5, now), None);
272 }
273
274 #[test]
275 fn insufficient_samples_returns_none() {
276 let now = Instant::now();
277 let tracker = make_tracker(); for _ in 0..4 {
280 tracker.record_latency(&1, Duration::from_millis(100), now);
281 }
282
283 assert_eq!(tracker.quantile(&1, 0.5, now), None);
284
285 tracker.record_latency(&1, Duration::from_millis(100), now);
287 assert!(tracker.quantile(&1, 0.5, now).is_some());
288 }
289
290 #[test]
291 fn is_send_and_sync() {
292 fn assert_send_sync<T: Send + Sync>() {}
293 assert_send_sync::<SyncLatencyTracker<u32>>();
294 assert_send_sync::<SyncLatencyTracker<String>>();
295 }
296
297 #[test]
302 fn concurrent_record_same_destination() {
303 use std::sync::Arc;
304 use std::thread;
305
306 let now = Instant::now();
307 let tracker = Arc::new(make_tracker());
308 let num_threads = 8;
309 let samples_per_thread = 1_000;
310
311 let handles: Vec<_> = (0..num_threads)
312 .map(|_| {
313 let tracker = Arc::clone(&tracker);
314 thread::spawn(move || {
315 for _ in 0..samples_per_thread {
316 tracker.record_latency_ms(&1, 50, now);
317 }
318 })
319 })
320 .collect();
321
322 for h in handles {
323 h.join().unwrap();
324 }
325
326 let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
327 assert_eq!(p50, 50);
328 }
329
330 #[test]
331 fn concurrent_record_different_destinations() {
332 use std::sync::Arc;
333 use std::thread;
334
335 let now = Instant::now();
336 let tracker = Arc::new(make_tracker());
337 let num_threads = 8;
338 let samples_per_thread = 1_000;
339
340 let handles: Vec<_> = (0..num_threads)
341 .map(|tid| {
342 let tracker = Arc::clone(&tracker);
343 thread::spawn(move || {
344 let dest = tid as u32;
345 for _ in 0..samples_per_thread {
346 tracker.record_latency_ms(&dest, (tid as u64 + 1) * 10, now);
347 }
348 })
349 })
350 .collect();
351
352 for h in handles {
353 h.join().unwrap();
354 }
355
356 for tid in 0..num_threads {
358 let dest = tid as u32;
359 let expected = (tid as u64 + 1) * 10;
360 let p50 = tracker.quantile_ms(&dest, 0.5, now).unwrap();
361 assert_eq!(p50, expected, "dest {dest}");
362 }
363 }
364
365 #[test]
366 fn concurrent_read_and_write() {
367 use std::sync::Arc;
368 use std::thread;
369
370 let now = Instant::now();
371 let tracker = Arc::new(make_tracker());
372
373 for _ in 0..100 {
375 tracker.record_latency_ms(&1, 50, now);
376 }
377
378 let num_writers = 4;
379 let num_readers = 4;
380 let iterations = 5_000;
381
382 let mut handles = Vec::new();
383
384 for _ in 0..num_writers {
386 let tracker = Arc::clone(&tracker);
387 handles.push(thread::spawn(move || {
388 for _ in 0..iterations {
389 tracker.record_latency_ms(&1, 50, now);
390 }
391 }));
392 }
393
394 for _ in 0..num_readers {
396 let tracker = Arc::clone(&tracker);
397 handles.push(thread::spawn(move || {
398 for _ in 0..iterations {
399 if let Some(p) = tracker.quantile_ms(&1, 0.5, now) {
400 assert_eq!(p, 50, "unexpected quantile: {p}");
401 }
402 }
403 }));
404 }
405
406 for h in handles {
407 h.join().unwrap();
408 }
409 }
410}