rx-rust 0.3.0

Reactive Programming in Rust inspired by ReactiveX https://reactivex.io/
Documentation
use futures::{
    channel::oneshot::{Canceled, Receiver},
    future::abortable,
    stream::AbortHandle,
};
use pin_project::pin_project;
use rx_rust::disposable::Disposable;

#[pin_project]
pub(crate) struct JoinHandle<FU>
where
    FU: Future,
{
    #[pin]
    rx: Receiver<FU::Output>,
    abort_handle: AbortHandle,
}

impl<FU> JoinHandle<FU>
where
    FU: Future,
{
    pub(crate) fn wrap(future: FU) -> (Self, impl Future<Output = ()>) {
        let (tx, rx) = futures::channel::oneshot::channel();
        let (future, abort_handle) = abortable(future);

        let future = async {
            let result = future.await;
            if let Ok(value) = result {
                _ = tx.send(value);
            }
        };

        (JoinHandle { rx, abort_handle }, future)
    }
}

impl<FU> Future for JoinHandle<FU>
where
    FU: Future,
{
    type Output = Result<FU::Output, Canceled>;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let this = self.project();
        Future::poll(this.rx, cx)
    }
}

impl<FU> Disposable for JoinHandle<FU>
where
    FU: Future,
{
    fn dispose(self) {
        self.abort_handle.abort();
    }
}