Struct tower::util::CallAll[][src]

pub struct CallAll<Svc, S> where
    Svc: Service<S::Item>,
    S: Stream
{ /* fields omitted */ }
This is supported on crate feature util only.

This is a Stream of responses resulting from calling the wrapped Service for each request received on the wrapped Stream.

use futures::future::{ready, Ready};
use futures::StreamExt;
use futures::channel::mpsc;
use tower_service::Service;
use tower::util::ServiceExt;

// First, we need to have a Service to process our requests.
#[derive(Debug, Eq, PartialEq)]
struct FirstLetter;
impl Service<&'static str> for FirstLetter {
     type Response = &'static str;
     type Error = Box<dyn Error + Send + Sync>;
     type Future = Ready<Result<Self::Response, Self::Error>>;

     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
     }

     fn call(&mut self, req: &'static str) -> Self::Future {
         ready(Ok(&req[..1]))
     }
}

#[tokio::main]
async fn main() {
    // Next, we need a Stream of requests.
    let (mut reqs, rx) = mpsc::unbounded();
    // Note that we have to help Rust out here by telling it what error type to use.
    // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
    let mut rsps = FirstLetter.call_all(rx);

    // Now, let's send a few requests and then check that we get the corresponding responses.
    reqs.unbounded_send("one").unwrap();
    reqs.unbounded_send("two").unwrap();
    reqs.unbounded_send("three").unwrap();
    drop(reqs);

    // We then loop over the response Strem that we get back from call_all.
    let mut i = 0usize;
    while let Some(rsp) = rsps.next().await {
        // Each response is a Result (we could also have used TryStream::try_next)
        match (i + 1, rsp.unwrap()) {
            (1, "o") |
            (2, "t") |
            (3, "t") => {}
            (n, i) => {
                unreachable!("{}. response was '{}'", n, i);
            }
        }
        i += 1;
    }

    // And at the end, we can get the Service back when there are no more requests.
    assert_eq!(rsps.into_inner(), FirstLetter);
}

Implementations

impl<Svc, S> CallAll<Svc, S> where
    Svc: Service<S::Item>,
    Svc::Error: Into<BoxError>,
    S: Stream
[src]

pub fn new(service: Svc, stream: S) -> CallAll<Svc, S>[src]

Create new CallAll combinator.

Each request yielded by stream is passed to svc, and the resulting responses are yielded in the same order by the implementation of Stream for CallAll.

pub fn into_inner(self) -> Svc[src]

Extract the wrapped Service.

Panics

Panics if take_service was already called.

pub fn take_service(self: Pin<&mut Self>) -> Svc[src]

Extract the wrapped Service.

This CallAll can no longer be used after this function has been called.

Panics

Panics if take_service was already called.

pub fn unordered(self) -> CallAllUnordered<Svc, S>[src]

Return responses as they are ready, regardless of the initial order.

This function must be called before the stream is polled.

Panics

Panics if poll was called.

Trait Implementations

impl<Svc: Debug, S: Debug> Debug for CallAll<Svc, S> where
    Svc: Service<S::Item>,
    S: Stream,
    Svc::Future: Debug
[src]

impl<Svc, S> Stream for CallAll<Svc, S> where
    Svc: Service<S::Item>,
    Svc::Error: Into<BoxError>,
    S: Stream
[src]

type Item = Result<Svc::Response, BoxError>

Values yielded by the stream.

impl<'pin, Svc, S> Unpin for CallAll<Svc, S> where
    Svc: Service<S::Item>,
    S: Stream,
    __CallAll<'pin, Svc, S>: Unpin
[src]

Auto Trait Implementations

impl<Svc, S> !RefUnwindSafe for CallAll<Svc, S>[src]

impl<Svc, S> Send for CallAll<Svc, S> where
    S: Send,
    Svc: Send,
    <Svc as Service<<S as Stream>::Item>>::Error: Send,
    <Svc as Service<<S as Stream>::Item>>::Future: Send,
    <Svc as Service<<S as Stream>::Item>>::Response: Send
[src]

impl<Svc, S> Sync for CallAll<Svc, S> where
    S: Sync,
    Svc: Sync,
    <Svc as Service<<S as Stream>::Item>>::Error: Sync,
    <Svc as Service<<S as Stream>::Item>>::Future: Sync,
    <Svc as Service<<S as Stream>::Item>>::Response: Sync
[src]

impl<Svc, S> !UnwindSafe for CallAll<Svc, S>[src]

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<K, S, E, D> Discover for D where
    D: TryStream<Ok = Change<K, S>, Error = E> + ?Sized,
    K: Eq
[src]

type Key = K

This is supported on crate feature discover only.

A unique identifier for each active service. Read more

type Service = S

This is supported on crate feature discover only.

The type of Service yielded by this Discover. Read more

type Error = E

This is supported on crate feature discover only.

Error produced during discovery

impl<T> From<T> for T[src]

impl<T> Instrument for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> StreamExt for T where
    T: Stream + ?Sized
[src]

impl<St> StreamExt for St where
    St: Stream + ?Sized
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<S, T, E> TryStream for S where
    S: Stream<Item = Result<T, E>> + ?Sized
[src]

type Ok = T

The type of successful values yielded by this future

type Error = E

The type of failures yielded by this future

impl<S> TryStreamExt for S where
    S: TryStream + ?Sized
[src]

impl<V, T> VZip<V> for T where
    V: MultiLane<T>,