[][src]Crate tokio_tower

This crate provides utilities for using protocols that follow certain common patterns on top of Tokio and Tower.

Protocols

At a high level, a protocol is a mechanism that lets you take a bunch of requests and turn them into responses. Tower provides the Service trait, which is an interface for mapping requests into responses, but it does not deal with how those requests are sent between clients and servers. Tokio, on the other hand, provides asynchronous communication primitives, but it does not deal with high-level abstractions like services. This crate attempts to bridge that gap.

There are many types of protocols in the wild, but they generally come in two forms: pipelining and multiplexing. A pipelining protocol sends requests and responses in-order between the consumer and provider of a service, and processes requests one at a time. A multiplexing protocol on the other hand constructs requests in such a way that they can be handled and responded to in any order while still allowing the client to know which response is for which request. Pipelining and multiplexing both have their advantages and disadvantages; see the module-level documentation for pipeline and multiplex for details. There is also good deal of discussion in this StackOverflow answer.

Transports

A key part of any protocol is its transport, which is the way that it transmits requests and responses. In general, tokio-tower leaves the on-the-wire implementations of protocols to other crates (like tokio-codec or async-bincode) and instead operates at the level of Sinks and Streams.

At its core, tokio-tower wraps a type that is Sink + Stream. On the client side, the Sink is used to send requests, and the Stream is used to receive responses (from the server) to those requests. On the server side, the Stream is used to receive requests, and the Sink is used to send the responses.

Servers and clients

This crate provides utilities that make writing both clients and servers easier. You'll find the client helper as Client in the protocol module you're working with (e.g., pipeline::Client), and the server helper as Server in the same place.

Example

type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// A transport implemented using a pair of `mpsc` channels.
///
/// `mpsc::Sender` and `mpsc::Receiver` are both unidirectional. So, if we want to use `mpsc`
/// to send requests and responses between a client and server, we need *two* channels, one
/// that lets requests flow from the client to the server, and one that lets responses flow the
/// other way.
///
/// In this echo server example, requests and responses are both of type `T`, but for "real"
/// services, the two types are usually different.
struct ChannelTransport<T> {
    rcv: mpsc::Receiver<T>,
    snd: mpsc::Sender<T>,
}

impl<T: Debug> futures_sink::Sink<T> for ChannelTransport<T> {
    type Error = StdError;

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        self.snd.poll_ready(cx).map_err(|e| e.into())
    }

    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        // unwrap ok because of poll_ready()
        self.snd.try_send(item).unwrap();
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(())) // no-op because all sends succeed immediately
    }

    fn poll_close( self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(())) // no-op because channel is closed on drop and flush is no-op
    }
}

impl<T> futures_util::stream::Stream for ChannelTransport<T> {
    type Item = Result<T, StdError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        self.rcv.poll_recv(cx).map(|s| s.map(Ok))
    }
}

/// A service that tokio-tower should serve over the transport.
/// This one just echoes whatever it gets.
struct Echo;

impl<T> tower_service::Service<T> for Echo {
    type Response = T;
    type Error = Never;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: T) -> Self::Future {
        ready(Ok(req))
    }
}

#[tokio::main]
async fn main() {
    let (s1, r1) = mpsc::channel(2);
    let (s2, r2) = mpsc::channel(2);
    let pair1 = ChannelTransport{snd: s1, rcv: r2};
    let pair2 = ChannelTransport{snd: s2, rcv: r1};

    tokio::spawn(pipeline::Server::new(pair1, Echo));
    let mut client = pipeline::Client::<_, tokio_tower::Error<_, _>, _>::new(pair2);

    use tower_service::Service;
    poll_fn(|cx| client.poll_ready(cx)).await;

    let msg = "Hello, tokio-tower";
    let resp = client.call(String::from(msg)).await.expect("client call");
    assert_eq!(resp, msg);
}

Modules

multiplex

In a multiplexed protocol, the server responds to client requests in the order they complete. Request IDs (TagStore::Tag) are used to match up responses with the request that triggered them. This allows the server to process requests out-of-order, and eliminates the application-level head-of-line blocking that pipelined protocols suffer from. Example multiplexed protocols include SSH, HTTP/2, and AMQP. This page has some further details about how multiplexing protocols operate.

pipeline

In a pipelined protocol, the server responds to client requests in the order they were sent. Many requests can be in flight at the same time, but no request sees a response until all previous requests have been satisfied. Pipelined protocols can experience head-of-line blocking wherein a slow-to-process request prevents any subsequent request from being processed, but are often to easier to implement on the server side, and provide clearer request ordering semantics. Example pipelined protocols include HTTP/1.1, MySQL, and Redis.

Enums

Error

An error that occurred while servicing a request.

Traits

MakeTransport

Creates new Transport (i.e., Sink + Stream) instances.