Skip to main content

barter_integration/stream/ext/
mod.rs

1use crate::stream::ext::{
2    forward_by::ForwardBy,
3    forward_clone_by::ForwardCloneBy,
4    indexed::{IndexedStream, Indexer},
5};
6use futures::Stream;
7
8/// Forward matching items, yield non-matching items.
9pub mod forward_by;
10
11/// Forward clones of matching items, yield all items.
12pub mod forward_clone_by;
13
14/// Indexed Stream.
15pub mod indexed;
16
17/// `Stream` extension trait.
18pub trait BarterStreamExt
19where
20    Self: Stream,
21{
22    /// Add a "consecutive event timeout" to the `Stream`
23    ///
24    /// Upon timeout, the `Stream` ends after executing the provided `TimeoutHandler` function.
25    fn with_timeout<TimeoutHandler>(
26        self,
27        timeout_next_item: std::time::Duration,
28        on_timeout: TimeoutHandler,
29    ) -> impl Stream<Item = Self::Item>
30    where
31        Self: Stream + Sized,
32        TimeoutHandler: FnOnce() + 'static,
33    {
34        use tokio_stream::StreamExt;
35        let mut on_timeout = Some(on_timeout);
36        self.timeout(timeout_next_item)
37            .map_while(move |timeout_result| match timeout_result {
38                Ok(item) => Some(item),
39                Err(_elapsed) => {
40                    if let Some(handler) = on_timeout.take() {
41                        handler();
42                    }
43                    None
44                }
45            })
46    }
47
48    /// Indexes the `Stream` using the provided `Indexer`.
49    fn with_index<I>(self, indexer: I) -> IndexedStream<Self, I>
50    where
51        Self: Stream<Item = I::Unindexed> + Sized,
52        I: Indexer,
53    {
54        IndexedStream::new(self, indexer)
55    }
56
57    /// Forward a subset of `Stream::Item`s.
58    ///
59    /// All items that the predicate returns as `Left` and forwarded, whilst `Right` items
60    /// continue through the existing `Stream`.
61    fn forward_by<A, B, FnPredicate, FnForward, FwdErr>(
62        self,
63        predicate: FnPredicate,
64        forward: FnForward,
65    ) -> ForwardBy<Self, FnPredicate, FnForward>
66    where
67        Self: Stream + Sized,
68        FnPredicate: Fn(Self::Item) -> futures::future::Either<A, B>,
69        FnForward: FnMut(A) -> Result<(), FwdErr>,
70    {
71        ForwardBy::new(self, predicate, forward)
72    }
73
74    /// Forward a clone of a subset of `Stream::Item`s that match the predicate. The `Stream`
75    /// still yields all items.
76    fn forward_clone_by<FnPredicate, FnForward, FwdErr>(
77        self,
78        predicate: FnPredicate,
79        forward: FnForward,
80    ) -> ForwardCloneBy<Self, FnPredicate, FnForward>
81    where
82        Self: Stream + Sized,
83        Self::Item: Clone,
84        FnPredicate: FnMut(&Self::Item) -> bool,
85        FnForward: FnMut(Self::Item) -> Result<(), FwdErr>,
86    {
87        ForwardCloneBy::new(self, predicate, forward)
88    }
89}
90
91impl<S> BarterStreamExt for S where S: Stream {}