tokio_debouncer/
lib.rs

1//! # tokio-debouncer
2//!
3//! A lightweight, cancel-safe async debouncer for [Tokio](https://tokio.rs/) tasks.
4//!
5//! ## Overview
6//!
7//! This crate provides a simple, robust, and deterministic debouncer for batching signals or jobs in async workflows.
8//! It is especially suited for job queues, event batching, and select-based async workers where you want to coalesce bursts of work and process them efficiently.
9//!
10//! - Supports both **leading** and **trailing** debounce modes.
11//! - Designed for use with `tokio::select!` for robust, cancel-safe batching.
12//! - Can be triggered from any thread or task.
13//! - Fully tested with simulated time.
14//!
15//! ## Example
16//!
17//! ```rust
18//! use tokio_debouncer::{Debouncer, DebounceMode};
19//! use tokio::time::Duration;
20//!
21//! #[tokio::main]
22//! async fn main() {
23//!     // Create a debouncer with a 100ms cooldown in trailing mode
24//!     let debouncer = Debouncer::new(Duration::from_millis(100), DebounceMode::Trailing);
25//!     debouncer.trigger(); // Signal an event
26//!     let _guard = debouncer.ready().await; // Wait until ready; debounce is finalized on drop
27//!     // Do your work here
28//! }
29//! ```
30//!
31//! ## Select-based Job Queue Example
32//!
33//! ```rust
34//! use tokio::{select, time::{sleep, Duration}};
35//! use tokio_debouncer::{Debouncer, DebounceMode};
36//!
37//! #[tokio::main]
38//! async fn main() {
39//!     let debouncer = Debouncer::new(Duration::from_secs(1), DebounceMode::Trailing);
40//!     let debouncer2 = debouncer.clone();
41//!     tokio::spawn(async move {
42//!         loop {
43//!             debouncer2.trigger();
44//!             sleep(Duration::from_millis(200)).await;
45//!         }
46//!     });
47//!    let mut iterations = 10;
48//!     loop {
49//!          iterations -= 1;
50//!          if iterations == 0 {
51//!              break;
52//!          }
53//!        // Wait for the debouncer to be ready
54//!         select! {
55//!             _guard = debouncer.ready() => {
56//!                 // Debounce is finalized automatically on drop
57//!                 println!("Processing job batch!");
58//!             }
59//!             _ = sleep(Duration::from_millis(100)) => {
60//!                 // Handle other events
61//!             }
62//!         }
63//!     }
64//! }
65//! ```
66//!
67//! ## Best Practice
68//!
69//! The debounce state is now finalized automatically when the guard is dropped. You do not need to call any method to commit the debounce; simply let the guard go out of scope after acquiring it. This ensures robust, cancellation-safe batching, even if your task is cancelled or panics after acquiring the guard.
70//!
71//! If you need to do work after acquiring the guard, do it after awaiting `ready()` and let the guard drop naturally.
72
73use std::marker::PhantomData;
74use std::sync::{Arc};
75use tokio::sync::Notify;
76use tokio::time::{Duration, Instant};
77
78
79// --- parking_lot feature support ---
80#[cfg(feature = "parking_lot")]
81pub use parking_lot::{Mutex, MutexGuard};
82#[cfg(not(feature = "parking_lot"))]
83pub use std::sync::{Mutex, MutexGuard};
84
85
86/// --- MutexExt for poison handling ---
87#[cfg(not(feature = "parking_lot"))]
88pub trait MutexExt<T> {
89    /// Lock the mutex, panicking if poisoned.
90    fn risky_lock(&self) -> MutexGuard<T>;
91}
92#[cfg(not(feature = "parking_lot"))]
93impl<T> MutexExt<T> for Mutex<T> {
94    fn risky_lock(&self) -> MutexGuard<T> {
95        self.lock().expect("Mutex poisoned")
96    }
97}
98#[cfg(feature = "parking_lot")]
99pub trait MutexExt<T> {
100    /// Lock the parking_lot mutex (never poisoned).
101    fn risky_lock(&self) -> MutexGuard<T>;
102}
103#[cfg(feature = "parking_lot")]
104impl<T> MutexExt<T> for Mutex<T> {
105    fn risky_lock(&self) -> MutexGuard<T> {
106        self.lock()
107    }
108}
109
110/// The debounce mode: Leading or Trailing.
111/// - Leading: fires immediately, then cools down.
112/// - Trailing: fires after the last trigger and cooldown (default).
113#[derive(Debug)]
114pub enum DebounceMode {
115    Leading,
116    Trailing,
117}
118
119/// Internal state for the debouncer.
120struct DebouncerState {
121    has_run: bool,
122    last_run: Instant,
123    triggered: bool,
124}
125
126/// Shared inner struct for Debouncer.
127struct DebouncerInner {
128    mode: DebounceMode,
129    notifier: Notify,
130    cooldown: Duration,
131    state: Mutex<DebouncerState>,
132}
133
134impl DebouncerInner {
135    /// Finalize the debounce state after work is done or dropped.
136    fn finalize(&self, pending: bool) {
137        let mut state = self.state.risky_lock();
138        if state.triggered {
139            state.has_run = true;
140            state.triggered = pending;
141            state.last_run = tokio::time::Instant::now();
142            self.notifier.notify_one();
143        }
144    }
145}
146
147/// Guard returned by Debouncer::ready().
148///
149/// The debounce state is finalized automatically when this guard is dropped.
150/// You do not need to call any method to commit the debounce; simply let the guard go out of scope.
151pub struct DebouncerGuard<'a> {
152    inner: Arc<DebouncerInner>,
153    completed: bool,
154    _not_send: PhantomData<*const ()>,
155    _not_static: PhantomData<&'a ()>,
156}
157
158impl<'a> DebouncerGuard<'a> {
159    fn new(inner: Arc<DebouncerInner>) -> Self {
160        Self {
161            inner,
162            completed: false,
163            _not_send: PhantomData,
164            _not_static: PhantomData,
165        }
166    }
167}
168
169impl<'a> Drop for DebouncerGuard<'a> {
170    /// Finalizes the debounce state when the guard is dropped.
171    ///
172    /// This ensures cancel-safety: if your task is cancelled or panics after acquiring the guard,
173    /// the debounce state is still committed and the next batch can proceed.
174    fn drop(&mut self) {
175        if !self.completed {
176            let inner = self.inner.clone();
177            self.completed = true;
178            inner.finalize(false);
179        }
180    }
181}
182
183/// Debouncer struct for batching events or jobs.
184/// Can be cloned and shared between tasks.
185#[derive(Clone)]
186pub struct Debouncer {
187    inner: Arc<DebouncerInner>,
188}
189
190impl Debouncer {
191    /// Create a new Debouncer with a cooldown time and mode (Leading or Trailing).
192    /// Cooldown is the minimum time between triggers.
193    pub fn new(cooldown: Duration, mode: DebounceMode) -> Self {
194        let inner = Arc::new(DebouncerInner {
195            notifier: Notify::new(),
196            cooldown,
197            state: Mutex::new(DebouncerState {
198                has_run: if matches!(mode, DebounceMode::Leading) {
199                    false
200                } else {
201                    true
202                },
203                last_run: tokio::time::Instant::now(),
204                triggered: false,
205            }),
206            mode,
207        });
208        Self { inner }
209    }
210
211    /// Check if the debouncer is currently triggered (for diagnostics/testing).
212    pub async fn is_triggered(&self) -> bool {
213        let state = self.inner.state.risky_lock();
214        state.triggered
215    }
216
217    /// Trigger the debouncer. Can be called from any thread or task.
218    /// Notifies the worker if not already pending.
219    pub fn trigger(&self) {
220        {
221            let mut guard = self.inner.state.risky_lock();
222            if matches!(self.inner.mode, DebounceMode::Trailing) {
223                guard.last_run = tokio::time::Instant::now();
224            }
225            if guard.triggered {
226                // Already pending, just update the value
227                return;
228            }
229            guard.triggered = true;
230        } // guard dropped here
231        self.inner.notifier.notify_one();
232    }
233
234    /// Wait until the debouncer is ready to run.
235    /// Returns a guard that finalizes the debounce state when dropped.
236    ///
237    /// # Cancel Safety
238    /// This method is cancel-safe and does not change internal state until the guard is used.
239    /// The debounce is committed automatically when the guard is dropped, so you do not need to call any method.
240    pub async fn ready<'a>(&self) -> DebouncerGuard<'a> {
241        // Do not change state here to keep it cancel-safe for use inside select
242        loop {
243            let notified = self.inner.notifier.notified();
244            {
245                let state = self.inner.state.risky_lock();
246                if !state.triggered {
247                    drop(state);
248                    notified.await;
249                    continue;
250                }
251                let now = tokio::time::Instant::now();
252                let next_allowed = state.last_run + self.inner.cooldown;
253                match self.inner.mode {
254                    DebounceMode::Leading => {
255                        if !state.has_run || now >= next_allowed {
256                            break;
257                        } else {
258                            drop(state);
259                            tokio::time::sleep_until(next_allowed).await;
260                        }
261                    }
262                    DebounceMode::Trailing => {
263                        if now >= next_allowed {
264                            break;
265                        } else {
266                            drop(state);
267                            tokio::time::sleep_until(next_allowed).await;
268                        }
269                    }
270                }
271            }
272        }
273        DebouncerGuard::new(self.inner.clone())
274    }
275}