another_rxrust/operators/
concat.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 collections::VecDeque,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Concat<'a, Item>
10where
11 Item: Clone + Send + Sync,
12{
13 observables: Vec<Observable<'a, Item>>,
14}
15
16impl<'a, Item> Concat<'a, Item>
17where
18 Item: Clone + Send + Sync,
19{
20 pub fn new(observables: &[Observable<'a, Item>]) -> Concat<'a, Item> {
21 Concat { observables: observables.to_vec() }
22 }
23 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24 let observables = Arc::new(RwLock::new(VecDeque::from_iter(
25 self.observables.clone().into_iter(),
26 )));
27 Observable::create(move |s| {
28 let sctl = StreamController::new(s);
29
30 fn complete_and_next<'a, Item>(
31 idx: usize,
32 observables: Arc<RwLock<VecDeque<Observable<'a, Item>>>>,
33 sctl: StreamController<'a, Item>,
34 ) where
35 Item: Clone + Send + Sync,
36 {
37 if observables.read().unwrap().is_empty() {
38 sctl.sink_complete_force();
39 return;
40 }
41 let o = observables.write().unwrap().pop_front().unwrap();
42
43 let sctl_next = sctl.clone();
44 let sctl_error = sctl.clone();
45 let sctl_complete = sctl.clone();
46
47 o.inner_subscribe(sctl.new_observer(
48 move |_, x| {
49 sctl_next.sink_next(x);
50 },
51 move |_, e| {
52 sctl_error.sink_error(e);
53 },
54 move |_| {
55 complete_and_next(
56 idx + 1,
57 Arc::clone(&observables),
58 sctl_complete.clone(),
59 );
60 },
61 ));
62 }
63
64 {
65 let complete_and_next = complete_and_next.clone();
66 let sctl_error = sctl.clone();
67 let sctl_next = sctl.clone();
68 let sctl_complete = sctl.clone();
69 let observables = Arc::clone(&observables);
70 source.inner_subscribe(sctl.new_observer(
71 move |_, x| {
72 sctl_next.sink_next(x);
73 },
74 move |_, e| {
75 sctl_error.sink_error(e);
76 },
77 move |_| {
78 complete_and_next(
79 0,
80 Arc::clone(&observables),
81 sctl_complete.clone(),
82 );
83 },
84 ));
85 }
86 })
87 }
88}
89
90impl<'a, Item> Observable<'a, Item>
91where
92 Item: Clone + Send + Sync,
93{
94 pub fn concat(
95 &self,
96 observables: &[Observable<'a, Item>],
97 ) -> Observable<'a, Item> {
98 Concat::new(observables).execute(self.clone())
99 }
100}
101
102#[cfg(all(test, not(feature = "web")))]
103mod test {
104 use crate::prelude::*;
105 use std::{thread, time};
106
107 #[test]
108 fn basic() {
109 observables::from_iter(0..10)
110 .concat(&[
111 observables::from_iter(10..20),
112 observables::from_iter(20..30),
113 ])
114 .subscribe(
115 print_next_fmt!("{:?}"),
116 print_error!(),
117 print_complete!(),
118 );
119 }
120
121 #[test]
122 fn thread() {
123 observables::from_iter(0..10)
124 .observe_on(schedulers::new_thread_scheduler())
125 .concat(&[
126 observables::from_iter(10..20)
127 .observe_on(schedulers::new_thread_scheduler()),
128 observables::from_iter(20..30)
129 .observe_on(schedulers::new_thread_scheduler()),
130 ])
131 .subscribe(
132 print_next_fmt!("{:?}"),
133 print_error!(),
134 print_complete!(),
135 );
136 thread::sleep(time::Duration::from_millis(1000));
137 }
138}