another_rxrust/operators/
concat.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  collections::VecDeque,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Concat<'a, Item>
10where
11  Item: Clone + Send + Sync,
12{
13  observables: Vec<Observable<'a, Item>>,
14}
15
16impl<'a, Item> Concat<'a, Item>
17where
18  Item: Clone + Send + Sync,
19{
20  pub fn new(observables: &[Observable<'a, Item>]) -> Concat<'a, Item> {
21    Concat { observables: observables.to_vec() }
22  }
23  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24    let observables = Arc::new(RwLock::new(VecDeque::from_iter(
25      self.observables.clone().into_iter(),
26    )));
27    Observable::create(move |s| {
28      let sctl = StreamController::new(s);
29
30      fn complete_and_next<'a, Item>(
31        idx: usize,
32        observables: Arc<RwLock<VecDeque<Observable<'a, Item>>>>,
33        sctl: StreamController<'a, Item>,
34      ) where
35        Item: Clone + Send + Sync,
36      {
37        if observables.read().unwrap().is_empty() {
38          sctl.sink_complete_force();
39          return;
40        }
41        let o = observables.write().unwrap().pop_front().unwrap();
42
43        let sctl_next = sctl.clone();
44        let sctl_error = sctl.clone();
45        let sctl_complete = sctl.clone();
46
47        o.inner_subscribe(sctl.new_observer(
48          move |_, x| {
49            sctl_next.sink_next(x);
50          },
51          move |_, e| {
52            sctl_error.sink_error(e);
53          },
54          move |_| {
55            complete_and_next(
56              idx + 1,
57              Arc::clone(&observables),
58              sctl_complete.clone(),
59            );
60          },
61        ));
62      }
63
64      {
65        let complete_and_next = complete_and_next.clone();
66        let sctl_error = sctl.clone();
67        let sctl_next = sctl.clone();
68        let sctl_complete = sctl.clone();
69        let observables = Arc::clone(&observables);
70        source.inner_subscribe(sctl.new_observer(
71          move |_, x| {
72            sctl_next.sink_next(x);
73          },
74          move |_, e| {
75            sctl_error.sink_error(e);
76          },
77          move |_| {
78            complete_and_next(
79              0,
80              Arc::clone(&observables),
81              sctl_complete.clone(),
82            );
83          },
84        ));
85      }
86    })
87  }
88}
89
90impl<'a, Item> Observable<'a, Item>
91where
92  Item: Clone + Send + Sync,
93{
94  pub fn concat(
95    &self,
96    observables: &[Observable<'a, Item>],
97  ) -> Observable<'a, Item> {
98    Concat::new(observables).execute(self.clone())
99  }
100}
101
102#[cfg(all(test, not(feature = "web")))]
103mod test {
104  use crate::prelude::*;
105  use std::{thread, time};
106
107  #[test]
108  fn basic() {
109    observables::from_iter(0..10)
110      .concat(&[
111        observables::from_iter(10..20),
112        observables::from_iter(20..30),
113      ])
114      .subscribe(
115        print_next_fmt!("{:?}"),
116        print_error!(),
117        print_complete!(),
118      );
119  }
120
121  #[test]
122  fn thread() {
123    observables::from_iter(0..10)
124      .observe_on(schedulers::new_thread_scheduler())
125      .concat(&[
126        observables::from_iter(10..20)
127          .observe_on(schedulers::new_thread_scheduler()),
128        observables::from_iter(20..30)
129          .observe_on(schedulers::new_thread_scheduler()),
130      ])
131      .subscribe(
132        print_next_fmt!("{:?}"),
133        print_error!(),
134        print_complete!(),
135      );
136    thread::sleep(time::Duration::from_millis(1000));
137  }
138}