use crate::{
context::{Context, RcDerefMut},
observable::{CoreObservable, ObservableType},
observer::Observer,
scheduler::{Duration, Scheduler, Task, TaskHandle, TaskState},
subscription::{SourceWithHandle, Subscription},
};
#[doc(alias = "debounceTime")]
#[derive(Clone)]
pub struct Debounce<S, Sch> {
pub source: S,
pub duration: Duration,
pub scheduler: Sch,
}
impl<S, Sch> ObservableType for Debounce<S, Sch>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
pub type DebounceSubscription<U, H> = SourceWithHandle<U, H>;
pub struct DebounceObserver<P, Sch, H, V> {
pub observer: P,
pub scheduler: Sch,
pub duration: Duration,
pub handle_state: H,
pub trailing_value: V,
}
fn debounce_emit_handler<P, Item, Err, V>(task_state: &mut (P, V)) -> TaskState
where
P: Observer<Item, Err>,
V: RcDerefMut<Target = Option<Item>>,
{
let (observer, value_rc) = task_state;
if let Some(value) = value_rc.rc_deref_mut().take() {
observer.next(value);
}
TaskState::Finished
}
impl<P, Sch, Item, Err, H, V> Observer<Item, Err> for DebounceObserver<P, Sch, H, V>
where
P: Observer<Item, Err> + Clone,
H: RcDerefMut<Target = Option<TaskHandle>>,
V: RcDerefMut<Target = Option<Item>> + Clone,
Sch: Scheduler<Task<(P, V)>>,
{
fn next(&mut self, v: Item) {
if let Some(handle) = self.handle_state.rc_deref_mut().take() {
handle.unsubscribe();
}
*self.trailing_value.rc_deref_mut() = Some(v);
let task_state = (self.observer.clone(), self.trailing_value.clone());
let task = Task::new(task_state, debounce_emit_handler::<P, Item, Err, V>);
let handle = self.scheduler.schedule(task, Some(self.duration));
*self.handle_state.rc_deref_mut() = Some(handle);
}
fn error(self, e: Err) {
if let Some(handle) = self.handle_state.rc_deref_mut().take() {
handle.unsubscribe();
}
self.observer.error(e);
}
fn complete(self) {
if let Some(handle) = self.handle_state.rc_deref_mut().take() {
handle.unsubscribe();
}
let trailing = self.trailing_value.rc_deref_mut().take();
if let Some(value) = trailing {
self.observer.clone().next(value);
}
self.observer.complete();
}
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
impl<S, Sch, C, Unsub> CoreObservable<C> for Debounce<S, Sch>
where
C: Context,
C::RcMut<Option<TaskHandle>>: Subscription,
S: for<'a> CoreObservable<
C::With<
DebounceObserver<
C::RcMut<Option<C::Inner>>,
Sch,
C::RcMut<Option<TaskHandle>>,
C::RcMut<Option<<S as ObservableType>::Item<'a>>>,
>,
>,
Unsub = Unsub,
>,
Unsub: Subscription,
{
type Unsub = DebounceSubscription<Unsub, C::RcMut<Option<TaskHandle>>>;
fn subscribe(self, context: C) -> Self::Unsub {
let Debounce { source, duration, scheduler } = self;
let handle_state: C::RcMut<Option<TaskHandle>> = C::RcMut::from(None);
let trailing_value = C::RcMut::from(None);
let wrapped = context.transform(|observer| DebounceObserver {
observer: C::RcMut::from(Some(observer)),
scheduler,
duration,
handle_state: handle_state.clone(),
trailing_value,
});
let source_sub = source.subscribe(wrapped);
SourceWithHandle::new(source_sub, handle_state)
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use super::*;
use crate::prelude::*;
#[rxrust_macro::test(local)]
async fn test_debounce_emits_last_after_quiet_period() {
let values = Arc::new(Mutex::new(Vec::new()));
let values_c = values.clone();
Local::interval(Duration::from_millis(20))
.take(5)
.debounce(Duration::from_millis(30))
.subscribe(move |v| {
values_c.lock().unwrap().push(v);
});
LocalScheduler
.sleep(Duration::from_millis(200))
.await;
let result = values.lock().unwrap().clone();
assert_eq!(result, vec![4]);
}
#[rxrust_macro::test]
fn test_debounce_emits_each_when_spacing_exceeds_duration() {
use std::{cell::RefCell, rc::Rc};
use crate::{context::TestCtx, factory::ObservableFactory, prelude::TestScheduler};
TestScheduler::init();
let values = Rc::new(RefCell::new(Vec::new()));
let values_c = values.clone();
let mut subject = TestCtx::subject::<i32, std::convert::Infallible>();
subject
.clone()
.debounce(Duration::from_millis(20))
.subscribe(move |v| {
values_c.borrow_mut().push(v);
});
subject.next(0);
TestScheduler::advance_by(Duration::from_millis(30));
subject.next(1);
TestScheduler::advance_by(Duration::from_millis(30));
subject.next(2);
subject.complete();
let result = values.borrow().clone();
assert_eq!(result, vec![0, 1, 2]);
}
#[rxrust_macro::test(local)]
async fn test_debounce_complete_emits_trailing() {
let values = Arc::new(Mutex::new(Vec::new()));
let values_c = values.clone();
let completed = Arc::new(Mutex::new(false));
let completed_c = completed.clone();
Local::of(42)
.debounce(Duration::from_millis(100))
.on_complete(move || *completed_c.lock().unwrap() = true)
.subscribe(move |v| {
values_c.lock().unwrap().push(v);
});
LocalScheduler
.sleep(Duration::from_millis(20))
.await;
let result = values.lock().unwrap().clone();
assert_eq!(result, vec![42]);
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test(local)]
async fn test_debounce_unsubscribe_cancels_pending() {
let values = Arc::new(Mutex::new(Vec::new()));
let values_c = values.clone();
let mut subject = Local::subject::<i32, std::convert::Infallible>();
let subscription = subject
.clone()
.debounce(Duration::from_millis(50))
.subscribe(move |v| {
values_c.lock().unwrap().push(v);
});
subject.next(42);
subscription.unsubscribe();
LocalScheduler
.sleep(Duration::from_millis(100))
.await;
let result = values.lock().unwrap().clone();
assert!(result.is_empty());
}
#[rxrust_macro::test]
async fn test_debounce_with_shared_scheduler() {
let values = Arc::new(Mutex::new(Vec::new()));
let values_c = values.clone();
let completed = Arc::new(Mutex::new(false));
let completed_c = completed.clone();
Shared::of(1)
.merge(Shared::of(2))
.merge(Shared::of(3))
.debounce(Duration::from_millis(10))
.on_complete(move || *completed_c.lock().unwrap() = true)
.subscribe(move |v| {
values_c.lock().unwrap().push(v);
});
LocalScheduler
.sleep(Duration::from_millis(50))
.await;
let result = values.lock().unwrap().clone();
assert_eq!(result.len(), 1);
assert!(*completed.lock().unwrap());
}
#[rxrust_macro::test]
fn test_debounce_subscription_closes_immediately_on_sync_complete() {
use std::{cell::RefCell, rc::Rc};
use crate::{context::TestCtx, prelude::TestScheduler, subscription::Subscription};
TestScheduler::init();
let values = Rc::new(RefCell::new(Vec::new()));
let values_c = values.clone();
let completed = Rc::new(RefCell::new(false));
let completed_c = completed.clone();
let subscription = TestCtx::of(42)
.debounce(Duration::from_millis(100))
.on_complete(move || *completed_c.borrow_mut() = true)
.subscribe(move |v| values_c.borrow_mut().push(v));
assert_eq!(*values.borrow(), vec![42]);
assert!(*completed.borrow());
assert!(subscription.is_closed());
}
}