tokio_debouncer/lib.rs
1//! # tokio-debouncer
2//!
3//! A lightweight, cancel-safe async debouncer for [Tokio](https://tokio.rs/) tasks.
4//!
5//! ## Overview
6//!
7//! This crate provides a simple, robust, and deterministic debouncer for batching signals or jobs in async workflows.
8//! It is especially suited for job queues, event batching, and select-based async workers where you want to coalesce bursts of work and process them efficiently.
9//!
10//! - Supports both **leading** and **trailing** debounce modes.
11//! - Designed for use with `tokio::select!` for robust, cancel-safe batching.
12//! - Can be triggered from any thread or task.
13//! - Fully tested with simulated time.
14//!
15//! ## Example
16//!
17//! ```rust
18//! use tokio_debouncer::{Debouncer, DebounceMode};
19//! use tokio::time::Duration;
20//!
21//! #[tokio::main]
22//! async fn main() {
23//! // Create a debouncer with a 100ms cooldown in trailing mode
24//! let debouncer = Debouncer::new(Duration::from_millis(100), DebounceMode::Trailing);
25//! debouncer.trigger(); // Signal an event
26//! let _guard = debouncer.ready().await; // Wait until ready; debounce is finalized on drop
27//! // Do your work here
28//! }
29//! ```
30//!
31//! ## Select-based Job Queue Example
32//!
33//! ```rust
34//! use tokio::{select, time::{sleep, Duration}};
35//! use tokio_debouncer::{Debouncer, DebounceMode};
36//!
37//! #[tokio::main]
38//! async fn main() {
39//! let debouncer = Debouncer::new(Duration::from_secs(1), DebounceMode::Trailing);
40//! let debouncer2 = debouncer.clone();
41//! tokio::spawn(async move {
42//! loop {
43//! debouncer2.trigger();
44//! sleep(Duration::from_millis(200)).await;
45//! }
46//! });
47//! let mut iterations = 10;
48//! loop {
49//! iterations -= 1;
50//! if iterations == 0 {
51//! break;
52//! }
53//! // Wait for the debouncer to be ready
54//! select! {
55//! _guard = debouncer.ready() => {
56//! // Debounce is finalized automatically on drop
57//! println!("Processing job batch!");
58//! }
59//! _ = sleep(Duration::from_millis(100)) => {
60//! // Handle other events
61//! }
62//! }
63//! }
64//! }
65//! ```
66//!
67//! ## Best Practice
68//!
69//! The debounce state is now finalized automatically when the guard is dropped. You do not need to call any method to commit the debounce; simply let the guard go out of scope after acquiring it. This ensures robust, cancellation-safe batching, even if your task is cancelled or panics after acquiring the guard.
70//!
71//! If you need to do work after acquiring the guard, do it after awaiting `ready()` and let the guard drop naturally.
72
73use std::marker::PhantomData;
74use std::sync::{Arc};
75use tokio::sync::Notify;
76use tokio::time::{Duration, Instant};
77
78
79// --- parking_lot feature support ---
80#[cfg(feature = "parking_lot")]
81pub use parking_lot::{Mutex, MutexGuard};
82#[cfg(not(feature = "parking_lot"))]
83pub use std::sync::{Mutex, MutexGuard};
84
85
86/// --- MutexExt for poison handling ---
87#[cfg(not(feature = "parking_lot"))]
88pub trait MutexExt<T> {
89 /// Lock the mutex, panicking if poisoned.
90 fn risky_lock(&self) -> MutexGuard<T>;
91}
92#[cfg(not(feature = "parking_lot"))]
93impl<T> MutexExt<T> for Mutex<T> {
94 fn risky_lock(&self) -> MutexGuard<T> {
95 self.lock().expect("Mutex poisoned")
96 }
97}
98#[cfg(feature = "parking_lot")]
99pub trait MutexExt<T> {
100 /// Lock the parking_lot mutex (never poisoned).
101 fn risky_lock(&self) -> MutexGuard<T>;
102}
103#[cfg(feature = "parking_lot")]
104impl<T> MutexExt<T> for Mutex<T> {
105 fn risky_lock(&self) -> MutexGuard<T> {
106 self.lock()
107 }
108}
109
110/// The debounce mode: Leading or Trailing.
111/// - Leading: fires immediately, then cools down.
112/// - Trailing: fires after the last trigger and cooldown (default).
113#[derive(Debug)]
114pub enum DebounceMode {
115 Leading,
116 Trailing,
117}
118
119/// Internal state for the debouncer.
120struct DebouncerState {
121 has_run: bool,
122 last_run: Instant,
123 triggered: bool,
124}
125
126/// Shared inner struct for Debouncer.
127struct DebouncerInner {
128 mode: DebounceMode,
129 notifier: Notify,
130 cooldown: Duration,
131 state: Mutex<DebouncerState>,
132}
133
134impl DebouncerInner {
135 /// Finalize the debounce state after work is done or dropped.
136 fn finalize(&self, pending: bool) {
137 let mut state = self.state.risky_lock();
138 if state.triggered {
139 state.has_run = true;
140 state.triggered = pending;
141 state.last_run = tokio::time::Instant::now();
142 self.notifier.notify_one();
143 }
144 }
145}
146
147/// Guard returned by Debouncer::ready().
148///
149/// The debounce state is finalized automatically when this guard is dropped.
150/// You do not need to call any method to commit the debounce; simply let the guard go out of scope.
151pub struct DebouncerGuard<'a> {
152 inner: Arc<DebouncerInner>,
153 completed: bool,
154 _not_send: PhantomData<*const ()>,
155 _not_static: PhantomData<&'a ()>,
156}
157
158impl<'a> DebouncerGuard<'a> {
159 fn new(inner: Arc<DebouncerInner>) -> Self {
160 Self {
161 inner,
162 completed: false,
163 _not_send: PhantomData,
164 _not_static: PhantomData,
165 }
166 }
167}
168
169impl<'a> Drop for DebouncerGuard<'a> {
170 /// Finalizes the debounce state when the guard is dropped.
171 ///
172 /// This ensures cancel-safety: if your task is cancelled or panics after acquiring the guard,
173 /// the debounce state is still committed and the next batch can proceed.
174 fn drop(&mut self) {
175 if !self.completed {
176 let inner = self.inner.clone();
177 self.completed = true;
178 inner.finalize(false);
179 }
180 }
181}
182
183/// Debouncer struct for batching events or jobs.
184/// Can be cloned and shared between tasks.
185#[derive(Clone)]
186pub struct Debouncer {
187 inner: Arc<DebouncerInner>,
188}
189
190impl Debouncer {
191 /// Create a new Debouncer with a cooldown time and mode (Leading or Trailing).
192 /// Cooldown is the minimum time between triggers.
193 pub fn new(cooldown: Duration, mode: DebounceMode) -> Self {
194 let inner = Arc::new(DebouncerInner {
195 notifier: Notify::new(),
196 cooldown,
197 state: Mutex::new(DebouncerState {
198 has_run: if matches!(mode, DebounceMode::Leading) {
199 false
200 } else {
201 true
202 },
203 last_run: tokio::time::Instant::now(),
204 triggered: false,
205 }),
206 mode,
207 });
208 Self { inner }
209 }
210
211 /// Check if the debouncer is currently triggered (for diagnostics/testing).
212 pub async fn is_triggered(&self) -> bool {
213 let state = self.inner.state.risky_lock();
214 state.triggered
215 }
216
217 /// Trigger the debouncer. Can be called from any thread or task.
218 /// Notifies the worker if not already pending.
219 pub fn trigger(&self) {
220 {
221 let mut guard = self.inner.state.risky_lock();
222 if matches!(self.inner.mode, DebounceMode::Trailing) {
223 guard.last_run = tokio::time::Instant::now();
224 }
225 if guard.triggered {
226 // Already pending, just update the value
227 return;
228 }
229 guard.triggered = true;
230 } // guard dropped here
231 self.inner.notifier.notify_one();
232 }
233
234 /// Wait until the debouncer is ready to run.
235 /// Returns a guard that finalizes the debounce state when dropped.
236 ///
237 /// # Cancel Safety
238 /// This method is cancel-safe and does not change internal state until the guard is used.
239 /// The debounce is committed automatically when the guard is dropped, so you do not need to call any method.
240 pub async fn ready<'a>(&self) -> DebouncerGuard<'a> {
241 // Do not change state here to keep it cancel-safe for use inside select
242 loop {
243 let notified = self.inner.notifier.notified();
244 {
245 let state = self.inner.state.risky_lock();
246 if !state.triggered {
247 drop(state);
248 notified.await;
249 continue;
250 }
251 let now = tokio::time::Instant::now();
252 let next_allowed = state.last_run + self.inner.cooldown;
253 match self.inner.mode {
254 DebounceMode::Leading => {
255 if !state.has_run || now >= next_allowed {
256 break;
257 } else {
258 drop(state);
259 tokio::time::sleep_until(next_allowed).await;
260 }
261 }
262 DebounceMode::Trailing => {
263 if now >= next_allowed {
264 break;
265 } else {
266 drop(state);
267 tokio::time::sleep_until(next_allowed).await;
268 }
269 }
270 }
271 }
272 }
273 DebouncerGuard::new(self.inner.clone())
274 }
275}