1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::io;
use std::mem;
use std::fmt;
use std::error;
use std::net::SocketAddr;
use mio;
use futures::{Poll, Async, Future};
pub use self::udp::UdpSocket;
pub use self::tcp::{TcpListener, TcpStream};
use fiber;
use internal::io_poll::{EventedHandle, Register};
pub mod futures {
pub use super::udp::{UdpSocketBind, SendTo, RecvFrom};
pub use super::tcp::{TcpListenerBind, Connect, Connected};
}
pub mod streams {
pub use super::tcp::Incoming;
}
mod udp;
mod tcp;
enum Bind<F, T> {
Bind(SocketAddr, F),
Registering(Register<T>),
Polled,
}
impl<F, T> Future for Bind<F, T>
where F: FnOnce(&SocketAddr) -> io::Result<T>,
T: mio::Evented + Send + 'static
{
type Item = EventedHandle<T>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match mem::replace(self, Bind::Polled) {
Bind::Bind(addr, bind) => {
let socket = bind(&addr)?;
let register =
assert_some!(fiber::with_current_context(|mut c| c.poller().register(socket)));
*self = Bind::Registering(register);
self.poll()
}
Bind::Registering(mut future) => {
if let Async::Ready(handle) = future.poll().map_err(into_io_error)? {
Ok(Async::Ready(handle))
} else {
*self = Bind::Registering(future);
Ok(Async::NotReady)
}
}
Bind::Polled => panic!("Cannot poll Bind twice"),
}
}
}
impl<F, T> fmt::Debug for Bind<F, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Bind::Bind(addr, _) => write!(f, "Bind::Bind({:?}, _)", addr),
Bind::Registering(_) => write!(f, "Bind::Registering(_)"),
Bind::Polled => write!(f, "Bind::Polled"),
}
}
}
fn into_io_error<E: error::Error + Send + Sync + 'static>(error: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(error))
}