future_utils/
stream_ext.rs

1use std::fmt::Display;
2use std::time::{Instant, Duration};
3use futures::{Future, Stream};
4use log::LogLevel;
5use void::Void;
6
7use until::Until;
8use first_ok::FirstOk;
9use log_errors::LogErrors;
10use infallible::Infallible;
11use next_or_else::NextOrElse;
12use finally::Finally;
13use with_timeout::WithTimeout;
14use with_readiness_timeout::WithReadinessTimeout;
15use {BoxStream, BoxSendStream};
16
17/// Extension trait for `Stream`.
18pub trait StreamExt: Stream + Sized {
19    /// Wraps a stream into a boxed stream, making type-checking easier at the expense of an extra
20    /// layer of indirection at runtime.
21    fn into_boxed(self) -> BoxStream<Self::Item, Self::Error>
22    where
23        Self: 'static
24    {
25        Box::new(self)
26    }
27
28    fn into_send_boxed(self) -> BoxSendStream<Self::Item, Self::Error>
29    where
30        Self: Send + 'static,
31    {
32        Box::new(self)
33    }
34
35    /// Run this stream until some condition is met. `condition` is a future which returns `()`,
36    /// after which this stream will be finished.
37    ///
38    /// # Example
39    /// ```rust
40    /// let my_stream_with_timeout = my_stream.until(Delay::new(Instant::now() + Duration::from_secs(1)));
41    /// ```
42    fn until<C>(self, condition: C) -> Until<Self, C>
43    where
44        C: Future<Item=()>,
45        Self::Error: From<C::Error>
46    {
47        Until::new(self, condition)
48    }
49
50    /// Adapts a stream to a future by taking the first successful item yielded by the stream. If
51    /// the stream ends before yielding an `Ok` then all the errors that were yielded by the stream
52    /// are returned in a vector.
53    fn first_ok(self) -> FirstOk<Self> {
54        FirstOk::new(self)
55    }
56
57    /// Removes the errors from this stream and log them. `description` is prepended to the log
58    /// messages. The returned stream has error type `Void` since the errors have been removed.
59    fn log_errors(self, level: LogLevel, description: &'static str) -> LogErrors<Self>
60    where
61        Self::Error: Display
62    {
63        LogErrors::new(self, level, description)
64    }
65
66    /// For streams which can't fail (ie. which have error type `Void`), cast the error type to
67    /// some inferred type.
68    fn infallible<E>(self) -> Infallible<Self, E>
69    where
70        Self: Stream<Error=Void>
71    {
72        Infallible::new(self)
73    }
74
75    /// Returns a future which returns the next item from the stream, along with the stream itself.
76    /// If the stream errors then just the error is returned. If the stream ends then the provided
77    /// closure is used to produce an error value.
78    fn next_or_else<F, E>(self, f: F) -> NextOrElse<Self, F>
79    where
80        F: FnOnce() -> E,
81        E: From<Self::Error>,
82    {
83        NextOrElse::new(self, f)
84    }
85
86    /// Yields items from the stream and runs the provided callback when the stream finishes. The
87    /// callback will also be run if the entire stream is dropped.
88    fn finally<D>(self, on_drop: D) -> Finally<Self, D>
89    where
90        D: FnOnce()
91    {
92        Finally::new(self, on_drop)
93    }
94
95    /// Runs the stream for the given duration.
96    fn with_timeout(self, duration: Duration) -> WithTimeout<Self> {
97        WithTimeout::new(self, duration)
98    }
99
100    /// Runs the stream until the given timeout.
101    fn with_timeout_at(self, instant: Instant) -> WithTimeout<Self> {
102        WithTimeout::new_at(self, instant)
103    }
104
105    fn with_readiness_timeout(self, duration: Duration) -> WithReadinessTimeout<Self> {
106        WithReadinessTimeout::new(self, duration)
107    }
108}
109
110impl<T: Stream + Sized> StreamExt for T {}
111