tokio_proto/util/
client_proxy.rs1#![allow(warnings)]
13
14use streaming::Message;
15use tokio_service::Service;
16use futures::{Future, Async, Poll, Stream, AsyncSink, Sink};
17use futures::sync::mpsc;
18use futures::sync::oneshot;
19use std::{fmt, io};
20use std::cell::RefCell;
21
22pub struct ClientProxy<R, S, E> {
24 tx: RefCell<mpsc::UnboundedSender<io::Result<Envelope<R, S, E>>>>,
25}
26
27impl<R, S, E> Clone for ClientProxy<R, S, E> {
28 fn clone(&self) -> Self {
29 ClientProxy {
30 tx: RefCell::new(self.tx.borrow().clone()),
31 }
32 }
33}
34pub struct Response<T, E> {
36 inner: oneshot::Receiver<Result<T, E>>,
37}
38
39type Envelope<R, S, E> = (R, oneshot::Sender<Result<S, E>>);
42
43pub type Pair<R, S, E> = (ClientProxy<R, S, E>, Receiver<R, S, E>);
45
46pub type Receiver<R, S, E> = mpsc::UnboundedReceiver<io::Result<Envelope<R, S, E>>>;
48
49pub fn pair<R, S, E>() -> Pair<R, S, E> {
51 let (tx, rx) = mpsc::unbounded();
53
54 let client = ClientProxy { tx: RefCell::new(tx) };
56
57 (client, rx)
59}
60
61impl<R, S, E: From<io::Error>> Service for ClientProxy<R, S, E> {
62 type Request = R;
63 type Response = S;
64 type Error = E;
65 type Future = Response<S, E>;
66
67 fn call(&self, request: R) -> Self::Future {
68 let (tx, rx) = oneshot::channel();
69
70 let _ = mpsc::UnboundedSender::send(&mut self.tx.borrow_mut(),
77 Ok((request, tx)));
78
79 Response { inner: rx }
80 }
81}
82
83impl<R, S, E> fmt::Debug for ClientProxy<R, S, E>
84 where R: fmt::Debug,
85 S: fmt::Debug,
86 E: fmt::Debug,
87{
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 write!(f, "ClientProxy {{ ... }}")
90 }
91}
92
93impl<T, E> Future for Response<T, E>
94 where E: From<io::Error>,
95{
96 type Item = T;
97 type Error = E;
98
99 fn poll(&mut self) -> Poll<T, E> {
100 match self.inner.poll() {
101 Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
102 Ok(Async::Ready(Err(e))) => Err(e),
103 Ok(Async::NotReady) => Ok(Async::NotReady),
104 Err(_) => {
105 let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
106 Err(e.into())
107 }
108 }
109 }
110}
111
112impl<T, E> fmt::Debug for Response<T, E>
113 where T: fmt::Debug,
114 E: fmt::Debug,
115{
116 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117 write!(f, "Response {{ ... }}")
118 }
119}