barter_integration/stream/ext/
mod.rs1use crate::stream::ext::{
2 forward_by::ForwardBy,
3 forward_clone_by::ForwardCloneBy,
4 indexed::{IndexedStream, Indexer},
5};
6use futures::Stream;
7
8pub mod forward_by;
10
11pub mod forward_clone_by;
13
14pub mod indexed;
16
17pub trait BarterStreamExt
19where
20 Self: Stream,
21{
22 fn with_timeout<TimeoutHandler>(
26 self,
27 timeout_next_item: std::time::Duration,
28 on_timeout: TimeoutHandler,
29 ) -> impl Stream<Item = Self::Item>
30 where
31 Self: Stream + Sized,
32 TimeoutHandler: FnOnce() + 'static,
33 {
34 use tokio_stream::StreamExt;
35 let mut on_timeout = Some(on_timeout);
36 self.timeout(timeout_next_item)
37 .map_while(move |timeout_result| match timeout_result {
38 Ok(item) => Some(item),
39 Err(_elapsed) => {
40 if let Some(handler) = on_timeout.take() {
41 handler();
42 }
43 None
44 }
45 })
46 }
47
48 fn with_index<I>(self, indexer: I) -> IndexedStream<Self, I>
50 where
51 Self: Stream<Item = I::Unindexed> + Sized,
52 I: Indexer,
53 {
54 IndexedStream::new(self, indexer)
55 }
56
57 fn forward_by<A, B, FnPredicate, FnForward, FwdErr>(
62 self,
63 predicate: FnPredicate,
64 forward: FnForward,
65 ) -> ForwardBy<Self, FnPredicate, FnForward>
66 where
67 Self: Stream + Sized,
68 FnPredicate: Fn(Self::Item) -> futures::future::Either<A, B>,
69 FnForward: FnMut(A) -> Result<(), FwdErr>,
70 {
71 ForwardBy::new(self, predicate, forward)
72 }
73
74 fn forward_clone_by<FnPredicate, FnForward, FwdErr>(
77 self,
78 predicate: FnPredicate,
79 forward: FnForward,
80 ) -> ForwardCloneBy<Self, FnPredicate, FnForward>
81 where
82 Self: Stream + Sized,
83 Self::Item: Clone,
84 FnPredicate: FnMut(&Self::Item) -> bool,
85 FnForward: FnMut(Self::Item) -> Result<(), FwdErr>,
86 {
87 ForwardCloneBy::new(self, predicate, forward)
88 }
89}
90
91impl<S> BarterStreamExt for S where S: Stream {}