tokio_proto/simple/multiplex/
client.rs1use BindClient;
2use super::{RequestId, Multiplex};
3use super::lift::{LiftBind, LiftTransport};
4use simple::LiftProto;
5
6use std::{fmt, io};
7
8use streaming::{self, Message};
9use streaming::multiplex::StreamingMultiplex;
10use tokio_core::reactor::Handle;
11use tokio_service::Service;
12use futures::{stream, Stream, Sink, Future, IntoFuture, Poll};
13
14type MyStream<E> = stream::Empty<(), E>;
15
16pub trait ClientProto<T: 'static>: 'static {
25 type Request: 'static;
27
28 type Response: 'static;
30
31 type Transport: 'static +
37 Stream<Item = (RequestId, Self::Response), Error = io::Error> +
38 Sink<SinkItem = (RequestId, Self::Request), SinkError = io::Error>;
39
40 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
44
45 fn bind_transport(&self, io: T) -> Self::BindTransport;
52}
53
54impl<T: 'static, P: ClientProto<T>> BindClient<Multiplex, T> for P {
55 type ServiceRequest = P::Request;
56 type ServiceResponse = P::Response;
57 type ServiceError = io::Error;
58
59 type BindClient = ClientService<T, P>;
60
61 fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
62 ClientService {
63 inner: BindClient::<StreamingMultiplex<MyStream<io::Error>>, T>::bind_client(
64 LiftProto::from_ref(self), handle, io
65 )
66 }
67 }
68}
69
70impl<T, P> streaming::multiplex::ClientProto<T> for LiftProto<P> where
71 T: 'static, P: ClientProto<T>
72{
73 type Request = P::Request;
74 type RequestBody = ();
75
76 type Response = P::Response;
77 type ResponseBody = ();
78
79 type Error = io::Error;
80
81 type Transport = LiftTransport<P::Transport, io::Error>;
82 type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
83
84 fn bind_transport(&self, io: T) -> Self::BindTransport {
85 LiftBind::lift(ClientProto::bind_transport(self.lower(), io).into_future())
86 }
87}
88
89pub struct ClientService<T, P> where T: 'static, P: ClientProto<T> {
91 inner: <LiftProto<P> as BindClient<StreamingMultiplex<MyStream<io::Error>>, T>>::BindClient
92}
93
94impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
95 type Request = P::Request;
96 type Response = P::Response;
97 type Error = io::Error;
98 type Future = ClientFuture<T, P>;
99
100 fn call(&self, req: P::Request) -> Self::Future {
101 ClientFuture {
102 inner: self.inner.call(Message::WithoutBody(req))
103 }
104 }
105}
106
107impl<T, P> Clone for ClientService<T, P> where T: 'static, P: ClientProto<T> {
108 fn clone(&self) -> Self {
109 ClientService {
110 inner: self.inner.clone(),
111 }
112 }
113}
114
115impl<T, P> fmt::Debug for ClientService<T, P>
116 where T: 'static + fmt::Debug,
117 P: ClientProto<T> + fmt::Debug
118{
119 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120 write!(f, "ClientService {{ ... }}")
121 }
122}
123
124pub struct ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
125 inner: <<LiftProto<P> as BindClient<StreamingMultiplex<MyStream<io::Error>>, T>>::BindClient
126 as Service>::Future
127}
128
129impl<T, P> Future for ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
130 type Item = P::Response;
131 type Error = io::Error;
132
133 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
134 match try_ready!(self.inner.poll()) {
135 Message::WithoutBody(msg) => Ok(msg.into()),
136 Message::WithBody(..) => panic!("bodies not supported"),
137 }
138 }
139}
140
141impl<T, P> fmt::Debug for ClientFuture<T, P>
142 where T: 'static + fmt::Debug,
143 P: ClientProto<T> + fmt::Debug
144{
145 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
146 write!(f, "ClientService {{ ... }}")
147 }
148}