rxrust/ops/
throttle_time.rs

1use crate::prelude::*;
2use std::{
3  cell::RefCell,
4  rc::Rc,
5  sync::{Arc, Mutex},
6  time::Duration,
7};
8
9/// Config to define leading and trailing behavior for throttle
10#[derive(PartialEq, Clone, Copy)]
11pub enum ThrottleEdge {
12  Tailing,
13  Leading,
14}
15
16#[derive(Clone)]
17pub struct ThrottleTimeOp<S, SD> {
18  pub(crate) source: S,
19  pub(crate) scheduler: SD,
20  pub(crate) duration: Duration,
21  pub(crate) edge: ThrottleEdge,
22}
23
24observable_proxy_impl!(ThrottleTimeOp, S, SD);
25
26impl<Item, Err, S, SD, Unsub> LocalObservable<'static> for ThrottleTimeOp<S, SD>
27where
28  S: LocalObservable<'static, Item = Item, Err = Err, Unsub = Unsub>,
29  Unsub: SubscriptionLike + 'static,
30  Item: Clone + 'static,
31  SD: LocalScheduler + 'static,
32{
33  type Unsub = Unsub;
34
35  fn actual_subscribe<
36    O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
37  >(
38    self,
39    subscriber: Subscriber<O, LocalSubscription>,
40  ) -> Self::Unsub {
41    let Self {
42      source,
43      duration,
44      edge,
45      scheduler,
46    } = self;
47
48    source.actual_subscribe(Subscriber {
49      observer: LocalThrottleObserver(Rc::new(RefCell::new(
50        ThrottleObserver {
51          observer: subscriber.observer,
52          edge,
53          delay: duration,
54          trailing_value: None,
55          throttled: None,
56          subscription: subscriber.subscription.clone(),
57          scheduler,
58        },
59      ))),
60      subscription: subscriber.subscription,
61    })
62  }
63}
64
65impl<S, SD> SharedObservable for ThrottleTimeOp<S, SD>
66where
67  S: SharedObservable,
68  S::Item: Clone + Send + 'static,
69  SD: SharedScheduler + Send + 'static,
70{
71  type Unsub = S::Unsub;
72  fn actual_subscribe<
73    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
74  >(
75    self,
76    subscriber: Subscriber<O, SharedSubscription>,
77  ) -> S::Unsub {
78    let Self {
79      source,
80      duration,
81      edge,
82      scheduler,
83    } = self;
84    let Subscriber {
85      observer,
86      subscription,
87    } = subscriber;
88    source.actual_subscribe(Subscriber {
89      observer: SharedThrottleObserver(Arc::new(Mutex::new(
90        ThrottleObserver {
91          observer,
92          edge,
93          delay: duration,
94          trailing_value: None,
95          throttled: None,
96          subscription: subscription.clone(),
97          scheduler,
98        },
99      ))),
100      subscription,
101    })
102  }
103}
104
105struct ThrottleObserver<O, S, Item, Sub> {
106  scheduler: S,
107  observer: O,
108  edge: ThrottleEdge,
109  delay: Duration,
110  trailing_value: Option<Item>,
111  throttled: Option<SpawnHandle>,
112  subscription: Sub,
113}
114
115struct SharedThrottleObserver<O, S, Item>(
116  Arc<Mutex<ThrottleObserver<O, S, Item, SharedSubscription>>>,
117);
118
119struct LocalThrottleObserver<O, S, Item>(
120  Rc<RefCell<ThrottleObserver<O, S, Item, LocalSubscription>>>,
121);
122
123impl<O, S> Observer for SharedThrottleObserver<O, S, O::Item>
124where
125  O: Observer + Send + 'static,
126  S: SharedScheduler + Send + 'static,
127  O::Item: Clone + Send + 'static,
128{
129  type Item = O::Item;
130  type Err = O::Err;
131  fn next(&mut self, value: Self::Item) {
132    let c_inner = self.0.clone();
133    let mut inner = self.0.lock().unwrap();
134    if inner.edge == ThrottleEdge::Tailing {
135      inner.trailing_value = Some(value.clone());
136    }
137
138    if inner.throttled.is_none() {
139      let delay = inner.delay;
140      let spawn_handle = inner.scheduler.schedule(
141        move |_| {
142          let mut inner = c_inner.lock().unwrap();
143          if let Some(v) = inner.trailing_value.take() {
144            inner.observer.next(v);
145          }
146          if let Some(mut throttled) = inner.throttled.take() {
147            throttled.unsubscribe();
148          }
149        },
150        Some(delay),
151        (),
152      );
153      inner.throttled = Some(SpawnHandle::new(spawn_handle.handle.clone()));
154      inner.subscription.add(spawn_handle);
155      if inner.edge == ThrottleEdge::Leading {
156        inner.observer.next(value);
157      }
158    }
159  }
160
161  fn error(&mut self, err: Self::Err) {
162    let mut inner = self.0.lock().unwrap();
163    inner.observer.error(err)
164  }
165
166  fn complete(&mut self) {
167    let mut inner = self.0.lock().unwrap();
168    if let Some(value) = inner.trailing_value.take() {
169      inner.observer.next(value);
170    }
171    inner.observer.complete();
172  }
173
174  fn is_stopped(&self) -> bool {
175    let inner = self.0.lock().unwrap();
176    inner.observer.is_stopped()
177  }
178}
179
180impl<O, S> Observer for LocalThrottleObserver<O, S, O::Item>
181where
182  O: Observer + 'static,
183  S: LocalScheduler + 'static,
184  O::Item: Clone + 'static,
185{
186  type Item = O::Item;
187  type Err = O::Err;
188  fn next(&mut self, value: Self::Item) {
189    let c_inner = self.0.clone();
190    let mut inner = self.0.borrow_mut();
191    if inner.edge == ThrottleEdge::Tailing {
192      inner.trailing_value = Some(value.clone());
193    }
194
195    if inner.throttled.is_none() {
196      let delay = inner.delay;
197      let spawn_handle = inner.scheduler.schedule(
198        move |_| {
199          let mut inner = c_inner.borrow_mut();
200          if let Some(v) = inner.trailing_value.take() {
201            inner.observer.next(v);
202          }
203          if let Some(mut throttled) = inner.throttled.take() {
204            throttled.unsubscribe();
205          }
206        },
207        Some(delay),
208        (),
209      );
210      inner.throttled = Some(SpawnHandle::new(spawn_handle.handle.clone()));
211      inner.subscription.add(spawn_handle);
212      if inner.edge == ThrottleEdge::Leading {
213        inner.observer.next(value);
214      }
215    }
216  }
217
218  fn error(&mut self, err: Self::Err) {
219    let mut inner = self.0.borrow_mut();
220    inner.observer.error(err)
221  }
222
223  fn complete(&mut self) {
224    let mut inner = self.0.borrow_mut();
225    if let Some(value) = inner.trailing_value.take() {
226      inner.observer.next(value);
227    }
228    inner.observer.complete();
229  }
230
231  fn is_stopped(&self) -> bool {
232    let inner = self.0.borrow_mut();
233    inner.observer.is_stopped()
234  }
235}
236
237#[cfg(test)]
238mod tests {
239  use super::*;
240  use crate::test_scheduler::ManualScheduler;
241
242  #[test]
243  fn smoke() {
244    let x = Rc::new(RefCell::new(vec![]));
245    let x_c = x.clone();
246    let scheduler = ManualScheduler::now();
247
248    let interval =
249      observable::interval(Duration::from_millis(5), scheduler.clone());
250    let throttle_subscribe = |edge| {
251      let x = x.clone();
252      interval
253        .clone()
254        .take(5)
255        .throttle_time(Duration::from_millis(11), edge, scheduler.clone())
256        .subscribe(move |v| x.borrow_mut().push(v))
257    };
258
259    // tailing throttle
260
261    let mut sub = throttle_subscribe(ThrottleEdge::Tailing);
262    scheduler.advance_and_run(Duration::from_millis(1), 25);
263    sub.unsubscribe();
264    assert_eq!(&*x_c.borrow(), &[2, 4]);
265
266    // leading throttle
267    x_c.borrow_mut().clear();
268    throttle_subscribe(ThrottleEdge::Leading);
269    scheduler.advance_and_run(Duration::from_millis(1), 25);
270    assert_eq!(&*x_c.borrow(), &[0, 3]);
271  }
272
273  #[test]
274  fn fork_and_shared() {
275    use futures::executor::ThreadPool;
276    let scheduler = ThreadPool::new().unwrap();
277    observable::from_iter(0..10)
278      .throttle_time(Duration::from_nanos(1), ThrottleEdge::Leading, scheduler)
279      .into_shared()
280      .into_shared()
281      .subscribe(|_| {});
282  }
283}