rustrade-integration 0.2.0

Low-level framework for composing flexible web integrations, especially with financial exchanges
Documentation
use crate::stream::ext::{
    forward_by::ForwardBy,
    forward_clone_by::ForwardCloneBy,
    indexed::{IndexedStream, Indexer},
};
use futures::Stream;

/// Forward matching items, yield non-matching items.
pub mod forward_by;

/// Forward clones of matching items, yield all items.
pub mod forward_clone_by;

/// Indexed Stream.
pub mod indexed;

/// `Stream` extension trait.
pub trait BarterStreamExt
where
    Self: Stream,
{
    /// Add a "consecutive event timeout" to the `Stream`
    ///
    /// Upon timeout, the `Stream` ends after executing the provided `TimeoutHandler` function.
    fn with_timeout<TimeoutHandler>(
        self,
        timeout_next_item: std::time::Duration,
        on_timeout: TimeoutHandler,
    ) -> impl Stream<Item = Self::Item>
    where
        Self: Stream + Sized,
        TimeoutHandler: FnOnce() + 'static,
    {
        use tokio_stream::StreamExt;
        let mut on_timeout = Some(on_timeout);
        self.timeout(timeout_next_item)
            .map_while(move |timeout_result| match timeout_result {
                Ok(item) => Some(item),
                Err(_elapsed) => {
                    if let Some(handler) = on_timeout.take() {
                        handler();
                    }
                    None
                }
            })
    }

    /// Indexes the `Stream` using the provided `Indexer`.
    fn with_index<I>(self, indexer: I) -> IndexedStream<Self, I>
    where
        Self: Stream<Item = I::Unindexed> + Sized,
        I: Indexer,
    {
        IndexedStream::new(self, indexer)
    }

    /// Forward a subset of `Stream::Item`s.
    ///
    /// All items that the predicate returns as `Left` and forwarded, whilst `Right` items
    /// continue through the existing `Stream`.
    fn forward_by<A, B, FnPredicate, FnForward, FwdErr>(
        self,
        predicate: FnPredicate,
        forward: FnForward,
    ) -> ForwardBy<Self, FnPredicate, FnForward>
    where
        Self: Stream + Sized,
        FnPredicate: Fn(Self::Item) -> futures::future::Either<A, B>,
        FnForward: FnMut(A) -> Result<(), FwdErr>,
    {
        ForwardBy::new(self, predicate, forward)
    }

    /// Forward a clone of a subset of `Stream::Item`s that match the predicate. The `Stream`
    /// still yields all items.
    fn forward_clone_by<FnPredicate, FnForward, FwdErr>(
        self,
        predicate: FnPredicate,
        forward: FnForward,
    ) -> ForwardCloneBy<Self, FnPredicate, FnForward>
    where
        Self: Stream + Sized,
        Self::Item: Clone,
        FnPredicate: FnMut(&Self::Item) -> bool,
        FnForward: FnMut(Self::Item) -> Result<(), FwdErr>,
    {
        ForwardCloneBy::new(self, predicate, forward)
    }
}

impl<S> BarterStreamExt for S where S: Stream {}