1use std::hash::{BuildHasher, Hash};
2use std::time::Duration;
3
4use crate::clock;
5use crate::config::TimeoutConfig;
6use crate::tracker::LatencyTracker;
7
8#[derive(Default, Clone)]
31pub struct AdaptiveTimeout {
32 config: TimeoutConfig,
33}
34
35impl AdaptiveTimeout {
36 pub fn new(config: TimeoutConfig) -> Self {
38 Self { config }
39 }
40
41 #[inline]
47 pub fn select_timeout<'a, D, I, H, const N: usize>(
48 &self,
49 tracker: &mut LatencyTracker<D, I, H, N>,
50 destinations: impl IntoIterator<Item = &'a D>,
51 attempt: u32,
52 now: I,
53 ) -> Duration
54 where
55 D: Hash + Eq + Clone + 'a,
56 I: clock::Instant,
57 H: BuildHasher,
58 {
59 Duration::from_millis(self.select_timeout_ms(tracker, destinations, attempt, now))
60 }
61
62 pub fn select_timeout_ms<'a, D, I, H, const N: usize>(
65 &self,
66 tracker: &mut LatencyTracker<D, I, H, N>,
67 destinations: impl IntoIterator<Item = &'a D>,
68 attempt: u32,
69 now: I,
70 ) -> u64
71 where
72 D: Hash + Eq + Clone + 'a,
73 I: clock::Instant,
74 H: BuildHasher,
75 {
76 let multiplier = Self::attempt_multiplier(attempt);
77 let floor = self.config.backoff.min_ms.get() as u64;
78 let ceiling = self.config.backoff.max_ms.get() as u64;
79 let fallback = (floor * multiplier).min(ceiling);
80 let mut selected = fallback;
81
82 for dest in destinations.into_iter() {
83 if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
84 let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
85 let clamped = adaptive_ms.max(floor).min(ceiling);
86 selected = selected.max(clamped);
87 }
88 }
89
90 selected
91 }
92
93 #[inline]
96 pub fn exponential_backoff(&self, attempt: u32) -> Duration {
97 Duration::from_millis(self.exponential_backoff_ms(attempt))
98 }
99
100 #[inline]
102 pub fn exponential_backoff_ms(&self, attempt: u32) -> u64 {
103 let multiplier = Self::attempt_multiplier(attempt);
104 let base = self.config.backoff.min_ms.get() as u64;
105 let ceiling = self.config.backoff.max_ms.get() as u64;
106 (base * multiplier).min(ceiling)
107 }
108
109 #[inline]
111 fn attempt_multiplier(attempt: u32) -> u64 {
112 let exponent = attempt.saturating_sub(1).min(20);
113 1u64 << exponent
114 }
115
116 #[inline]
118 fn compute_adaptive_ms(&self, estimate_ms: u64, multiplier: u64) -> u64 {
119 let base = estimate_ms.saturating_mul(multiplier);
120 (self.config.safety_factor * base as f64) as u64
121 }
122
123 #[inline]
125 pub fn config(&self) -> &TimeoutConfig {
126 &self.config
127 }
128
129 #[cfg(feature = "sync")]
138 #[inline]
139 pub fn select_timeout_sync<'a, D, I, H, const N: usize>(
140 &self,
141 tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
142 destinations: impl IntoIterator<Item = &'a D>,
143 attempt: u32,
144 now: I,
145 ) -> Duration
146 where
147 D: Hash + Eq + Clone + Send + Sync + 'a,
148 I: clock::Instant,
149 H: BuildHasher + Clone,
150 {
151 Duration::from_millis(self.select_timeout_sync_ms(tracker, destinations, attempt, now))
152 }
153
154 #[cfg(feature = "sync")]
157 pub fn select_timeout_sync_ms<'a, D, I, H, const N: usize>(
158 &self,
159 tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
160 destinations: impl IntoIterator<Item = &'a D>,
161 attempt: u32,
162 now: I,
163 ) -> u64
164 where
165 D: Hash + Eq + Clone + Send + Sync + 'a,
166 I: clock::Instant,
167 H: BuildHasher + Clone,
168 {
169 let multiplier = Self::attempt_multiplier(attempt);
170 let floor = self.config.backoff.min_ms.get() as u64;
171 let ceiling = self.config.backoff.max_ms.get() as u64;
172 let fallback = (floor * multiplier).min(ceiling);
173 let mut selected = fallback;
174
175 for dest in destinations.into_iter() {
176 if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
177 let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
178 let clamped = adaptive_ms.max(floor).min(ceiling);
179 selected = selected.max(clamped);
180 }
181 }
182
183 selected
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::time::Instant;
190
191 use super::*;
192 use crate::config::TrackerConfig;
193 use crate::parse::BackoffInterval;
194
195 fn make_tracker_and_timeout<I: clock::Instant>() -> (LatencyTracker<u32, I>, AdaptiveTimeout) {
196 let tracker_config = TrackerConfig {
197 min_samples: 5,
198 ..TrackerConfig::default()
199 };
200 let timeout_config = TimeoutConfig {
201 backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
202 quantile: 0.99,
203 safety_factor: 2.0,
204 };
205 (
206 LatencyTracker::new(tracker_config),
207 AdaptiveTimeout::new(timeout_config),
208 )
209 }
210
211 #[test]
212 fn fallback_exponential_backoff_no_data() {
213 let now = Instant::now();
214 let (mut tracker, timeout) = make_tracker_and_timeout();
215
216 let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
217 assert_eq!(t1, Duration::from_millis(10));
218
219 let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
220 assert_eq!(t2, Duration::from_millis(20));
221
222 let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
223 assert_eq!(t3, Duration::from_millis(40));
224 }
225
226 #[test]
227 fn exponential_backoff_capped_at_max() {
228 let now = Instant::now();
229 let (mut tracker, timeout) = make_tracker_and_timeout();
230
231 let t = timeout.select_timeout(&mut tracker, &[1u32], 100, now);
232 assert_eq!(t, Duration::from_secs(60));
233 }
234
235 #[test]
236 fn adaptive_timeout_with_data() {
237 let now = Instant::now();
238 let (mut tracker, timeout) = make_tracker_and_timeout();
239
240 for _ in 0..100 {
241 tracker.record_latency(&1u32, Duration::from_millis(50), now);
242 }
243
244 let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
246 assert_eq!(t, Duration::from_millis(100));
247 }
248
249 #[test]
250 fn adaptive_timeout_respects_floor() {
251 let now = Instant::now();
252 let (mut tracker, timeout) = make_tracker_and_timeout();
253
254 for _ in 0..100 {
255 tracker.record_latency(&1u32, Duration::from_millis(1), now);
256 }
257
258 let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
259 assert_eq!(t, Duration::from_millis(10));
260 }
261
262 #[test]
263 fn adaptive_timeout_respects_ceiling() {
264 let now = Instant::now();
265 let (mut tracker, timeout) = make_tracker_and_timeout();
266
267 for _ in 0..100 {
268 tracker.record_latency(&1u32, Duration::from_millis(50_000), now);
269 }
270
271 let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
272 assert_eq!(t, Duration::from_secs(60));
273 }
274
275 #[test]
276 fn max_across_destinations() {
277 let now = Instant::now();
278 let (mut tracker, timeout) = make_tracker_and_timeout();
279
280 for _ in 0..100 {
281 tracker.record_latency(&1u32, Duration::from_millis(10), now);
282 tracker.record_latency(&2u32, Duration::from_millis(500), now);
283 }
284
285 let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
286 assert!(
287 t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
288 "timeout was {t:?}"
289 );
290 }
291
292 #[test]
293 fn attempt_multiplier_increases_timeout() {
294 let now = Instant::now();
295 let (mut tracker, timeout) = make_tracker_and_timeout();
296
297 for _ in 0..100 {
298 tracker.record_latency(&1u32, Duration::from_millis(50), now);
299 }
300
301 let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
302 let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
303 let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
304
305 assert_eq!(t1, Duration::from_millis(100));
306 assert_eq!(t2, Duration::from_millis(200));
307 assert_eq!(t3, Duration::from_millis(400));
308 }
309
310 #[test]
311 fn mixed_data_and_no_data_destinations() {
312 let now = Instant::now();
313 let (mut tracker, timeout) = make_tracker_and_timeout();
314
315 for _ in 0..100 {
316 tracker.record_latency(&1u32, Duration::from_millis(50), now);
317 }
318
319 let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
320 assert_eq!(t, Duration::from_millis(100));
321 }
322
323 #[test]
324 fn ms_variants_match_duration_variants() {
325 let now = Instant::now();
326 let (mut tracker, timeout) = make_tracker_and_timeout();
327
328 for _ in 0..100 {
329 tracker.record_latency(&1u32, Duration::from_millis(50), now);
330 }
331
332 let dur = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
333 let ms = timeout.select_timeout_ms(&mut tracker, &[1u32], 1, now);
334 assert_eq!(dur, Duration::from_millis(ms));
335
336 let dur_fb = timeout.exponential_backoff(3);
337 let ms_fb = timeout.exponential_backoff_ms(3);
338 assert_eq!(dur_fb, Duration::from_millis(ms_fb));
339 }
340
341 #[cfg(feature = "sync")]
346 mod sync_tests {
347 use std::time::{Duration, Instant};
348
349 use crate::config::{TimeoutConfig, TrackerConfig};
350 use crate::parse::BackoffInterval;
351 use crate::sync_tracker::SyncLatencyTracker;
352 use crate::timeout::AdaptiveTimeout;
353
354 fn make_sync_tracker_and_timeout() -> (SyncLatencyTracker<u32>, AdaptiveTimeout) {
355 let tracker_config = TrackerConfig {
356 min_samples: 5,
357 ..TrackerConfig::default()
358 };
359 let timeout_config = TimeoutConfig {
360 backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
361 quantile: 0.99,
362 safety_factor: 2.0,
363 };
364 (
365 SyncLatencyTracker::new(tracker_config),
366 AdaptiveTimeout::new(timeout_config),
367 )
368 }
369
370 #[test]
371 fn sync_fallback_exponential_backoff_no_data() {
372 let now = Instant::now();
373 let (tracker, timeout) = make_sync_tracker_and_timeout();
374
375 let t1 = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
376 assert_eq!(t1, Duration::from_millis(10));
377
378 let t2 = timeout.select_timeout_sync(&tracker, &[1u32], 2, now);
379 assert_eq!(t2, Duration::from_millis(20));
380
381 let t3 = timeout.select_timeout_sync(&tracker, &[1u32], 3, now);
382 assert_eq!(t3, Duration::from_millis(40));
383 }
384
385 #[test]
386 fn sync_adaptive_timeout_with_data() {
387 let now = Instant::now();
388 let (tracker, timeout) = make_sync_tracker_and_timeout();
389
390 for _ in 0..100 {
391 tracker.record_latency(&1u32, Duration::from_millis(50), now);
392 }
393
394 let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
396 assert_eq!(t, Duration::from_millis(100));
397 }
398
399 #[test]
400 fn sync_respects_floor_and_ceiling() {
401 let now = Instant::now();
402 let (tracker, timeout) = make_sync_tracker_and_timeout();
403
404 for _ in 0..100 {
406 tracker.record_latency(&1u32, Duration::from_millis(1), now);
407 }
408 let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
409 assert_eq!(t, Duration::from_millis(10));
410
411 for _ in 0..100 {
413 tracker.record_latency(&2u32, Duration::from_millis(50_000), now);
414 }
415 let t = timeout.select_timeout_sync(&tracker, &[2u32], 1, now);
416 assert_eq!(t, Duration::from_secs(60));
417 }
418
419 #[test]
420 fn sync_max_across_destinations() {
421 let now = Instant::now();
422 let (tracker, timeout) = make_sync_tracker_and_timeout();
423
424 for _ in 0..100 {
425 tracker.record_latency(&1u32, Duration::from_millis(10), now);
426 tracker.record_latency(&2u32, Duration::from_millis(500), now);
427 }
428
429 let t = timeout.select_timeout_sync(&tracker, &[1u32, 2u32], 1, now);
430 assert!(
431 t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
432 "timeout was {t:?}"
433 );
434 }
435
436 #[test]
437 fn sync_ms_variants_match_duration_variants() {
438 let now = Instant::now();
439 let (tracker, timeout) = make_sync_tracker_and_timeout();
440
441 for _ in 0..100 {
442 tracker.record_latency(&1u32, Duration::from_millis(50), now);
443 }
444
445 let dur = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
446 let ms = timeout.select_timeout_sync_ms(&tracker, &[1u32], 1, now);
447 assert_eq!(dur, Duration::from_millis(ms));
448 }
449
450 #[test]
451 fn sync_matches_mutable_tracker_results() {
452 use crate::tracker::LatencyTracker;
453
454 let now = Instant::now();
455 let tracker_config = TrackerConfig {
456 min_samples: 5,
457 ..TrackerConfig::default()
458 };
459 let timeout_config = TimeoutConfig {
460 backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
461 quantile: 0.99,
462 safety_factor: 2.0,
463 };
464
465 let mut mutable_tracker = LatencyTracker::<u32, Instant>::new(tracker_config);
466 let sync_tracker = SyncLatencyTracker::<u32>::new(tracker_config);
467 let timeout = AdaptiveTimeout::new(timeout_config);
468
469 for _ in 0..100 {
471 mutable_tracker.record_latency(&1u32, Duration::from_millis(50), now);
472 sync_tracker.record_latency(&1u32, Duration::from_millis(50), now);
473 }
474
475 let ms_mut = timeout.select_timeout_ms(&mut mutable_tracker, &[1u32], 1, now);
476 let ms_sync = timeout.select_timeout_sync_ms(&sync_tracker, &[1u32], 1, now);
477 assert_eq!(ms_mut, ms_sync);
478 }
479 }
480}