queen_core/service/
connection.rs

1use std::io;
2use std::io::{Read, Write};
3use std::io::ErrorKind::WouldBlock;
4use std::io::Cursor;
5use std::mem;
6
7use queen_io::tcp::TcpStream;
8use queen_io::Poll;
9use queen_io::Token;
10use queen_io::Ready;
11use queen_io::PollOpt;
12use queen_io::Evented;
13use queen_io::channel::Sender;
14
15use byteorder::{LittleEndian, ReadBytesExt};
16
17use wire_protocol::message::Message;
18use wire_protocol::header::Header;
19
20use super::bufstream::Stream;
21use super::service::ServiceEvent;
22
23pub struct Connection {
24    socket: TcpStream,
25    token: Token,
26    interest: Ready,
27    stream: Stream,
28}
29
30impl Connection {
31    pub fn new(socket: TcpStream, token: Token) -> io::Result<Connection> {
32        let conn = Connection {
33            socket: socket,
34            token: token,
35            interest: Ready::readable() | Ready::hup(),
36            stream: Stream::new(),
37        };
38
39        Ok(conn)
40    }
41
42    pub fn reader(&mut self, poll: &Poll, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
43        self.interest.remove(Ready::readable());
44
45        loop {
46            let mut buf = [0; 8 * 1024];
47
48            match self.socket.read(&mut buf) {
49                Ok(size) => {
50                    if size == 0 {
51                        return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
52                    } else {
53                        self.stream.reader.extend_from_slice(&buf[0..size]);
54                    }
55                }
56                Err(err) => {
57                    if let WouldBlock = err.kind() {
58                        break;
59                    } else {
60                        return Err(err)
61                    }
62                }
63            }
64
65            if self.stream.reader.len() > 32 * 1024 * 1024 {
66                break;
67            }
68        }
69
70        self.send_message(tx_out)?;
71
72        if self.stream.reader.len() < 16 * 1024 * 1024 {
73            self.interest.insert(Ready::readable());
74        }
75
76        if self.stream.writer.len() > 0 {
77            self.interest.insert(Ready::writable());
78        }
79
80        self.reregister_insterest(poll);
81
82        Ok(())
83    }
84
85    pub fn writer(&mut self, poll: &Poll) -> io::Result<()>{
86        self.interest.remove(Ready::writable());
87
88        loop {
89            match self.socket.write(&self.stream.writer) {
90                Ok(size) => {
91                    if size == 0 {
92                        return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
93                    } else {
94                        if size < self.stream.writer.len() {
95                            self.stream.writer = self.stream.writer.split_off(size);
96                        } else {
97                            self.stream.writer.clear();
98                            break;
99                        }
100                    }
101                }
102                Err(err) => {
103                    if let WouldBlock = err.kind() {
104                        break;
105                    } else {
106                        return Err(err)
107                    }
108                }
109            }
110        }
111
112        if self.stream.reader.len() < 16 * 1024 * 1024 {
113            self.interest.insert(Ready::readable());
114        }
115
116        if self.stream.writer.len() > 0 {
117            self.interest.insert(Ready::writable());
118        }
119
120        self.reregister_insterest(poll);
121
122        Ok(())
123    }
124
125    pub fn register_insterest(&self, poll: &Poll) {
126        self.register(
127            poll,
128            self.token,
129            self.interest,
130            PollOpt::edge() | PollOpt::oneshot()
131        ).unwrap();
132    }
133
134    pub fn reregister_insterest(&mut self, poll: &Poll) {
135        self.reregister(
136            poll,
137            self.token,
138            self.interest,
139            PollOpt::edge() | PollOpt::oneshot()
140        ).unwrap();
141    }
142
143    pub fn send_message(&mut self, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
144        loop {
145
146            if self.stream.reader.len() < mem::size_of::<Header>() {
147                return Ok(())
148            }
149
150            let message_length = {
151                Cursor::new(&self.stream.reader).read_i32::<LittleEndian>()? as usize
152            };
153
154            if self.stream.reader.len() < message_length {
155                return Ok(())
156            }
157
158            let (message, position) = {
159
160                let mut cursor = Cursor::new(&self.stream.reader);
161
162                let message = match Message::read(&mut cursor)? {
163                    Some(message) => message,
164                    None => return Ok(()),
165                };
166
167                (message, cursor.position())
168            };
169
170            self.stream.reader = self.stream.reader.split_off(position as usize);
171
172            let _ = tx_out.send(ServiceEvent::Message(self.token.into(), message));
173        }
174    }
175
176    pub fn recv_message(&mut self, poll: &Poll, message: Message) -> io::Result<()> {
177        message.write(&mut self.stream.writer)?;
178
179        if self.stream.reader.len() < 16 * 1024 * 1024 {
180            self.interest.insert(Ready::readable());
181        }
182
183        if self.stream.writer.len() > 0 {
184            self.interest.insert(Ready::writable());
185        }
186
187        self.reregister_insterest(poll);
188
189        Ok(())
190    }
191}
192
193impl Evented for Connection {
194    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
195         -> io::Result<()>
196    {
197        self.socket.register(poll, token, interest, opts)
198    }
199
200    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
201         -> io::Result<()>
202    {
203        self.socket.reregister(poll, token, interest, opts)
204    }
205
206    fn deregister(&self, poll: &Poll) -> io::Result<()> {
207        self.socket.deregister(poll)
208    }
209}