split_stream_by/
lib.rs

1//!This crate offers two `futures::Stream` extension traits which allows for
2//! splitting a `Stream` into two streams using a predicate function thats
3//! checked on each `Stream::Item`.
4//!
5//!```rust
6//! use futures::StreamExt;
7//! use split_stream_by::SplitStreamByExt;
8//!
9//! tokio::runtime::Runtime::new().unwrap().block_on(async {
10//!     let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
11//!     let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
12//!
13//!
14//!     tokio::spawn(async move {
15//!     	assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
16//!     });
17//!
18//!     assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
19//! })
20//! ```
21//!
22//! The following is how to use the version that can buffer more than one value.
23//! In this case
24//!```rust
25//! use futures::StreamExt;
26//! use split_stream_by::SplitStreamByExt;
27//!
28//! tokio::runtime::Runtime::new().unwrap().block_on(async {
29//!     const BUFSIZE: usize = 10;
30//!     let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
31//!     let (mut even_stream, mut odd_stream) = incoming_stream.split_by_buffered::<BUFSIZE>(|&n| n % 2 == 0);
32//!
33//!
34//!     tokio::spawn(async move {
35//!     	assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
36//!     });
37//!
38//!     assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
39//! })
40//! ```
41//!
42//!A more advanced usage uses `split_by_map` which allows for extracting values
43//! while splitting
44//!
45//!```rust
46//! use split_stream_by::{Either,SplitStreamByMapExt};
47//! use futures::StreamExt;
48//!
49//! #[derive(Debug, PartialEq)]
50//! struct Request;
51//!
52//! #[derive(Debug, PartialEq)]
53//! struct Response;
54//!
55//! enum Message {
56//! 	Request(Request),
57//! 	Response(Response)
58//! }
59//!
60//! tokio::runtime::Runtime::new().unwrap().block_on(async {
61//!     let incoming_stream = futures::stream::iter([
62//!     	Message::Request(Request),
63//!     	Message::Response(Response),
64//!     	Message::Response(Response),
65//!     ]);
66//!     let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
67//!     	Message::Request(req) => Either::Left(req),
68//!     	Message::Response(res) => Either::Right(res),
69//!     });
70//!
71//!     let requests_fut = tokio::spawn(request_stream.collect::<Vec<_>>());
72//!     let responses_fut = tokio::spawn(response_stream.collect::<Vec<_>>());
73//!     let (requests,responses) = tokio::join!(requests_fut,responses_fut);
74//!    	assert_eq!(vec![Request], requests.unwrap());
75//!     assert_eq!(vec![Response,Response], responses.unwrap());
76//! })
77//! ```
78mod ring_buf;
79mod split_by;
80mod split_by_buffered;
81mod split_by_map;
82mod split_by_map_buffered;
83
84pub(crate) use split_by::SplitBy;
85pub use split_by::{FalseSplitBy, TrueSplitBy};
86pub(crate) use split_by_buffered::SplitByBuffered;
87pub use split_by_buffered::{FalseSplitByBuffered, TrueSplitByBuffered};
88pub(crate) use split_by_map::SplitByMap;
89pub use split_by_map::{LeftSplitByMap, RightSplitByMap};
90pub(crate) use split_by_map_buffered::SplitByMapBuffered;
91pub use split_by_map_buffered::{LeftSplitByMapBuffered, RightSplitByMapBuffered};
92
93pub use futures::future::Either;
94use futures::Stream;
95
96/// This extension trait provides the functionality for splitting a
97/// stream by a predicate of type `Fn(&Self::Item) -> bool`. The two resulting
98/// streams will both yield `Self::Item`
99pub trait SplitStreamByExt<P>: Stream {
100    /// This takes ownership of a stream and returns two streams based on a
101    /// predicate. When the predicate returns `true`, the item will appear in
102    /// the first of the pair of streams returned. Items that return false will
103    /// go into the second of the pair of streams
104    ///
105    ///```rust
106    /// use split_stream_by::SplitStreamByExt;
107    ///
108    /// let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
109    /// let (even_stream, odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
110    /// ```
111    fn split_by(
112        self,
113        predicate: P,
114    ) -> (
115        TrueSplitBy<Self::Item, Self, P>,
116        FalseSplitBy<Self::Item, Self, P>,
117    )
118    where
119        P: Fn(&Self::Item) -> bool,
120        Self: Sized,
121    {
122        let stream = SplitBy::new(self, predicate);
123        let true_stream = TrueSplitBy::new(stream.clone());
124        let false_stream = FalseSplitBy::new(stream);
125        (true_stream, false_stream)
126    }
127
128    /// This takes ownership of a stream and returns two streams based on a
129    /// predicate. When the predicate returns `true`, the item will appear in
130    /// the first of the pair of streams returned. Items that return false will
131    /// go into the second of the pair of streams. This will buffer up to N
132    /// items of the inactive stream before returning Pending and notifying that
133    /// stream
134    ///
135    ///```rust
136    /// use split_stream_by::SplitStreamByExt;
137    ///
138    /// let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
139    /// let (even_stream, odd_stream) = incoming_stream.split_by_buffered::<3>(|&n| n % 2 == 0);
140    /// ```
141    fn split_by_buffered<const N: usize>(
142        self,
143        predicate: P,
144    ) -> (
145        TrueSplitByBuffered<Self::Item, Self, P, N>,
146        FalseSplitByBuffered<Self::Item, Self, P, N>,
147    )
148    where
149        P: Fn(&Self::Item) -> bool,
150        Self: Sized,
151    {
152        let stream = SplitByBuffered::new(self, predicate);
153        let true_stream = TrueSplitByBuffered::new(stream.clone());
154        let false_stream = FalseSplitByBuffered::new(stream);
155        (true_stream, false_stream)
156    }
157}
158
159impl<T, P> SplitStreamByExt<P> for T where T: Stream + ?Sized {}
160
161/// This extension trait provides the functionality for splitting a
162/// stream by a predicate of type `Fn(Self::Item) -> Either<L,R>`. The resulting
163/// streams will yield types `L` and `R` respectively
164pub trait SplitStreamByMapExt<P, L, R>: Stream {
165    /// This takes ownership of a stream and returns two streams based on a
166    /// predicate. The predicate takes an item by value and returns
167    /// `Either::Left(..)` or `Either::Right(..)` where the inner
168    /// values of `Left` and `Right` become the items of the two respective
169    /// streams
170    ///
171    /// ```
172    /// use split_stream_by::{Either,SplitStreamByMapExt};
173    /// struct Request {
174    /// 	//...
175    /// }
176    /// struct Response {
177    /// 	//...
178    /// }
179    /// enum Message {
180    /// 	Request(Request),
181    /// 	Response(Response)
182    /// }
183    /// let incoming_stream = futures::stream::iter([
184    /// 	Message::Request(Request {}),
185    /// 	Message::Response(Response {}),
186    /// 	Message::Response(Response {}),
187    /// ]);
188    /// let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
189    /// 	Message::Request(req) => Either::Left(req),
190    /// 	Message::Response(res) => Either::Right(res),
191    /// });
192    /// ```
193
194    fn split_by_map(
195        self,
196        predicate: P,
197    ) -> (
198        LeftSplitByMap<Self::Item, L, R, Self, P>,
199        RightSplitByMap<Self::Item, L, R, Self, P>,
200    )
201    where
202        P: Fn(Self::Item) -> Either<L, R>,
203        Self: Sized,
204    {
205        let stream = SplitByMap::new(self, predicate);
206        let true_stream = LeftSplitByMap::new(stream.clone());
207        let false_stream = RightSplitByMap::new(stream);
208        (true_stream, false_stream)
209    }
210
211    /// This takes ownership of a stream and returns two streams based on a
212    /// predicate. The predicate takes an item by value and returns
213    /// `Either::Left(..)` or `Either::Right(..)` where the inner
214    /// values of `Left` and `Right` become the items of the two respective
215    /// streams. This will buffer up to N items of the inactive stream before
216    /// returning Pending and notifying that stream
217    ///
218    /// ```
219    /// use split_stream_by::{Either,SplitStreamByMapExt};
220    /// struct Request {
221    /// 	//...
222    /// }
223    /// struct Response {
224    /// 	//...
225    /// }
226    /// enum Message {
227    /// 	Request(Request),
228    /// 	Response(Response)
229    /// }
230    /// let incoming_stream = futures::stream::iter([
231    /// 	Message::Request(Request {}),
232    /// 	Message::Response(Response {}),
233    /// 	Message::Response(Response {}),
234    /// ]);
235    /// let (mut request_stream, mut response_stream) = incoming_stream.split_by_map_buffered::<3>(|item| match item {
236    /// 	Message::Request(req) => Either::Left(req),
237    /// 	Message::Response(res) => Either::Right(res),
238    /// });
239    /// ```
240
241    fn split_by_map_buffered<const N: usize>(
242        self,
243        predicate: P,
244    ) -> (
245        LeftSplitByMapBuffered<Self::Item, L, R, Self, P, N>,
246        RightSplitByMapBuffered<Self::Item, L, R, Self, P, N>,
247    )
248    where
249        P: Fn(Self::Item) -> Either<L, R>,
250        Self: Sized,
251    {
252        let stream = SplitByMapBuffered::new(self, predicate);
253        let true_stream = LeftSplitByMapBuffered::new(stream.clone());
254        let false_stream = RightSplitByMapBuffered::new(stream);
255        (true_stream, false_stream)
256    }
257}
258
259impl<T, P, L, R> SplitStreamByMapExt<P, L, R> for T where T: Stream + ?Sized {}