1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use codec::ServerCodec;
use futures::{stream, Poll, Stream, Sink, StartSend};
use std::io;
use tokio_core::io::{EasyBuf, Io, Framed};
use tokio_proto::streaming::pipeline::{ServerProto, Transport};
use types::{GopherRequest, GopherResponse, Void};

pub struct GopherServer;

impl<T: Io + 'static> ServerProto<T> for GopherServer {
    type Request = GopherRequest;
    type RequestBody = Void;

    type Response = GopherResponse;
    type ResponseBody = EasyBuf;

    type Error = io::Error;

    type Transport = OneShot<Framed<T, ServerCodec>>;
    type BindTransport = Result<Self::Transport, io::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(OneShot::new(io.framed(ServerCodec)))
    }
}

/// Transport that closes the stream after one request.
pub struct OneShot<S>(stream::Take<S>);

impl<S: Stream> OneShot<S> {
    fn new(stream: S) -> Self {
        OneShot(stream.take(1))
    }
}

impl<S: Stream> Stream for OneShot<S> {
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        self.0.poll()
    }
}

impl<S: Sink + Stream> Sink for OneShot<S> {
    type SinkItem = S::SinkItem;
    type SinkError = S::SinkError;

    fn start_send(&mut self, item: S::SinkItem) -> StartSend<S::SinkItem, S::SinkError> {
        self.0.start_send(item)
    }

    fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
        self.0.poll_complete()
    }
}

impl<S> Transport for OneShot<S> where S: 'static +
                                          Stream<Error=io::Error> +
                                          Sink<SinkError = io::Error> {}