another_rxrust/operators/
delay.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{marker::PhantomData, thread, time::Duration};
4
5#[derive(Clone)]
6pub struct Delay<Item>
7where
8  Item: Clone + Send + Sync,
9{
10  dur: Duration,
11  _item: PhantomData<Item>,
12}
13
14impl<'a, Item> Delay<Item>
15where
16  Item: Clone + Send + Sync,
17{
18  pub fn new(dur: Duration) -> Delay<Item> {
19    Delay { dur, _item: PhantomData }
20  }
21  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
22    let dur = self.dur;
23
24    Observable::<Item>::create(move |s| {
25      let sctl = StreamController::new(s);
26      let sctl_next = sctl.clone();
27      let sctl_error = sctl.clone();
28      let sctl_complete = sctl.clone();
29
30      source.inner_subscribe(sctl.new_observer(
31        move |_, x| {
32          thread::sleep(dur);
33          sctl_next.sink_next(x);
34        },
35        move |_, e| {
36          sctl_error.sink_error(e);
37        },
38        move |serial| sctl_complete.sink_complete(&serial),
39      ));
40    })
41  }
42}
43
44impl<'a, Item> Observable<'a, Item>
45where
46  Item: Clone + Send + Sync,
47{
48  pub fn delay(&self, dur: Duration) -> Observable<'a, Item> {
49    Delay::new(dur).execute(self.clone())
50  }
51}
52
53#[cfg(test)]
54mod test {
55  use crate::prelude::*;
56  use std::{thread, time};
57
58  #[test]
59  fn basic() {
60    let o = Observable::create(|s| {
61      for n in 0..2 {
62        println!("emit {}", n);
63        s.next(n);
64        thread::sleep(time::Duration::from_millis(500));
65      }
66      s.complete();
67    });
68
69    o.delay(time::Duration::from_millis(1000)).subscribe(
70      print_next_fmt!("{}"),
71      print_error!(),
72      print_complete!(),
73    );
74  }
75
76  #[test]
77  fn thread() {
78    let o = Observable::create(|s| {
79      for n in 0..5 {
80        if !s.is_subscribed() {
81          println!("break!");
82          break;
83        }
84        println!("emit {}", n);
85        s.next(n);
86        thread::sleep(time::Duration::from_millis(100));
87      }
88      if s.is_subscribed() {
89        s.complete();
90      }
91    });
92
93    o.skip(2).subscribe(
94      print_next_fmt!("{}"),
95      print_error!(),
96      print_complete!(),
97    );
98    thread::sleep(time::Duration::from_millis(1000));
99  }
100}