1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::Relaxed};
23use std::time::{Duration, Instant};
24
25use crate::{Stop, StopReason};
26
27const DEFAULT_TARGET_NANOS: u64 = 100_000;
33
34#[inline]
36fn duration_to_nanos(d: Duration) -> u64 {
37 d.as_nanos().min(u64::MAX as u128) as u64
38}
39
40pub struct DebouncedTimeout<T> {
81 inner: T,
82 created: Instant,
84 deadline_nanos: u64,
86 target_nanos: u64,
88
89 call_count: AtomicU32,
92 skip_mod: AtomicU32,
94 last_measured_nanos: AtomicU64,
96 last_measured_count: AtomicU32,
98}
99
100impl<T: Stop> DebouncedTimeout<T> {
101 #[inline]
106 pub fn new(inner: T, duration: Duration) -> Self {
107 let now = Instant::now();
108 Self {
109 inner,
110 created: now,
111 deadline_nanos: duration_to_nanos(duration),
112 target_nanos: DEFAULT_TARGET_NANOS,
113 call_count: AtomicU32::new(0),
114 skip_mod: AtomicU32::new(1),
115 last_measured_nanos: AtomicU64::new(0),
116 last_measured_count: AtomicU32::new(0),
117 }
118 }
119
120 #[inline]
125 pub fn with_deadline(inner: T, deadline: Instant) -> Self {
126 let now = Instant::now();
127 Self {
128 inner,
129 created: now,
130 deadline_nanos: duration_to_nanos(deadline.saturating_duration_since(now)),
131 target_nanos: DEFAULT_TARGET_NANOS,
132 call_count: AtomicU32::new(0),
133 skip_mod: AtomicU32::new(1),
134 last_measured_nanos: AtomicU64::new(0),
135 last_measured_count: AtomicU32::new(0),
136 }
137 }
138
139 #[inline]
147 pub fn with_target_interval(mut self, interval: Duration) -> Self {
148 self.target_nanos = duration_to_nanos(interval).max(1);
149 self
150 }
151
152 #[inline]
154 pub fn deadline(&self) -> Instant {
155 self.created + Duration::from_nanos(self.deadline_nanos)
156 }
157
158 #[inline]
162 pub fn remaining(&self) -> Duration {
163 self.deadline().saturating_duration_since(Instant::now())
164 }
165
166 #[inline]
168 pub fn inner(&self) -> &T {
169 &self.inner
170 }
171
172 #[inline]
174 pub fn into_inner(self) -> T {
175 self.inner
176 }
177
178 #[inline]
183 pub fn checks_per_clock_read(&self) -> u32 {
184 self.skip_mod.load(Relaxed)
185 }
186
187 #[cold]
189 #[inline(never)]
190 fn measure_and_recalibrate(&self, count: u32) -> bool {
191 let elapsed_nanos = self.created.elapsed().as_nanos() as u64;
192
193 if elapsed_nanos >= self.deadline_nanos {
194 return true; }
196
197 let prev_nanos = self.last_measured_nanos.swap(elapsed_nanos, Relaxed);
199 let prev_count = self.last_measured_count.swap(count, Relaxed);
200
201 let delta_nanos = elapsed_nanos.saturating_sub(prev_nanos);
202 let delta_calls = count.wrapping_sub(prev_count) as u64;
203
204 if delta_calls > 0 && delta_nanos > 0 {
205 let nanos_per_call = delta_nanos / delta_calls;
206 if nanos_per_call > 0 {
207 let ideal_skip =
208 (self.target_nanos / nanos_per_call).clamp(1, u32::MAX as u64) as u32;
209 let current_skip = self.skip_mod.load(Relaxed);
210
211 let new_skip = if ideal_skip <= current_skip {
212 ideal_skip
214 } else {
215 current_skip
217 .saturating_add((ideal_skip - current_skip) / 8)
218 .max(1)
219 };
220
221 self.skip_mod.store(new_skip, Relaxed);
222 }
223 }
224
225 false }
227}
228
229impl<T: Stop> Stop for DebouncedTimeout<T> {
230 #[inline]
231 fn check(&self) -> Result<(), StopReason> {
232 self.inner.check()?;
234
235 let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
237 let skip = self.skip_mod.load(Relaxed);
238
239 if !count.is_multiple_of(skip) {
241 return Ok(());
242 }
243
244 if self.measure_and_recalibrate(count) {
246 Err(StopReason::TimedOut)
247 } else {
248 Ok(())
249 }
250 }
251
252 #[inline]
253 fn should_stop(&self) -> bool {
254 if self.inner.should_stop() {
255 return true;
256 }
257
258 let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
259 let skip = self.skip_mod.load(Relaxed);
260
261 if !count.is_multiple_of(skip) {
262 return false;
263 }
264
265 self.measure_and_recalibrate(count)
266 }
267}
268
269impl<T: Stop> DebouncedTimeout<T> {
270 #[inline]
275 pub fn tighten(self, duration: Duration) -> Self {
276 let elapsed = duration_to_nanos(Instant::now().saturating_duration_since(self.created));
277 let new_deadline_nanos = elapsed.saturating_add(duration_to_nanos(duration));
278 let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
279 Self {
280 inner: self.inner,
281 created: self.created,
282 deadline_nanos,
283 target_nanos: self.target_nanos,
284 call_count: AtomicU32::new(0),
285 skip_mod: AtomicU32::new(1),
286 last_measured_nanos: AtomicU64::new(0),
287 last_measured_count: AtomicU32::new(0),
288 }
289 }
290
291 #[inline]
296 pub fn tighten_deadline(self, deadline: Instant) -> Self {
297 let new_deadline_nanos =
298 duration_to_nanos(deadline.saturating_duration_since(self.created));
299 let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
300 Self {
301 inner: self.inner,
302 created: self.created,
303 deadline_nanos,
304 target_nanos: self.target_nanos,
305 call_count: AtomicU32::new(0),
306 skip_mod: AtomicU32::new(1),
307 last_measured_nanos: AtomicU64::new(0),
308 last_measured_count: AtomicU32::new(0),
309 }
310 }
311}
312
313impl<T: Clone + Stop> Clone for DebouncedTimeout<T> {
314 fn clone(&self) -> Self {
316 Self {
317 inner: self.inner.clone(),
318 created: self.created,
319 deadline_nanos: self.deadline_nanos,
320 target_nanos: self.target_nanos,
321 call_count: AtomicU32::new(0),
322 skip_mod: AtomicU32::new(1),
323 last_measured_nanos: AtomicU64::new(0),
324 last_measured_count: AtomicU32::new(0),
325 }
326 }
327}
328
329impl<T: core::fmt::Debug> core::fmt::Debug for DebouncedTimeout<T> {
330 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
331 let deadline = self.created + Duration::from_nanos(self.deadline_nanos);
332 f.debug_struct("DebouncedTimeout")
333 .field("inner", &self.inner)
334 .field("deadline", &deadline)
335 .field("target_interval_us", &(self.target_nanos / 1_000))
336 .field("skip_mod", &self.skip_mod.load(Relaxed))
337 .finish()
338 }
339}
340
341pub trait DebouncedTimeoutExt: Stop + Sized {
346 #[inline]
353 fn with_debounced_timeout(self, duration: Duration) -> DebouncedTimeout<Self> {
354 DebouncedTimeout::new(self, duration)
355 }
356
357 #[inline]
359 fn with_debounced_deadline(self, deadline: Instant) -> DebouncedTimeout<Self> {
360 DebouncedTimeout::with_deadline(self, deadline)
361 }
362}
363
364impl<T: Stop> DebouncedTimeoutExt for T {}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::StopSource;
370
371 #[test]
372 fn basic_timeout() {
373 let source = StopSource::new();
374 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(50));
375
376 assert!(!stop.should_stop());
377 assert!(stop.check().is_ok());
378
379 std::thread::sleep(Duration::from_millis(80));
380
381 for _ in 0..100 {
383 if stop.should_stop() {
384 return; }
386 }
387 panic!("should have detected timeout");
388 }
389
390 #[test]
391 fn cancel_before_timeout() {
392 let source = StopSource::new();
393 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
394
395 source.cancel();
396
397 assert!(stop.should_stop());
399 assert_eq!(stop.check(), Err(StopReason::Cancelled));
400 }
401
402 #[test]
403 fn calibration_ramps_up() {
404 let source = StopSource::new();
405 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
406
407 assert_eq!(stop.checks_per_clock_read(), 1);
409
410 for _ in 0..10_000 {
412 let _ = stop.check();
413 }
414
415 assert!(
417 stop.checks_per_clock_read() > 1,
418 "skip_mod should have increased, got {}",
419 stop.checks_per_clock_read()
420 );
421 }
422
423 #[test]
424 fn remaining_accuracy() {
425 let source = StopSource::new();
426 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
427
428 let remaining = stop.remaining();
429 assert!(remaining > Duration::from_secs(9));
430 assert!(remaining <= Duration::from_secs(10));
431 }
432
433 #[test]
434 fn tighten_works() {
435 let source = StopSource::new();
436 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
437 .tighten(Duration::from_secs(1));
438
439 let remaining = stop.remaining();
440 assert!(remaining < Duration::from_secs(2));
441 }
442
443 #[test]
444 fn clone_resets_calibration() {
445 let source = StopSource::new();
446 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
447
448 for _ in 0..10_000 {
450 let _ = stop.check();
451 }
452 assert!(stop.checks_per_clock_read() > 1);
453
454 let cloned = stop.clone();
456 assert_eq!(cloned.checks_per_clock_read(), 1);
457 }
458
459 #[test]
460 fn extension_trait() {
461 use super::DebouncedTimeoutExt;
462 let source = StopSource::new();
463 let stop = source
464 .as_ref()
465 .with_debounced_timeout(Duration::from_secs(10));
466 assert!(!stop.should_stop());
467 }
468
469 #[test]
470 fn is_send_sync() {
471 fn assert_send_sync<T: Send + Sync>() {}
472 assert_send_sync::<DebouncedTimeout<crate::StopRef<'_>>>();
473 }
474
475 #[test]
476 fn zero_duration_immediate_timeout() {
477 let source = StopSource::new();
478 let stop = DebouncedTimeout::new(source.as_ref(), Duration::ZERO);
479
480 assert_eq!(stop.check(), Err(StopReason::TimedOut));
482 }
483
484 #[test]
485 fn deadline_in_the_past() {
486 let source = StopSource::new();
487 let past = Instant::now() - Duration::from_secs(1);
488 let stop = DebouncedTimeout::with_deadline(source.as_ref(), past);
489
490 assert_eq!(stop.check(), Err(StopReason::TimedOut));
492 }
493
494 #[test]
495 fn with_deadline_basic() {
496 let source = StopSource::new();
497 let deadline = Instant::now() + Duration::from_millis(100);
498 let stop = DebouncedTimeout::with_deadline(source.as_ref(), deadline);
499
500 assert!(!stop.should_stop());
501
502 std::thread::sleep(Duration::from_millis(150));
503
504 for _ in 0..100 {
506 if stop.should_stop() {
507 return;
508 }
509 }
510 panic!("should have detected timeout");
511 }
512
513 #[test]
514 fn deadline_accessor() {
515 let source = StopSource::new();
516 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
517
518 let deadline = stop.deadline();
519 let remaining = stop.remaining();
520 assert!(remaining > Duration::from_secs(9));
521 assert!(remaining <= Duration::from_secs(10));
522 assert!(deadline > Instant::now() + Duration::from_secs(9));
524 }
525
526 #[test]
527 fn inner_access() {
528 let source = StopSource::new();
529 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
530
531 assert!(!stop.inner().should_stop());
532
533 source.cancel();
534
535 assert!(stop.inner().should_stop());
536 }
537
538 #[test]
539 fn into_inner_works() {
540 let source = StopSource::new();
541 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
542
543 let inner = stop.into_inner();
544 assert!(!inner.should_stop());
545 }
546
547 #[test]
548 fn tighten_deadline_works() {
549 let source = StopSource::new();
550 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
551 .tighten_deadline(Instant::now() + Duration::from_secs(1));
552
553 let remaining = stop.remaining();
554 assert!(remaining < Duration::from_secs(2));
555 }
556
557 #[test]
558 fn tighten_does_not_loosen() {
559 let source = StopSource::new();
560 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(1))
561 .tighten(Duration::from_secs(60));
562
563 let remaining = stop.remaining();
565 assert!(remaining < Duration::from_secs(2));
566 }
567
568 #[test]
569 fn debug_format() {
570 let source = StopSource::new();
571 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
572 let debug = format!("{stop:?}");
573 assert!(debug.contains("DebouncedTimeout"));
574 assert!(debug.contains("skip_mod"));
575 assert!(debug.contains("target_interval_us"));
576 }
577
578 #[test]
579 fn with_target_interval_zero_clamps_to_one() {
580 let source = StopSource::new();
581 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
582 .with_target_interval(Duration::ZERO);
583
584 assert!(stop.check().is_ok());
586 }
587
588 #[test]
589 fn with_debounced_deadline_ext() {
590 let source = StopSource::new();
591 let deadline = Instant::now() + Duration::from_secs(10);
592 let stop = source.as_ref().with_debounced_deadline(deadline);
593
594 assert!(!stop.should_stop());
595 assert!(stop.remaining() > Duration::from_secs(9));
596 }
597
598 #[test]
599 fn check_and_should_stop_agree() {
600 let source = StopSource::new();
601 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
602
603 for _ in 0..1000 {
605 assert!(!stop.should_stop());
606 assert!(stop.check().is_ok());
607 }
608
609 source.cancel();
610
611 assert!(stop.should_stop());
613 assert_eq!(stop.check(), Err(StopReason::Cancelled));
614 }
615
616 #[test]
617 fn remaining_after_expiry_is_zero() {
618 let source = StopSource::new();
619 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(1));
620
621 std::thread::sleep(Duration::from_millis(10));
622
623 assert_eq!(stop.remaining(), Duration::ZERO);
624 }
625
626 #[test]
627 fn adapts_to_slowdown() {
628 let source = StopSource::new();
629 let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
631 .with_target_interval(Duration::from_micros(10));
632
633 for _ in 0..50_000 {
635 let _ = stop.check();
636 }
637 let fast_skip = stop.checks_per_clock_read();
638 assert!(fast_skip > 1, "should have ramped up, got {fast_skip}");
639
640 for _ in 0..(fast_skip as usize + 100) {
643 std::thread::sleep(Duration::from_micros(50));
644 let _ = stop.check();
645 }
646
647 let slow_skip = stop.checks_per_clock_read();
648 assert!(
649 slow_skip < fast_skip,
650 "should have reduced skip_mod from {fast_skip} to less, got {slow_skip}"
651 );
652 }
653}