rx_rust/operators/utility/
subscribe_on.rs

1use crate::{
2    disposable::subscription::Subscription,
3    observable::Observable,
4    observer::Observer,
5    scheduler::Scheduler,
6    utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared},
7};
8use educe::Educe;
9
10/// Specifies the `Scheduler` on which an observer will subscribe to this Observable.
11/// See <https://reactivex.io/documentation/operators/subscribeon.html>
12///
13/// # Examples
14/// ```rust
15/// # #[cfg(not(feature = "tokio-scheduler"))]
16/// # fn main() {
17/// #     panic!("Use tokio-scheduler feature to run tests.");
18/// # }
19/// # #[cfg(feature = "tokio-scheduler")]
20/// #[tokio::main]
21/// async fn main() {
22///     use rx_rust::{
23///         observable::observable_ext::ObservableExt,
24///         observer::Termination,
25///         operators::{
26///             creating::from_iter::FromIter,
27///             utility::subscribe_on::SubscribeOn,
28///         },
29///     };
30///     use std::sync::{Arc, Mutex};
31///     use tokio::time::{sleep, Duration};
32///
33///     let handle = tokio::runtime::Handle::current();
34///     let values = Arc::new(Mutex::new(Vec::new()));
35///     let terminations = Arc::new(Mutex::new(Vec::new()));
36///     let values_observer = Arc::clone(&values);
37///     let terminations_observer = Arc::clone(&terminations);
38///
39///     let subscription = SubscribeOn::new(FromIter::new(vec![1, 2, 3]), handle.clone())
40///         .subscribe_with_callback(
41///             move |value| values_observer.lock().unwrap().push(value),
42///             move |termination| terminations_observer
43///                 .lock()
44///                 .unwrap()
45///                 .push(termination),
46///         );
47///
48///     sleep(Duration::from_millis(10)).await;
49///     drop(subscription);
50///
51///     assert_eq!(&*values.lock().unwrap(), &[1, 2, 3]);
52///     assert_eq!(
53///         &*terminations.lock().unwrap(),
54///         &[Termination::Completed]
55///     );
56/// }
57/// ```
58#[derive(Educe)]
59#[educe(Debug, Clone)]
60pub struct SubscribeOn<OE, S> {
61    source: OE,
62    scheduler: S,
63}
64
65impl<OE, S> SubscribeOn<OE, S> {
66    pub fn new(source: OE, scheduler: S) -> Self {
67        Self { source, scheduler }
68    }
69}
70
71impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for SubscribeOn<OE, S>
72where
73    OE: Observable<'or, 'static, T, E> + NecessarySendSync + 'static,
74    S: Scheduler,
75{
76    fn subscribe(
77        self,
78        observer: impl Observer<T, E> + NecessarySendSync + 'static,
79    ) -> Subscription<'sub> {
80        let sub = Shared::new(Mutable::new(Some(Subscription::default()))); // Placeholder
81        let sub_cloned = sub.clone();
82        let disposal = self.scheduler.schedule(
83            move || {
84                sub_cloned.lock_mut(|mut lock| {
85                    if lock.is_some() {
86                        // Only subscribe if not unsubscribed yet.
87                        let sub = self.source.subscribe(observer);
88                        lock.replace(sub);
89                    }
90                });
91            },
92            None,
93        );
94        Subscription::new_with_disposal(sub) + disposal
95    }
96}