queen_core/service/
connection.rs1use std::io;
2use std::io::{Read, Write};
3use std::io::ErrorKind::WouldBlock;
4use std::io::Cursor;
5use std::mem;
6
7use queen_io::tcp::TcpStream;
8use queen_io::Poll;
9use queen_io::Token;
10use queen_io::Ready;
11use queen_io::PollOpt;
12use queen_io::Evented;
13use queen_io::channel::Sender;
14
15use byteorder::{LittleEndian, ReadBytesExt};
16
17use wire_protocol::message::Message;
18use wire_protocol::header::Header;
19
20use super::bufstream::Stream;
21use super::service::ServiceEvent;
22
23pub struct Connection {
24 socket: TcpStream,
25 token: Token,
26 interest: Ready,
27 stream: Stream,
28}
29
30impl Connection {
31 pub fn new(socket: TcpStream, token: Token) -> io::Result<Connection> {
32 let conn = Connection {
33 socket: socket,
34 token: token,
35 interest: Ready::readable() | Ready::hup(),
36 stream: Stream::new(),
37 };
38
39 Ok(conn)
40 }
41
42 pub fn reader(&mut self, poll: &Poll, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
43 self.interest.remove(Ready::readable());
44
45 loop {
46 let mut buf = [0; 8 * 1024];
47
48 match self.socket.read(&mut buf) {
49 Ok(size) => {
50 if size == 0 {
51 return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
52 } else {
53 self.stream.reader.extend_from_slice(&buf[0..size]);
54 }
55 }
56 Err(err) => {
57 if let WouldBlock = err.kind() {
58 break;
59 } else {
60 return Err(err)
61 }
62 }
63 }
64
65 if self.stream.reader.len() > 32 * 1024 * 1024 {
66 break;
67 }
68 }
69
70 self.send_message(tx_out)?;
71
72 if self.stream.reader.len() < 16 * 1024 * 1024 {
73 self.interest.insert(Ready::readable());
74 }
75
76 if self.stream.writer.len() > 0 {
77 self.interest.insert(Ready::writable());
78 }
79
80 self.reregister_insterest(poll);
81
82 Ok(())
83 }
84
85 pub fn writer(&mut self, poll: &Poll) -> io::Result<()>{
86 self.interest.remove(Ready::writable());
87
88 loop {
89 match self.socket.write(&self.stream.writer) {
90 Ok(size) => {
91 if size == 0 {
92 return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
93 } else {
94 if size < self.stream.writer.len() {
95 self.stream.writer = self.stream.writer.split_off(size);
96 } else {
97 self.stream.writer.clear();
98 break;
99 }
100 }
101 }
102 Err(err) => {
103 if let WouldBlock = err.kind() {
104 break;
105 } else {
106 return Err(err)
107 }
108 }
109 }
110 }
111
112 if self.stream.reader.len() < 16 * 1024 * 1024 {
113 self.interest.insert(Ready::readable());
114 }
115
116 if self.stream.writer.len() > 0 {
117 self.interest.insert(Ready::writable());
118 }
119
120 self.reregister_insterest(poll);
121
122 Ok(())
123 }
124
125 pub fn register_insterest(&self, poll: &Poll) {
126 self.register(
127 poll,
128 self.token,
129 self.interest,
130 PollOpt::edge() | PollOpt::oneshot()
131 ).unwrap();
132 }
133
134 pub fn reregister_insterest(&mut self, poll: &Poll) {
135 self.reregister(
136 poll,
137 self.token,
138 self.interest,
139 PollOpt::edge() | PollOpt::oneshot()
140 ).unwrap();
141 }
142
143 pub fn send_message(&mut self, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
144 loop {
145
146 if self.stream.reader.len() < mem::size_of::<Header>() {
147 return Ok(())
148 }
149
150 let message_length = {
151 Cursor::new(&self.stream.reader).read_i32::<LittleEndian>()? as usize
152 };
153
154 if self.stream.reader.len() < message_length {
155 return Ok(())
156 }
157
158 let (message, position) = {
159
160 let mut cursor = Cursor::new(&self.stream.reader);
161
162 let message = match Message::read(&mut cursor)? {
163 Some(message) => message,
164 None => return Ok(()),
165 };
166
167 (message, cursor.position())
168 };
169
170 self.stream.reader = self.stream.reader.split_off(position as usize);
171
172 let _ = tx_out.send(ServiceEvent::Message(self.token.into(), message));
173 }
174 }
175
176 pub fn recv_message(&mut self, poll: &Poll, message: Message) -> io::Result<()> {
177 message.write(&mut self.stream.writer)?;
178
179 if self.stream.reader.len() < 16 * 1024 * 1024 {
180 self.interest.insert(Ready::readable());
181 }
182
183 if self.stream.writer.len() > 0 {
184 self.interest.insert(Ready::writable());
185 }
186
187 self.reregister_insterest(poll);
188
189 Ok(())
190 }
191}
192
193impl Evented for Connection {
194 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
195 -> io::Result<()>
196 {
197 self.socket.register(poll, token, interest, opts)
198 }
199
200 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
201 -> io::Result<()>
202 {
203 self.socket.reregister(poll, token, interest, opts)
204 }
205
206 fn deregister(&self, poll: &Poll) -> io::Result<()> {
207 self.socket.deregister(poll)
208 }
209}