Expand description
This crate offers two futures::Stream
extension traits which allows for
splitting a Stream
into two streams using a predicate function thats
checked on each Stream::Item
.
use futures::StreamExt;
use split_stream_by::SplitStreamByExt;
tokio::runtime::Runtime::new().unwrap().block_on(async {
let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);
tokio::spawn(async move {
assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
});
assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
})
The following is how to use the version that can buffer more than one value. In this case
use futures::StreamExt;
use split_stream_by::SplitStreamByExt;
tokio::runtime::Runtime::new().unwrap().block_on(async {
const BUFSIZE: usize = 10;
let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
let (mut even_stream, mut odd_stream) = incoming_stream.split_by_buffered::<BUFSIZE>(|&n| n % 2 == 0);
tokio::spawn(async move {
assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
});
assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
})
A more advanced usage uses split_by_map
which allows for extracting values
while splitting
use split_stream_by::{Either,SplitStreamByMapExt};
use futures::StreamExt;
#[derive(Debug, PartialEq)]
struct Request;
#[derive(Debug, PartialEq)]
struct Response;
enum Message {
Request(Request),
Response(Response)
}
tokio::runtime::Runtime::new().unwrap().block_on(async {
let incoming_stream = futures::stream::iter([
Message::Request(Request),
Message::Response(Response),
Message::Response(Response),
]);
let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
Message::Request(req) => Either::Left(req),
Message::Response(res) => Either::Right(res),
});
let requests_fut = tokio::spawn(request_stream.collect::<Vec<_>>());
let responses_fut = tokio::spawn(response_stream.collect::<Vec<_>>());
let (requests,responses) = tokio::join!(requests_fut,responses_fut);
assert_eq!(vec![Request], requests.unwrap());
assert_eq!(vec![Response,Response], responses.unwrap());
})
Structs§
- False
Split By - A struct that implements
Stream
which returns the items where the predicate returnsfalse
- False
Split ByBuffered - A struct that implements
Stream
which returns the items where the predicate returnsfalse
- Left
Split ByMap - A struct that implements
Stream
which returns the inner values where the predicate returnsEither::Left(..)
when usingsplit_by_map
- Left
Split ByMap Buffered - A struct that implements
Stream
which returns the inner values where the predicate returnsEither::Left(..)
when usingsplit_by_map
- Right
Split ByMap - A struct that implements
Stream
which returns the inner values where the predicate returnsEither::Right(..)
when usingsplit_by_map
- Right
Split ByMap Buffered - A struct that implements
Stream
which returns the inner values where the predicate returnsEither::Right(..)
when usingsplit_by_map
- True
Split By - A struct that implements
Stream
which returns the items where the predicate returnstrue
- True
Split ByBuffered - A struct that implements
Stream
which returns the items where the predicate returnstrue
Enums§
- Either
- Combines two different futures, streams, or sinks having the same associated types into a single type.
Traits§
- Split
Stream ByExt - This extension trait provides the functionality for splitting a
stream by a predicate of type
Fn(&Self::Item) -> bool
. The two resulting streams will both yieldSelf::Item
- Split
Stream ByMap Ext - This extension trait provides the functionality for splitting a
stream by a predicate of type
Fn(Self::Item) -> Either<L,R>
. The resulting streams will yield typesL
andR
respectively