rx_rust/operators/creating/
from_future.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6    scheduler::Scheduler,
7};
8use educe::Educe;
9use std::convert::Infallible;
10
11/// Converts a Future into an Observable.
12/// See <https://reactivex.io/documentation/operators/from.html>
13///
14/// # Examples
15/// ```rust
16/// # #[cfg(not(feature = "tokio-scheduler"))]
17/// # fn main() {
18/// #     panic!("Use tokio-scheduler feature to run tests.");
19/// # }
20/// # #[cfg(feature = "tokio-scheduler")]
21/// #[tokio::main]
22/// async fn main() {
23///     use rx_rust::{
24///         observable::observable_ext::ObservableExt,
25///         observer::Termination,
26///         operators::creating::from_future::FromFuture,
27///     };
28///     use std::sync::{Arc, Mutex};
29///     use tokio::time::{sleep, Duration};
30///
31///     let values = Arc::new(Mutex::new(Vec::new()));
32///     let terminations = Arc::new(Mutex::new(Vec::new()));
33///     let values_observer = Arc::clone(&values);
34///     let terminations_observer = Arc::clone(&terminations);
35///     let handle = tokio::runtime::Handle::current();
36///
37///     let subscription = FromFuture::new(async { 7 }, handle).subscribe_with_callback(
38///         move |value| values_observer.lock().unwrap().push(value),
39///         move |termination| terminations_observer
40///             .lock()
41///             .unwrap()
42///             .push(termination),
43///     );
44///
45///     sleep(Duration::from_millis(10)).await;
46///     drop(subscription);
47///
48///     assert_eq!(&*values.lock().unwrap(), &[7]);
49///     assert_eq!(
50///         &*terminations.lock().unwrap(),
51///         &[Termination::Completed]
52///     );
53/// }
54/// ```
55#[derive(Educe)]
56#[educe(Debug, Clone)]
57pub struct FromFuture<FU, S> {
58    future: FU,
59    scheduler: S,
60}
61
62impl<FU, S> FromFuture<FU, S> {
63    pub fn new(future: FU, scheduler: S) -> Self
64    where
65        FU: Future + NecessarySendSync + 'static,
66    {
67        Self { future, scheduler }
68    }
69}
70
71impl<'sub, T, FU, S> Observable<'static, 'sub, T, Infallible> for FromFuture<FU, S>
72where
73    FU: Future<Output = T> + NecessarySendSync + 'static,
74    S: Scheduler,
75{
76    fn subscribe(
77        self,
78        mut observer: impl Observer<T, Infallible> + NecessarySendSync + 'static,
79    ) -> Subscription<'sub> {
80        let disposal = self.scheduler.schedule_future(async {
81            let result = self.future.await;
82            observer.on_next(result);
83            observer.on_termination(Termination::Completed);
84        });
85        Subscription::new_with_disposal(disposal)
86    }
87}