futures_ext/stream/
mod.rs1mod return_remainder;
13mod stream_with_timeout;
14mod weight_limited_buffered_stream;
15mod yield_periodically;
16
17use std::time::Duration;
18
19use futures::Future;
20use futures::Stream;
21use futures::StreamExt;
22use futures::TryFuture;
23use futures::TryStream;
24
25pub use self::return_remainder::ReturnRemainder;
26pub use self::stream_with_timeout::StreamTimeoutError;
27pub use self::stream_with_timeout::StreamWithTimeout;
28pub use self::weight_limited_buffered_stream::BufferedParams;
29pub use self::weight_limited_buffered_stream::WeightLimitedBufferedStream;
30pub use self::weight_limited_buffered_stream::WeightLimitedBufferedTryStream;
31pub use self::yield_periodically::YieldPeriodically;
32use crate::future::ConservativeReceiver;
33
34pub trait FbStreamExt: Stream {
37 fn return_remainder(self) -> (ReturnRemainder<Self>, ConservativeReceiver<Self>)
41 where
42 Self: Sized,
43 {
44 ReturnRemainder::new(self)
45 }
46
47 fn buffered_weight_limited<'a, I, Fut>(
50 self,
51 params: BufferedParams,
52 ) -> WeightLimitedBufferedStream<'a, Self, I>
53 where
54 Self: Sized + Send + 'a,
55 Self: Stream<Item = (Fut, u64)>,
56 Fut: Future<Output = I>,
57 {
58 WeightLimitedBufferedStream::new(params, self)
59 }
60
61 fn whole_stream_timeout(self, timeout: Duration) -> StreamWithTimeout<Self>
63 where
64 Self: Sized,
65 {
66 StreamWithTimeout::new(self, timeout)
67 }
68
69 fn yield_periodically(self) -> YieldPeriodically<Self>
71 where
72 Self: Sized,
73 {
74 YieldPeriodically::new(self, Duration::from_millis(10))
75 }
76}
77
78impl<T> FbStreamExt for T where T: Stream + ?Sized {}
79
80pub trait FbTryStreamExt: TryStream {
83 fn try_buffered_weight_limited<'a, I, Fut, E>(
86 self,
87 params: BufferedParams,
88 ) -> WeightLimitedBufferedTryStream<'a, Self, I, E>
89 where
90 Self: Sized + Send + 'a,
91 Self: TryStream<Ok = (Fut, u64), Error = E>,
92 Fut: TryFuture<Ok = I, Error = E>,
93 {
94 WeightLimitedBufferedTryStream::new(params, self)
95 }
96
97 #[allow(clippy::type_complexity)]
100 fn flatten_err<I, E1, E2>(
101 self,
102 ) -> futures::stream::Map<Self, fn(Result<Result<I, E1>, E2>) -> Result<I, E1>>
103 where
104 Self: Sized,
105 Self: Stream<Item = Result<Result<I, E1>, E2>>,
106 E1: From<E2>,
107 {
108 fn flatten_err<I, E1, E2>(e: Result<Result<I, E1>, E2>) -> Result<I, E1>
109 where
110 E1: From<E2>,
111 {
112 match e {
113 Ok(Ok(i)) => Ok(i),
114 Ok(Err(e1)) => Err(e1),
115 Err(e2) => Err(E1::from(e2)),
116 }
117 }
118
119 self.map(flatten_err)
120 }
121}
122
123impl<T> FbTryStreamExt for T where T: TryStream + ?Sized {}