futures_ext/stream/
mod.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10//! Module extending functionality of [`futures::stream`] module
11
12mod return_remainder;
13mod stream_with_timeout;
14mod weight_limited_buffered_stream;
15mod yield_periodically;
16
17use std::time::Duration;
18
19use futures::Future;
20use futures::Stream;
21use futures::StreamExt;
22use futures::TryFuture;
23use futures::TryStream;
24
25pub use self::return_remainder::ReturnRemainder;
26pub use self::stream_with_timeout::StreamTimeoutError;
27pub use self::stream_with_timeout::StreamWithTimeout;
28pub use self::weight_limited_buffered_stream::BufferedParams;
29pub use self::weight_limited_buffered_stream::WeightLimitedBufferedStream;
30pub use self::weight_limited_buffered_stream::WeightLimitedBufferedTryStream;
31pub use self::yield_periodically::YieldPeriodically;
32use crate::future::ConservativeReceiver;
33
34/// A trait implemented by default for all Streams which extends the standard
35/// functionality.
36pub trait FbStreamExt: Stream {
37    /// Creates a stream wrapper and a future. The future will resolve into the wrapped stream when
38    /// the stream wrapper returns None. It uses ConservativeReceiver to ensure that deadlocks are
39    /// easily caught when one tries to poll on the receiver before consuming the stream.
40    fn return_remainder(self) -> (ReturnRemainder<Self>, ConservativeReceiver<Self>)
41    where
42        Self: Sized,
43    {
44        ReturnRemainder::new(self)
45    }
46
47    /// Like [futures::stream::StreamExt::buffered] call,
48    /// but can also limit number of futures in a buffer by "weight".
49    fn buffered_weight_limited<'a, I, Fut>(
50        self,
51        params: BufferedParams,
52    ) -> WeightLimitedBufferedStream<'a, Self, I>
53    where
54        Self: Sized + Send + 'a,
55        Self: Stream<Item = (Fut, u64)>,
56        Fut: Future<Output = I>,
57    {
58        WeightLimitedBufferedStream::new(params, self)
59    }
60
61    /// Construct a new [self::stream_with_timeout::StreamWithTimeout].
62    fn whole_stream_timeout(self, timeout: Duration) -> StreamWithTimeout<Self>
63    where
64        Self: Sized,
65    {
66        StreamWithTimeout::new(self, timeout)
67    }
68
69    /// Construct a new [self::yield_periodically::YieldPeriodically], with a sensible default.
70    fn yield_periodically(self) -> YieldPeriodically<Self>
71    where
72        Self: Sized,
73    {
74        YieldPeriodically::new(self, Duration::from_millis(10))
75    }
76}
77
78impl<T> FbStreamExt for T where T: Stream + ?Sized {}
79
80/// A trait implemented by default for all TryStreams which extends the standard
81/// functionality.
82pub trait FbTryStreamExt: TryStream {
83    /// Like [futures::stream::StreamExt::buffered] call, but for `TryStream` and
84    /// can also limit number of futures in a buffer by "weight".
85    fn try_buffered_weight_limited<'a, I, Fut, E>(
86        self,
87        params: BufferedParams,
88    ) -> WeightLimitedBufferedTryStream<'a, Self, I, E>
89    where
90        Self: Sized + Send + 'a,
91        Self: TryStream<Ok = (Fut, u64), Error = E>,
92        Fut: TryFuture<Ok = I, Error = E>,
93    {
94        WeightLimitedBufferedTryStream::new(params, self)
95    }
96
97    /// Convert a `Stream` of `Result<Result<I, E1>, E2>` into a `Stream` of
98    /// `Result<I, E1>`, assuming `E2` can convert into `E1`.
99    #[allow(clippy::type_complexity)]
100    fn flatten_err<I, E1, E2>(
101        self,
102    ) -> futures::stream::Map<Self, fn(Result<Result<I, E1>, E2>) -> Result<I, E1>>
103    where
104        Self: Sized,
105        Self: Stream<Item = Result<Result<I, E1>, E2>>,
106        E1: From<E2>,
107    {
108        fn flatten_err<I, E1, E2>(e: Result<Result<I, E1>, E2>) -> Result<I, E1>
109        where
110            E1: From<E2>,
111        {
112            match e {
113                Ok(Ok(i)) => Ok(i),
114                Ok(Err(e1)) => Err(e1),
115                Err(e2) => Err(E1::from(e2)),
116            }
117        }
118
119        self.map(flatten_err)
120    }
121}
122
123impl<T> FbTryStreamExt for T where T: TryStream + ?Sized {}