1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
//!This repo is for a rust crate that offers a `futures::Stream` extension
//!trait which allows for splitting a `Stream` into two streams using a
//!predicate function thats checked on each `Stream::Item`.
//!
//!The current version of this crate buffers only one value and only in the
//!scenario where the item yielded from the parent stream is not what the
//!child stream requested per the predicate. In that scenario, the item is
//!stored and the other stream is awakened
//!
//!```rust
//!use split_stream_by::SplitStreamByExt;
//!
//!let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
//!let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
//!
//!tokio::spawn(async move {
//! while let Some(even_number) = even_stream.next().await {
//! println!("Even {}",even_number);
//! }
//!});
//!
//!while let Some(odd_number) = odd_stream.next().await {
//! println!("Odd {}",odd_number);
//!}
//!```
//!
//!A more advanced usage uses `split_by_map` which allows for extracting
//!values while splitting
//!
//!```rust
//!use split_stream_by::{Either,SplitStreamByExt};
//!
//!struct Request {
//! //...
//!}
//!
//!struct Response {
//! //...
//!}
//!
//!enum Message {
//! Request(Request),
//! Response(Response)
//!}
//!
//!let incoming_stream = futures::stream::iter([
//! Message::Request(Request {}),
//! Message::Response(Response {}),
//! Message::Response(Response {}),
//!]);
//!let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
//! Message::Request(req) => Either::Left(req),
//! Message::Response(res) => Either::Right(res),
//!});
//!
//!tokio::spawn(async move {
//! while let Some(request) = request_stream.next().await {
//! // ...
//! }
//!});
//!
//!while let Some(response) = response_stream.next().await {
//! // ...
//!}
//!```
mod split_by;
mod split_by_map;
pub(crate) use split_by::SplitBy;
pub use split_by::{FalseSplitBy, TrueSplitBy};
pub(crate) use split_by_map::SplitByMap;
pub use split_by_map::{LeftSplitByMap, RightSplitByMap};
pub use futures::future::Either;
use futures::Stream;
/// This is the extension crate would provides the functionality for splitting a stream
pub trait SplitStreamByExt: Stream {
/// This takes ownership of a stream and returns two streams based on a predicate. When the predicate
/// returns `true`, the item will appear in the first of the pair of streams returned.
/// Items that return false will go into the second of the pair of streams
///
///```rust
///use split_stream_by::SplitStreamByExt;
///
///let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
///let (even_stream, odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
///```
fn split_by<P>(
self,
predicate: P,
) -> (
TrueSplitBy<Self::Item, Self, P>,
FalseSplitBy<Self::Item, Self, P>,
)
where
P: Fn(&Self::Item) -> bool,
Self: Sized,
{
let stream = SplitBy::new(self, predicate);
let true_stream = TrueSplitBy::new(stream.clone());
let false_stream = FalseSplitBy::new(stream);
(true_stream, false_stream)
}
/// This takes ownership of a stream and returns two streams based on a predicate. The predicate
/// takes an item by value and returns `Either::Left(..)` or `Either::Right(..)` where the inner
/// values of `Left` and `Right` become the items of the two respective streams
///
/// ```
/// use split_stream_by::{Either,SplitStreamByExt};
/// struct Request {
/// //...
/// }
/// struct Response {
/// //...
/// }
/// enum Message {
/// Request(Request),
/// Response(Response)
/// }
/// let incoming_stream = futures::stream::iter([
/// Message::Request(Request {}),
/// Message::Response(Response {}),
/// Message::Response(Response {}),
/// ]);
/// let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
/// Message::Request(req) => Either::Left(req),
/// Message::Response(res) => Either::Right(res),
/// });
/// ```
fn split_by_map<L, R, P>(
self,
predicate: P,
) -> (
LeftSplitByMap<Self::Item, L, R, Self, P>,
RightSplitByMap<Self::Item, L, R, Self, P>,
)
where
P: Fn(Self::Item) -> Either<L, R>,
Self: Sized,
{
let stream = SplitByMap::new(self, predicate);
let true_stream = LeftSplitByMap::new(stream.clone());
let false_stream = RightSplitByMap::new(stream);
(true_stream, false_stream)
}
}
impl<T> SplitStreamByExt for T where T: Stream + ?Sized {}