future_utils/
stream_ext.rs1use std::fmt::Display;
2use std::time::{Instant, Duration};
3use futures::{Future, Stream};
4use log::LogLevel;
5use void::Void;
6
7use until::Until;
8use first_ok::FirstOk;
9use log_errors::LogErrors;
10use infallible::Infallible;
11use next_or_else::NextOrElse;
12use finally::Finally;
13use with_timeout::WithTimeout;
14use with_readiness_timeout::WithReadinessTimeout;
15use {BoxStream, BoxSendStream};
16
17pub trait StreamExt: Stream + Sized {
19 fn into_boxed(self) -> BoxStream<Self::Item, Self::Error>
22 where
23 Self: 'static
24 {
25 Box::new(self)
26 }
27
28 fn into_send_boxed(self) -> BoxSendStream<Self::Item, Self::Error>
29 where
30 Self: Send + 'static,
31 {
32 Box::new(self)
33 }
34
35 fn until<C>(self, condition: C) -> Until<Self, C>
43 where
44 C: Future<Item=()>,
45 Self::Error: From<C::Error>
46 {
47 Until::new(self, condition)
48 }
49
50 fn first_ok(self) -> FirstOk<Self> {
54 FirstOk::new(self)
55 }
56
57 fn log_errors(self, level: LogLevel, description: &'static str) -> LogErrors<Self>
60 where
61 Self::Error: Display
62 {
63 LogErrors::new(self, level, description)
64 }
65
66 fn infallible<E>(self) -> Infallible<Self, E>
69 where
70 Self: Stream<Error=Void>
71 {
72 Infallible::new(self)
73 }
74
75 fn next_or_else<F, E>(self, f: F) -> NextOrElse<Self, F>
79 where
80 F: FnOnce() -> E,
81 E: From<Self::Error>,
82 {
83 NextOrElse::new(self, f)
84 }
85
86 fn finally<D>(self, on_drop: D) -> Finally<Self, D>
89 where
90 D: FnOnce()
91 {
92 Finally::new(self, on_drop)
93 }
94
95 fn with_timeout(self, duration: Duration) -> WithTimeout<Self> {
97 WithTimeout::new(self, duration)
98 }
99
100 fn with_timeout_at(self, instant: Instant) -> WithTimeout<Self> {
102 WithTimeout::new_at(self, instant)
103 }
104
105 fn with_readiness_timeout(self, duration: Duration) -> WithReadinessTimeout<Self> {
106 WithReadinessTimeout::new(self, duration)
107 }
108}
109
110impl<T: Stream + Sized> StreamExt for T {}
111