Trait postage::stream::Stream[][src]

#[must_use = "streams do nothing unless polled"]pub trait Stream {
    type Item;
    fn poll_recv(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> PollRecv<Self::Item>; fn recv(&mut self) -> RecvFuture<'_, Self>

Notable traits for RecvFuture<'s, S>

impl<'s, S: ?Sized> Future for RecvFuture<'s, S> where
    S: Stream + Unpin
type Output = Option<S::Item>;

    where
        Self: Unpin
, { ... }
fn try_recv(&mut self) -> Result<Self::Item, TryRecvError>
    where
        Self: Unpin
, { ... }
fn blocking_recv(&mut self) -> Option<Self::Item>
    where
        Self: Unpin
, { ... }
fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into>
    where
        Map: Fn(Self::Item) -> Into,
        Self: Sized
, { ... }
fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter>
    where
        Self: Sized + Unpin,
        Filter: FnMut(&Self::Item) -> bool + Unpin
, { ... }
fn merge<Other>(self, other: Other) -> MergeStream<Self, Other>
    where
        Other: Stream<Item = Self::Item>,
        Self: Sized
, { ... }
fn chain<Other>(self, other: Other) -> ChainStream<Self, Other>
    where
        Other: Stream<Item = Self::Item>,
        Self: Sized
, { ... }
fn find<Condition>(
        self,
        condition: Condition
    ) -> FindStream<Self, Condition>
    where
        Self: Sized + Unpin,
        Condition: Fn(&Self::Item) -> bool + Unpin
, { ... }
fn log(self, level: Level) -> StreamLog<Self>
    where
        Self: Sized,
        Self::Item: Debug
, { ... } }

An asynchronous stream, which produces a series of messages until closed.

Streams implement poll_recv, a poll-based method very similar to std::future::Future.

Streams can be used in async code with stream.recv().await, or with stream.try_recv().

use postage::mpsc::channel;
use postage::sink::Sink;
use postage::stream::Stream;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = channel(16);
    tx.send(true).await;
    tx.send(true).await;
    drop(tx);

    while let Some(_v) = rx.recv().await {
        println!("Value received!");
        if let Ok(_v) = rx.try_recv() {
            println!("Extra value received!");
        }
    }
}

Streams also support combinators, such as map, filter, find, and log.

use postage::mpsc::channel;
use postage::sink::Sink;
use postage::stream::{Stream, TryRecvError};

#[tokio::main]
async fn main() {
    let (mut tx, rx) = channel(16);

    tx.send(1usize).await;
    tx.send(2usize).await;
    tx.send(3usize).await;
    drop(tx);

    let mut rx = rx
        .map(|i| i * 2)
        .filter(|i| *i >= 4)
        .find(|i| *i == 6);

    // The `logging` feature enables a combinator that logs values using the Debug trait.
    #[cfg(feature = "logging")]
    let mut rx = rx
        .log(log::Level::Info);

    assert_eq!(Ok(6), rx.try_recv());
    assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
}

Associated Types

Loading content...

Required methods

fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item>[src]

Attempts to retrieve an item from the stream, without blocking.

Returns:

  • PollRecv::Ready(value) if a message is ready
  • PollRecv::Pending if the stream is open, but no message is currently available.
  • PollRecv::Closed if the stream is closed, and no messages are expected.
Loading content...

Provided methods

fn recv(&mut self) -> RecvFuture<'_, Self>

Notable traits for RecvFuture<'s, S>

impl<'s, S: ?Sized> Future for RecvFuture<'s, S> where
    S: Stream + Unpin
type Output = Option<S::Item>;
where
    Self: Unpin
[src]

Retrieves a message from the stream.

Returns:

  • Some(value) if the stream is open
  • None if the stream is closed, and no further messages are expected.

fn try_recv(&mut self) -> Result<Self::Item, TryRecvError> where
    Self: Unpin
[src]

Attempts to retrive a message from the stream, without blocking.

Returns:

  • Ok(value) if a message is ready.
  • TryRecvError::Pending if the stream is open, but no messages are available.
  • TryRecvError::Closed if the stream has been closed, and no items are expected.

fn blocking_recv(&mut self) -> Option<Self::Item> where
    Self: Unpin
[src]

Retrieves a message from the stream, blocking the current thread until one is available.

Returns:

  • Some(value) if the stream is open
  • None if the stream is closed, and no further messages are expected.

fn map<Map, Into>(self, map: Map) -> MapStream<Self, Map, Into> where
    Map: Fn(Self::Item) -> Into,
    Self: Sized
[src]

Transforms the stream with a map function.

fn filter<Filter>(self, filter: Filter) -> FilterStream<Self, Filter> where
    Self: Sized + Unpin,
    Filter: FnMut(&Self::Item) -> bool + Unpin
[src]

Filters messages returned by the stream, ignoring messages where filter returns false.

fn merge<Other>(self, other: Other) -> MergeStream<Self, Other> where
    Other: Stream<Item = Self::Item>,
    Self: Sized
[src]

Merges two streams, returning values from both at once, until both are closed.

fn chain<Other>(self, other: Other) -> ChainStream<Self, Other> where
    Other: Stream<Item = Self::Item>,
    Self: Sized
[src]

Chains two streams, returning values from self until it is closed, and then returning values from other.

fn find<Condition>(self, condition: Condition) -> FindStream<Self, Condition> where
    Self: Sized + Unpin,
    Condition: Fn(&Self::Item) -> bool + Unpin
[src]

Finds a message matching a condition. When the condition is matched, a single value will be returned. Then the stream will be closed.

fn log(self, level: Level) -> StreamLog<Self> where
    Self: Sized,
    Self::Item: Debug
[src]

Logs messages that are produced by the stream using the Debug trait, at the provided log level.

Requires the logging feature

Loading content...

Implementations on Foreign Types

impl<S: ?Sized> Stream for &mut S where
    S: Stream + Unpin
[src]

type Item = S::Item

impl<P, S: ?Sized> Stream for Pin<P> where
    P: DerefMut<Target = S> + Unpin,
    S: Stream + Unpin
[src]

type Item = <S as Stream>::Item

Loading content...

Implementors

impl Stream for postage::barrier::Receiver[src]

type Item = ()

impl<T> Stream for postage::broadcast::Receiver<T> where
    T: Clone
[src]

type Item = T

impl<T> Stream for postage::dispatch::Receiver<T>[src]

type Item = T

impl<T> Stream for postage::mpsc::Receiver<T>[src]

type Item = T

impl<T> Stream for postage::oneshot::Receiver<T>[src]

type Item = T

impl<T> Stream for postage::watch::Receiver<T> where
    T: Clone
[src]

type Item = T

Loading content...