Skip to main content

almost_enough/time/
debounced.rs

1//! Debounced timeout that skips most `Instant::now()` calls.
2//!
3//! [`DebouncedTimeout`] wraps any [`Stop`] and adds deadline-based cancellation,
4//! like [`WithTimeout`]. The key difference: it learns how fast `check()` is
5//! being called and skips the expensive clock read on most calls.
6//!
7//! This matters for codecs and libraries where the caller controls the `Stop`
8//! implementation but not the check frequency, and vice versa. A library
9//! calling `stop.check()` every 4KB row can't know whether the caller passed a
10//! `WithTimeout` (which calls `Instant::now()` every check) or a plain
11//! `Stopper`. `DebouncedTimeout` lets callers add deadlines without imposing
12//! a hidden ~17ns-per-check tax on the library's hot path.
13//!
14//! # Deadline precision
15//!
16//! The maximum overshoot equals the target interval (default 100μs). For
17//! timeouts measured in seconds or minutes this is negligible. If you need
18//! sub-100μs precision, either lower the target with
19//! [`with_target_interval`](DebouncedTimeout::with_target_interval) or use
20//! [`WithTimeout`] directly.
21
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::Relaxed};
23use std::time::{Duration, Instant};
24
25use crate::{Stop, StopReason};
26
27/// Default target interval between clock reads: 100μs (0.1ms).
28///
29/// This means we aim to call `Instant::now()` roughly every 100 microseconds,
30/// regardless of how fast `check()` is called. At 10ns per call, that's one
31/// clock read per ~10,000 checks.
32const DEFAULT_TARGET_NANOS: u64 = 100_000;
33
34/// Convert a Duration to nanoseconds as u64, clamping at u64::MAX.
35#[inline]
36fn duration_to_nanos(d: Duration) -> u64 {
37    d.as_nanos().min(u64::MAX as u128) as u64
38}
39
40/// A [`Stop`] wrapper that debounces the `Instant::now()` call.
41///
42/// After a brief calibration phase, `check()` only reads the clock every
43/// N calls, where N is chosen so clock reads happen approximately once
44/// per [`target_interval`](DebouncedTimeout::with_target_interval).
45///
46/// **Adaptation behavior:**
47/// - If calls slow down (longer between checks), immediately increases
48///   check frequency to avoid missing the deadline.
49/// - If calls speed up (shorter between checks), gradually decreases
50///   check frequency to avoid over-checking.
51///
52/// # When to Use
53///
54/// Prefer `DebouncedTimeout` over [`WithTimeout`](super::WithTimeout) when
55/// the `Stop` implementation crosses an API boundary — the caller adds the
56/// deadline, but the library controls check frequency. This avoids coupling
57/// timeout overhead to the library's internal loop structure.
58///
59/// In tight loops (sub-microsecond between checks), the savings are ~10x.
60/// In codec workloads (~4KB between checks), both types are equivalent
61/// because the real work dwarfs the clock read.
62///
63/// # Example
64///
65/// ```rust
66/// use almost_enough::{StopSource, Stop};
67/// use almost_enough::time::DebouncedTimeout;
68/// use std::time::Duration;
69///
70/// let source = StopSource::new();
71/// let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(100));
72///
73/// // Fast loop — most check() calls skip the clock read
74/// let mut i = 0u64;
75/// while !stop.should_stop() {
76///     i += 1;
77///     if i > 1_000_000 { break; }
78/// }
79/// ```
80pub struct DebouncedTimeout<T> {
81    inner: T,
82    /// Instant when this timeout was created (reference point for nanos math).
83    created: Instant,
84    /// Deadline as nanoseconds since `created`.
85    deadline_nanos: u64,
86    /// Target interval between clock reads, in nanoseconds.
87    target_nanos: u64,
88
89    // ── Mutable state (atomics for Send+Sync) ──────────────────────
90    /// Monotonic call counter (wraps at u32::MAX, which is fine).
91    call_count: AtomicU32,
92    /// Check the clock when `call_count % skip_mod == 0`. Minimum 1.
93    skip_mod: AtomicU32,
94    /// Nanoseconds since `created` at the last clock read.
95    last_measured_nanos: AtomicU64,
96    /// `call_count` value at the last clock read.
97    last_measured_count: AtomicU32,
98}
99
100impl<T: Stop> DebouncedTimeout<T> {
101    /// Create a new debounced timeout with the default target interval (100μs).
102    ///
103    /// The deadline is calculated as `Instant::now() + duration`.
104    /// Durations longer than ~584 years are clamped to `u64::MAX` nanoseconds.
105    #[inline]
106    pub fn new(inner: T, duration: Duration) -> Self {
107        let now = Instant::now();
108        Self {
109            inner,
110            created: now,
111            deadline_nanos: duration_to_nanos(duration),
112            target_nanos: DEFAULT_TARGET_NANOS,
113            call_count: AtomicU32::new(0),
114            skip_mod: AtomicU32::new(1),
115            last_measured_nanos: AtomicU64::new(0),
116            last_measured_count: AtomicU32::new(0),
117        }
118    }
119
120    /// Create a debounced timeout with an absolute deadline.
121    ///
122    /// If the deadline is in the past, the first clock read will trigger
123    /// [`StopReason::TimedOut`].
124    #[inline]
125    pub fn with_deadline(inner: T, deadline: Instant) -> Self {
126        let now = Instant::now();
127        Self {
128            inner,
129            created: now,
130            deadline_nanos: duration_to_nanos(deadline.saturating_duration_since(now)),
131            target_nanos: DEFAULT_TARGET_NANOS,
132            call_count: AtomicU32::new(0),
133            skip_mod: AtomicU32::new(1),
134            last_measured_nanos: AtomicU64::new(0),
135            last_measured_count: AtomicU32::new(0),
136        }
137    }
138
139    /// Set the target interval between clock reads.
140    ///
141    /// Smaller values check the clock more often (more responsive but more
142    /// overhead). Larger values check less often (less overhead but may
143    /// overshoot the deadline by up to this amount).
144    ///
145    /// Default: 100μs (0.1ms).
146    #[inline]
147    pub fn with_target_interval(mut self, interval: Duration) -> Self {
148        self.target_nanos = duration_to_nanos(interval).max(1);
149        self
150    }
151
152    /// Get the deadline as an `Instant`.
153    #[inline]
154    pub fn deadline(&self) -> Instant {
155        self.created + Duration::from_nanos(self.deadline_nanos)
156    }
157
158    /// Get the remaining time until deadline.
159    ///
160    /// Returns `Duration::ZERO` if the deadline has passed.
161    #[inline]
162    pub fn remaining(&self) -> Duration {
163        self.deadline().saturating_duration_since(Instant::now())
164    }
165
166    /// Get a reference to the inner stop.
167    #[inline]
168    pub fn inner(&self) -> &T {
169        &self.inner
170    }
171
172    /// Unwrap and return the inner stop.
173    #[inline]
174    pub fn into_inner(self) -> T {
175        self.inner
176    }
177
178    /// Current number of `check()` calls between clock reads.
179    ///
180    /// Starts at 1 (every call) and adapts upward as the call rate is measured.
181    /// Useful for diagnostics and testing.
182    #[inline]
183    pub fn checks_per_clock_read(&self) -> u32 {
184        self.skip_mod.load(Relaxed)
185    }
186
187    /// The cold path: read the clock, check the deadline, recalibrate.
188    #[cold]
189    #[inline(never)]
190    fn measure_and_recalibrate(&self, count: u32) -> bool {
191        let elapsed_nanos = self.created.elapsed().as_nanos() as u64;
192
193        if elapsed_nanos >= self.deadline_nanos {
194            return true; // timed out
195        }
196
197        // Recalibrate skip_mod based on observed call rate.
198        let prev_nanos = self.last_measured_nanos.swap(elapsed_nanos, Relaxed);
199        let prev_count = self.last_measured_count.swap(count, Relaxed);
200
201        let delta_nanos = elapsed_nanos.saturating_sub(prev_nanos);
202        let delta_calls = count.wrapping_sub(prev_count) as u64;
203
204        if delta_calls > 0 && delta_nanos > 0 {
205            let nanos_per_call = delta_nanos / delta_calls;
206            if nanos_per_call > 0 {
207                let ideal_skip =
208                    (self.target_nanos / nanos_per_call).clamp(1, u32::MAX as u64) as u32;
209                let current_skip = self.skip_mod.load(Relaxed);
210
211                let new_skip = if ideal_skip <= current_skip {
212                    // Calls are slower → need to check more often → adapt immediately
213                    ideal_skip
214                } else {
215                    // Calls are faster → can check less often → adapt slowly (1/8 step)
216                    current_skip
217                        .saturating_add((ideal_skip - current_skip) / 8)
218                        .max(1)
219                };
220
221                self.skip_mod.store(new_skip, Relaxed);
222            }
223        }
224
225        false // not timed out
226    }
227}
228
229impl<T: Stop> Stop for DebouncedTimeout<T> {
230    #[inline]
231    fn check(&self) -> Result<(), StopReason> {
232        // Always check the inner stop (typically a single atomic load).
233        self.inner.check()?;
234
235        // Increment call counter and decide whether to read the clock.
236        let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
237        let skip = self.skip_mod.load(Relaxed);
238
239        // Hot path: skip the clock read.
240        if !count.is_multiple_of(skip) {
241            return Ok(());
242        }
243
244        // Cold path: read clock, check deadline, recalibrate.
245        if self.measure_and_recalibrate(count) {
246            Err(StopReason::TimedOut)
247        } else {
248            Ok(())
249        }
250    }
251
252    #[inline]
253    fn should_stop(&self) -> bool {
254        if self.inner.should_stop() {
255            return true;
256        }
257
258        let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
259        let skip = self.skip_mod.load(Relaxed);
260
261        if !count.is_multiple_of(skip) {
262            return false;
263        }
264
265        self.measure_and_recalibrate(count)
266    }
267}
268
269impl<T: Stop> DebouncedTimeout<T> {
270    /// Add another timeout, taking the tighter of the two deadlines.
271    ///
272    /// Resets calibration state since the new deadline may require
273    /// a different check frequency.
274    #[inline]
275    pub fn tighten(self, duration: Duration) -> Self {
276        let elapsed = duration_to_nanos(Instant::now().saturating_duration_since(self.created));
277        let new_deadline_nanos = elapsed.saturating_add(duration_to_nanos(duration));
278        let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
279        Self {
280            inner: self.inner,
281            created: self.created,
282            deadline_nanos,
283            target_nanos: self.target_nanos,
284            call_count: AtomicU32::new(0),
285            skip_mod: AtomicU32::new(1),
286            last_measured_nanos: AtomicU64::new(0),
287            last_measured_count: AtomicU32::new(0),
288        }
289    }
290
291    /// Add another deadline, taking the earlier of the two.
292    ///
293    /// Resets calibration state since the new deadline may require
294    /// a different check frequency.
295    #[inline]
296    pub fn tighten_deadline(self, deadline: Instant) -> Self {
297        let new_deadline_nanos =
298            duration_to_nanos(deadline.saturating_duration_since(self.created));
299        let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
300        Self {
301            inner: self.inner,
302            created: self.created,
303            deadline_nanos,
304            target_nanos: self.target_nanos,
305            call_count: AtomicU32::new(0),
306            skip_mod: AtomicU32::new(1),
307            last_measured_nanos: AtomicU64::new(0),
308            last_measured_count: AtomicU32::new(0),
309        }
310    }
311}
312
313impl<T: Clone + Stop> Clone for DebouncedTimeout<T> {
314    /// Clone resets calibration state — each clone starts fresh.
315    fn clone(&self) -> Self {
316        Self {
317            inner: self.inner.clone(),
318            created: self.created,
319            deadline_nanos: self.deadline_nanos,
320            target_nanos: self.target_nanos,
321            call_count: AtomicU32::new(0),
322            skip_mod: AtomicU32::new(1),
323            last_measured_nanos: AtomicU64::new(0),
324            last_measured_count: AtomicU32::new(0),
325        }
326    }
327}
328
329impl<T: core::fmt::Debug> core::fmt::Debug for DebouncedTimeout<T> {
330    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
331        let deadline = self.created + Duration::from_nanos(self.deadline_nanos);
332        f.debug_struct("DebouncedTimeout")
333            .field("inner", &self.inner)
334            .field("deadline", &deadline)
335            .field("target_interval_us", &(self.target_nanos / 1_000))
336            .field("skip_mod", &self.skip_mod.load(Relaxed))
337            .finish()
338    }
339}
340
341/// Extension trait for creating [`DebouncedTimeout`] wrappers.
342///
343/// Automatically implemented for all [`Stop`] types when the `std` feature
344/// is enabled.
345pub trait DebouncedTimeoutExt: Stop + Sized {
346    /// Add a debounced timeout to this stop.
347    ///
348    /// Like [`TimeoutExt::with_timeout`](super::TimeoutExt::with_timeout),
349    /// but skips most `Instant::now()` calls by learning the call rate.
350    ///
351    /// Default target interval between clock reads: 100μs.
352    #[inline]
353    fn with_debounced_timeout(self, duration: Duration) -> DebouncedTimeout<Self> {
354        DebouncedTimeout::new(self, duration)
355    }
356
357    /// Add a debounced timeout with an absolute deadline.
358    #[inline]
359    fn with_debounced_deadline(self, deadline: Instant) -> DebouncedTimeout<Self> {
360        DebouncedTimeout::with_deadline(self, deadline)
361    }
362}
363
364impl<T: Stop> DebouncedTimeoutExt for T {}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::StopSource;
370
371    #[test]
372    fn basic_timeout() {
373        let source = StopSource::new();
374        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(50));
375
376        assert!(!stop.should_stop());
377        assert!(stop.check().is_ok());
378
379        std::thread::sleep(Duration::from_millis(80));
380
381        // After enough checks, should detect timeout
382        for _ in 0..100 {
383            if stop.should_stop() {
384                return; // success
385            }
386        }
387        panic!("should have detected timeout");
388    }
389
390    #[test]
391    fn cancel_before_timeout() {
392        let source = StopSource::new();
393        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
394
395        source.cancel();
396
397        // Inner cancellation is always checked immediately
398        assert!(stop.should_stop());
399        assert_eq!(stop.check(), Err(StopReason::Cancelled));
400    }
401
402    #[test]
403    fn calibration_ramps_up() {
404        let source = StopSource::new();
405        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
406
407        // Initial: check every call
408        assert_eq!(stop.checks_per_clock_read(), 1);
409
410        // Pump through calls so calibration kicks in
411        for _ in 0..10_000 {
412            let _ = stop.check();
413        }
414
415        // After enough calls, should be skipping some
416        assert!(
417            stop.checks_per_clock_read() > 1,
418            "skip_mod should have increased, got {}",
419            stop.checks_per_clock_read()
420        );
421    }
422
423    #[test]
424    fn remaining_accuracy() {
425        let source = StopSource::new();
426        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
427
428        let remaining = stop.remaining();
429        assert!(remaining > Duration::from_secs(9));
430        assert!(remaining <= Duration::from_secs(10));
431    }
432
433    #[test]
434    fn tighten_works() {
435        let source = StopSource::new();
436        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
437            .tighten(Duration::from_secs(1));
438
439        let remaining = stop.remaining();
440        assert!(remaining < Duration::from_secs(2));
441    }
442
443    #[test]
444    fn clone_resets_calibration() {
445        let source = StopSource::new();
446        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
447
448        // Pump to get calibration going
449        for _ in 0..10_000 {
450            let _ = stop.check();
451        }
452        assert!(stop.checks_per_clock_read() > 1);
453
454        // Clone resets
455        let cloned = stop.clone();
456        assert_eq!(cloned.checks_per_clock_read(), 1);
457    }
458
459    #[test]
460    fn extension_trait() {
461        use super::DebouncedTimeoutExt;
462        let source = StopSource::new();
463        let stop = source
464            .as_ref()
465            .with_debounced_timeout(Duration::from_secs(10));
466        assert!(!stop.should_stop());
467    }
468
469    #[test]
470    fn is_send_sync() {
471        fn assert_send_sync<T: Send + Sync>() {}
472        assert_send_sync::<DebouncedTimeout<crate::StopRef<'_>>>();
473    }
474
475    #[test]
476    fn zero_duration_immediate_timeout() {
477        let source = StopSource::new();
478        let stop = DebouncedTimeout::new(source.as_ref(), Duration::ZERO);
479
480        // First call reads the clock (skip_mod starts at 1) and sees expiry
481        assert_eq!(stop.check(), Err(StopReason::TimedOut));
482    }
483
484    #[test]
485    fn deadline_in_the_past() {
486        let source = StopSource::new();
487        let past = Instant::now() - Duration::from_secs(1);
488        let stop = DebouncedTimeout::with_deadline(source.as_ref(), past);
489
490        // deadline_nanos is 0 (saturating_duration_since clamps to zero)
491        assert_eq!(stop.check(), Err(StopReason::TimedOut));
492    }
493
494    #[test]
495    fn with_deadline_basic() {
496        let source = StopSource::new();
497        let deadline = Instant::now() + Duration::from_millis(100);
498        let stop = DebouncedTimeout::with_deadline(source.as_ref(), deadline);
499
500        assert!(!stop.should_stop());
501
502        std::thread::sleep(Duration::from_millis(150));
503
504        // Pump enough calls to trigger a clock read
505        for _ in 0..100 {
506            if stop.should_stop() {
507                return;
508            }
509        }
510        panic!("should have detected timeout");
511    }
512
513    #[test]
514    fn deadline_accessor() {
515        let source = StopSource::new();
516        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
517
518        let deadline = stop.deadline();
519        let remaining = stop.remaining();
520        assert!(remaining > Duration::from_secs(9));
521        assert!(remaining <= Duration::from_secs(10));
522        // deadline should be ~10s from now
523        assert!(deadline > Instant::now() + Duration::from_secs(9));
524    }
525
526    #[test]
527    fn inner_access() {
528        let source = StopSource::new();
529        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
530
531        assert!(!stop.inner().should_stop());
532
533        source.cancel();
534
535        assert!(stop.inner().should_stop());
536    }
537
538    #[test]
539    fn into_inner_works() {
540        let source = StopSource::new();
541        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
542
543        let inner = stop.into_inner();
544        assert!(!inner.should_stop());
545    }
546
547    #[test]
548    fn tighten_deadline_works() {
549        let source = StopSource::new();
550        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
551            .tighten_deadline(Instant::now() + Duration::from_secs(1));
552
553        let remaining = stop.remaining();
554        assert!(remaining < Duration::from_secs(2));
555    }
556
557    #[test]
558    fn tighten_does_not_loosen() {
559        let source = StopSource::new();
560        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(1))
561            .tighten(Duration::from_secs(60));
562
563        // Should still be ~1 second, not 60
564        let remaining = stop.remaining();
565        assert!(remaining < Duration::from_secs(2));
566    }
567
568    #[test]
569    fn debug_format() {
570        let source = StopSource::new();
571        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
572        let debug = format!("{stop:?}");
573        assert!(debug.contains("DebouncedTimeout"));
574        assert!(debug.contains("skip_mod"));
575        assert!(debug.contains("target_interval_us"));
576    }
577
578    #[test]
579    fn with_target_interval_zero_clamps_to_one() {
580        let source = StopSource::new();
581        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
582            .with_target_interval(Duration::ZERO);
583
584        // Should still work (target_nanos clamped to 1, not 0)
585        assert!(stop.check().is_ok());
586    }
587
588    #[test]
589    fn with_debounced_deadline_ext() {
590        let source = StopSource::new();
591        let deadline = Instant::now() + Duration::from_secs(10);
592        let stop = source.as_ref().with_debounced_deadline(deadline);
593
594        assert!(!stop.should_stop());
595        assert!(stop.remaining() > Duration::from_secs(9));
596    }
597
598    #[test]
599    fn check_and_should_stop_agree() {
600        let source = StopSource::new();
601        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
602
603        // Both should report not stopped
604        for _ in 0..1000 {
605            assert!(!stop.should_stop());
606            assert!(stop.check().is_ok());
607        }
608
609        source.cancel();
610
611        // Both should report stopped (inner cancellation is immediate)
612        assert!(stop.should_stop());
613        assert_eq!(stop.check(), Err(StopReason::Cancelled));
614    }
615
616    #[test]
617    fn remaining_after_expiry_is_zero() {
618        let source = StopSource::new();
619        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(1));
620
621        std::thread::sleep(Duration::from_millis(10));
622
623        assert_eq!(stop.remaining(), Duration::ZERO);
624    }
625
626    #[test]
627    fn adapts_to_slowdown() {
628        let source = StopSource::new();
629        // Use a smaller target interval so skip_mod stays manageable for testing.
630        let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
631            .with_target_interval(Duration::from_micros(10));
632
633        // Fast phase: pump quickly to ramp up skip_mod
634        for _ in 0..50_000 {
635            let _ = stop.check();
636        }
637        let fast_skip = stop.checks_per_clock_read();
638        assert!(fast_skip > 1, "should have ramped up, got {fast_skip}");
639
640        // Slow phase: sleep between calls. Need enough calls to trigger
641        // at least one recalibration (count % skip_mod == 0).
642        for _ in 0..(fast_skip as usize + 100) {
643            std::thread::sleep(Duration::from_micros(50));
644            let _ = stop.check();
645        }
646
647        let slow_skip = stop.checks_per_clock_read();
648        assert!(
649            slow_skip < fast_skip,
650            "should have reduced skip_mod from {fast_skip} to less, got {slow_skip}"
651        );
652    }
653}