another_rxrust/operators/
debounce.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::sync::{Arc, RwLock};
5use std::{marker::PhantomData, thread, time::Duration};
6
7#[derive(Clone)]
8pub struct Debounce<'a, Scheduler, Item>
9where
10  Scheduler: IScheduler<'a> + Clone + Send + Sync,
11  Item: Clone + Send + Sync,
12{
13  dur: Duration,
14  scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
15  _item: PhantomData<Item>,
16  _lifetime: PhantomData<&'a ()>,
17}
18
19impl<'a, Scheduler, Item> Debounce<'a, Scheduler, Item>
20where
21  Scheduler: IScheduler<'a> + Clone + Send + Sync,
22  Item: Clone + Send + Sync,
23{
24  pub fn new<SchedulerCreator>(
25    dur: Duration,
26    scheduler_ctor: SchedulerCreator,
27  ) -> Debounce<'a, Scheduler, Item>
28  where
29    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
30  {
31    Debounce {
32      dur,
33      scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
34      _item: PhantomData,
35      _lifetime: PhantomData,
36    }
37  }
38  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
39    let dur = self.dur;
40    let scheduler_ctor = self.scheduler_ctor.clone();
41
42    Observable::<Item>::create(move |s| {
43      let value = Arc::new(RwLock::new(None::<Item>));
44
45      let sctl = StreamController::new(s);
46
47      let scheduler = scheduler_ctor.call(());
48      {
49        let scheduler = scheduler.clone();
50        sctl.set_on_finalize(move || {
51          scheduler.abort();
52        });
53      }
54      {
55        let sctl = sctl.clone();
56        let value = Arc::clone(&value);
57        scheduler.post(move || {
58          while sctl.is_subscribed() {
59            thread::sleep(dur);
60            let value = {
61              let mut value = value.write().unwrap();
62              let v = value.clone();
63              *value = None;
64              v
65            };
66            if let Some(value) = value {
67              sctl.sink_next(value);
68            }
69          }
70        })
71      }
72
73      let sctl_error = sctl.clone();
74      let sctl_complete = sctl.clone();
75
76      source.inner_subscribe(sctl.new_observer(
77        move |_, x| {
78          *value.write().unwrap() = Some(x);
79        },
80        move |_, e| {
81          sctl_error.sink_error(e);
82        },
83        move |serial| sctl_complete.sink_complete(&serial),
84      ));
85    })
86  }
87}
88
89impl<'a, Item> Observable<'a, Item>
90where
91  Item: Clone + Send + Sync,
92{
93  pub fn debounce<Scheduler, SchedulerCreator>(
94    &self,
95    dur: Duration,
96    scheduler_ctor: SchedulerCreator,
97  ) -> Observable<'a, Item>
98  where
99    Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
100    SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
101  {
102    Debounce::new(dur, scheduler_ctor).execute(self.clone())
103  }
104}
105
106#[cfg(all(test, not(feature = "web")))]
107mod test {
108  use crate::prelude::*;
109  use std::{thread, time};
110
111  #[test]
112  fn basic() {
113    observables::interval(
114      time::Duration::from_millis(10),
115      schedulers::new_thread_scheduler(),
116    )
117    .debounce(
118      time::Duration::from_millis(100),
119      schedulers::new_thread_scheduler(),
120    )
121    .take(5)
122    .subscribe(
123      print_next_fmt!("{}"),
124      print_error!(),
125      print_complete!(),
126    );
127
128    thread::sleep(time::Duration::from_millis(1000));
129  }
130}