use super::ThreadPool;
use crate::fd_monitor::FdEventSignaller;
use crate::fd_readable_set;
use std::num::NonZeroU64;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub(super) type WorkItem = Box<dyn FnOnce() + 'static + Send>;
#[derive(Clone)]
pub struct Debounce<R> {
pool: Arc<ThreadPool>,
timeout: Duration,
data: Arc<Mutex<DebounceData<R>>>,
event_signaller: Arc<FdEventSignaller>,
}
struct DebounceData<R> {
next_req: Option<WorkItem>,
active_token: Option<NonZeroU64>,
next_token: NonZeroU64,
start_time: Instant,
result: Option<R>,
}
impl<R: Send + 'static> Debounce<R> {
pub fn new(
pool: &Arc<ThreadPool>,
event_signaller: &Arc<FdEventSignaller>,
timeout: Duration,
) -> Self {
Self {
pool: Arc::clone(pool),
timeout,
event_signaller: Arc::clone(event_signaller),
data: Arc::new(Mutex::new(DebounceData {
next_req: None,
active_token: None,
next_token: NonZeroU64::new(1).unwrap(),
start_time: Instant::now(),
result: None,
})),
}
}
fn run_next(data: &Mutex<DebounceData<R>>, token: NonZeroU64) -> bool {
let request = {
let mut data = data.lock().expect("Mutex poisoned!");
if let Some(req) = data.next_req.take() {
data.start_time = Instant::now();
req
} else {
if Some(token) == data.active_token {
data.active_token = None;
}
return false;
}
};
(request)();
true
}
pub fn perform_void(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 {
self.perform_inner(Box::new(handler))
}
pub fn perform<Handler>(&self, handler: Handler) -> NonZeroU64
where
Handler: FnOnce() -> R + 'static + Send,
{
let data = Arc::clone(&self.data);
let event_signaller = Arc::clone(&self.event_signaller);
let work_item = Box::new(move || {
let result = handler();
data.lock().unwrap().result = Some(result);
event_signaller.post();
});
self.perform_inner(work_item)
}
fn perform_inner(&self, work_item: WorkItem) -> NonZeroU64 {
let mut spawn = false;
let active_token = {
let mut data = self.data.lock().expect("Mutex poisoned!");
data.next_req = Some(work_item);
if data.active_token.is_some()
&& !self.timeout.is_zero()
&& (Instant::now() - data.start_time > self.timeout)
{
data.active_token = None;
}
if data.active_token.is_none() {
spawn = true;
data.active_token = Some(data.next_token);
data.next_token = data.next_token.checked_add(1).unwrap();
data.start_time = Instant::now();
}
data.active_token.expect("Something should be active now.")
};
if spawn {
let data = Arc::clone(&self.data);
self.pool.perform(move || {
while Self::run_next(&data, active_token) {
}
});
}
active_token
}
pub fn take_result(&mut self) -> Option<R> {
self.data.lock().unwrap().result.take()
}
pub fn take_result_with_timeout(&mut self, timeout: Duration) -> Option<R> {
let timeout = fd_readable_set::Timeout::Duration(timeout);
if fd_readable_set::is_fd_readable(self.event_signaller.read_fd(), timeout) {
self.take_result()
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::{Debounce, ThreadPool};
use crate::fd_monitor::FdEventSignaller;
use crate::global_safety::RelaxedAtomicBool;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Condvar, Mutex,
};
use std::time::Duration;
#[test]
fn test_debounce() {
let pool = ThreadPool::new(1, 16);
let event_signaller = Arc::new(FdEventSignaller::new());
let mut db = Debounce::new(&pool, &event_signaller, Duration::from_secs(0));
const COUNT: usize = 8;
let mut result_ready: [bool; COUNT] = Default::default();
struct Context {
handler_ran: [RelaxedAtomicBool; COUNT],
ready_to_go: Mutex<bool>,
cv: Condvar,
}
let ctx = Arc::new(Context {
handler_ran: std::array::from_fn(|_i| RelaxedAtomicBool::new(false)),
ready_to_go: Mutex::new(false),
cv: Condvar::new(),
});
for idx in 0..COUNT {
assert!(!ctx.handler_ran[idx].load());
let performer = {
let ctx = ctx.clone();
move || {
let guard = ctx.ready_to_go.lock().unwrap();
let _guard = ctx.cv.wait_while(guard, |ready| !*ready).unwrap();
ctx.handler_ran[idx].store(true);
idx
}
};
db.perform(performer);
}
*ctx.ready_to_go.lock().unwrap() = true;
ctx.cv.notify_all();
while !result_ready.last().unwrap() {
if let Some(result_idx) = db.take_result() {
result_ready[result_idx] = true;
}
}
assert!(ctx.handler_ran.last().unwrap().load());
assert!(result_ready.last().unwrap());
let mut total_ran = 0;
for idx in 0..COUNT {
if ctx.handler_ran[idx].load() {
total_ran += 1;
}
}
assert!(total_ran <= 2);
}
#[test]
fn test_debounce_timeout() {
let pool = ThreadPool::new(1, 16);
let event_signaller = Arc::new(FdEventSignaller::new());
let timeout = Duration::from_millis(500);
let db = Debounce::new(&pool, &event_signaller, timeout);
struct Data {
db: Debounce<usize>,
exit_ok: Mutex<bool>,
cv: Condvar,
running: AtomicU32,
}
let data = Arc::new(Data {
db,
exit_ok: Mutex::new(false),
cv: Condvar::new(),
running: AtomicU32::new(0),
});
let handler = {
let data = data.clone();
move || {
data.running.fetch_add(1, Ordering::Relaxed);
let guard = data.exit_ok.lock().unwrap();
let _guard = data.cv.wait_while(guard, |exit_ok| !*exit_ok);
}
};
let token1 = data.db.perform_void(handler.clone());
let token2 = data.db.perform_void(handler.clone());
assert_eq!(token1, token2);
std::thread::sleep(timeout + timeout / 2);
assert_eq!(data.running.load(Ordering::Relaxed), 1);
let token3 = data.db.perform_void(handler);
assert!(token3 > token2);
let mut exit_ok = data.exit_ok.lock().unwrap();
*exit_ok = true;
data.cv.notify_all();
}
}