another_rxrust/operators/
amb.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct Amb<'a, Item>
7where
8 Item: Clone + Send + Sync,
9{
10 observables: Vec<Observable<'a, Item>>,
11}
12
13impl<'a, Item> Amb<'a, Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new(observables: &[Observable<'a, Item>]) -> Amb<'a, Item> {
18 Amb { observables: observables.to_vec() }
19 }
20 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21 let observables = self.observables.clone();
22 Observable::<Item>::create(move |s| {
23 let sctl = StreamController::new(s);
24 let winner = Arc::new(RwLock::new(None::<i32>));
25
26 let winner_check = winner.clone();
27 let is_win = move |serial: &i32| -> bool {
28 let mut w = winner_check.write().unwrap();
29 if let Some(w) = &*w {
30 *serial == *w
31 } else {
32 *w = Some(serial.clone());
33 true
34 }
35 };
36
37 let mut sbs = {
39 let sctl = sctl.clone();
40 Vec::from_iter(
41 (0..(observables.len() + 1)).map(move |_| {
42 let is_win_next = is_win.clone();
43 let is_win_error = is_win.clone();
44 let is_win_complete = is_win.clone();
45
46 let sctl_next = sctl.clone();
47 let sctl_error = sctl.clone();
48 let sctl_complete = sctl.clone();
49
50 sctl.new_observer(
51 move |serial, x| {
52 if is_win_next(&serial) {
53 sctl_next.sink_next(x);
54 } else {
55 sctl_next.upstream_abort_observe(&serial);
56 }
57 },
58 move |serial, e| {
59 if is_win_error(&serial) {
60 sctl_error.sink_error(e);
61 } else {
62 sctl_error.upstream_abort_observe(&serial);
63 }
64 },
65 move |serial| {
66 if is_win_complete(&serial) {
67 sctl_complete.sink_complete(&serial);
68 } else {
69 sctl_complete.upstream_abort_observe(&serial);
70 }
71 },
72 )
73 }),
74 )
75 };
76
77 source.inner_subscribe(sbs.pop().unwrap());
78 observables.iter().for_each(|o| {
79 o.inner_subscribe(sbs.pop().unwrap());
80 });
81 })
82 }
83}
84
85impl<'a, Item> Observable<'a, Item>
86where
87 Item: Clone + Send + Sync,
88{
89 pub fn amb(
90 &self,
91 observables: &[Observable<'a, Item>],
92 ) -> Observable<'a, Item> {
93 Amb::new(observables).execute(self.clone())
94 }
95}
96
97#[cfg(all(test, not(feature = "web")))]
98mod test {
99 use crate::prelude::*;
100 use std::{thread, time};
101
102 #[test]
103 fn basic() {
104 fn ob(len: usize, maker: &'static str) -> Observable<String> {
105 observables::from_iter(0..len)
106 .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
107 }
108
109 ob(5, "#1")
110 .amb(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
111 .subscribe(
112 print_next_fmt!("{}"),
113 print_error!(),
114 print_complete!(),
115 );
116 }
117
118 #[test]
119 fn thread() {
120 fn ob(len: usize, maker: &'static str) -> Observable<String> {
121 observables::interval(
122 time::Duration::from_millis(100),
123 schedulers::new_thread_scheduler(),
124 )
125 .take(len)
126 .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
127 }
128
129 ob(5, "#1")
130 .amb(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
131 .subscribe(
132 print_next_fmt!("{}"),
133 print_error!(),
134 print_complete!(),
135 );
136
137 thread::sleep(time::Duration::from_millis(1500));
138 }
139}