mtcp_rs/
listener.rs

1/*
2 * mtcp - TcpListener/TcpStream *with* timeout/cancellation support
3 * This is free and unencumbered software released into the public domain.
4 */
5
6use std::cell::Ref;
7use std::io::{Result as IoResult, ErrorKind};
8use std::net::SocketAddr;
9use std::rc::Rc;
10use std::time::Duration;
11
12use mio::{Token, Interest};
13use mio::net::TcpListener as MioTcpListener;
14
15use log::warn;
16
17use crate::utilities::Timeout;
18use crate::{TcpConnection, TcpManager, TcpError};
19use crate::manager::{TcpPollContext};
20
21/// A TCP socket server, listening for connections, akin to
22/// [`std::net::TcpListener`](std::net::TcpListener)
23///
24/// All I/O operations provided by `mtcp_rs::TcpListener` are "blocking", but –
25/// unlike the `std::net` implementation – proper ***timeout*** and
26/// ***cancellation*** support is available. The `mtcp_rs::TcpListener` is tied
27/// to an [`mtcp_rs::TcpManager`](crate::TcpManager) instance.
28/// 
29/// If the `timeout` parameter was set to `Some(Duration)` and if the I/O
30/// operation does **not** complete before the specified timeout period
31/// expires, then the pending I/O operation will be aborted and fail with an
32/// [`TcpError::TimedOut`](crate::TcpError::TimedOut) error.
33#[derive(Debug)]
34pub struct TcpListener {
35    listener: MioTcpListener,
36    token: Token,
37    manager: Rc<TcpManager>,
38}
39
40impl TcpListener {
41    /// Creates a new `TcpListener` which will be bound to the specified socket address.
42    /// 
43    /// The new `TcpListener` is tied to the specified `TcpManager` instance.
44    pub fn bind(manager: &Rc<TcpManager>, addr: SocketAddr) -> IoResult<Self> {
45        let manager = manager.clone();
46        let (listener, token) = Self::initialize(manager.context(), addr)?;
47
48        Ok(Self {
49            listener,
50            token,
51            manager,
52        })
53    }
54
55    fn initialize(context: Ref<TcpPollContext>, addr: SocketAddr) -> IoResult<(MioTcpListener, Token)> {
56        let mut listener = MioTcpListener::bind(addr)?;
57        let token = context.token();
58        context.registry().register(&mut listener, token, Interest::READABLE)?;
59        Ok((listener, token))
60    }
61
62    /// Accept a new incoming TCP connection from this listener.
63    /// 
64    /// An optional ***timeout*** can be specified, after which the operation
65    /// is going to fail, if there is **no** incoming connection yet. 
66    pub fn accept(&self, timeout: Option<Duration>) -> Result<TcpConnection, TcpError> {
67        if self.manager.cancelled() {
68            return Err(TcpError::Cancelled);
69        }
70
71        let timeout = Timeout::start(timeout);
72
73        match Self::event_accept(&self.listener) {
74            Ok(Some(connection)) => return Ok(connection),
75            Ok(_) => (),
76            Err(error) => return Err(error.into()),
77        }
78
79        let mut context = self.manager.context_mut();
80
81        loop {
82            let remaining = timeout.remaining_time();
83            match context.poll(remaining) {
84                Ok(events) => {
85                    for _event in events.iter().filter(|event| event.token() == self.token) {
86                        match Self::event_accept(&self.listener) {
87                            Ok(Some(connection)) => return Ok(connection),
88                            Ok(_) => (),
89                            Err(error) => return Err(error.into()),
90                        }
91                    }
92                },
93                Err(error) => return Err(error.into()),
94            }
95            if self.manager.cancelled() {
96                return Err(TcpError::Cancelled);
97            }
98            if remaining.map(|time| time.is_zero()).unwrap_or(false) {
99                return Err(TcpError::TimedOut);
100            }
101        }
102    }
103
104    fn event_accept(listener: &MioTcpListener) -> IoResult<Option<TcpConnection>> {
105        loop {
106            match listener.accept() {
107                Ok((stream, _addr)) => return Ok(Some(TcpConnection::new(stream))),
108                Err(error) => match error.kind() {
109                    ErrorKind::Interrupted => (),
110                    ErrorKind::WouldBlock => return Ok(None),
111                    _ => return Err(error),
112                },
113            }
114        }
115    }
116
117    /// Get the *local* socket address to which this `TcpListener` is bound.
118    pub fn local_addr(&self) -> Option<SocketAddr> {
119        self.listener.local_addr().ok()
120    }
121}
122
123impl Drop for TcpListener {
124    fn drop(&mut self) {
125        let context = self.manager.context();
126        if let Err(error) = context.registry().deregister(&mut self.listener) {
127            warn!("Failed to de-register: {:?}", error);
128        }
129    }
130}