use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
use tokio::{sync::oneshot, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
#[derive(Clone)]
pub struct Debouncer {
name: Arc<str>,
debounce_duration: Duration,
max_debounce: Option<Duration>,
cancel_task_timeout: Duration,
current_task: Arc<Mutex<Option<TaskToken>>>,
timer_handle: Arc<Mutex<Option<TimerHandle>>>,
event_handler: Option<EventHandler>,
cancel_token: CancellationToken,
}
pub type StoredTask = Arc<
dyn Fn(CancellationToken) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync,
>;
#[derive(Clone)]
pub struct StoredTaskDebouncer {
debouncer: Debouncer,
task_fn: StoredTask,
}
pub trait DebouncedTask: Send + Sync + 'static {
fn execute(&self, token: CancellationToken) -> impl Future<Output = ()> + Send + Sync;
}
#[derive(Clone)]
pub struct TaskDebouncer<T: DebouncedTask> {
debouncer: Debouncer,
task: Arc<T>,
}
#[derive(Debug)]
pub struct TimerHandle {
debounced_at: Instant,
first_debounce_at: Option<Instant>,
timer_token: CancellationToken,
join_handle: JoinHandle<()>,
exit_rx: oneshot::Receiver<TaskExit>,
}
#[derive(Debug)]
pub struct TaskToken {
pub started_at: Instant,
pub task_token: CancellationToken,
}
#[derive(Clone, Debug, Copy)]
pub enum TaskExit {
Normal,
Cancelled,
Aborted,
NotStarted,
}
#[derive(Clone, Debug)]
pub enum DebounceEvent {
Debounced {
instant: Instant,
first_debounce_at: Option<Instant>,
debounce_ends_at: Instant,
},
Started {
instant: Instant,
},
Ended {
instant: Instant,
exit_status: TaskExit,
},
}
pub type EventHandler = Arc<dyn Fn(DebounceEvent) + Send + Sync + 'static>;
impl Debouncer {
pub fn new(
debounce_duration: Duration,
cancel_token: CancellationToken,
task_name: impl AsRef<str>,
) -> Self {
Self {
debounce_duration,
max_debounce: None,
cancel_task_timeout: Duration::from_secs(2),
timer_handle: Arc::new(Mutex::new(None)),
cancel_token,
name: Arc::from(task_name.as_ref()),
current_task: Arc::new(Mutex::new(None)),
event_handler: None,
}
}
pub fn with_task_timeout(mut self, task_timeout: Duration) -> Self {
self.cancel_task_timeout = task_timeout;
self
}
pub fn with_max_wait(mut self, max_wait: Duration) -> Self {
self.max_debounce = Some(max_wait);
self
}
pub fn with_event_handler<E: Fn(DebounceEvent) + Send + Sync + 'static>(
mut self,
event_handler: E,
) -> Self {
self.event_handler = Some(Arc::new(event_handler));
self
}
fn fire_event(&self, event: DebounceEvent) {
if let Some(event_handler) = &self.event_handler {
event_handler(event)
}
}
async fn should_wait_for_debounce(&self, first_debounce_at: Option<Instant>) -> bool {
if let Some(max_wait) = self.max_debounce {
if let Some(first_debounce) = first_debounce_at {
let wait_period = Instant::now().duration_since(first_debounce);
if wait_period >= max_wait {
trace!(
"task {:?} exceeded the max debounce waiting period, {wait_period:?} >= {max_wait:?}",
self.name
);
return false;
}
}
}
true
}
async fn spawn_task<Task, Fut>(&self, task: Task) -> TaskExit
where
Task: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let task_token = CancellationToken::new();
let now = {
let mut current_task_token = self.current_task.lock().await;
if let Some(current_token) = current_task_token.take() {
current_token.task_token.cancel();
}
let now = Instant::now();
*current_task_token = Some(TaskToken {
started_at: now,
task_token: task_token.clone(),
});
now
};
let (tx, rx) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let task_handle = tokio::spawn({
let task_token = task_token.clone();
async move {
task(task_token).await;
let _ = tx.send(());
let _ = tx2.send(());
}
});
self.fire_event(DebounceEvent::Started { instant: now });
let exit_status = tokio::select! {
_ = rx => TaskExit::Normal,
_ = self.wait_for_cancellation(&task_token) => {
tokio::select! {
_ = tokio::time::sleep(self.cancel_task_timeout) => {
task_handle.abort();
TaskExit::Aborted
}
_ = rx2 => {
TaskExit::Cancelled
}
}
}
};
self.fire_event(DebounceEvent::Ended {
exit_status,
instant: Instant::now(),
});
exit_status
}
async fn wait_for_cancellation(&self, task_token: &CancellationToken) {
tokio::select! {
_ = self.cancel_token.cancelled() => {
trace!("task {:?} debouncer token cancelled", self.name);
task_token.cancel();
}
_ = task_token.cancelled() => {
trace!("task {:?} task token cancelled", self.name);
}
}
}
async fn timer<F, Fut>(
&self,
now: Instant,
first_debounce_at: Option<Instant>,
exit_tx: oneshot::Sender<TaskExit>,
timer_token: CancellationToken,
f: F,
) where
F: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.fire_event(DebounceEvent::Debounced {
instant: now,
debounce_ends_at: now.checked_add(self.debounce_duration).unwrap_or(now),
first_debounce_at,
});
let exit_status = tokio::select! {
_ = timer_token.cancelled() => {
trace!("task {:?} timer cancelled by next trigger", self.name);
TaskExit::NotStarted
}
_ = self.cancel_token.cancelled() => {
trace!("task {:?} cancelled while waiting to execute next task", self.name);
TaskExit::NotStarted
}
_ = self.wait_for_debounce(first_debounce_at) => {
self.spawn_task(f).await
}
};
let _ = exit_tx.send(exit_status);
}
async fn wait_for_debounce(&self, first_debounce_at: Option<Instant>) {
if self.should_wait_for_debounce(first_debounce_at).await {
tokio::time::sleep(self.debounce_duration).await;
}
}
pub async fn run_now<Task, Fut>(&self, task: Task)
where
Task: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let mut timer_handle = self.timer_handle.lock().await;
if let Some(timer_handle) = timer_handle.take() {
if !timer_handle
.cancel_with_timeout(self.cancel_task_timeout)
.await
{
warn!("task {:?} aborted timer handle", self.name);
}
}
drop(timer_handle);
self.spawn_task(task).await;
}
pub async fn debounce<Task, Fut>(&self, task: Task)
where
Task: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let timer_token = CancellationToken::new();
let (exit_tx, exit_rx) = oneshot::channel();
let mut timer_handle = self.timer_handle.lock().await;
let first_debounce_at = if let Some(existing_timer) = timer_handle.take() {
let first_debounce = Some(
existing_timer
.first_debounce_at
.unwrap_or(existing_timer.debounced_at),
);
if !existing_timer
.cancel_with_timeout(self.cancel_task_timeout)
.await
{
warn!("task {:?} aborted timer handle", self.name);
}
first_debounce
} else {
None
};
let now = Instant::now();
let debouncer = self.clone();
*timer_handle = Some(TimerHandle {
exit_rx,
timer_token: timer_token.clone(),
debounced_at: now,
first_debounce_at,
join_handle: tokio::spawn(async move {
debouncer
.timer(now, first_debounce_at, exit_tx, timer_token, task)
.await;
}),
});
}
pub async fn stop(&self) {
debug!("task {:?} stopping...", self.name);
if let Some(current_token) = self.current_task.lock().await.take() {
trace!("task {:?} running, cancelling task...", self.name);
current_token.task_token.cancel();
}
if let Some(timer_handle) = self.timer_handle.lock().await.take() {
trace!("task {:?} waiting on timer....", self.name);
if !timer_handle
.cancel_with_timeout(self.cancel_task_timeout)
.await
{
warn!("task {:?} aborted timer handle", self.name);
}
}
debug!("task {:?} stopped", self.name);
}
}
impl TimerHandle {
pub async fn cancel_with_timeout(self, timeout: Duration) -> bool {
self.timer_token.cancel();
tokio::select! {
_ = tokio::time::sleep(timeout) => {
self.join_handle.abort();
false
}
_ = self.exit_rx => {
true
}
}
}
}
impl StoredTaskDebouncer {
pub fn new<F, Fut>(
debounce_timeout: Duration,
debouncer_token: CancellationToken,
task_type: impl AsRef<str>,
task_fn: F,
) -> Self
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
StoredTaskDebouncer {
debouncer: Debouncer::new(debounce_timeout, debouncer_token, task_type),
task_fn: Arc::new(move |token| Box::pin(task_fn(token))),
}
}
pub fn with_task_timeout(mut self, task_timeout: Duration) -> Self {
self.debouncer = self.debouncer.with_task_timeout(task_timeout);
self
}
pub fn with_max_wait(mut self, max_wait: Duration) -> Self {
self.debouncer = self.debouncer.with_max_wait(max_wait);
self
}
pub fn with_event_handler<E: Fn(DebounceEvent) + Send + Sync + 'static>(
mut self,
event_handler: E,
) -> Self {
self.debouncer = self.debouncer.with_event_handler(event_handler);
self
}
pub fn set_task<F, Fut>(&mut self, task_fn: F)
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.task_fn = Arc::new(move |token| Box::pin(task_fn(token)));
}
pub async fn debounce(&self) {
let task_fn = self.task_fn.clone();
self.debouncer
.debounce(move |token| async move { task_fn(token).await })
.await
}
pub async fn stop(&self) {
self.debouncer.stop().await
}
}
impl<T: DebouncedTask> TaskDebouncer<T> {
pub fn new(
debounce_timeout: Duration,
debouncer_token: CancellationToken,
task_type: impl AsRef<str>,
task: T,
) -> Self {
TaskDebouncer {
debouncer: Debouncer::new(debounce_timeout, debouncer_token, task_type),
task: Arc::new(task),
}
}
pub fn with_task_timeout(mut self, task_timeout: Duration) -> Self {
self.debouncer = self.debouncer.with_task_timeout(task_timeout);
self
}
pub fn with_max_wait(mut self, max_wait: Duration) -> Self {
self.debouncer = self.debouncer.with_max_wait(max_wait);
self
}
pub fn with_event_handler<E: Fn(DebounceEvent) + Send + Sync + 'static>(
mut self,
event_handler: E,
) -> Self {
self.debouncer = self.debouncer.with_event_handler(event_handler);
self
}
pub fn set_task(&mut self, task: T) {
self.task = Arc::new(task);
}
pub async fn debounce(&self) {
let task = self.task.clone();
self.debouncer
.debounce(move |token| async move { task.execute(token).await })
.await
}
pub async fn stop(&self) {
self.debouncer.stop().await
}
}
#[cfg(test)]
mod test_task_debouncer {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::time::sleep;
#[tokio::test]
async fn test_basic_debounce() {
let cancel_token = CancellationToken::new();
let debouncer = Debouncer::new(Duration::from_millis(100), cancel_token, "test_task");
let counter = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
for _ in 0..5 {
let counter = counter.clone();
let cancelled = cancelled.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {
cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = async {
counter.fetch_add(1, Ordering::SeqCst);
} => {}
}
})
.await;
sleep(Duration::from_millis(10)).await;
}
tokio::select! {
_ = sleep(Duration::from_millis(200)) => {
assert_eq!(counter.load(Ordering::Acquire), 1);
assert_eq!(cancelled.load(Ordering::Acquire), 0);
}
_ = debouncer.cancel_token.cancelled() => {
panic!("Test cancelled unexpectedly");
}
}
}
#[tokio::test]
async fn test_cancel_task() {
let cancel_token = CancellationToken::new();
let task_ms: u64 = 1;
let debouncer = Debouncer::new(
Duration::from_millis(task_ms),
cancel_token.clone(),
"test_task",
);
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
.await;
started1_rx.await.unwrap();
assert_eq!(
started.load(Ordering::Acquire),
1,
"Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"Task should not have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Task should have cancelled"
);
assert_eq!(exited.load(Ordering::Acquire), 1, "Task should have exited");
}
#[tokio::test]
async fn test_rapid_debounce_tasks() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 15;
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
);
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let trigger_task = || {
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
};
for _ in 0..5 {
trigger_task().await;
}
tokio::time::sleep(Duration::from_millis(debounce_ms * 2)).await;
assert_eq!(
started.load(Ordering::Acquire),
1,
"Only 1 Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"No Task should have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Only 1 task should have cancelled"
);
assert_eq!(
exited.load(Ordering::Acquire),
1,
"Only 1 task should have exited"
);
}
#[tokio::test]
async fn test_cancel_zombie_task() {
let cancel_token = CancellationToken::new();
let time_ms: u64 = 1;
let debouncer = Debouncer::new(
Duration::from_millis(time_ms),
cancel_token.clone(),
"test_task",
)
.with_task_timeout(Duration::from_millis(time_ms));
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
loop {
sleep(Duration::from_millis(1)).await;
}
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
.await;
started1_rx.await.unwrap();
assert_eq!(
started.load(Ordering::Acquire),
1,
"Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"Task should not have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Task should have cancelled"
);
assert_eq!(
exited.load(Ordering::Acquire),
0,
"Task should not have exited"
);
}
#[tokio::test]
async fn test_overlapping_tasks() {
let time_ms: u64 = 1;
let cancel_token = CancellationToken::new();
let debouncer = Debouncer::new(Duration::from_millis(time_ms), cancel_token, "test_task");
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let (ended2_tx, ended2_rx) = oneshot::channel();
let started_clone = started.clone();
let started_clone2 = started.clone();
let cancelled_clone2 = cancelled.clone();
let cancelled_clone = cancelled.clone();
let finished_clone = finished.clone();
let finished_clone2 = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(time_ms * 2)).await;
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
})
.await;
started1_rx.await.unwrap();
debouncer
.debounce(move |token| async move {
started_clone2.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled_clone2.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_millis(time_ms * 2)) => {
finished_clone2.fetch_add(1, Ordering::SeqCst);
}
}
ended2_tx.send(()).unwrap();
})
.await;
ended2_rx.await.unwrap();
sleep(Duration::from_millis(time_ms * 2)).await;
assert_eq!(
started.load(Ordering::Acquire),
2,
"Both tasks should have started"
);
assert_eq!(
finished.load(Ordering::Acquire),
1,
"Only last task should finish"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Only last task should be cancelled"
);
}
#[tokio::test]
async fn test_fixed_debouncer() {
let cancel_token = CancellationToken::new();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(100),
cancel_token,
"test_task",
move |_token| {
let counter = counter_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
},
);
for _ in 0..5 {
task_debouncer.debounce().await;
sleep(Duration::from_millis(10)).await;
}
sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn test_task_debouncer_with_cancellation() {
let cancel_token = CancellationToken::new();
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let finished_clone = finished.clone();
let task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(50),
cancel_token.clone(),
"test_task",
move |token| {
let started = started_clone.clone();
let cancelled = cancelled_clone.clone();
let finished = finished_clone.clone();
async move {
started.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished.fetch_add(1, Ordering::SeqCst);
}
}
}
},
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(started.load(Ordering::Acquire), 1);
cancel_token.cancel();
task_debouncer.stop().await;
assert_eq!(cancelled.load(Ordering::Acquire), 1);
assert_eq!(finished.load(Ordering::Acquire), 0);
}
#[tokio::test]
async fn test_task_debouncer_set_task() {
let cancel_token = CancellationToken::new();
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let counter1_clone = counter1.clone();
let mut task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(50),
cancel_token,
"test_task",
move |_token| {
let counter = counter1_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
},
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 0);
let counter2_clone = counter2.clone();
task_debouncer.set_task(move |_token| {
let counter = counter2_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
});
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1); assert_eq!(counter2.load(Ordering::Acquire), 1); }
#[tokio::test]
async fn test_max_wait_timeout() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 100;
let max_wait_ms: u64 = 250;
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
)
.with_max_wait(Duration::from_millis(max_wait_ms));
let started = Arc::new(AtomicU32::new(0));
let executed = Arc::new(AtomicU32::new(0));
let (tx, mut rx) = tokio::sync::mpsc::channel::<Instant>(32);
for _ in 0..10 {
let started_clone = started.clone();
let executed_clone = executed.clone();
let tx = tx.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
let exec_time = Instant::now();
tx.send(exec_time).await.unwrap();
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
} => {}
}
})
.await;
sleep(Duration::from_millis(50)).await;
}
let mut exec_times = Vec::new();
let timeout = sleep(Duration::from_millis(max_wait_ms + 100));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(time) = rx.recv() => exec_times.push(time),
_ = &mut timeout => break,
}
}
assert!(
exec_times.len() >= 2,
"Should have at least 2 executions due to max_wait"
);
assert_eq!(
started.load(Ordering::SeqCst),
executed.load(Ordering::SeqCst),
"All started tasks should have executed"
);
for window in exec_times.windows(2) {
let duration = window[1].duration_since(window[0]);
assert!(
duration <= Duration::from_millis(max_wait_ms + 50),
"Time between executions ({:?}) should not exceed max_wait ({:?})",
duration,
Duration::from_millis(max_wait_ms)
);
}
}
#[tokio::test]
async fn test_simple_debounce_events() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 100;
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<DebounceEvent>(32);
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
)
.with_event_handler(move |event| {
let event_tx = event_tx.clone();
tokio::spawn(async move {
let _ = event_tx.send(event).await;
});
});
let executed = Arc::new(AtomicU32::new(0));
let executed_clone = executed.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
} => {}
}
})
.await;
let executed_clone = executed.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
} => {}
}
})
.await;
let mut events = Vec::new();
let timeout = sleep(Duration::from_millis(debounce_ms * 3));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(event) = event_rx.recv() => events.push(event),
_ = &mut timeout => break,
}
}
assert_eq!(events.len(), 4, "Should have received 4 events");
let mut iter = events.iter();
if let Some(DebounceEvent::Debounced {
first_debounce_at,
instant,
debounce_ends_at,
}) = iter.next()
{
assert!(
first_debounce_at.is_none(),
"First debounce should have no first_debounce_at"
);
assert!(debounce_ends_at > instant, "Spawn time should be after now");
} else {
panic!("First event should be Debounced");
}
if let Some(DebounceEvent::Debounced {
first_debounce_at,
instant,
debounce_ends_at,
}) = iter.next()
{
assert!(
first_debounce_at.is_some(),
"Second debounce should have first_debounce_at set"
);
assert!(debounce_ends_at > instant, "Spawn time should be after now");
} else {
panic!("Second event should be Debounced");
}
if let Some(DebounceEvent::Started { instant: _ }) = iter.next() {
} else {
panic!("Third event should be Started");
}
if let Some(DebounceEvent::Ended {
instant: _,
exit_status: TaskExit::Normal,
}) = iter.next()
{
} else {
panic!("Fourth event should be Exit(Normal)");
}
assert_eq!(
executed.load(Ordering::SeqCst),
1,
"Only second task should have executed"
);
}
struct TestTask {
counter: Arc<AtomicU32>,
}
impl DebouncedTask for TestTask {
async fn execute(&self, token: CancellationToken) {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
self.counter.fetch_add(1, Ordering::SeqCst);
} => {}
}
}
}
#[tokio::test]
async fn test_trait_based_task_debouncer() {
let cancel_token = CancellationToken::new();
let counter = Arc::new(AtomicU32::new(0));
let task = TestTask {
counter: counter.clone(),
};
let task_debouncer =
TaskDebouncer::new(Duration::from_millis(100), cancel_token, "test_task", task);
for _ in 0..5 {
task_debouncer.debounce().await;
sleep(Duration::from_millis(10)).await;
}
sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::Acquire), 1);
}
struct CancellableTestTask {
started: Arc<AtomicU32>,
cancelled: Arc<AtomicU32>,
finished: Arc<AtomicU32>,
}
impl DebouncedTask for CancellableTestTask {
async fn execute(&self, token: CancellationToken) {
self.started.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
self.cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
self.finished.fetch_add(1, Ordering::SeqCst);
}
}
}
}
#[tokio::test]
async fn test_trait_based_task_debouncer_with_cancellation() {
let cancel_token = CancellationToken::new();
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let task = CancellableTestTask {
started: started.clone(),
cancelled: cancelled.clone(),
finished: finished.clone(),
};
let task_debouncer = TaskDebouncer::new(
Duration::from_millis(50),
cancel_token.clone(),
"test_task",
task,
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(started.load(Ordering::Acquire), 1);
cancel_token.cancel();
task_debouncer.stop().await;
assert_eq!(cancelled.load(Ordering::Acquire), 1);
assert_eq!(finished.load(Ordering::Acquire), 0);
}
#[tokio::test]
async fn test_trait_based_task_debouncer_set_task() {
let cancel_token = CancellationToken::new();
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let task1 = TestTask {
counter: counter1.clone(),
};
let mut task_debouncer =
TaskDebouncer::new(Duration::from_millis(50), cancel_token, "test_task", task1);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 0);
let task2 = TestTask {
counter: counter2.clone(),
};
task_debouncer.set_task(task2);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1); assert_eq!(counter2.load(Ordering::Acquire), 1); }
}