another_rxrust/operators/
time_interval.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6  time::{Duration, Instant},
7};
8
9#[derive(Clone)]
10pub struct TimeInterval<Item>
11where
12  Item: Clone + Send + Sync,
13{
14  _item: PhantomData<Item>,
15}
16
17impl<'a, Item> TimeInterval<Item>
18where
19  Item: Clone + Send + Sync,
20{
21  pub fn new() -> TimeInterval<Item> {
22    TimeInterval { _item: PhantomData }
23  }
24  pub fn execute(
25    &self,
26    source: Observable<'a, Item>,
27  ) -> Observable<'a, Duration> {
28    Observable::create(move |s| {
29      let start_time = Arc::new(RwLock::new(None::<Instant>));
30      let start_time_next = Arc::clone(&start_time);
31      let start_time_complete = Arc::clone(&start_time);
32
33      let sctl = StreamController::new(s);
34      let sctl_next = sctl.clone();
35      let sctl_error = sctl.clone();
36      let sctl_complete = sctl.clone();
37
38      source.inner_subscribe(sctl.new_observer(
39        move |_, _| {
40          if let Some(start_time) = *start_time_next.read().unwrap() {
41            sctl_next.sink_next(start_time.elapsed());
42          }
43          *start_time_next.write().unwrap() = Some(Instant::now());
44        },
45        move |_, e| {
46          sctl_error.sink_error(e);
47        },
48        move |serial| {
49          if let Some(start_time) = *start_time_complete.read().unwrap() {
50            sctl_complete.sink_next(start_time.elapsed());
51          }
52          sctl_complete.sink_complete(&serial);
53        },
54      ));
55    })
56  }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61  Item: Clone + Send + Sync,
62{
63  pub fn time_interval(&self) -> Observable<'a, Duration> {
64    TimeInterval::new().execute(self.clone())
65  }
66}
67
68#[cfg(all(test, not(feature = "web")))]
69mod test {
70  use crate::prelude::*;
71  use std::{thread, time};
72
73  #[test]
74  fn basic() {
75    observables::interval(
76      time::Duration::from_millis(100),
77      schedulers::new_thread_scheduler(),
78    )
79    .time_interval()
80    .take(10)
81    .subscribe(
82      print_next_fmt!("{:?}"),
83      print_error!(),
84      print_complete!(),
85    );
86    thread::sleep(time::Duration::from_millis(1500));
87  }
88}