1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::io;
use std::net::SocketAddr;

use local_addr::LocalAddr;

use futures::Poll;
use futures::future::Future;
use futures::stream::Stream;
use tokio_core::net::{TcpStream, TcpStreamNew, Incoming, TcpListener};
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};

/// Trait for initializing connections over an abstract `Transport`.
pub trait Transport {
    /// Concrete socket.
    type Socket: AsyncRead + AsyncWrite + 'static;

    /// Future `Self::Socket`.
    type FutureSocket: Future<Item=Self::Socket, Error=io::Error> + 'static;

    /// Concrete listener.
    type Listener: Stream<Item=(Self::Socket, SocketAddr), Error=io::Error> + LocalAddr + 'static;

    /// Connect to the given address over this transport, using the supplied `Handle`.
    fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::FutureSocket>;

    /// Listen to the given address for this transport, using the supplied `Handle`.
    fn listen(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::Listener>;
}

//----------------------------------------------------------------------------------//

/// Defines a `Transport` operating over TCP.
pub struct TcpTransport;

impl Transport for TcpTransport {
    type Socket = TcpStream;
    type FutureSocket = TcpStreamNew;
    type Listener = TcpListenerStream<Incoming>;

    fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::FutureSocket> {
        Ok(TcpStream::connect(addr, handle))
    }

    fn listen(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::Listener> {
        let listener = try!(TcpListener::bind(addr, handle));
        let listen_addr = try!(listener.local_addr());

        Ok(TcpListenerStream::new(listen_addr, listener.incoming()))
    }
}

/// Convenient object that wraps a listener stream `L`, and also implements `LocalAddr`.
pub struct TcpListenerStream<L> {
    listen_addr: SocketAddr,
    listener:    L
}

impl<L> TcpListenerStream<L> {
    fn new(listen_addr: SocketAddr, listener: L) -> TcpListenerStream<L> {
        TcpListenerStream{ listen_addr: listen_addr, listener: listener }
    }
}

impl<L> Stream for TcpListenerStream<L> where L: Stream {
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.listener.poll()
    }
}

impl<L> LocalAddr for TcpListenerStream<L> {
    fn local_addr(&self) -> io::Result<SocketAddr> {
        Ok(self.listen_addr)
    }
}

//----------------------------------------------------------------------------------//

#[cfg(test)]
pub mod test_transports {
    use std::io::{self, Cursor};
    use std::net::SocketAddr;

    use super::Transport;
    use local_addr::LocalAddr;

    use futures::{Poll};
    use futures::future::{self, FutureResult};
    use futures::stream::{self, Stream, Empty};
    use tokio_core::reactor::Handle;

    pub struct MockTransport;

    impl Transport for MockTransport {
        type Socket       = Cursor<Vec<u8>>;
        type FutureSocket = FutureResult<Self::Socket, io::Error>;
        type Listener     = MockListener;

        fn connect(_addr: &SocketAddr, _handle: &Handle) -> io::Result<Self::FutureSocket> {
            Ok(future::ok(Cursor::new(Vec::new())))
        }

        fn listen(addr: &SocketAddr, _handle: &Handle) -> io::Result<Self::Listener> {
            Ok(MockListener::new(*addr))
        }
    }

    //----------------------------------------------------------------------------------//

    pub struct MockListener {
        addr: SocketAddr,
        empty: Empty<(Cursor<Vec<u8>>, SocketAddr), io::Error>
    }

    impl MockListener {
        fn new(addr: SocketAddr) -> MockListener {
            MockListener{ addr: addr, empty: stream::empty() }
        }
    }

    impl LocalAddr for MockListener {
        fn local_addr(&self) -> io::Result<SocketAddr> {
            Ok(self.addr)
        }
    }

    impl Stream for MockListener {
        type Item = (Cursor<Vec<u8>>, SocketAddr);
        type Error = io::Error;

        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
            self.empty.poll()
        }
    }
}