tokio_proto/simple/pipeline/
server.rs

1use std::io;
2use std::marker;
3
4use BindServer;
5use super::Pipeline;
6use super::lift::{LiftBind, LiftTransport};
7use simple::LiftProto;
8
9use streaming::{self, Message};
10use streaming::pipeline::StreamingPipeline;
11use tokio_core::reactor::Handle;
12use tokio_service::Service;
13use futures::{stream, Stream, Sink, Future, IntoFuture, Poll};
14
15type MyStream<E> = stream::Empty<(), E>;
16
17/// A pipelined server protocol.
18///
19/// The `T` parameter is used for the I/O object used to communicate, which is
20/// supplied in `bind_transport`.
21///
22/// For simple protocols, the `Self` type is often a unit struct. In more
23/// advanced cases, `Self` may contain configuration information that is used
24/// for setting up the transport in `bind_transport`.
25pub trait ServerProto<T: 'static>: 'static {
26    /// Request messages.
27    type Request: 'static;
28
29    /// Response messages.
30    type Response: 'static;
31
32    /// The message transport, which works with I/O objects of type `T`.
33    ///
34    /// An easy way to build a transport is to use `tokio_core::io::Framed`
35    /// together with a `Codec`; in that case, the transport type is
36    /// `Framed<T, YourCodec>`. See the crate docs for an example.
37    type Transport: 'static +
38        Stream<Item = Self::Request, Error = io::Error> +
39        Sink<SinkItem = Self::Response, SinkError = io::Error>;
40
41    /// A future for initializing a transport from an I/O object.
42    ///
43    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
44    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
45
46    /// Build a transport from the given I/O object, using `self` for any
47    /// configuration.
48    ///
49    /// An easy way to build a transport is to use `tokio_core::io::Framed`
50    /// together with a `Codec`; in that case, `bind_transport` is just
51    /// `io.framed(YourCodec)`. See the crate docs for an example.
52    fn bind_transport(&self, io: T) -> Self::BindTransport;
53}
54
55impl<T: 'static, P: ServerProto<T>> BindServer<Pipeline, T> for P {
56    type ServiceRequest = P::Request;
57    type ServiceResponse = P::Response;
58    type ServiceError = io::Error;
59
60    fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
61        where S: Service<Request = Self::ServiceRequest,
62                         Response = Self::ServiceResponse,
63                         Error = io::Error> + 'static
64    {
65        BindServer::<StreamingPipeline<MyStream<io::Error>>, T>::bind_server(
66            LiftProto::from_ref(self), handle, io, LiftService(service)
67        )
68    }
69}
70
71impl<T, P> streaming::pipeline::ServerProto<T> for LiftProto<P> where
72    T: 'static, P: ServerProto<T>
73{
74    type Request = P::Request;
75    type RequestBody = ();
76
77    type Response = P::Response;
78    type ResponseBody = ();
79
80    type Error = io::Error;
81
82    type Transport = LiftTransport<P::Transport, io::Error>;
83    type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
84
85    fn bind_transport(&self, io: T) -> Self::BindTransport {
86        LiftBind::lift(ServerProto::bind_transport(self.lower(), io).into_future())
87    }
88}
89
90struct LiftService<S>(S);
91
92impl<S: Service> Service for LiftService<S> {
93    type Request = streaming::Message<S::Request, streaming::Body<(), S::Error>>;
94    type Response = streaming::Message<S::Response, MyStream<S::Error>>;
95    type Error = S::Error;
96    type Future = LiftFuture<S::Future, stream::Empty<(), S::Error>>;
97
98    fn call(&self, req: Self::Request) -> Self::Future {
99        match req {
100            Message::WithoutBody(msg) => {
101                LiftFuture(self.0.call(msg), marker::PhantomData)
102            }
103            Message::WithBody(..) => panic!("bodies not supported"),
104        }
105    }
106}
107
108struct LiftFuture<F, T>(F, marker::PhantomData<fn() -> T>);
109
110impl<F: Future, T> Future for LiftFuture<F, T> {
111    type Item = Message<F::Item, T>;
112    type Error = F::Error;
113
114    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
115        let item = try_ready!(self.0.poll());
116        Ok(Message::WithoutBody(item).into())
117    }
118}