tokio_proto/simple/pipeline/
client.rs1use BindClient;
2use super::Pipeline;
3use super::lift::{LiftBind, LiftTransport};
4use simple::LiftProto;
5
6use streaming::{self, Message};
7use streaming::pipeline::StreamingPipeline;
8use tokio_core::reactor::Handle;
9use tokio_service::Service;
10use futures::{stream, Stream, Sink, Future, Poll, IntoFuture};
11use std::{fmt, io};
12
13type MyStream<E> = stream::Empty<(), E>;
14
15pub trait ClientProto<T: 'static>: 'static {
24 type Request: 'static;
26
27 type Response: 'static;
29
30 type Transport: 'static +
36 Stream<Item = Self::Response, Error = io::Error> +
37 Sink<SinkItem = Self::Request, SinkError = io::Error>;
38
39 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
43
44 fn bind_transport(&self, io: T) -> Self::BindTransport;
51}
52
53impl<T: 'static, P: ClientProto<T>> BindClient<Pipeline, T> for P {
54 type ServiceRequest = P::Request;
55 type ServiceResponse = P::Response;
56 type ServiceError = io::Error;
57
58 type BindClient = ClientService<T, P>;
59
60 fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
61 ClientService {
62 inner: BindClient::<StreamingPipeline<MyStream<io::Error>>, T>::bind_client(
63 LiftProto::from_ref(self), handle, io
64 )
65 }
66 }
67}
68
69impl<T, P> streaming::pipeline::ClientProto<T> for LiftProto<P> where
70 T: 'static, P: ClientProto<T>
71{
72 type Request = P::Request;
73 type RequestBody = ();
74
75 type Response = P::Response;
76 type ResponseBody = ();
77
78 type Error = io::Error;
79
80 type Transport = LiftTransport<P::Transport, io::Error>;
81 type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
82
83 fn bind_transport(&self, io: T) -> Self::BindTransport {
84 LiftBind::lift(ClientProto::bind_transport(self.lower(), io).into_future())
85 }
86}
87
88pub struct ClientService<T, P> where T: 'static, P: ClientProto<T> {
90 inner: <LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
91}
92
93impl<T, P> Clone for ClientService<T, P> where T: 'static, P: ClientProto<T> {
94 fn clone(&self) -> Self {
95 ClientService {
96 inner: self.inner.clone(),
97 }
98 }
99}
100
101impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
102 type Request = P::Request;
103 type Response = P::Response;
104 type Error = io::Error;
105 type Future = ClientFuture<T, P>;
106
107 fn call(&self, req: P::Request) -> Self::Future {
108 ClientFuture {
109 inner: self.inner.call(Message::WithoutBody(req))
110 }
111 }
112}
113
114impl<T, P> fmt::Debug for ClientService<T, P>
115 where T: 'static + fmt::Debug,
116 P: ClientProto<T> + fmt::Debug,
117{
118 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
119 write!(f, "ClientService {{ ... }}")
120 }
121}
122
123pub struct ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
124 inner: <<LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
125 as Service>::Future
126}
127
128impl<T, P> Future for ClientFuture<T, P> where P: ClientProto<T> {
129 type Item = P::Response;
130 type Error = io::Error;
131
132 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133 match try_ready!(self.inner.poll()) {
134 Message::WithoutBody(msg) => Ok(msg.into()),
135 Message::WithBody(..) => panic!("bodies not supported"),
136 }
137 }
138}
139
140impl<T, P> fmt::Debug for ClientFuture<T, P>
141 where T: 'static + fmt::Debug,
142 P: ClientProto<T> + fmt::Debug,
143{
144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 write!(f, "ClientFuture {{ ... }}")
146 }
147}