pub mod fold;
use std::{
time::{Instant, Duration},
collections::VecDeque,
marker::PhantomData,
sync::{Mutex, Condvar, Arc, MutexGuard},
fmt,
error,
};
pub fn debouncer<R, D, F>(debounce_time: Duration, fold: F)
-> (DebouncerTx<R, D, F>, DebouncerRx<D>)
where
F: Fn(Option<D>, R) -> D,
{
let shared_state = Arc::new(Debouncer::new(debounce_time, 1, 1));
let tx = DebouncerTx {
debouncer: shared_state.clone(),
fold,
_raw_event_marker: PhantomData,
};
let rx = DebouncerRx {
debouncer: shared_state,
};
(tx, rx)
}
pub struct DebouncerTx<R, D, F> {
debouncer: Arc<Debouncer<D>>,
fold: F,
_raw_event_marker: PhantomData<R>,
}
impl<R, D, F> DebouncerTx<R, D, F>
where
F: Fn(Option<D>, R) -> D,
{
pub fn send(&self, event: R) -> Result<(), SendError<R>> {
self.debouncer.push(event, &self.fold)
}
}
impl<R, D, F> Clone for DebouncerTx<R, D, F>
where
F: Clone,
{
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_tx()
.expect("debouncer tx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
fold: self.fold.clone(),
_raw_event_marker: PhantomData,
}
}
}
impl<R, D, F> Drop for DebouncerTx<R, D, F> {
fn drop(&mut self) {
let remaining_tx = {
let mut state_guard = self.debouncer.lock_state();
state_guard.remove_tx()
};
if remaining_tx == 0 {
self.debouncer.event_ready_wait_cvar.notify_all();
self.debouncer.queue_wait_cvar.notify_all();
}
}
}
pub struct DebouncerRx<D> {
debouncer: Arc<Debouncer<D>>
}
impl<D> DebouncerRx<D> {
pub fn recv(&self) -> Result<D, ReceiveError> {
self.debouncer.pop()
}
pub fn try_recv(&self) -> Result<Option<D>, ReceiveError> {
self.debouncer.try_pop()
}
}
impl<D> Clone for DebouncerRx<D> {
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_rx()
.expect("debouncer rx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
}
}
}
impl<D> Drop for DebouncerRx<D> {
fn drop(&mut self) {
self.debouncer.lock_state().remove_rx();
}
}
pub struct SendError<T>(pub T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("sending through a closed debouncer", f)
}
}
impl<T> error::Error for SendError<T> {}
#[derive(Debug)]
pub struct ReceiveError;
impl fmt::Display for ReceiveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("receiving from a closed debouncer", f)
}
}
impl error::Error for ReceiveError {}
struct Debouncer<T> {
state: Mutex<DebouncerState<T>>,
debounce_time: Duration,
queue_wait_cvar: Condvar,
event_ready_wait_cvar: Condvar,
}
impl<T> Debouncer<T> {
fn new(debounce_time: Duration, tx_count: usize, rx_count: usize) -> Self {
Self {
state: Mutex::new(DebouncerState::new(tx_count, rx_count)),
debounce_time,
queue_wait_cvar: Condvar::new(),
event_ready_wait_cvar: Condvar::new(),
}
}
fn lock_state(&self) -> MutexGuard<DebouncerState<T>> {
match self.state.lock() {
Ok(guard) => guard,
Err(err) => err.into_inner(),
}
}
fn push<R, F>(&self, raw_event: R, fold: F) -> Result<(), SendError<R>>
where
F: Fn(Option<T>, R) -> T,
{
let now = Instant::now();
let push_outcome = {
let mut state_guard = self.lock_state();
if state_guard.has_no_rxs() {
return Err(SendError(raw_event));
}
state_guard.acc_queue.push_latest(raw_event, fold, now, self.debounce_time)
};
if matches!(push_outcome, PushOutcome::NewAcc) {
self.queue_wait_cvar.notify_one();
}
Ok(())
}
fn pop(&self) -> Result<T, ReceiveError> {
enum PopOutcome<T> {
Event(T),
Shutdown,
}
let mut state_guard = self.lock_state();
let result = 'result: {
if state_guard.has_no_txs() {
break 'result PopOutcome::Shutdown;
}
'pop_event_outer_loop: loop {
if state_guard.acc_queue.is_empty() {
state_guard = self.queue_wait_cvar.wait(state_guard).unwrap();
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
}
break loop {
let required_wait_time = {
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
continue 'pop_event_outer_loop;
};
let now = Instant::now();
peeked_acc.ready_time.checked_duration_since(now)
};
match required_wait_time {
Some(wait_time) if !wait_time.is_zero() => {
(state_guard, _) = self.event_ready_wait_cvar
.wait_timeout(state_guard, wait_time)
.unwrap();
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
continue;
},
_ => {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break PopOutcome::Event(popped_acc),
None => continue 'pop_event_outer_loop,
}
},
}
};
}
};
match result {
PopOutcome::Event(event) => Ok(event),
PopOutcome::Shutdown => {
state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.ok_or(ReceiveError)
},
}
}
fn try_pop(&self) -> Result<Option<T>, ReceiveError> {
let mut state_guard = self.lock_state();
if state_guard.has_no_txs() {
return state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.map(Some)
.ok_or(ReceiveError);
}
let now = Instant::now();
loop {
let first_acc_ready = {
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
break Ok(None);
};
peeked_acc.ready_time <= now
};
if first_acc_ready {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break Ok(Some(popped_acc)),
None => continue,
}
} else {
break Ok(None);
}
}
}
}
struct DebouncerState<T> {
acc_queue: EventAccQueue<T>,
tx_count: usize,
rx_count: usize,
}
impl<T> DebouncerState<T> {
fn new(tx_count: usize, rx_count: usize) -> Self {
Self {
acc_queue: EventAccQueue::new(),
tx_count,
rx_count,
}
}
fn has_no_txs(&self) -> bool {
self.tx_count == 0
}
fn add_tx(&mut self) -> Result<(), CountOverflowError> {
self.tx_count = self.tx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn remove_tx(&mut self) -> usize {
self.tx_count = self.tx_count.saturating_sub(1);
self.tx_count
}
fn has_no_rxs(&self) -> bool {
self.rx_count == 0
}
fn add_rx(&mut self) -> Result<(), CountOverflowError> {
self.rx_count = self.rx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn remove_rx(&mut self) -> usize {
self.rx_count = self.rx_count.saturating_sub(1);
self.rx_count
}
}
struct EventAccQueue<T> {
inner: VecDeque<EventAcc<T>>,
}
impl<T> EventAccQueue<T> {
fn new() -> Self {
Self { inner: VecDeque::new() }
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn peek_oldest(&self) -> Option<&EventAcc<T>> {
self.inner.front()
}
fn pop_oldest(&mut self) -> Option<EventAcc<T>> {
self.inner.pop_front()
}
fn pop_oldest_acc_discard_none(&mut self) -> Option<T> {
while let Some(event) = self.pop_oldest() {
if let Some(acc) = event.into_acc() {
return Some(acc);
}
}
return None;
}
fn push_latest<R, F>(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration)
-> PushOutcome
where
F: FnOnce(Option<T>, R) -> T,
{
match self.inner
.back_mut()
.and_then(|acc| (acc.ready_time > now).then_some(acc))
{
Some(event) => {
event.fold(raw_event, f);
PushOutcome::NoNewAcc
},
None => {
let ready_time = now + debounce_time;
let event = EventAcc::new_from_fold(raw_event, f, ready_time);
self.inner.push_back(event);
PushOutcome::NewAcc
},
}
}
}
enum PushOutcome {
NewAcc,
NoNewAcc,
}
struct EventAcc<T> {
acc: Option<T>,
ready_time: Instant,
}
impl<T> EventAcc<T> {
fn new_from_fold<R, F>(raw_event: R, f: F, ready_time: Instant) -> Self
where
F: FnOnce(Option<T>, R) -> T,
{
Self {
acc: Some(f(None, raw_event)),
ready_time,
}
}
fn fold<R, F>(&mut self, raw_event: R, f: F)
where
F: FnOnce(Option<T>, R) -> T,
{
let acc = self.acc.take();
self.acc = Some(f(acc, raw_event));
}
fn into_acc(self) -> Option<T> {
self.acc
}
}
#[derive(Debug)]
struct CountOverflowError;
#[cfg(test)]
mod tests {
use std::{time::{Duration, Instant}, thread};
use super::{debouncer, fold};
#[test]
fn test_debounce() {
let (tx, rx) = debouncer(Duration::from_millis(50), fold::fold_vec_push::<u8>);
for i in 0..3 {
for j in 0..10 {
tx.send(i * 10 + j).unwrap();
thread::sleep(Duration::from_millis(4));
}
thread::sleep(Duration::from_millis(20));
}
assert_eq!(rx.recv().unwrap(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(rx.recv().unwrap(), &[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
assert_eq!(rx.recv().unwrap(), &[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]);
}
#[test]
fn test_debouncer_shutdown() {
let (tx, rx) = debouncer(Duration::from_millis(100), fold::fold_vec_push::<u8>);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
drop(tx);
assert_eq!(rx.recv().unwrap(), &[1, 2, 3]);
assert!(rx.recv().is_err());
}
#[test]
fn test_multi_tx() {
let (tx1, rx) = debouncer(Duration::from_millis(100), fold::fold_vec_push::<u8>);
let tx2 = tx1.clone();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
assert_eq!(rx.recv().unwrap(), &[1, 2]);
drop(tx1);
tx2.send(3).unwrap();
tx2.send(4).unwrap();
assert_eq!(rx.recv().unwrap(), &[3, 4]);
let start_time = Instant::now();
tx2.send(5).unwrap();
tx2.send(6).unwrap();
assert_eq!(rx.recv().unwrap(), &[5, 6]);
assert!(Instant::now().duration_since(start_time) >= Duration::from_millis(100));
tx2.send(7).unwrap();
tx2.send(8).unwrap();
drop(tx2);
assert_eq!(rx.recv().unwrap(), &[7, 8]);
assert!(rx.recv().is_err());
}
#[test]
fn test_multi_rx() {
let (tx, rx1) = debouncer(Duration::from_millis(100), fold::fold_vec_push::<u8>);
let rx2 = rx1.clone();
tx.send(1).unwrap();
tx.send(2).unwrap();
assert_eq!(rx1.recv().unwrap(), &[1, 2]);
tx.send(3).unwrap();
tx.send(4).unwrap();
assert_eq!(rx2.recv().unwrap(), &[3, 4]);
drop(rx1);
tx.send(5).unwrap();
tx.send(6).unwrap();
assert_eq!(rx2.recv().unwrap(), &[5, 6]);
drop(rx2);
assert!(tx.send(7).is_err());
}
}