another_rxrust/operators/
zip.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 collections::VecDeque,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Zip<'a, Item>
10where
11 Item: Clone + Send + Sync,
12{
13 observables: Vec<Observable<'a, Item>>,
14}
15
16impl<'a, Item> Zip<'a, Item>
17where
18 Item: Clone + Send + Sync,
19{
20 pub fn new(observables: &[Observable<'a, Item>]) -> Zip<'a, Item> {
21 Zip { observables: observables.to_vec() }
22 }
23 pub fn execute(
24 &self,
25 source: Observable<'a, Item>,
26 ) -> Observable<'a, Vec<Item>> {
27 let observables = self.observables.clone();
28 Observable::<Vec<Item>>::create(move |s| {
29 let sctl = StreamController::new(s);
30
31 let results = Arc::new(RwLock::new({
32 let mut r = Vec::<VecDeque<Item>>::new();
33 (0..(observables.len() + 1)).for_each(|_| {
34 r.push(VecDeque::<Item>::new());
35 });
36 r
37 }));
38
39 let sctl_f = sctl.clone();
40 let results_f = Arc::clone(&results);
41 let register = move |id: &usize, item: Item| {
42 results_f
43 .write()
44 .unwrap()
45 .get_mut(id.clone())
46 .unwrap()
47 .push_back(item);
48
49 let re = Arc::clone(&results_f);
50 let get = move || {
51 let mut re = re.write().unwrap();
52 let filled = re.iter().filter(|x| x.len() > 0).count();
53 if filled == re.len() {
54 let mut v = Vec::new();
55 for items in re.iter_mut() {
56 v.push(items.pop_front().unwrap());
57 }
58 Some(v)
59 } else {
60 None
61 }
62 };
63 while let Some(items) = get() {
64 if !sctl_f.is_subscribed() {
65 break;
66 }
67 sctl_f.sink_next(items);
68 }
69 };
70
71 let mut sbs = {
73 let sctl = sctl.clone();
74 VecDeque::from_iter(
75 (0..(observables.len() + 1)).map(move |id| {
76 let register = register.clone();
77 let sctl_error = sctl.clone();
78 let sctl_complete = sctl.clone();
79 sctl.new_observer(
80 move |_, x| register(&id, x),
81 move |_, e| {
82 sctl_error.sink_error(e);
83 },
84 move |serial| {
85 sctl_complete.sink_complete(&serial);
86 },
87 )
88 }),
89 )
90 };
91
92 source.inner_subscribe(sbs.pop_front().unwrap());
93 observables.iter().for_each(|o| {
94 o.inner_subscribe(sbs.pop_front().unwrap());
95 });
96 })
97 }
98}
99
100impl<'a, Item> Observable<'a, Item>
101where
102 Item: Clone + Send + Sync,
103{
104 pub fn zip(
105 &self,
106 observables: &[Observable<'a, Item>],
107 ) -> Observable<'a, Vec<Item>> {
108 Zip::new(observables).execute(self.clone())
109 }
110}
111
112#[cfg(all(test, not(feature = "web")))]
113mod test {
114 use crate::prelude::*;
115 use std::{thread, time};
116
117 #[test]
118 fn basic() {
119 let ob = observables::from_iter(0..10);
120
121 ob.zip(&[ob.map(|x| x + 10), ob.map(|x| x + 20)]).subscribe(
122 print_next_fmt!("{:?}"),
123 print_error!(),
124 print_complete!(),
125 );
126 }
127
128 #[test]
129 fn thread() {
130 observables::from_iter(0..10)
131 .observe_on(schedulers::new_thread_scheduler())
132 .zip(&[
133 observables::from_iter(10..20)
134 .observe_on(schedulers::new_thread_scheduler()),
135 observables::from_iter(20..30)
136 .observe_on(schedulers::new_thread_scheduler()),
137 ])
138 .subscribe(
139 print_next_fmt!("{:?}"),
140 print_error!(),
141 print_complete!(),
142 );
143 thread::sleep(time::Duration::from_millis(1000));
144 }
145}