1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::cell::Ref;
use std::io::{Result as IoResult, ErrorKind};
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;
use mio::{Token, Interest};
use mio::net::TcpListener as MioTcpListener;
use log::warn;
use crate::utilities::Timeout;
use crate::{TcpConnection, TcpManager, TcpError};
use crate::manager::{TcpPollContext};
#[derive(Debug)]
pub struct TcpListener {
listener: MioTcpListener,
token: Token,
manager: Rc<TcpManager>,
}
impl TcpListener {
pub fn bind(manager: &Rc<TcpManager>, addr: SocketAddr) -> IoResult<Self> {
let manager = manager.clone();
let (listener, token) = Self::initialize(manager.context(), addr)?;
Ok(Self {
listener,
token,
manager,
})
}
fn initialize(context: Ref<TcpPollContext>, addr: SocketAddr) -> IoResult<(MioTcpListener, Token)> {
let mut listener = MioTcpListener::bind(addr)?;
let token = context.token();
context.registry().register(&mut listener, token, Interest::READABLE)?;
Ok((listener, token))
}
pub fn accept(&self, timeout: Option<Duration>) -> Result<TcpConnection, TcpError> {
if self.manager.cancelled() {
return Err(TcpError::Cancelled);
}
let timeout = Timeout::start(timeout);
match Self::event_accept(&self.listener) {
Ok(Some(connection)) => return Ok(connection),
Ok(_) => (),
Err(error) => return Err(error.into()),
}
let mut context = self.manager.context_mut();
loop {
let remaining = timeout.remaining_time();
match context.poll(remaining) {
Ok(events) => {
for _event in events.iter().filter(|event| event.token() == self.token) {
match Self::event_accept(&self.listener) {
Ok(Some(connection)) => return Ok(connection),
Ok(_) => (),
Err(error) => return Err(error.into()),
}
}
},
Err(error) => return Err(error.into()),
}
if self.manager.cancelled() {
return Err(TcpError::Cancelled);
}
if remaining.map(|time| time.is_zero()).unwrap_or(false) {
return Err(TcpError::TimedOut);
}
}
}
fn event_accept(listener: &MioTcpListener) -> IoResult<Option<TcpConnection>> {
loop {
match listener.accept() {
Ok((stream, _addr)) => return Ok(Some(TcpConnection::new(stream))),
Err(error) => match error.kind() {
ErrorKind::Interrupted => (),
ErrorKind::WouldBlock => return Ok(None),
_ => return Err(error),
},
}
}
}
pub fn local_addr(&self) -> Option<SocketAddr> {
self.listener.local_addr().ok()
}
}
impl Drop for TcpListener {
fn drop(&mut self) {
let context = self.manager.context();
if let Err(error) = context.registry().deregister(&mut self.listener) {
warn!("Failed to de-register: {:?}", error);
}
}
}