rx_rust/operators/utility/
observe_on.rs

1use crate::{
2    disposable::{Disposable, boxed_disposal::BoxedDisposal, subscription::Subscription},
3    observable::Observable,
4    observer::{Observer, Termination},
5    safe_lock_option_disposable, safe_lock_option_observer,
6    scheduler::{RecursionAction, Scheduler},
7    utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared},
8};
9use educe::Educe;
10
11/// Specifies the `Scheduler` on which an observer will observe this Observable.
12/// See <https://reactivex.io/documentation/operators/observeon.html>
13///
14/// # Examples
15/// ```rust
16/// # #[cfg(not(feature = "tokio-scheduler"))]
17/// # fn main() {
18/// #     panic!("Use tokio-scheduler feature to run tests.");
19/// # }
20/// # #[cfg(feature = "tokio-scheduler")]
21/// #[tokio::main]
22/// async fn main() {
23///     use rx_rust::{
24///         observable::observable_ext::ObservableExt,
25///         observer::Termination,
26///         operators::{
27///             creating::from_iter::FromIter,
28///             utility::observe_on::ObserveOn,
29///         },
30///     };
31///     use std::sync::{Arc, Mutex};
32///     use tokio::time::{sleep, Duration};
33///
34///     let handle = tokio::runtime::Handle::current();
35///     let values = Arc::new(Mutex::new(Vec::new()));
36///     let terminations = Arc::new(Mutex::new(Vec::new()));
37///     let values_observer = Arc::clone(&values);
38///     let terminations_observer = Arc::clone(&terminations);
39///
40///     let subscription = ObserveOn::new(FromIter::new(vec![1, 2, 3]), handle.clone())
41///         .subscribe_with_callback(
42///             move |value| values_observer.lock().unwrap().push(value),
43///             move |termination| terminations_observer
44///                 .lock()
45///                 .unwrap()
46///                 .push(termination),
47///         );
48///
49///     sleep(Duration::from_millis(10)).await;
50///     drop(subscription);
51///
52///     assert_eq!(&*values.lock().unwrap(), &[1, 2, 3]);
53///     assert_eq!(
54///         &*terminations.lock().unwrap(),
55///         &[Termination::Completed]
56///     );
57/// }
58/// ```
59#[derive(Educe)]
60#[educe(Debug, Clone)]
61pub struct ObserveOn<OE, S> {
62    source: OE,
63    scheduler: S,
64}
65
66impl<OE, S> ObserveOn<OE, S> {
67    pub fn new(source: OE, scheduler: S) -> Self {
68        Self { source, scheduler }
69    }
70}
71
72impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for ObserveOn<OE, S>
73where
74    T: NecessarySendSync + 'static,
75    E: NecessarySendSync + 'static,
76    OE: Observable<'or, 'sub, T, E>,
77    S: Scheduler,
78{
79    fn subscribe(
80        self,
81        observer: impl Observer<T, E> + NecessarySendSync + 'static,
82    ) -> Subscription<'sub> {
83        let context = Shared::new(Mutable::new(ObserveOnContext {
84            values: Vec::new(),
85            termination: None,
86            disposal: None,
87        }));
88        let observer = ObserveOnObserver {
89            context: context.clone(),
90            observer: Shared::new(Mutable::new(Some(observer))),
91            scheduler: self.scheduler,
92        };
93        self.source.subscribe(observer) + context
94    }
95}
96
97struct ObserveOnContext<T, E> {
98    values: Vec<T>,
99    termination: Option<Termination<E>>,
100    disposal: Option<BoxedDisposal<'static>>,
101}
102
103impl<T, E> Disposable for Shared<Mutable<ObserveOnContext<T, E>>> {
104    fn dispose(self) {
105        safe_lock_option_disposable!(dispose: self, disposal);
106    }
107}
108
109struct ObserveOnObserver<T, E, OR, S> {
110    context: Shared<Mutable<ObserveOnContext<T, E>>>,
111    observer: Shared<Mutable<Option<OR>>>,
112    scheduler: S,
113}
114
115impl<T, E, OR, S> ObserveOnObserver<T, E, OR, S> {
116    fn setup_scheduler_if_needed(&self, mut lock: MutGuard<'_, ObserveOnContext<T, E>>)
117    where
118        T: NecessarySendSync + 'static,
119        E: NecessarySendSync + 'static,
120        OR: Observer<T, E> + NecessarySendSync + 'static,
121        S: Scheduler,
122    {
123        if lock.disposal.is_some() {
124            return;
125        }
126        let context = self.context.clone();
127        let observer = self.observer.clone();
128        lock.disposal
129            .replace(BoxedDisposal::new(self.scheduler.schedule_recursively(
130                move |_| {
131                    context.lock_mut(|mut lock| {
132                        let termination = lock.termination.take();
133                        let values = std::mem::take(&mut lock.values);
134
135                        match (termination, values.is_empty()) {
136                            (None, true) => {
137                                // No more values. Stop scheduler. Set disposal to None.
138                                if let Some(disposal) = lock.disposal.take() {
139                                    disposal.dispose();
140                                }
141                                RecursionAction::Stop
142                            }
143                            (None, false) => {
144                                drop(lock);
145                                safe_lock_option_observer!(on_next: observer, values: values);
146                                RecursionAction::ContinueImmediately
147                            }
148                            (Some(termination), true) => {
149                                drop(lock);
150                                safe_lock_option_observer!(on_termination: observer, termination);
151                                RecursionAction::Stop
152                            }
153                            (Some(termination), false) => {
154                                drop(lock);
155                                match termination {
156                                    Termination::Completed => {
157                                        safe_lock_option_observer!(on_next_and_termination: observer, values: values, Termination::Completed);
158                                    }
159                                    Termination::Error(_) => {
160                                        safe_lock_option_observer!(on_termination: observer, termination);
161                                    }
162                                }
163                                RecursionAction::Stop
164                            }
165                        }
166                    })
167                },
168                None,
169            )));
170    }
171}
172
173impl<T, E, OR, S> Observer<T, E> for ObserveOnObserver<T, E, OR, S>
174where
175    T: NecessarySendSync + 'static,
176    E: NecessarySendSync + 'static,
177    OR: Observer<T, E> + NecessarySendSync + 'static,
178    S: Scheduler,
179{
180    fn on_next(&mut self, value: T) {
181        self.context.lock_mut(|mut lock| {
182            lock.values.push(value);
183            self.setup_scheduler_if_needed(lock);
184        });
185    }
186
187    fn on_termination(self, termination: Termination<E>) {
188        self.context.lock_mut(|mut lock| {
189            lock.termination.replace(termination);
190            self.setup_scheduler_if_needed(lock);
191        });
192    }
193}