split_stream_by/lib.rs
1//!This crate offers two `futures::Stream` extension traits which allows for
2//! splitting a `Stream` into two streams using a predicate function thats
3//! checked on each `Stream::Item`.
4//!
5//!```rust
6//! use futures::StreamExt;
7//! use split_stream_by::SplitStreamByExt;
8//!
9//! tokio::runtime::Runtime::new().unwrap().block_on(async {
10//! let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
11//! let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
12//!
13//!
14//! tokio::spawn(async move {
15//! assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
16//! });
17//!
18//! assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
19//! })
20//! ```
21//!
22//! The following is how to use the version that can buffer more than one value.
23//! In this case
24//!```rust
25//! use futures::StreamExt;
26//! use split_stream_by::SplitStreamByExt;
27//!
28//! tokio::runtime::Runtime::new().unwrap().block_on(async {
29//! const BUFSIZE: usize = 10;
30//! let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
31//! let (mut even_stream, mut odd_stream) = incoming_stream.split_by_buffered::<BUFSIZE>(|&n| n % 2 == 0);
32//!
33//!
34//! tokio::spawn(async move {
35//! assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
36//! });
37//!
38//! assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
39//! })
40//! ```
41//!
42//!A more advanced usage uses `split_by_map` which allows for extracting values
43//! while splitting
44//!
45//!```rust
46//! use split_stream_by::{Either,SplitStreamByMapExt};
47//! use futures::StreamExt;
48//!
49//! #[derive(Debug, PartialEq)]
50//! struct Request;
51//!
52//! #[derive(Debug, PartialEq)]
53//! struct Response;
54//!
55//! enum Message {
56//! Request(Request),
57//! Response(Response)
58//! }
59//!
60//! tokio::runtime::Runtime::new().unwrap().block_on(async {
61//! let incoming_stream = futures::stream::iter([
62//! Message::Request(Request),
63//! Message::Response(Response),
64//! Message::Response(Response),
65//! ]);
66//! let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
67//! Message::Request(req) => Either::Left(req),
68//! Message::Response(res) => Either::Right(res),
69//! });
70//!
71//! let requests_fut = tokio::spawn(request_stream.collect::<Vec<_>>());
72//! let responses_fut = tokio::spawn(response_stream.collect::<Vec<_>>());
73//! let (requests,responses) = tokio::join!(requests_fut,responses_fut);
74//! assert_eq!(vec![Request], requests.unwrap());
75//! assert_eq!(vec![Response,Response], responses.unwrap());
76//! })
77//! ```
78mod ring_buf;
79mod split_by;
80mod split_by_buffered;
81mod split_by_map;
82mod split_by_map_buffered;
83
84pub(crate) use split_by::SplitBy;
85pub use split_by::{FalseSplitBy, TrueSplitBy};
86pub(crate) use split_by_buffered::SplitByBuffered;
87pub use split_by_buffered::{FalseSplitByBuffered, TrueSplitByBuffered};
88pub(crate) use split_by_map::SplitByMap;
89pub use split_by_map::{LeftSplitByMap, RightSplitByMap};
90pub(crate) use split_by_map_buffered::SplitByMapBuffered;
91pub use split_by_map_buffered::{LeftSplitByMapBuffered, RightSplitByMapBuffered};
92
93pub use futures::future::Either;
94use futures::Stream;
95
96/// This extension trait provides the functionality for splitting a
97/// stream by a predicate of type `Fn(&Self::Item) -> bool`. The two resulting
98/// streams will both yield `Self::Item`
99pub trait SplitStreamByExt<P>: Stream {
100 /// This takes ownership of a stream and returns two streams based on a
101 /// predicate. When the predicate returns `true`, the item will appear in
102 /// the first of the pair of streams returned. Items that return false will
103 /// go into the second of the pair of streams
104 ///
105 ///```rust
106 /// use split_stream_by::SplitStreamByExt;
107 ///
108 /// let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
109 /// let (even_stream, odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
110 /// ```
111 fn split_by(
112 self,
113 predicate: P,
114 ) -> (
115 TrueSplitBy<Self::Item, Self, P>,
116 FalseSplitBy<Self::Item, Self, P>,
117 )
118 where
119 P: Fn(&Self::Item) -> bool,
120 Self: Sized,
121 {
122 let stream = SplitBy::new(self, predicate);
123 let true_stream = TrueSplitBy::new(stream.clone());
124 let false_stream = FalseSplitBy::new(stream);
125 (true_stream, false_stream)
126 }
127
128 /// This takes ownership of a stream and returns two streams based on a
129 /// predicate. When the predicate returns `true`, the item will appear in
130 /// the first of the pair of streams returned. Items that return false will
131 /// go into the second of the pair of streams. This will buffer up to N
132 /// items of the inactive stream before returning Pending and notifying that
133 /// stream
134 ///
135 ///```rust
136 /// use split_stream_by::SplitStreamByExt;
137 ///
138 /// let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
139 /// let (even_stream, odd_stream) = incoming_stream.split_by_buffered::<3>(|&n| n % 2 == 0);
140 /// ```
141 fn split_by_buffered<const N: usize>(
142 self,
143 predicate: P,
144 ) -> (
145 TrueSplitByBuffered<Self::Item, Self, P, N>,
146 FalseSplitByBuffered<Self::Item, Self, P, N>,
147 )
148 where
149 P: Fn(&Self::Item) -> bool,
150 Self: Sized,
151 {
152 let stream = SplitByBuffered::new(self, predicate);
153 let true_stream = TrueSplitByBuffered::new(stream.clone());
154 let false_stream = FalseSplitByBuffered::new(stream);
155 (true_stream, false_stream)
156 }
157}
158
159impl<T, P> SplitStreamByExt<P> for T where T: Stream + ?Sized {}
160
161/// This extension trait provides the functionality for splitting a
162/// stream by a predicate of type `Fn(Self::Item) -> Either<L,R>`. The resulting
163/// streams will yield types `L` and `R` respectively
164pub trait SplitStreamByMapExt<P, L, R>: Stream {
165 /// This takes ownership of a stream and returns two streams based on a
166 /// predicate. The predicate takes an item by value and returns
167 /// `Either::Left(..)` or `Either::Right(..)` where the inner
168 /// values of `Left` and `Right` become the items of the two respective
169 /// streams
170 ///
171 /// ```
172 /// use split_stream_by::{Either,SplitStreamByMapExt};
173 /// struct Request {
174 /// //...
175 /// }
176 /// struct Response {
177 /// //...
178 /// }
179 /// enum Message {
180 /// Request(Request),
181 /// Response(Response)
182 /// }
183 /// let incoming_stream = futures::stream::iter([
184 /// Message::Request(Request {}),
185 /// Message::Response(Response {}),
186 /// Message::Response(Response {}),
187 /// ]);
188 /// let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
189 /// Message::Request(req) => Either::Left(req),
190 /// Message::Response(res) => Either::Right(res),
191 /// });
192 /// ```
193
194 fn split_by_map(
195 self,
196 predicate: P,
197 ) -> (
198 LeftSplitByMap<Self::Item, L, R, Self, P>,
199 RightSplitByMap<Self::Item, L, R, Self, P>,
200 )
201 where
202 P: Fn(Self::Item) -> Either<L, R>,
203 Self: Sized,
204 {
205 let stream = SplitByMap::new(self, predicate);
206 let true_stream = LeftSplitByMap::new(stream.clone());
207 let false_stream = RightSplitByMap::new(stream);
208 (true_stream, false_stream)
209 }
210
211 /// This takes ownership of a stream and returns two streams based on a
212 /// predicate. The predicate takes an item by value and returns
213 /// `Either::Left(..)` or `Either::Right(..)` where the inner
214 /// values of `Left` and `Right` become the items of the two respective
215 /// streams. This will buffer up to N items of the inactive stream before
216 /// returning Pending and notifying that stream
217 ///
218 /// ```
219 /// use split_stream_by::{Either,SplitStreamByMapExt};
220 /// struct Request {
221 /// //...
222 /// }
223 /// struct Response {
224 /// //...
225 /// }
226 /// enum Message {
227 /// Request(Request),
228 /// Response(Response)
229 /// }
230 /// let incoming_stream = futures::stream::iter([
231 /// Message::Request(Request {}),
232 /// Message::Response(Response {}),
233 /// Message::Response(Response {}),
234 /// ]);
235 /// let (mut request_stream, mut response_stream) = incoming_stream.split_by_map_buffered::<3>(|item| match item {
236 /// Message::Request(req) => Either::Left(req),
237 /// Message::Response(res) => Either::Right(res),
238 /// });
239 /// ```
240
241 fn split_by_map_buffered<const N: usize>(
242 self,
243 predicate: P,
244 ) -> (
245 LeftSplitByMapBuffered<Self::Item, L, R, Self, P, N>,
246 RightSplitByMapBuffered<Self::Item, L, R, Self, P, N>,
247 )
248 where
249 P: Fn(Self::Item) -> Either<L, R>,
250 Self: Sized,
251 {
252 let stream = SplitByMapBuffered::new(self, predicate);
253 let true_stream = LeftSplitByMapBuffered::new(stream.clone());
254 let false_stream = RightSplitByMapBuffered::new(stream);
255 (true_stream, false_stream)
256 }
257}
258
259impl<T, P, L, R> SplitStreamByMapExt<P, L, R> for T where T: Stream + ?Sized {}