another_rxrust/operators/
debounce.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use scheduler::IScheduler;
4use std::sync::{Arc, RwLock};
5use std::{marker::PhantomData, thread, time::Duration};
6
7#[derive(Clone)]
8pub struct Debounce<'a, Scheduler, Item>
9where
10 Scheduler: IScheduler<'a> + Clone + Send + Sync,
11 Item: Clone + Send + Sync,
12{
13 dur: Duration,
14 scheduler_ctor: FunctionWrapper<'a, (), Scheduler>,
15 _item: PhantomData<Item>,
16 _lifetime: PhantomData<&'a ()>,
17}
18
19impl<'a, Scheduler, Item> Debounce<'a, Scheduler, Item>
20where
21 Scheduler: IScheduler<'a> + Clone + Send + Sync,
22 Item: Clone + Send + Sync,
23{
24 pub fn new<SchedulerCreator>(
25 dur: Duration,
26 scheduler_ctor: SchedulerCreator,
27 ) -> Debounce<'a, Scheduler, Item>
28 where
29 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
30 {
31 Debounce {
32 dur,
33 scheduler_ctor: FunctionWrapper::new(move |_| scheduler_ctor()),
34 _item: PhantomData,
35 _lifetime: PhantomData,
36 }
37 }
38 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
39 let dur = self.dur;
40 let scheduler_ctor = self.scheduler_ctor.clone();
41
42 Observable::<Item>::create(move |s| {
43 let value = Arc::new(RwLock::new(None::<Item>));
44
45 let sctl = StreamController::new(s);
46
47 let scheduler = scheduler_ctor.call(());
48 {
49 let scheduler = scheduler.clone();
50 sctl.set_on_finalize(move || {
51 scheduler.abort();
52 });
53 }
54 {
55 let sctl = sctl.clone();
56 let value = Arc::clone(&value);
57 scheduler.post(move || {
58 while sctl.is_subscribed() {
59 thread::sleep(dur);
60 let value = {
61 let mut value = value.write().unwrap();
62 let v = value.clone();
63 *value = None;
64 v
65 };
66 if let Some(value) = value {
67 sctl.sink_next(value);
68 }
69 }
70 })
71 }
72
73 let sctl_error = sctl.clone();
74 let sctl_complete = sctl.clone();
75
76 source.inner_subscribe(sctl.new_observer(
77 move |_, x| {
78 *value.write().unwrap() = Some(x);
79 },
80 move |_, e| {
81 sctl_error.sink_error(e);
82 },
83 move |serial| sctl_complete.sink_complete(&serial),
84 ));
85 })
86 }
87}
88
89impl<'a, Item> Observable<'a, Item>
90where
91 Item: Clone + Send + Sync,
92{
93 pub fn debounce<Scheduler, SchedulerCreator>(
94 &self,
95 dur: Duration,
96 scheduler_ctor: SchedulerCreator,
97 ) -> Observable<'a, Item>
98 where
99 Scheduler: IScheduler<'a> + Clone + Send + Sync + 'a,
100 SchedulerCreator: Fn() -> Scheduler + Send + Sync + 'a,
101 {
102 Debounce::new(dur, scheduler_ctor).execute(self.clone())
103 }
104}
105
106#[cfg(all(test, not(feature = "web")))]
107mod test {
108 use crate::prelude::*;
109 use std::{thread, time};
110
111 #[test]
112 fn basic() {
113 observables::interval(
114 time::Duration::from_millis(10),
115 schedulers::new_thread_scheduler(),
116 )
117 .debounce(
118 time::Duration::from_millis(100),
119 schedulers::new_thread_scheduler(),
120 )
121 .take(5)
122 .subscribe(
123 print_next_fmt!("{}"),
124 print_error!(),
125 print_complete!(),
126 );
127
128 thread::sleep(time::Duration::from_millis(1000));
129 }
130}