1use crate::prelude::*;
2use std::{
3 cell::RefCell,
4 rc::Rc,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9#[derive(PartialEq, Clone, Copy)]
11pub enum ThrottleEdge {
12 Tailing,
13 Leading,
14}
15
16#[derive(Clone)]
17pub struct ThrottleTimeOp<S, SD> {
18 pub(crate) source: S,
19 pub(crate) scheduler: SD,
20 pub(crate) duration: Duration,
21 pub(crate) edge: ThrottleEdge,
22}
23
24observable_proxy_impl!(ThrottleTimeOp, S, SD);
25
26impl<Item, Err, S, SD, Unsub> LocalObservable<'static> for ThrottleTimeOp<S, SD>
27where
28 S: LocalObservable<'static, Item = Item, Err = Err, Unsub = Unsub>,
29 Unsub: SubscriptionLike + 'static,
30 Item: Clone + 'static,
31 SD: LocalScheduler + 'static,
32{
33 type Unsub = Unsub;
34
35 fn actual_subscribe<
36 O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
37 >(
38 self,
39 subscriber: Subscriber<O, LocalSubscription>,
40 ) -> Self::Unsub {
41 let Self {
42 source,
43 duration,
44 edge,
45 scheduler,
46 } = self;
47
48 source.actual_subscribe(Subscriber {
49 observer: LocalThrottleObserver(Rc::new(RefCell::new(
50 ThrottleObserver {
51 observer: subscriber.observer,
52 edge,
53 delay: duration,
54 trailing_value: None,
55 throttled: None,
56 subscription: subscriber.subscription.clone(),
57 scheduler,
58 },
59 ))),
60 subscription: subscriber.subscription,
61 })
62 }
63}
64
65impl<S, SD> SharedObservable for ThrottleTimeOp<S, SD>
66where
67 S: SharedObservable,
68 S::Item: Clone + Send + 'static,
69 SD: SharedScheduler + Send + 'static,
70{
71 type Unsub = S::Unsub;
72 fn actual_subscribe<
73 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
74 >(
75 self,
76 subscriber: Subscriber<O, SharedSubscription>,
77 ) -> S::Unsub {
78 let Self {
79 source,
80 duration,
81 edge,
82 scheduler,
83 } = self;
84 let Subscriber {
85 observer,
86 subscription,
87 } = subscriber;
88 source.actual_subscribe(Subscriber {
89 observer: SharedThrottleObserver(Arc::new(Mutex::new(
90 ThrottleObserver {
91 observer,
92 edge,
93 delay: duration,
94 trailing_value: None,
95 throttled: None,
96 subscription: subscription.clone(),
97 scheduler,
98 },
99 ))),
100 subscription,
101 })
102 }
103}
104
105struct ThrottleObserver<O, S, Item, Sub> {
106 scheduler: S,
107 observer: O,
108 edge: ThrottleEdge,
109 delay: Duration,
110 trailing_value: Option<Item>,
111 throttled: Option<SpawnHandle>,
112 subscription: Sub,
113}
114
115struct SharedThrottleObserver<O, S, Item>(
116 Arc<Mutex<ThrottleObserver<O, S, Item, SharedSubscription>>>,
117);
118
119struct LocalThrottleObserver<O, S, Item>(
120 Rc<RefCell<ThrottleObserver<O, S, Item, LocalSubscription>>>,
121);
122
123impl<O, S> Observer for SharedThrottleObserver<O, S, O::Item>
124where
125 O: Observer + Send + 'static,
126 S: SharedScheduler + Send + 'static,
127 O::Item: Clone + Send + 'static,
128{
129 type Item = O::Item;
130 type Err = O::Err;
131 fn next(&mut self, value: Self::Item) {
132 let c_inner = self.0.clone();
133 let mut inner = self.0.lock().unwrap();
134 if inner.edge == ThrottleEdge::Tailing {
135 inner.trailing_value = Some(value.clone());
136 }
137
138 if inner.throttled.is_none() {
139 let delay = inner.delay;
140 let spawn_handle = inner.scheduler.schedule(
141 move |_| {
142 let mut inner = c_inner.lock().unwrap();
143 if let Some(v) = inner.trailing_value.take() {
144 inner.observer.next(v);
145 }
146 if let Some(mut throttled) = inner.throttled.take() {
147 throttled.unsubscribe();
148 }
149 },
150 Some(delay),
151 (),
152 );
153 inner.throttled = Some(SpawnHandle::new(spawn_handle.handle.clone()));
154 inner.subscription.add(spawn_handle);
155 if inner.edge == ThrottleEdge::Leading {
156 inner.observer.next(value);
157 }
158 }
159 }
160
161 fn error(&mut self, err: Self::Err) {
162 let mut inner = self.0.lock().unwrap();
163 inner.observer.error(err)
164 }
165
166 fn complete(&mut self) {
167 let mut inner = self.0.lock().unwrap();
168 if let Some(value) = inner.trailing_value.take() {
169 inner.observer.next(value);
170 }
171 inner.observer.complete();
172 }
173
174 fn is_stopped(&self) -> bool {
175 let inner = self.0.lock().unwrap();
176 inner.observer.is_stopped()
177 }
178}
179
180impl<O, S> Observer for LocalThrottleObserver<O, S, O::Item>
181where
182 O: Observer + 'static,
183 S: LocalScheduler + 'static,
184 O::Item: Clone + 'static,
185{
186 type Item = O::Item;
187 type Err = O::Err;
188 fn next(&mut self, value: Self::Item) {
189 let c_inner = self.0.clone();
190 let mut inner = self.0.borrow_mut();
191 if inner.edge == ThrottleEdge::Tailing {
192 inner.trailing_value = Some(value.clone());
193 }
194
195 if inner.throttled.is_none() {
196 let delay = inner.delay;
197 let spawn_handle = inner.scheduler.schedule(
198 move |_| {
199 let mut inner = c_inner.borrow_mut();
200 if let Some(v) = inner.trailing_value.take() {
201 inner.observer.next(v);
202 }
203 if let Some(mut throttled) = inner.throttled.take() {
204 throttled.unsubscribe();
205 }
206 },
207 Some(delay),
208 (),
209 );
210 inner.throttled = Some(SpawnHandle::new(spawn_handle.handle.clone()));
211 inner.subscription.add(spawn_handle);
212 if inner.edge == ThrottleEdge::Leading {
213 inner.observer.next(value);
214 }
215 }
216 }
217
218 fn error(&mut self, err: Self::Err) {
219 let mut inner = self.0.borrow_mut();
220 inner.observer.error(err)
221 }
222
223 fn complete(&mut self) {
224 let mut inner = self.0.borrow_mut();
225 if let Some(value) = inner.trailing_value.take() {
226 inner.observer.next(value);
227 }
228 inner.observer.complete();
229 }
230
231 fn is_stopped(&self) -> bool {
232 let inner = self.0.borrow_mut();
233 inner.observer.is_stopped()
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::test_scheduler::ManualScheduler;
241
242 #[test]
243 fn smoke() {
244 let x = Rc::new(RefCell::new(vec![]));
245 let x_c = x.clone();
246 let scheduler = ManualScheduler::now();
247
248 let interval =
249 observable::interval(Duration::from_millis(5), scheduler.clone());
250 let throttle_subscribe = |edge| {
251 let x = x.clone();
252 interval
253 .clone()
254 .take(5)
255 .throttle_time(Duration::from_millis(11), edge, scheduler.clone())
256 .subscribe(move |v| x.borrow_mut().push(v))
257 };
258
259 let mut sub = throttle_subscribe(ThrottleEdge::Tailing);
262 scheduler.advance_and_run(Duration::from_millis(1), 25);
263 sub.unsubscribe();
264 assert_eq!(&*x_c.borrow(), &[2, 4]);
265
266 x_c.borrow_mut().clear();
268 throttle_subscribe(ThrottleEdge::Leading);
269 scheduler.advance_and_run(Duration::from_millis(1), 25);
270 assert_eq!(&*x_c.borrow(), &[0, 3]);
271 }
272
273 #[test]
274 fn fork_and_shared() {
275 use futures::executor::ThreadPool;
276 let scheduler = ThreadPool::new().unwrap();
277 observable::from_iter(0..10)
278 .throttle_time(Duration::from_nanos(1), ThrottleEdge::Leading, scheduler)
279 .into_shared()
280 .into_shared()
281 .subscribe(|_| {});
282 }
283}