pub type IndexedStreamsUnordered<T> = Unordered<T, IndexedStreams>;
Available on crate feature futures-rs only.
Expand description

A container for an unordered collection of Streams, which also yields the index that produced the next item.

§Examples

use tokio::{net::TcpListener, time};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
    let mut clients = unicycle::IndexedStreamsUnordered::new();

    loop {
        tokio::select! {
            result = listener.accept() => {
                let (stream, _) = result?;
                clients.push(Framed::new(stream, LengthDelimitedCodec::new()));
            },
            Some((index, frame)) = clients.next() => {
                match frame {
                    Some(frame) => println!("{}: received frame: {:?}", index, frame),
                    None => println!("{}: client disconnected", index),
                }
            }
        }
    }
}

Aliased Type§

struct IndexedStreamsUnordered<T> { /* private fields */ }

Implementations§

source§

impl<T> IndexedStreamsUnordered<T>

source

pub fn new() -> Self

Construct a new, empty IndexedStreamsUnordered.

This is the same as StreamsUnordered, except that it yields the index of the stream who’se value was just yielded, alongside the yielded value.

§Examples
use tokio_stream::iter;
use unicycle::IndexedStreamsUnordered;

#[tokio::main]
async fn main() {
    let mut streams = IndexedStreamsUnordered::new();
    assert!(streams.is_empty());

    streams.push(iter(vec![1, 2]));
    streams.push(iter(vec![5, 6]));

    let mut received = Vec::new();

    while let Some(value) = streams.next().await {
        received.push(value);
    }

    assert_eq!(
        vec![
            (1, Some(5)),
            (0, Some(1)),
            (1, Some(6)),
            (0, Some(2)),
            (1, None),
            (0, None)
        ],
        received
    );
}

Trait Implementations§

source§

impl<T> PollNext for IndexedStreamsUnordered<T>
where T: Stream,

§

type Item = (usize, Option<<T as Stream>::Item>)

The output of the poll.
source§

fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>

Poll the stream for the next item. Read more