Crate split_stream_by

Source
Expand description

This crate offers two futures::Stream extension traits which allows for splitting a Stream into two streams using a predicate function thats checked on each Stream::Item.

 use futures::StreamExt;
 use split_stream_by::SplitStreamByExt;

 tokio::runtime::Runtime::new().unwrap().block_on(async {
     let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
     let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);


     tokio::spawn(async move {
     	assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
     });

     assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
 })

The following is how to use the version that can buffer more than one value. In this case

 use futures::StreamExt;
 use split_stream_by::SplitStreamByExt;

 tokio::runtime::Runtime::new().unwrap().block_on(async {
     const BUFSIZE: usize = 10;
     let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
     let (mut even_stream, mut odd_stream) = incoming_stream.split_by_buffered::<BUFSIZE>(|&n| n % 2 == 0);


     tokio::spawn(async move {
     	assert_eq!(vec![0,2,4], even_stream.collect::<Vec<_>>().await);
     });

     assert_eq!(vec![1,3,5], odd_stream.collect::<Vec<_>>().await);
 })

A more advanced usage uses split_by_map which allows for extracting values while splitting

 use split_stream_by::{Either,SplitStreamByMapExt};
 use futures::StreamExt;

 #[derive(Debug, PartialEq)]
 struct Request;

 #[derive(Debug, PartialEq)]
 struct Response;

 enum Message {
 	Request(Request),
 	Response(Response)
 }

 tokio::runtime::Runtime::new().unwrap().block_on(async {
     let incoming_stream = futures::stream::iter([
     	Message::Request(Request),
     	Message::Response(Response),
     	Message::Response(Response),
     ]);
     let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
     	Message::Request(req) => Either::Left(req),
     	Message::Response(res) => Either::Right(res),
     });

     let requests_fut = tokio::spawn(request_stream.collect::<Vec<_>>());
     let responses_fut = tokio::spawn(response_stream.collect::<Vec<_>>());
     let (requests,responses) = tokio::join!(requests_fut,responses_fut);
    	assert_eq!(vec![Request], requests.unwrap());
     assert_eq!(vec![Response,Response], responses.unwrap());
 })

Structs§

FalseSplitBy
A struct that implements Stream which returns the items where the predicate returns false
FalseSplitByBuffered
A struct that implements Stream which returns the items where the predicate returns false
LeftSplitByMap
A struct that implements Stream which returns the inner values where the predicate returns Either::Left(..) when using split_by_map
LeftSplitByMapBuffered
A struct that implements Stream which returns the inner values where the predicate returns Either::Left(..) when using split_by_map
RightSplitByMap
A struct that implements Stream which returns the inner values where the predicate returns Either::Right(..) when using split_by_map
RightSplitByMapBuffered
A struct that implements Stream which returns the inner values where the predicate returns Either::Right(..) when using split_by_map
TrueSplitBy
A struct that implements Stream which returns the items where the predicate returns true
TrueSplitByBuffered
A struct that implements Stream which returns the items where the predicate returns true

Enums§

Either
Combines two different futures, streams, or sinks having the same associated types into a single type.

Traits§

SplitStreamByExt
This extension trait provides the functionality for splitting a stream by a predicate of type Fn(&Self::Item) -> bool. The two resulting streams will both yield Self::Item
SplitStreamByMapExt
This extension trait provides the functionality for splitting a stream by a predicate of type Fn(Self::Item) -> Either<L,R>. The resulting streams will yield types L and R respectively