1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::hash::{BuildHasher, Hash};
4use std::time::Duration;
5
6use foldhash::fast::FixedState;
7
8use crate::clock;
9use crate::config::{SIGNIFICANT_VALUE_DIGITS, TrackerConfig};
10use crate::histogram::SlidingWindowHistogram;
11
12pub const DEFAULT_SUB_WINDOWS: usize = 10;
18
19pub struct LatencyTracker<
94 D,
95 I: clock::Instant = std::time::Instant,
96 H = foldhash::fast::RandomState,
97 const N: usize = DEFAULT_SUB_WINDOWS,
98> {
99 config: TrackerConfig,
100 histograms: HashMap<D, SlidingWindowHistogram<I, N>, H>,
101}
102
103impl<D, I> Default for LatencyTracker<D, I, foldhash::fast::RandomState, DEFAULT_SUB_WINDOWS>
104where
105 D: Hash + Eq + Clone,
106 I: clock::Instant,
107{
108 fn default() -> Self {
109 Self::new(TrackerConfig::default())
110 }
111}
112
113impl<D, I, const N: usize> LatencyTracker<D, I, foldhash::fast::FixedState, N>
114where
115 D: Hash + Eq + Clone,
116 I: clock::Instant,
117{
118 pub const fn const_new(config: TrackerConfig) -> Self {
120 Self::with_hasher_and_config(FixedState::with_seed(125322317734512), config)
121 }
122}
123
124impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
125where
126 D: Hash + Eq + Clone,
127 I: clock::Instant,
128 H: Default,
129{
130 pub fn new(config: TrackerConfig) -> Self {
132 Self {
133 config,
134 histograms: HashMap::default(),
135 }
136 }
137}
138
139impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
140where
141 D: Hash + Eq + Clone,
142 I: clock::Instant,
143 H: BuildHasher,
144{
145 pub const fn with_hasher_and_config(hasher: H, config: TrackerConfig) -> Self {
146 Self {
147 config,
148 histograms: HashMap::with_hasher(hasher),
149 }
150 }
151}
152
153impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
154where
155 D: Hash + Eq + Clone,
156 I: clock::Instant,
157 H: BuildHasher,
158{
159 #[inline]
172 pub fn record_latency_from<Q>(&mut self, dest: &Q, earlier: I, now: I) -> Duration
173 where
174 D: Borrow<Q>,
175 Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
176 {
177 let latency = now.duration_since(earlier);
178 self.record_latency_ms(dest, latency.as_millis() as u64, now);
179 latency
180 }
181
182 #[inline]
184 pub fn record_latency<Q>(&mut self, dest: &Q, latency: Duration, now: I)
185 where
186 D: Borrow<Q>,
187 Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
188 {
189 self.record_latency_ms(dest, latency.as_millis() as u64, now);
190 }
191
192 #[inline]
196 pub fn record_latency_ms<Q>(&mut self, dest: &Q, latency_ms: u64, now: I)
197 where
198 D: Borrow<Q>,
199 Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
200 {
201 if let Some(histogram) = self.histograms.get_mut(dest) {
202 histogram.record(latency_ms, now);
203 return;
204 }
205 self.record_latency_ms_cold(dest.to_owned(), latency_ms, now);
206 }
207
208 #[cold]
209 fn record_latency_ms_cold(&mut self, dest: D, latency_ms: u64, now: I) {
210 let mut histogram = self.new_histogram(now);
211 histogram.record(latency_ms, now);
212 self.histograms.insert(dest, histogram);
213 }
214
215 #[inline]
218 pub fn quantile_ms<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<u64>
219 where
220 D: Borrow<Q>,
221 Q: Hash + Eq + ?Sized,
222 {
223 let histogram = self.histograms.get_mut(dest)?;
224 histogram.quantile(quantile, self.config.min_samples as u64, now)
225 }
226
227 #[inline]
230 pub fn quantile<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<Duration>
231 where
232 D: Borrow<Q>,
233 Q: Hash + Eq + ?Sized,
234 {
235 self.quantile_ms(dest, quantile, now)
236 .map(Duration::from_millis)
237 }
238
239 pub fn clear(&mut self) {
241 self.histograms.clear();
242 }
243
244 #[inline]
246 pub fn config(&self) -> &TrackerConfig {
247 &self.config
248 }
249
250 fn new_histogram(&self, now: I) -> SlidingWindowHistogram<I, N> {
251 SlidingWindowHistogram::new(
252 self.config.window(),
253 SIGNIFICANT_VALUE_DIGITS,
254 self.config.max_trackable_latency_ms as u64,
255 now,
256 )
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use std::time::Instant;
263
264 use super::*;
265
266 type TestTracker = LatencyTracker<u32, Instant>;
267
268 fn make_tracker() -> TestTracker {
269 let config = TrackerConfig {
270 min_samples: 5,
271 ..TrackerConfig::default()
272 };
273 LatencyTracker::new(config)
274 }
275
276 #[test]
277 fn no_data_returns_none() {
278 let now = Instant::now();
279 let mut tracker = make_tracker();
280 assert_eq!(tracker.quantile(&1, 0.5, now), None);
281 }
282
283 #[test]
284 fn record_latency_directly() {
285 let now = Instant::now();
286 let mut tracker = make_tracker();
287
288 for _ in 0..10 {
289 tracker.record_latency(&1, Duration::from_millis(100), now);
290 }
291
292 let p50 = tracker.quantile(&1, 0.5, now).unwrap();
293 assert_eq!(p50, Duration::from_millis(100));
294 }
295
296 #[test]
297 fn record_latency_ms_directly() {
298 let now = Instant::now();
299 let mut tracker = make_tracker();
300
301 for _ in 0..10 {
302 tracker.record_latency_ms(&1, 100, now);
303 }
304
305 let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
306 assert_eq!(p50, 100);
307 }
308
309 #[test]
310 fn record_latency_from_computes_duration() {
311 let now = Instant::now();
312 let mut tracker = make_tracker();
313 let later = now + Duration::from_millis(42);
314
315 for _ in 0..10 {
316 let d = tracker.record_latency_from(&1, now, later);
317 assert_eq!(d, Duration::from_millis(42));
318 }
319
320 let p50 = tracker.quantile_ms(&1, 0.5, later).unwrap();
321 assert_eq!(p50, 42);
322 }
323
324 #[test]
325 fn per_destination_isolation() {
326 let now = Instant::now();
327 let mut tracker = make_tracker();
328
329 for _ in 0..10 {
330 tracker.record_latency(&1, Duration::from_millis(100), now);
331 tracker.record_latency(&2, Duration::from_millis(500), now);
332 }
333
334 let p1 = tracker.quantile(&1, 0.5, now).unwrap();
335 let p2 = tracker.quantile(&2, 0.5, now).unwrap();
336
337 assert_eq!(p1, Duration::from_millis(100));
338 assert!(
339 p2 >= Duration::from_millis(495) && p2 <= Duration::from_millis(505),
340 "p2 was {p2:?}"
341 );
342
343 assert_eq!(tracker.quantile(&3, 0.5, now), None);
344 }
345
346 #[test]
347 fn clear_resets_all_state() {
348 let now = Instant::now();
349 let mut tracker = make_tracker();
350
351 for _ in 0..10 {
352 tracker.record_latency(&1, Duration::from_millis(100), now);
353 }
354
355 tracker.clear();
356
357 assert_eq!(tracker.quantile(&1, 0.5, now), None);
358 }
359
360 #[test]
361 fn insufficient_samples_returns_none() {
362 let now = Instant::now();
363 let mut tracker = make_tracker(); for _ in 0..4 {
366 tracker.record_latency(&1, Duration::from_millis(100), now);
367 }
368
369 assert_eq!(tracker.quantile(&1, 0.5, now), None);
370
371 tracker.record_latency(&1, Duration::from_millis(100), now);
373 assert!(tracker.quantile(&1, 0.5, now).is_some());
374 }
375}