tokio_proto/util/
client_proxy.rs

1//! Utilities for building protocol clients
2//!
3//! Provides a channel that handles details of providing a `Service` client.
4//! Usually, this module does not have to be used directly. Instead it is used
5//! by `pipeline` and `multiplex` in the `connect` fns.
6//!
7//! However, some protocols require implementing the dispatch layer directly,
8//! in which case using client channel is helpful.
9
10// Allow warnings in order to prevent the compiler from outputting an error
11// that seems to be fixed on nightly.
12#![allow(warnings)]
13
14use streaming::Message;
15use tokio_service::Service;
16use futures::{Future, Async, Poll, Stream, AsyncSink, Sink};
17use futures::sync::mpsc;
18use futures::sync::oneshot;
19use std::{fmt, io};
20use std::cell::RefCell;
21
22/// Client `Service` for pipeline or multiplex protocols
23pub struct ClientProxy<R, S, E> {
24    tx: RefCell<mpsc::UnboundedSender<io::Result<Envelope<R, S, E>>>>,
25}
26
27impl<R, S, E> Clone for ClientProxy<R, S, E> {
28    fn clone(&self) -> Self {
29        ClientProxy {
30            tx: RefCell::new(self.tx.borrow().clone()),
31        }
32    }
33}
34/// Response future returned from a client
35pub struct Response<T, E> {
36    inner: oneshot::Receiver<Result<T, E>>,
37}
38
39/// Message used to dispatch requests to the task managing the client
40/// connection.
41type Envelope<R, S, E> = (R, oneshot::Sender<Result<S, E>>);
42
43/// A client / receiver pair
44pub type Pair<R, S, E> = (ClientProxy<R, S, E>, Receiver<R, S, E>);
45
46/// Receive requests submitted to the client
47pub type Receiver<R, S, E> = mpsc::UnboundedReceiver<io::Result<Envelope<R, S, E>>>;
48
49/// Return a client handle and a handle used to receive requests on
50pub fn pair<R, S, E>() -> Pair<R, S, E> {
51    // Create a stream
52    let (tx, rx) = mpsc::unbounded();
53
54    // Use the sender handle to create a `Client` handle
55    let client = ClientProxy { tx: RefCell::new(tx) };
56
57    // Return the pair
58    (client, rx)
59}
60
61impl<R, S, E: From<io::Error>> Service for ClientProxy<R, S, E> {
62    type Request = R;
63    type Response = S;
64    type Error = E;
65    type Future = Response<S, E>;
66
67    fn call(&self, request: R) -> Self::Future {
68        let (tx, rx) = oneshot::channel();
69
70        // If send returns an Err, its because the other side has been dropped.
71        // By ignoring it, we are just dropping the `tx`, which will mean the
72        // rx will return Canceled when polled. In turn, that is translated
73        // into a BrokenPipe, which conveys the proper error.
74        // NOTE: If Service changes to have some sort of `try_call`, it'd
75        // probably be more appropriate to return the Request.
76        let _ = mpsc::UnboundedSender::send(&mut self.tx.borrow_mut(),
77                                            Ok((request, tx)));
78
79        Response { inner: rx }
80    }
81}
82
83impl<R, S, E> fmt::Debug for ClientProxy<R, S, E>
84    where R: fmt::Debug,
85          S: fmt::Debug,
86          E: fmt::Debug,
87{
88    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89        write!(f, "ClientProxy {{ ... }}")
90    }
91}
92
93impl<T, E> Future for Response<T, E>
94    where E: From<io::Error>,
95{
96    type Item = T;
97    type Error = E;
98
99    fn poll(&mut self) -> Poll<T, E> {
100        match self.inner.poll() {
101            Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
102            Ok(Async::Ready(Err(e))) => Err(e),
103            Ok(Async::NotReady) => Ok(Async::NotReady),
104            Err(_) => {
105                let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
106                Err(e.into())
107            }
108        }
109    }
110}
111
112impl<T, E> fmt::Debug for Response<T, E>
113    where T: fmt::Debug,
114          E: fmt::Debug,
115{
116    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117        write!(f, "Response {{ ... }}")
118    }
119}