rx-rust 0.3.0

Reactive Programming in Rust inspired by ReactiveX https://reactivex.io/
Documentation
use crate::utils::types::NecessarySendSync;
use crate::{
    disposable::subscription::Subscription,
    observable::Observable,
    observer::{Observer, Termination},
    scheduler::Scheduler,
};
use educe::Educe;
use futures::Stream;
use std::convert::Infallible;

/// Converts a `Stream` into an Observable.
/// See <https://reactivex.io/documentation/operators/from.html>
///
/// # Examples
/// ```rust
/// # #[cfg(not(feature = "tokio-scheduler"))]
/// # fn main() {
/// #     panic!("Use tokio-scheduler feature to run tests.");
/// # }
/// # #[cfg(feature = "tokio-scheduler")]
/// #[tokio::main]
/// async fn main() {
///     use futures::stream;
///     use rx_rust::{
///         observable::observable_ext::ObservableExt,
///         observer::Termination,
///         operators::creating::from_stream::FromStream,
///     };
///     use std::sync::{Arc, Mutex};
///     use tokio::time::{sleep, Duration};
///
///     let handle = tokio::runtime::Handle::current();
///     let values = Arc::new(Mutex::new(Vec::new()));
///     let terminations = Arc::new(Mutex::new(Vec::new()));
///     let values_observer = Arc::clone(&values);
///     let terminations_observer = Arc::clone(&terminations);
///     let stream = stream::iter([10, 20]);
///
///     let subscription = FromStream::new(stream, handle).subscribe_with_callback(
///         move |value| values_observer.lock().unwrap().push(value),
///         move |termination| terminations_observer
///             .lock()
///             .unwrap()
///             .push(termination),
///     );
///
///     sleep(Duration::from_millis(10)).await;
///     drop(subscription);
///
///     assert_eq!(&*values.lock().unwrap(), &[10, 20]);
///     assert_eq!(
///         &*terminations.lock().unwrap(),
///         &[Termination::Completed]
///     );
/// }
/// ```
#[derive(Educe)]
#[educe(Debug, Clone)]
pub struct FromStream<SM, S> {
    stream: SM,
    scheduler: S,
}

impl<SM, S> FromStream<SM, S> {
    pub fn new(stream: SM, scheduler: S) -> Self
    where
        SM: Stream + NecessarySendSync + Unpin + 'static,
    {
        Self { stream, scheduler }
    }
}

impl<'sub, T, SM, S> Observable<'static, 'sub, T, Infallible> for FromStream<SM, S>
where
    SM: Stream<Item = T> + NecessarySendSync + Unpin + 'static,
    S: Scheduler,
{
    fn subscribe(
        self,
        observer: impl Observer<T, Infallible> + NecessarySendSync + 'static,
    ) -> Subscription<'sub> {
        let mut observer = Some(observer);
        let disposal = self
            .scheduler
            .schedule_stream(self.stream, move |result| match result {
                Some(value) => {
                    if let Some(observer) = observer.as_mut() {
                        observer.on_next(value)
                    }
                }
                None => {
                    if let Some(observer) = observer.take() {
                        observer.on_termination(Termination::Completed)
                    }
                }
            });
        Subscription::new_with_disposal(disposal)
    }
}