another_rxrust/operators/
switch_on_next.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct SwitchOnNext<'a, Item>
7where
8  Item: Clone + Send + Sync,
9{
10  target: Observable<'a, Item>,
11}
12
13impl<'a, Item> SwitchOnNext<'a, Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new(target: Observable<'a, Item>) -> SwitchOnNext<'a, Item> {
18    SwitchOnNext { target }
19  }
20  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21    let target = self.target.clone();
22    Observable::<Item>::create(move |s| {
23      let sctl = StreamController::new(s);
24      let emitted = Arc::new(RwLock::new(false));
25
26      {
27        let emitted = Arc::clone(&emitted);
28        let sctl_next = sctl.clone();
29        let sctl_error = sctl.clone();
30        let sctl_complete = sctl.clone();
31
32        source.inner_subscribe(sctl.new_observer(
33          move |serial, x| {
34            if *emitted.read().unwrap() {
35              sctl_next.upstream_abort_observe(&serial);
36            } else {
37              sctl_next.sink_next(x);
38            }
39          },
40          move |_, e| {
41            sctl_error.sink_error(e);
42          },
43          move |serial| sctl_complete.sink_complete(&serial),
44        ));
45      };
46
47      {
48        let emitted = Arc::clone(&emitted);
49        let sctl_next = sctl.clone();
50        let sctl_error = sctl.clone();
51        let sctl_complete = sctl.clone();
52        target.inner_subscribe(sctl.new_observer(
53          move |_, x| {
54            *emitted.write().unwrap() = true;
55            sctl_next.sink_next(x);
56          },
57          move |_, e| {
58            sctl_error.sink_error(e);
59          },
60          move |_| sctl_complete.sink_complete_force(),
61        ));
62      }
63    })
64  }
65}
66
67impl<'a, Item> Observable<'a, Item>
68where
69  Item: Clone + Send + Sync,
70{
71  pub fn switch_on_next(
72    &self,
73    target: Observable<'a, Item>,
74  ) -> Observable<'a, Item> {
75    SwitchOnNext::new(target).execute(self.clone())
76  }
77}
78
79#[cfg(all(test, not(feature = "web")))]
80mod test {
81  use crate::prelude::*;
82  use std::{thread, time};
83
84  #[test]
85  fn basic() {
86    let o = observables::interval(
87      time::Duration::from_millis(100),
88      schedulers::new_thread_scheduler(),
89    );
90    let sbj = subjects::Subject::<u64>::new();
91
92    o.switch_on_next(sbj.observable()).subscribe(
93      print_next_fmt!("{}"),
94      print_error!(),
95      print_complete!(),
96    );
97
98    thread::sleep(time::Duration::from_millis(1000));
99
100    sbj.next(1000);
101    sbj.next(2000);
102    sbj.complete();
103    thread::sleep(time::Duration::from_millis(1000));
104  }
105}