rx_rust/operators/utility/subscribe_on.rs
1use crate::{
2 disposable::subscription::Subscription,
3 observable::Observable,
4 observer::Observer,
5 scheduler::Scheduler,
6 utils::types::{Mutable, MutableHelper, NecessarySend, Shared},
7};
8use educe::Educe;
9
10/// Specifies the `Scheduler` on which an observer will subscribe to this Observable.
11/// See <https://reactivex.io/documentation/operators/subscribeon.html>
12///
13/// # Examples
14/// ```rust
15/// # #[cfg(not(feature = "tokio-scheduler"))]
16/// # fn main() {
17/// # panic!("Use tokio-scheduler feature to run tests.");
18/// # }
19/// # #[cfg(feature = "tokio-scheduler")]
20/// #[tokio::main]
21/// async fn main() {
22/// use rx_rust::{
23/// observable::observable_ext::ObservableExt,
24/// observer::Termination,
25/// operators::{
26/// creating::from_iter::FromIter,
27/// utility::subscribe_on::SubscribeOn,
28/// },
29/// };
30/// use std::sync::{Arc, Mutex};
31/// use tokio::time::{sleep, Duration};
32///
33/// let handle = tokio::runtime::Handle::current();
34/// let values = Arc::new(Mutex::new(Vec::new()));
35/// let terminations = Arc::new(Mutex::new(Vec::new()));
36/// let values_observer = Arc::clone(&values);
37/// let terminations_observer = Arc::clone(&terminations);
38///
39/// let subscription = SubscribeOn::new(FromIter::new(vec![1, 2, 3]), handle.clone())
40/// .subscribe_with_callback(
41/// move |value| values_observer.lock().unwrap().push(value),
42/// move |termination| terminations_observer
43/// .lock()
44/// .unwrap()
45/// .push(termination),
46/// );
47///
48/// sleep(Duration::from_millis(10)).await;
49/// drop(subscription);
50///
51/// assert_eq!(&*values.lock().unwrap(), &[1, 2, 3]);
52/// assert_eq!(
53/// &*terminations.lock().unwrap(),
54/// &[Termination::Completed]
55/// );
56/// }
57/// ```
58#[derive(Educe)]
59#[educe(Debug, Clone)]
60pub struct SubscribeOn<OE, S> {
61 source: OE,
62 scheduler: S,
63}
64
65impl<OE, S> SubscribeOn<OE, S> {
66 pub fn new(source: OE, scheduler: S) -> Self {
67 Self { source, scheduler }
68 }
69}
70
71impl<'or, 'sub, T, E, OE, S> Observable<'static, 'sub, T, E> for SubscribeOn<OE, S>
72where
73 OE: Observable<'or, 'static, T, E> + NecessarySend + 'static,
74 S: Scheduler,
75{
76 fn subscribe(
77 self,
78 observer: impl Observer<T, E> + NecessarySend + 'static,
79 ) -> Subscription<'sub> {
80 let sub = Shared::new(Mutable::new(Some(Subscription::default()))); // Placeholder
81 let sub_cloned = sub.clone();
82 let disposal = self.scheduler.schedule(
83 move || {
84 sub_cloned.lock_mut(|mut lock| {
85 if lock.is_some() {
86 // Only subscribe if not unsubscribed yet.
87 let sub = self.source.subscribe(observer);
88 lock.replace(sub);
89 }
90 });
91 },
92 None,
93 );
94 Subscription::new_with_disposal(sub) + disposal
95 }
96}