another_rxrust/operators/
start_with.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct StartWith<'a, Item, Iter>
7where
8  Item: Clone + Send + Sync,
9  Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
10{
11  it: Iter,
12  _item: PhantomData<Item>,
13  _lifetime: PhantomData<&'a ()>,
14}
15
16impl<'a, Item, Iter> StartWith<'a, Item, Iter>
17where
18  Item: Clone + Send + Sync,
19  Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
20{
21  pub fn new(it: Iter) -> StartWith<'a, Item, Iter> {
22    StartWith {
23      it,
24      _item: PhantomData,
25      _lifetime: PhantomData,
26    }
27  }
28  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
29    let it = self.it.clone();
30    Observable::<Item>::create(move |s| {
31      for n in it.clone() {
32        if !s.is_subscribed() {
33          break;
34        }
35        s.next(n);
36      }
37
38      if s.is_subscribed() {
39        let sctl = StreamController::new(s);
40        let sctl_next = sctl.clone();
41        let sctl_error = sctl.clone();
42        let sctl_complete = sctl.clone();
43
44        source.inner_subscribe(sctl.new_observer(
45          move |_, x| {
46            sctl_next.sink_next(x);
47          },
48          move |_, e| {
49            sctl_error.sink_error(e);
50          },
51          move |serial| sctl_complete.sink_complete(&serial),
52        ));
53      }
54    })
55  }
56}
57
58impl<'a, Item> Observable<'a, Item>
59where
60  Item: Clone + Send + Sync,
61{
62  pub fn start_with<Iter>(&self, iter: Iter) -> Observable<'a, Item>
63  where
64    Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
65  {
66    StartWith::new(iter).execute(self.clone())
67  }
68}
69
70#[cfg(test)]
71mod test {
72  use crate::prelude::*;
73
74  #[test]
75  fn single() {
76    let o = Observable::create(|s| {
77      for n in 0..5 {
78        s.next(n);
79      }
80      s.complete();
81    });
82
83    o.start_with([1000].into_iter()).subscribe(
84      print_next_fmt!("{}"),
85      print_error!(),
86      print_complete!(),
87    );
88  }
89
90  #[test]
91  fn multiple() {
92    let o = Observable::create(|s| {
93      for n in 0..5 {
94        s.next(n);
95      }
96      s.complete();
97    });
98
99    o.start_with(-5..0).subscribe(
100      print_next_fmt!("{}"),
101      print_error!(),
102      print_complete!(),
103    );
104  }
105}