use bouncing::{
DebounceEvent, DebouncedTask, Debouncer, StoredTaskDebouncer, TaskDebouncer, TaskExit,
};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
#[tokio::test]
async fn test_basic_debounce() {
let cancel_token = CancellationToken::new();
let debouncer = Debouncer::new(Duration::from_millis(100), cancel_token.clone(), "test_task");
let counter = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
for _ in 0..5 {
let counter = counter.clone();
let cancelled = cancelled.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {
cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = async {
counter.fetch_add(1, Ordering::SeqCst);
} => {}
}
})
.await;
sleep(Duration::from_millis(10)).await;
}
tokio::select! {
_ = sleep(Duration::from_millis(200)) => {
assert_eq!(counter.load(Ordering::Acquire), 1);
assert_eq!(cancelled.load(Ordering::Acquire), 0);
}
_ = cancel_token.cancelled() => {
panic!("Test cancelled unexpectedly");
}
}
}
#[tokio::test]
async fn test_cancel_task() {
let cancel_token = CancellationToken::new();
let task_ms: u64 = 1;
let debouncer = Debouncer::new(
Duration::from_millis(task_ms),
cancel_token.clone(),
"test_task",
);
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
.await;
started1_rx.await.unwrap();
assert_eq!(
started.load(Ordering::Acquire),
1,
"Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"Task should not have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Task should have cancelled"
);
assert_eq!(exited.load(Ordering::Acquire), 1, "Task should have exited");
}
#[tokio::test]
async fn test_rapid_debounce_tasks() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 15;
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
);
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let trigger_task = || {
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
};
for _ in 0..5 {
trigger_task().await;
}
tokio::time::sleep(Duration::from_millis(debounce_ms * 2)).await;
assert_eq!(
started.load(Ordering::Acquire),
1,
"Only 1 Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"No Task should have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Only 1 task should have cancelled"
);
assert_eq!(
exited.load(Ordering::Acquire),
1,
"Only 1 task should have exited"
);
}
#[tokio::test]
async fn test_cancel_zombie_task() {
let cancel_token = CancellationToken::new();
let time_ms: u64 = 1;
let debouncer = Debouncer::new(
Duration::from_millis(time_ms),
cancel_token.clone(),
"test_task",
)
.with_task_timeout(Duration::from_millis(time_ms));
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let exited = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let excited_clone = exited.clone();
let finished_clone = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
loop {
sleep(Duration::from_millis(1)).await;
}
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
excited_clone.fetch_add(1, Ordering::SeqCst);
})
.await;
started1_rx.await.unwrap();
assert_eq!(
started.load(Ordering::Acquire),
1,
"Task should have started"
);
cancel_token.cancel();
debouncer.stop().await;
assert_eq!(
finished.load(Ordering::Acquire),
0,
"Task should not have finished"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Task should have cancelled"
);
assert_eq!(
exited.load(Ordering::Acquire),
0,
"Task should not have exited"
);
}
#[tokio::test]
async fn test_overlapping_tasks() {
let time_ms: u64 = 1;
let cancel_token = CancellationToken::new();
let debouncer = Debouncer::new(Duration::from_millis(time_ms), cancel_token, "test_task");
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let (started1_tx, started1_rx) = oneshot::channel();
let (ended2_tx, ended2_rx) = oneshot::channel();
let started_clone = started.clone();
let started_clone2 = started.clone();
let cancelled_clone2 = cancelled.clone();
let cancelled_clone = cancelled.clone();
let finished_clone = finished.clone();
let finished_clone2 = finished.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(time_ms * 2)).await;
started1_tx.send(()).unwrap();
tokio::select! {
_ = token.cancelled() => {
cancelled_clone.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished_clone.fetch_add(1, Ordering::SeqCst);
}
}
})
.await;
started1_rx.await.unwrap();
debouncer
.debounce(move |token| async move {
started_clone2.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled_clone2.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_millis(time_ms * 2)) => {
finished_clone2.fetch_add(1, Ordering::SeqCst);
}
}
ended2_tx.send(()).unwrap();
})
.await;
ended2_rx.await.unwrap();
sleep(Duration::from_millis(time_ms * 2)).await;
assert_eq!(
started.load(Ordering::Acquire),
2,
"Both tasks should have started"
);
assert_eq!(
finished.load(Ordering::Acquire),
1,
"Only last task should finish"
);
assert_eq!(
cancelled.load(Ordering::Acquire),
1,
"Only last task should be cancelled"
);
}
#[tokio::test]
async fn test_fixed_debouncer() {
let cancel_token = CancellationToken::new();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(100),
cancel_token,
"test_task",
move |_token| {
let counter = counter_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
},
);
for _ in 0..5 {
task_debouncer.debounce().await;
sleep(Duration::from_millis(10)).await;
}
sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn test_task_debouncer_with_cancellation() {
let cancel_token = CancellationToken::new();
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let started_clone = started.clone();
let cancelled_clone = cancelled.clone();
let finished_clone = finished.clone();
let task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(50),
cancel_token.clone(),
"test_task",
move |token| {
let started = started_clone.clone();
let cancelled = cancelled_clone.clone();
let finished = finished_clone.clone();
async move {
started.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
finished.fetch_add(1, Ordering::SeqCst);
}
}
}
},
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(started.load(Ordering::Acquire), 1);
cancel_token.cancel();
task_debouncer.stop().await;
assert_eq!(cancelled.load(Ordering::Acquire), 1);
assert_eq!(finished.load(Ordering::Acquire), 0);
}
#[tokio::test]
async fn test_task_debouncer_set_task() {
let cancel_token = CancellationToken::new();
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let counter1_clone = counter1.clone();
let mut task_debouncer = StoredTaskDebouncer::new(
Duration::from_millis(50),
cancel_token,
"test_task",
move |_token| {
let counter = counter1_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
},
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 0);
let counter2_clone = counter2.clone();
task_debouncer.set_task(move |_token| {
let counter = counter2_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
}
});
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn test_max_wait_timeout() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 100;
let max_wait_ms: u64 = 250;
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
)
.with_max_wait(Duration::from_millis(max_wait_ms));
let started = Arc::new(AtomicU32::new(0));
let executed = Arc::new(AtomicU32::new(0));
let (tx, mut rx) = tokio::sync::mpsc::channel::<tokio::time::Instant>(32);
for _ in 0..10 {
let started_clone = started.clone();
let executed_clone = executed.clone();
let tx = tx.clone();
debouncer
.debounce(move |token| async move {
started_clone.fetch_add(1, Ordering::SeqCst);
let exec_time = tokio::time::Instant::now();
tx.send(exec_time).await.unwrap();
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
} => {}
}
})
.await;
sleep(Duration::from_millis(50)).await;
}
let mut exec_times = Vec::new();
let timeout = sleep(Duration::from_millis(max_wait_ms + 100));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(time) = rx.recv() => exec_times.push(time),
_ = &mut timeout => break,
}
}
assert!(
exec_times.len() >= 2,
"Should have at least 2 executions due to max_wait"
);
assert_eq!(
started.load(Ordering::SeqCst),
executed.load(Ordering::SeqCst),
"All started tasks should have executed"
);
for window in exec_times.windows(2) {
let duration = window[1].duration_since(window[0]);
assert!(
duration <= Duration::from_millis(max_wait_ms + 50),
"Time between executions ({:?}) should not exceed max_wait ({:?})",
duration,
Duration::from_millis(max_wait_ms)
);
}
}
#[tokio::test]
async fn test_simple_debounce_events() {
let cancel_token = CancellationToken::new();
let debounce_ms: u64 = 100;
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<DebounceEvent>(32);
let debouncer = Debouncer::new(
Duration::from_millis(debounce_ms),
cancel_token.clone(),
"test_task",
)
.with_event_handler(move |event| {
let event_tx = event_tx.clone();
tokio::spawn(async move {
let _ = event_tx.send(event).await;
});
});
let executed = Arc::new(AtomicU32::new(0));
let executed_clone = executed.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
} => {}
}
})
.await;
let executed_clone = executed.clone();
debouncer
.debounce(move |token| async move {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
executed_clone.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
} => {}
}
})
.await;
let mut events = Vec::new();
let timeout = sleep(Duration::from_millis(debounce_ms * 3));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(event) = event_rx.recv() => events.push(event),
_ = &mut timeout => break,
}
}
assert_eq!(events.len(), 4, "Should have received 4 events");
let mut iter = events.iter();
if let Some(DebounceEvent::Debounced {
first_debounce_at,
instant,
debounce_ends_at,
}) = iter.next()
{
assert!(
first_debounce_at.is_none(),
"First debounce should have no first_debounce_at"
);
assert!(debounce_ends_at > instant, "Spawn time should be after now");
} else {
panic!("First event should be Debounced");
}
if let Some(DebounceEvent::Debounced {
first_debounce_at,
instant,
debounce_ends_at,
}) = iter.next()
{
assert!(
first_debounce_at.is_some(),
"Second debounce should have first_debounce_at set"
);
assert!(debounce_ends_at > instant, "Spawn time should be after now");
} else {
panic!("Second event should be Debounced");
}
if let Some(DebounceEvent::Started { instant: _ }) = iter.next() {
} else {
panic!("Third event should be Started");
}
if let Some(DebounceEvent::Ended {
instant: _,
exit_status: TaskExit::Normal,
}) = iter.next()
{
} else {
panic!("Fourth event should be Exit(Normal)");
}
assert_eq!(
executed.load(Ordering::SeqCst),
1,
"Only second task should have executed"
);
}
struct TestTask {
counter: Arc<AtomicU32>,
}
impl DebouncedTask for TestTask {
async fn execute(&self, token: CancellationToken) {
tokio::select! {
_ = token.cancelled() => {}
_ = async {
self.counter.fetch_add(1, Ordering::SeqCst);
} => {}
}
}
}
#[tokio::test]
async fn test_trait_based_task_debouncer() {
let cancel_token = CancellationToken::new();
let counter = Arc::new(AtomicU32::new(0));
let task = TestTask {
counter: counter.clone(),
};
let task_debouncer =
TaskDebouncer::new(Duration::from_millis(100), cancel_token, "test_task", task);
for _ in 0..5 {
task_debouncer.debounce().await;
sleep(Duration::from_millis(10)).await;
}
sleep(Duration::from_millis(200)).await;
assert_eq!(counter.load(Ordering::Acquire), 1);
}
struct CancellableTestTask {
started: Arc<AtomicU32>,
cancelled: Arc<AtomicU32>,
finished: Arc<AtomicU32>,
}
impl DebouncedTask for CancellableTestTask {
async fn execute(&self, token: CancellationToken) {
self.started.fetch_add(1, Ordering::SeqCst);
tokio::select! {
_ = token.cancelled() => {
self.cancelled.fetch_add(1, Ordering::SeqCst);
}
_ = sleep(Duration::from_secs(10)) => {
self.finished.fetch_add(1, Ordering::SeqCst);
}
}
}
}
#[tokio::test]
async fn test_trait_based_task_debouncer_with_cancellation() {
let cancel_token = CancellationToken::new();
let started = Arc::new(AtomicU32::new(0));
let cancelled = Arc::new(AtomicU32::new(0));
let finished = Arc::new(AtomicU32::new(0));
let task = CancellableTestTask {
started: started.clone(),
cancelled: cancelled.clone(),
finished: finished.clone(),
};
let task_debouncer = TaskDebouncer::new(
Duration::from_millis(50),
cancel_token.clone(),
"test_task",
task,
);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(started.load(Ordering::Acquire), 1);
cancel_token.cancel();
task_debouncer.stop().await;
assert_eq!(cancelled.load(Ordering::Acquire), 1);
assert_eq!(finished.load(Ordering::Acquire), 0);
}
#[tokio::test]
async fn test_trait_based_task_debouncer_set_task() {
let cancel_token = CancellationToken::new();
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let task1 = TestTask {
counter: counter1.clone(),
};
let mut task_debouncer =
TaskDebouncer::new(Duration::from_millis(50), cancel_token, "test_task", task1);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 0);
let task2 = TestTask {
counter: counter2.clone(),
};
task_debouncer.set_task(task2);
task_debouncer.debounce().await;
sleep(Duration::from_millis(100)).await;
assert_eq!(counter1.load(Ordering::Acquire), 1);
assert_eq!(counter2.load(Ordering::Acquire), 1);
}