1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::io;
use std::net::SocketAddr;
use local_addr::LocalAddr;
use futures::Poll;
use futures::future::Future;
use futures::stream::Stream;
use tokio_core::net::{TcpStream, TcpStreamNew, Incoming, TcpListener};
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
pub trait Transport {
type Socket: AsyncRead + AsyncWrite + 'static;
type FutureSocket: Future<Item=Self::Socket, Error=io::Error> + 'static;
type Listener: Stream<Item=(Self::Socket, SocketAddr), Error=io::Error> + LocalAddr + 'static;
fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::FutureSocket>;
fn listen(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::Listener>;
}
pub struct TcpTransport;
impl Transport for TcpTransport {
type Socket = TcpStream;
type FutureSocket = TcpStreamNew;
type Listener = TcpListenerStream<Incoming>;
fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::FutureSocket> {
Ok(TcpStream::connect(addr, handle))
}
fn listen(addr: &SocketAddr, handle: &Handle) -> io::Result<Self::Listener> {
let listener = try!(TcpListener::bind(addr, handle));
let listen_addr = try!(listener.local_addr());
Ok(TcpListenerStream::new(listen_addr, listener.incoming()))
}
}
pub struct TcpListenerStream<L> {
listen_addr: SocketAddr,
listener: L
}
impl<L> TcpListenerStream<L> {
fn new(listen_addr: SocketAddr, listener: L) -> TcpListenerStream<L> {
TcpListenerStream{ listen_addr: listen_addr, listener: listener }
}
}
impl<L> Stream for TcpListenerStream<L> where L: Stream {
type Item = L::Item;
type Error = L::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.listener.poll()
}
}
impl<L> LocalAddr for TcpListenerStream<L> {
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.listen_addr)
}
}
#[cfg(test)]
pub mod test_transports {
use std::io::{self, Cursor};
use std::net::SocketAddr;
use super::Transport;
use local_addr::LocalAddr;
use futures::{Poll};
use futures::future::{self, FutureResult};
use futures::stream::{self, Stream, Empty};
use tokio_core::reactor::Handle;
pub struct MockTransport;
impl Transport for MockTransport {
type Socket = Cursor<Vec<u8>>;
type FutureSocket = FutureResult<Self::Socket, io::Error>;
type Listener = MockListener;
fn connect(_addr: &SocketAddr, _handle: &Handle) -> io::Result<Self::FutureSocket> {
Ok(future::ok(Cursor::new(Vec::new())))
}
fn listen(addr: &SocketAddr, _handle: &Handle) -> io::Result<Self::Listener> {
Ok(MockListener::new(*addr))
}
}
pub struct MockListener {
addr: SocketAddr,
empty: Empty<(Cursor<Vec<u8>>, SocketAddr), io::Error>
}
impl MockListener {
fn new(addr: SocketAddr) -> MockListener {
MockListener{ addr: addr, empty: stream::empty() }
}
}
impl LocalAddr for MockListener {
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.addr)
}
}
impl Stream for MockListener {
type Item = (Cursor<Vec<u8>>, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.empty.poll()
}
}
}