nexus/tcp/
protocol.rs

1use protocol::Protocol;
2use reactor::{Token, Configurer, ReactorError};
3use mio::tcp::{TcpStream, TcpListener};
4
5/// Trait used with a TCP reactor.
6pub trait TcpProtocol: Protocol {
7    /// Event called when a new tcp connections is received.
8    ///
9    /// The socket should be transformed into `Self::Socket` type and added to the reactor with the
10    /// configurer.
11    fn on_connect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: TcpStream);
12}
13
14pub struct ReactorProtocol<P> {
15    protocol: P,
16    listener: TcpListener,
17}
18
19impl<P> ReactorProtocol<P> {
20    pub fn new(proto: P, listener: TcpListener) -> ReactorProtocol<P> {
21        ReactorProtocol{
22            protocol: proto,
23            listener: listener,
24        }
25    }
26}
27
28impl<P: TcpProtocol> TcpProtocol for ReactorProtocol<P> {
29    fn on_connect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: TcpStream) {
30        self.protocol.on_connect(configurer, socket);
31    }
32}
33
34impl<P: TcpProtocol> Protocol for ReactorProtocol<P> {
35    type Socket = P::Socket;
36
37    fn on_readable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
38        self.protocol.on_readable(configurer, socket, token)
39    }
40
41    fn on_writable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
42        self.protocol.on_writable(configurer, socket, token)
43    }
44
45    fn on_timeout(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
46        self.protocol.on_timeout(configurer, socket, token)
47    }
48
49    fn on_disconnect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
50        self.protocol.on_disconnect(configurer, socket, token)
51    }
52
53    fn on_socket_error(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
54        self.protocol.on_socket_error(configurer, socket, token)
55    }
56
57    fn on_event_loop_error(&mut self, error: ReactorError<Self::Socket>) {
58        self.protocol.on_event_loop_error(error)
59    }
60
61    fn tick(&mut self, configurer: &mut Configurer<Self::Socket>) {
62        loop {
63            let l = &mut self.listener;
64            let proto = &mut self.protocol;
65            match l.accept() {
66                Ok(Some((skt, _addr))) => {
67                    proto.on_connect(configurer, skt);
68                },
69                Ok(None) => break,
70                Err(e) => {
71                    error!("error accepting connections: {:?}", e);
72                    configurer.shutdown(e);
73                    return
74                },
75            }
76        }
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use mio::tcp::{TcpListener, TcpStream};
83    use test_helpers::{FakeTcpProtocol};
84    use protocol::Protocol;
85    use std::thread;
86    use super::ReactorProtocol;
87    use reactor::configurer::{ProtocolConfigurer};
88
89    #[test]
90    fn test_adding_new_connections() {
91        let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
92        let addr = l.local_addr().unwrap();
93        let proto = FakeTcpProtocol::new();
94        let mut reactor_proto = ReactorProtocol::new(proto.clone(), l);
95        let mut configurer = ProtocolConfigurer::new();
96
97        let handle = thread::spawn(move || {
98            let _stream1 = TcpStream::connect(&addr).unwrap();
99            let _stream2 = TcpStream::connect(&addr).unwrap();
100        });
101
102        handle.join().unwrap();
103        reactor_proto.tick(&mut configurer);
104
105        assert_eq!(proto.connect_count(), 2);
106    }
107}