another_rxrust/operators/
switch_on_next.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct SwitchOnNext<'a, Item>
7where
8 Item: Clone + Send + Sync,
9{
10 target: Observable<'a, Item>,
11}
12
13impl<'a, Item> SwitchOnNext<'a, Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new(target: Observable<'a, Item>) -> SwitchOnNext<'a, Item> {
18 SwitchOnNext { target }
19 }
20 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
21 let target = self.target.clone();
22 Observable::<Item>::create(move |s| {
23 let sctl = StreamController::new(s);
24 let emitted = Arc::new(RwLock::new(false));
25
26 {
27 let emitted = Arc::clone(&emitted);
28 let sctl_next = sctl.clone();
29 let sctl_error = sctl.clone();
30 let sctl_complete = sctl.clone();
31
32 source.inner_subscribe(sctl.new_observer(
33 move |serial, x| {
34 if *emitted.read().unwrap() {
35 sctl_next.upstream_abort_observe(&serial);
36 } else {
37 sctl_next.sink_next(x);
38 }
39 },
40 move |_, e| {
41 sctl_error.sink_error(e);
42 },
43 move |serial| sctl_complete.sink_complete(&serial),
44 ));
45 };
46
47 {
48 let emitted = Arc::clone(&emitted);
49 let sctl_next = sctl.clone();
50 let sctl_error = sctl.clone();
51 let sctl_complete = sctl.clone();
52 target.inner_subscribe(sctl.new_observer(
53 move |_, x| {
54 *emitted.write().unwrap() = true;
55 sctl_next.sink_next(x);
56 },
57 move |_, e| {
58 sctl_error.sink_error(e);
59 },
60 move |_| sctl_complete.sink_complete_force(),
61 ));
62 }
63 })
64 }
65}
66
67impl<'a, Item> Observable<'a, Item>
68where
69 Item: Clone + Send + Sync,
70{
71 pub fn switch_on_next(
72 &self,
73 target: Observable<'a, Item>,
74 ) -> Observable<'a, Item> {
75 SwitchOnNext::new(target).execute(self.clone())
76 }
77}
78
79#[cfg(all(test, not(feature = "web")))]
80mod test {
81 use crate::prelude::*;
82 use std::{thread, time};
83
84 #[test]
85 fn basic() {
86 let o = observables::interval(
87 time::Duration::from_millis(100),
88 schedulers::new_thread_scheduler(),
89 );
90 let sbj = subjects::Subject::<u64>::new();
91
92 o.switch_on_next(sbj.observable()).subscribe(
93 print_next_fmt!("{}"),
94 print_error!(),
95 print_complete!(),
96 );
97
98 thread::sleep(time::Duration::from_millis(1000));
99
100 sbj.next(1000);
101 sbj.next(2000);
102 sbj.complete();
103 thread::sleep(time::Duration::from_millis(1000));
104 }
105}