another_rxrust/operators/
start_with.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::marker::PhantomData;
4
5#[derive(Clone)]
6pub struct StartWith<'a, Item, Iter>
7where
8 Item: Clone + Send + Sync,
9 Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
10{
11 it: Iter,
12 _item: PhantomData<Item>,
13 _lifetime: PhantomData<&'a ()>,
14}
15
16impl<'a, Item, Iter> StartWith<'a, Item, Iter>
17where
18 Item: Clone + Send + Sync,
19 Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
20{
21 pub fn new(it: Iter) -> StartWith<'a, Item, Iter> {
22 StartWith {
23 it,
24 _item: PhantomData,
25 _lifetime: PhantomData,
26 }
27 }
28 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
29 let it = self.it.clone();
30 Observable::<Item>::create(move |s| {
31 for n in it.clone() {
32 if !s.is_subscribed() {
33 break;
34 }
35 s.next(n);
36 }
37
38 if s.is_subscribed() {
39 let sctl = StreamController::new(s);
40 let sctl_next = sctl.clone();
41 let sctl_error = sctl.clone();
42 let sctl_complete = sctl.clone();
43
44 source.inner_subscribe(sctl.new_observer(
45 move |_, x| {
46 sctl_next.sink_next(x);
47 },
48 move |_, e| {
49 sctl_error.sink_error(e);
50 },
51 move |serial| sctl_complete.sink_complete(&serial),
52 ));
53 }
54 })
55 }
56}
57
58impl<'a, Item> Observable<'a, Item>
59where
60 Item: Clone + Send + Sync,
61{
62 pub fn start_with<Iter>(&self, iter: Iter) -> Observable<'a, Item>
63 where
64 Iter: Iterator<Item = Item> + Clone + Send + Sync + 'a,
65 {
66 StartWith::new(iter).execute(self.clone())
67 }
68}
69
70#[cfg(test)]
71mod test {
72 use crate::prelude::*;
73
74 #[test]
75 fn single() {
76 let o = Observable::create(|s| {
77 for n in 0..5 {
78 s.next(n);
79 }
80 s.complete();
81 });
82
83 o.start_with([1000].into_iter()).subscribe(
84 print_next_fmt!("{}"),
85 print_error!(),
86 print_complete!(),
87 );
88 }
89
90 #[test]
91 fn multiple() {
92 let o = Observable::create(|s| {
93 for n in 0..5 {
94 s.next(n);
95 }
96 s.complete();
97 });
98
99 o.start_with(-5..0).subscribe(
100 print_next_fmt!("{}"),
101 print_error!(),
102 print_complete!(),
103 );
104 }
105}