1use std::cell::Ref;
7use std::io::{Result as IoResult, ErrorKind};
8use std::net::SocketAddr;
9use std::rc::Rc;
10use std::time::Duration;
11
12use mio::{Token, Interest};
13use mio::net::TcpListener as MioTcpListener;
14
15use log::warn;
16
17use crate::utilities::Timeout;
18use crate::{TcpConnection, TcpManager, TcpError};
19use crate::manager::{TcpPollContext};
20
21#[derive(Debug)]
34pub struct TcpListener {
35 listener: MioTcpListener,
36 token: Token,
37 manager: Rc<TcpManager>,
38}
39
40impl TcpListener {
41 pub fn bind(manager: &Rc<TcpManager>, addr: SocketAddr) -> IoResult<Self> {
45 let manager = manager.clone();
46 let (listener, token) = Self::initialize(manager.context(), addr)?;
47
48 Ok(Self {
49 listener,
50 token,
51 manager,
52 })
53 }
54
55 fn initialize(context: Ref<TcpPollContext>, addr: SocketAddr) -> IoResult<(MioTcpListener, Token)> {
56 let mut listener = MioTcpListener::bind(addr)?;
57 let token = context.token();
58 context.registry().register(&mut listener, token, Interest::READABLE)?;
59 Ok((listener, token))
60 }
61
62 pub fn accept(&self, timeout: Option<Duration>) -> Result<TcpConnection, TcpError> {
67 if self.manager.cancelled() {
68 return Err(TcpError::Cancelled);
69 }
70
71 let timeout = Timeout::start(timeout);
72
73 match Self::event_accept(&self.listener) {
74 Ok(Some(connection)) => return Ok(connection),
75 Ok(_) => (),
76 Err(error) => return Err(error.into()),
77 }
78
79 let mut context = self.manager.context_mut();
80
81 loop {
82 let remaining = timeout.remaining_time();
83 match context.poll(remaining) {
84 Ok(events) => {
85 for _event in events.iter().filter(|event| event.token() == self.token) {
86 match Self::event_accept(&self.listener) {
87 Ok(Some(connection)) => return Ok(connection),
88 Ok(_) => (),
89 Err(error) => return Err(error.into()),
90 }
91 }
92 },
93 Err(error) => return Err(error.into()),
94 }
95 if self.manager.cancelled() {
96 return Err(TcpError::Cancelled);
97 }
98 if remaining.map(|time| time.is_zero()).unwrap_or(false) {
99 return Err(TcpError::TimedOut);
100 }
101 }
102 }
103
104 fn event_accept(listener: &MioTcpListener) -> IoResult<Option<TcpConnection>> {
105 loop {
106 match listener.accept() {
107 Ok((stream, _addr)) => return Ok(Some(TcpConnection::new(stream))),
108 Err(error) => match error.kind() {
109 ErrorKind::Interrupted => (),
110 ErrorKind::WouldBlock => return Ok(None),
111 _ => return Err(error),
112 },
113 }
114 }
115 }
116
117 pub fn local_addr(&self) -> Option<SocketAddr> {
119 self.listener.local_addr().ok()
120 }
121}
122
123impl Drop for TcpListener {
124 fn drop(&mut self) {
125 let context = self.manager.context();
126 if let Err(error) = context.registry().deregister(&mut self.listener) {
127 warn!("Failed to de-register: {:?}", error);
128 }
129 }
130}