use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::Relaxed};
use std::time::{Duration, Instant};
use crate::{Stop, StopReason};
const DEFAULT_TARGET_NANOS: u64 = 100_000;
#[inline]
fn duration_to_nanos(d: Duration) -> u64 {
d.as_nanos().min(u64::MAX as u128) as u64
}
pub struct DebouncedTimeout<T> {
inner: T,
created: Instant,
deadline_nanos: u64,
target_nanos: u64,
call_count: AtomicU32,
skip_mod: AtomicU32,
last_measured_nanos: AtomicU64,
last_measured_count: AtomicU32,
}
impl<T: Stop> DebouncedTimeout<T> {
#[inline]
pub fn new(inner: T, duration: Duration) -> Self {
let now = Instant::now();
Self {
inner,
created: now,
deadline_nanos: duration_to_nanos(duration),
target_nanos: DEFAULT_TARGET_NANOS,
call_count: AtomicU32::new(0),
skip_mod: AtomicU32::new(1),
last_measured_nanos: AtomicU64::new(0),
last_measured_count: AtomicU32::new(0),
}
}
#[inline]
pub fn with_deadline(inner: T, deadline: Instant) -> Self {
let now = Instant::now();
Self {
inner,
created: now,
deadline_nanos: duration_to_nanos(deadline.saturating_duration_since(now)),
target_nanos: DEFAULT_TARGET_NANOS,
call_count: AtomicU32::new(0),
skip_mod: AtomicU32::new(1),
last_measured_nanos: AtomicU64::new(0),
last_measured_count: AtomicU32::new(0),
}
}
#[inline]
pub fn with_target_interval(mut self, interval: Duration) -> Self {
self.target_nanos = duration_to_nanos(interval).max(1);
self
}
#[inline]
pub fn deadline(&self) -> Instant {
self.created + Duration::from_nanos(self.deadline_nanos)
}
#[inline]
pub fn remaining(&self) -> Duration {
self.deadline().saturating_duration_since(Instant::now())
}
#[inline]
pub fn inner(&self) -> &T {
&self.inner
}
#[inline]
pub fn into_inner(self) -> T {
self.inner
}
#[inline]
pub fn checks_per_clock_read(&self) -> u32 {
self.skip_mod.load(Relaxed)
}
#[cold]
#[inline(never)]
fn measure_and_recalibrate(&self, count: u32) -> bool {
let elapsed_nanos = self.created.elapsed().as_nanos() as u64;
if elapsed_nanos >= self.deadline_nanos {
return true; }
let prev_nanos = self.last_measured_nanos.swap(elapsed_nanos, Relaxed);
let prev_count = self.last_measured_count.swap(count, Relaxed);
let delta_nanos = elapsed_nanos.saturating_sub(prev_nanos);
let delta_calls = count.wrapping_sub(prev_count) as u64;
if delta_calls > 0 && delta_nanos > 0 {
let nanos_per_call = delta_nanos / delta_calls;
if nanos_per_call > 0 {
let ideal_skip =
(self.target_nanos / nanos_per_call).clamp(1, u32::MAX as u64) as u32;
let current_skip = self.skip_mod.load(Relaxed);
let new_skip = if ideal_skip <= current_skip {
ideal_skip
} else {
current_skip
.saturating_add((ideal_skip - current_skip) / 8)
.max(1)
};
self.skip_mod.store(new_skip, Relaxed);
}
}
false }
}
impl<T: Stop> Stop for DebouncedTimeout<T> {
#[inline]
fn check(&self) -> Result<(), StopReason> {
self.inner.check()?;
let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
let skip = self.skip_mod.load(Relaxed);
if count % skip != 0 {
return Ok(());
}
if self.measure_and_recalibrate(count) {
Err(StopReason::TimedOut)
} else {
Ok(())
}
}
#[inline]
fn should_stop(&self) -> bool {
if self.inner.should_stop() {
return true;
}
let count = self.call_count.fetch_add(1, Relaxed).wrapping_add(1);
let skip = self.skip_mod.load(Relaxed);
if count % skip != 0 {
return false;
}
self.measure_and_recalibrate(count)
}
}
impl<T: Stop> DebouncedTimeout<T> {
#[inline]
pub fn tighten(self, duration: Duration) -> Self {
let elapsed = duration_to_nanos(Instant::now().saturating_duration_since(self.created));
let new_deadline_nanos = elapsed.saturating_add(duration_to_nanos(duration));
let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
Self {
inner: self.inner,
created: self.created,
deadline_nanos,
target_nanos: self.target_nanos,
call_count: AtomicU32::new(0),
skip_mod: AtomicU32::new(1),
last_measured_nanos: AtomicU64::new(0),
last_measured_count: AtomicU32::new(0),
}
}
#[inline]
pub fn tighten_deadline(self, deadline: Instant) -> Self {
let new_deadline_nanos =
duration_to_nanos(deadline.saturating_duration_since(self.created));
let deadline_nanos = self.deadline_nanos.min(new_deadline_nanos);
Self {
inner: self.inner,
created: self.created,
deadline_nanos,
target_nanos: self.target_nanos,
call_count: AtomicU32::new(0),
skip_mod: AtomicU32::new(1),
last_measured_nanos: AtomicU64::new(0),
last_measured_count: AtomicU32::new(0),
}
}
}
impl<T: Clone + Stop> Clone for DebouncedTimeout<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
created: self.created,
deadline_nanos: self.deadline_nanos,
target_nanos: self.target_nanos,
call_count: AtomicU32::new(0),
skip_mod: AtomicU32::new(1),
last_measured_nanos: AtomicU64::new(0),
last_measured_count: AtomicU32::new(0),
}
}
}
impl<T: core::fmt::Debug> core::fmt::Debug for DebouncedTimeout<T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let deadline = self.created + Duration::from_nanos(self.deadline_nanos);
f.debug_struct("DebouncedTimeout")
.field("inner", &self.inner)
.field("deadline", &deadline)
.field("target_interval_us", &(self.target_nanos / 1_000))
.field("skip_mod", &self.skip_mod.load(Relaxed))
.finish()
}
}
pub trait DebouncedTimeoutExt: Stop + Sized {
#[inline]
fn with_debounced_timeout(self, duration: Duration) -> DebouncedTimeout<Self> {
DebouncedTimeout::new(self, duration)
}
#[inline]
fn with_debounced_deadline(self, deadline: Instant) -> DebouncedTimeout<Self> {
DebouncedTimeout::with_deadline(self, deadline)
}
}
impl<T: Stop> DebouncedTimeoutExt for T {}
#[cfg(test)]
mod tests {
use super::*;
use crate::StopSource;
#[test]
fn basic_timeout() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(50));
assert!(!stop.should_stop());
assert!(stop.check().is_ok());
std::thread::sleep(Duration::from_millis(80));
for _ in 0..100 {
if stop.should_stop() {
return; }
}
panic!("should have detected timeout");
}
#[test]
fn cancel_before_timeout() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
source.cancel();
assert!(stop.should_stop());
assert_eq!(stop.check(), Err(StopReason::Cancelled));
}
#[test]
fn calibration_ramps_up() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
assert_eq!(stop.checks_per_clock_read(), 1);
for _ in 0..10_000 {
let _ = stop.check();
}
assert!(
stop.checks_per_clock_read() > 1,
"skip_mod should have increased, got {}",
stop.checks_per_clock_read()
);
}
#[test]
fn remaining_accuracy() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
let remaining = stop.remaining();
assert!(remaining > Duration::from_secs(9));
assert!(remaining <= Duration::from_secs(10));
}
#[test]
fn tighten_works() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
.tighten(Duration::from_secs(1));
let remaining = stop.remaining();
assert!(remaining < Duration::from_secs(2));
}
#[test]
fn clone_resets_calibration() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
for _ in 0..10_000 {
let _ = stop.check();
}
assert!(stop.checks_per_clock_read() > 1);
let cloned = stop.clone();
assert_eq!(cloned.checks_per_clock_read(), 1);
}
#[test]
fn extension_trait() {
use super::DebouncedTimeoutExt;
let source = StopSource::new();
let stop = source
.as_ref()
.with_debounced_timeout(Duration::from_secs(10));
assert!(!stop.should_stop());
}
#[test]
fn is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<DebouncedTimeout<crate::StopRef<'_>>>();
}
#[test]
fn zero_duration_immediate_timeout() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::ZERO);
assert_eq!(stop.check(), Err(StopReason::TimedOut));
}
#[test]
fn deadline_in_the_past() {
let source = StopSource::new();
let past = Instant::now() - Duration::from_secs(1);
let stop = DebouncedTimeout::with_deadline(source.as_ref(), past);
assert_eq!(stop.check(), Err(StopReason::TimedOut));
}
#[test]
fn with_deadline_basic() {
let source = StopSource::new();
let deadline = Instant::now() + Duration::from_millis(100);
let stop = DebouncedTimeout::with_deadline(source.as_ref(), deadline);
assert!(!stop.should_stop());
std::thread::sleep(Duration::from_millis(150));
for _ in 0..100 {
if stop.should_stop() {
return;
}
}
panic!("should have detected timeout");
}
#[test]
fn deadline_accessor() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
let deadline = stop.deadline();
let remaining = stop.remaining();
assert!(remaining > Duration::from_secs(9));
assert!(remaining <= Duration::from_secs(10));
assert!(deadline > Instant::now() + Duration::from_secs(9));
}
#[test]
fn inner_access() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
assert!(!stop.inner().should_stop());
source.cancel();
assert!(stop.inner().should_stop());
}
#[test]
fn into_inner_works() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
let inner = stop.into_inner();
assert!(!inner.should_stop());
}
#[test]
fn tighten_deadline_works() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
.tighten_deadline(Instant::now() + Duration::from_secs(1));
let remaining = stop.remaining();
assert!(remaining < Duration::from_secs(2));
}
#[test]
fn tighten_does_not_loosen() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(1))
.tighten(Duration::from_secs(60));
let remaining = stop.remaining();
assert!(remaining < Duration::from_secs(2));
}
#[test]
fn debug_format() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(10));
let debug = format!("{stop:?}");
assert!(debug.contains("DebouncedTimeout"));
assert!(debug.contains("skip_mod"));
assert!(debug.contains("target_interval_us"));
}
#[test]
fn with_target_interval_zero_clamps_to_one() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
.with_target_interval(Duration::ZERO);
assert!(stop.check().is_ok());
}
#[test]
fn with_debounced_deadline_ext() {
let source = StopSource::new();
let deadline = Instant::now() + Duration::from_secs(10);
let stop = source.as_ref().with_debounced_deadline(deadline);
assert!(!stop.should_stop());
assert!(stop.remaining() > Duration::from_secs(9));
}
#[test]
fn check_and_should_stop_agree() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60));
for _ in 0..1000 {
assert!(!stop.should_stop());
assert!(stop.check().is_ok());
}
source.cancel();
assert!(stop.should_stop());
assert_eq!(stop.check(), Err(StopReason::Cancelled));
}
#[test]
fn remaining_after_expiry_is_zero() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_millis(1));
std::thread::sleep(Duration::from_millis(10));
assert_eq!(stop.remaining(), Duration::ZERO);
}
#[test]
fn adapts_to_slowdown() {
let source = StopSource::new();
let stop = DebouncedTimeout::new(source.as_ref(), Duration::from_secs(60))
.with_target_interval(Duration::from_micros(10));
for _ in 0..50_000 {
let _ = stop.check();
}
let fast_skip = stop.checks_per_clock_read();
assert!(fast_skip > 1, "should have ramped up, got {fast_skip}");
for _ in 0..(fast_skip as usize + 100) {
std::thread::sleep(Duration::from_micros(50));
let _ = stop.check();
}
let slow_skip = stop.checks_per_clock_read();
assert!(
slow_skip < fast_skip,
"should have reduced skip_mod from {fast_skip} to less, got {slow_skip}"
);
}
}