1use protocol::Protocol;
2use reactor::{Token, Configurer, ReactorError};
3use mio::tcp::{TcpStream, TcpListener};
4
5pub trait TcpProtocol: Protocol {
7 fn on_connect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: TcpStream);
12}
13
14pub struct ReactorProtocol<P> {
15 protocol: P,
16 listener: TcpListener,
17}
18
19impl<P> ReactorProtocol<P> {
20 pub fn new(proto: P, listener: TcpListener) -> ReactorProtocol<P> {
21 ReactorProtocol{
22 protocol: proto,
23 listener: listener,
24 }
25 }
26}
27
28impl<P: TcpProtocol> TcpProtocol for ReactorProtocol<P> {
29 fn on_connect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: TcpStream) {
30 self.protocol.on_connect(configurer, socket);
31 }
32}
33
34impl<P: TcpProtocol> Protocol for ReactorProtocol<P> {
35 type Socket = P::Socket;
36
37 fn on_readable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
38 self.protocol.on_readable(configurer, socket, token)
39 }
40
41 fn on_writable(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
42 self.protocol.on_writable(configurer, socket, token)
43 }
44
45 fn on_timeout(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
46 self.protocol.on_timeout(configurer, socket, token)
47 }
48
49 fn on_disconnect(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
50 self.protocol.on_disconnect(configurer, socket, token)
51 }
52
53 fn on_socket_error(&mut self, configurer: &mut Configurer<Self::Socket>, socket: &mut Self::Socket, token: Token) {
54 self.protocol.on_socket_error(configurer, socket, token)
55 }
56
57 fn on_event_loop_error(&mut self, error: ReactorError<Self::Socket>) {
58 self.protocol.on_event_loop_error(error)
59 }
60
61 fn tick(&mut self, configurer: &mut Configurer<Self::Socket>) {
62 loop {
63 let l = &mut self.listener;
64 let proto = &mut self.protocol;
65 match l.accept() {
66 Ok(Some((skt, _addr))) => {
67 proto.on_connect(configurer, skt);
68 },
69 Ok(None) => break,
70 Err(e) => {
71 error!("error accepting connections: {:?}", e);
72 configurer.shutdown(e);
73 return
74 },
75 }
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use mio::tcp::{TcpListener, TcpStream};
83 use test_helpers::{FakeTcpProtocol};
84 use protocol::Protocol;
85 use std::thread;
86 use super::ReactorProtocol;
87 use reactor::configurer::{ProtocolConfigurer};
88
89 #[test]
90 fn test_adding_new_connections() {
91 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
92 let addr = l.local_addr().unwrap();
93 let proto = FakeTcpProtocol::new();
94 let mut reactor_proto = ReactorProtocol::new(proto.clone(), l);
95 let mut configurer = ProtocolConfigurer::new();
96
97 let handle = thread::spawn(move || {
98 let _stream1 = TcpStream::connect(&addr).unwrap();
99 let _stream2 = TcpStream::connect(&addr).unwrap();
100 });
101
102 handle.join().unwrap();
103 reactor_proto.tick(&mut configurer);
104
105 assert_eq!(proto.connect_count(), 2);
106 }
107}