1use std::io;
2use std::error::Error;
3use std::io::ErrorKind::{WouldBlock, BrokenPipe, WriteZero, ConnectionReset};
4
5use rotor::{Response, Scope, Machine, EventSet, PollOpt, Time};
6use rotor::void::{Void, unreachable};
7
8use substr::find_substr;
9use extensions::{ScopeExt, ResponseExt};
10use {Expectation, Protocol, StreamSocket, Stream, StreamImpl};
11use {Buf, Transport, Accepted, Exception, Intent};
12use {ProtocolStop, SocketError};
13
14
15#[derive(Debug)]
16enum IoOp {
17 Done,
18 NoOp,
19 Eos,
20 Error(io::Error),
21}
22
23fn to_result<P: Protocol>(intent: Intent<P>)
24 -> Result<(P, Expectation, Option<Time>), Option<Box<Error>>>
25{
26 match intent.0 {
27 Ok(x) => Ok((x, intent.1, intent.2)),
28 Err(e) => Err(e),
29 }
30}
31
32impl<S: StreamSocket> StreamImpl<S> {
33 fn transport(&mut self) -> Transport<S> {
34 Transport {
35 sock: &mut self.socket,
36 inbuf: &mut self.inbuf,
37 outbuf: &mut self.outbuf,
38 }
39 }
40 fn action<M>(self, intent: Intent<M>,
41 scope: &mut Scope<M::Context>)
42 -> Response<Stream<M>, Void>
43 where M: Protocol<Socket=S>
44 {
45 match self._action(intent, scope) {
46 Ok(stream) => {
47 let dline = stream.deadline;
48 Response::ok(stream).deadline_opt(dline)
49 }
50 Err(Some(e)) => Response::error(e),
51 Err(None) => Response::done(),
52 }
53 }
54 fn _action<P>(mut self, intent: Intent<P>,
55 scope: &mut Scope<P::Context>)
56 -> Result<Stream<P>, Option<Box<Error>>>
57 where P: Protocol<Socket=S>
58 {
59 use Expectation::*;
60 let mut intent = try!(to_result(intent));
61 let mut can_write = match self.write() {
62 IoOp::Done => true,
63 IoOp::NoOp => false,
64 IoOp::Eos => {
65 self.outbuf.remove_range(..);
66 return Err(intent.0.fatal(
67 Exception::WriteError(io::Error::new(
68 WriteZero, "failed to write whole buffer")),
69 scope));
70 }
71 IoOp::Error(e) => {
72 return Err(intent.0.fatal(
73 Exception::WriteError(e),
74 scope));
75 }
76 };
77 'outer: loop {
78 if can_write {
79 can_write = match self.write() {
80 IoOp::Done => true,
81 IoOp::NoOp => false,
82 IoOp::Eos => {
83 self.outbuf.remove_range(..);
84 return Err(intent.0.fatal(
85 Exception::WriteError(io::Error::new(
86 WriteZero, "failed to write whole buffer")),
87 scope));
88 }
89 IoOp::Error(e) => {
90 self.outbuf.remove_range(..);
91 return Err(intent.0.fatal(
92 Exception::WriteError(e),
93 scope));
94 }
95 };
96 }
97 match intent.1 {
98 Bytes(num) => {
99 loop {
100 if self.inbuf.len() >= num {
101 intent = try!(to_result(intent.0.bytes_read(
102 &mut self.transport(),
103 num, scope)));
104 continue 'outer;
105 }
106 match self.read() {
107 IoOp::Done => {}
108 IoOp::NoOp => {
109 return Ok(Stream::compose(self, intent));
110 }
111 IoOp::Eos => {
112 intent = try!(to_result(intent.0.exception(
113 &mut self.transport(),
114 Exception::EndOfStream,
115 scope)));
116 continue 'outer;
117 }
118 IoOp::Error(e) => {
119 intent = try!(to_result(intent.0.exception(
120 &mut self.transport(),
121 Exception::ReadError(e),
122 scope)));
123 continue 'outer;
124 }
125 }
126 }
127 }
128 Delimiter(min, delim, max) => {
129 loop {
130 if self.inbuf.len() > min {
131 let opt = find_substr(&self.inbuf[min..], delim);
132 if let Some(num) = opt {
133 intent = try!(to_result(intent.0.bytes_read(
134 &mut self.transport(),
135 num, scope)));
136 continue 'outer;
137 }
138 }
139 if self.inbuf.len() > max {
140 intent = try!(to_result(intent.0.exception(
141 &mut self.transport(),
142 Exception::LimitReached,
143 scope)));
144 continue 'outer;
145 }
146 match self.read() {
147 IoOp::Done => {}
148 IoOp::NoOp => {
149 return Ok(Stream::compose(self, intent));
150 }
151 IoOp::Eos => {
152 intent = try!(to_result(intent.0.exception(
153 &mut self.transport(),
154 Exception::EndOfStream,
155 scope)));
156 continue 'outer;
157 }
158 IoOp::Error(e) => {
159 intent = try!(to_result(intent.0.exception(
160 &mut self.transport(),
161 Exception::ReadError(e),
162 scope)));
163 continue 'outer;
164 }
165 }
166 }
167 }
168 Flush(num) => {
169 if self.outbuf.len() <= num {
170 intent = try!(to_result(intent.0.bytes_flushed(
171 &mut self.transport(), scope)));
172 } else {
173 return Ok(Stream::compose(self, intent));
174 }
175 }
176 Sleep => {
177 return Ok(Stream::compose(self, intent));
178 }
179 }
180 }
181 }
182 fn read(&mut self) -> IoOp {
186 match self.inbuf.read_from(&mut self.socket) {
187 Ok(0) => IoOp::Eos,
188 Ok(_) => IoOp::Done,
189 Err(ref e) if e.kind() == BrokenPipe
190 || e.kind() == ConnectionReset
191 => return IoOp::Eos,
192 Err(ref e) if e.kind() == WouldBlock => IoOp::NoOp,
193 Err(e) => IoOp::Error(e),
194 }
195 }
196 fn write(&mut self) -> IoOp {
197 loop {
198 if self.outbuf.len() == 0 {
199 return IoOp::Done;
200 }
201 match self.outbuf.write_to(&mut self.socket) {
202 Ok(0) => return IoOp::Eos,
203 Ok(_) => continue,
204 Err(ref e) if e.kind() == BrokenPipe
205 || e.kind() == ConnectionReset
206 => return IoOp::Eos,
207 Err(ref e) if e.kind() == WouldBlock => return IoOp::NoOp,
208 Err(e) => return IoOp::Error(e),
209 }
210 }
211 }
212}
213
214impl<P: Protocol> Accepted for Stream<P>
215 where <P as Protocol>::Seed: Clone
216{
217 type Seed = <P as Protocol>::Seed;
218 type Socket = P::Socket;
219 fn accepted(sock: P::Socket, seed: <P as Protocol>::Seed,
220 scope: &mut Scope<Self::Context>)
221 -> Response<Self, Void>
222 {
223 Self::new(sock, seed, scope)
224 }
225}
226
227impl<P: Protocol> Stream<P> {
228 pub fn transport(&mut self) -> Transport<P::Socket> {
234 Transport {
235 sock: &mut self.socket,
236 inbuf: &mut self.inbuf,
237 outbuf: &mut self.outbuf,
238 }
239 }
240 fn decompose(self) -> (P, Expectation, Option<Time>, StreamImpl<P::Socket>)
241 {
242 (self.fsm, self.expectation, self.deadline, StreamImpl {
243 socket: self.socket,
244 connected: self.connected,
245 inbuf: self.inbuf,
246 outbuf: self.outbuf,
247 })
248 }
249 fn compose(implem: StreamImpl<P::Socket>,
250 (fsm, exp, dline): (P, Expectation, Option<Time>))
251 -> Stream<P>
252 {
253 Stream {
254 fsm: fsm,
255 socket: implem.socket,
256 expectation: exp,
257 connected: implem.connected,
258 deadline: dline,
259 inbuf: implem.inbuf,
260 outbuf: implem.outbuf,
261 }
262 }
263 pub fn new(mut sock: P::Socket, seed: P::Seed,
264 scope: &mut Scope<P::Context>)
265 -> Response<Self, Void>
266 {
267 if let Err(e) = scope.register(&sock,
275 EventSet::readable() | EventSet::writable(), PollOpt::edge())
276 {
277 return Response::error(Box::new(e));
279 }
280 let Intent(m, exp, dline) = P::create(seed, &mut sock, scope);
281 match m {
282 Err(None) => Response::error(Box::new(ProtocolStop)),
283 Err(Some(e)) => Response::error(e),
284 Ok(m) => {
285 Response::ok(Stream {
286 socket: sock,
287 expectation: exp,
288 connected: false,
289 deadline: dline,
290 fsm: m,
291 inbuf: Buf::new(),
292 outbuf: Buf::new(),
293 }).deadline_opt(dline)
294 }
295 }
296 }
297 pub fn connected(mut sock: P::Socket, seed: P::Seed,
298 scope: &mut Scope<P::Context>)
299 -> Response<Self, Void>
300 {
301 if let Err(e) = scope.reregister(&sock,
312 EventSet::readable() | EventSet::writable(), PollOpt::edge())
313 {
314 return Response::error(Box::new(e));
316 }
317 let Intent(m, exp, dline) = P::create(seed, &mut sock, scope);
318 match m {
319 Err(None) => Response::error(Box::new(ProtocolStop)),
320 Err(Some(e)) => Response::error(e),
321 Ok(m) => {
322 Response::ok(Stream {
323 socket: sock,
324 expectation: exp,
325 connected: true,
326 deadline: dline,
327 fsm: m,
328 inbuf: Buf::new(),
329 outbuf: Buf::new(),
330 }).deadline_opt(dline)
331 }
332 }
333 }
334}
335
336impl<P: Protocol> Machine for Stream<P>
337{
338 type Context = P::Context;
339 type Seed = Void;
340 fn create(void: Void, _scope: &mut Scope<Self::Context>)
341 -> Response<Self, Void>
342 {
343 unreachable(void);
344 }
345 fn ready(self, events: EventSet, scope: &mut Scope<Self::Context>)
346 -> Response<Self, Self::Seed>
347 {
348 let (fsm, exp, dline, mut imp) = self.decompose();
350 if !imp.connected && events.is_writable() {
351 match imp.socket.take_socket_error() {
352 Ok(()) => {}
353 Err(e) => {
354 match fsm.fatal(Exception::ConnectError(e), scope) {
355 Some(e) => return Response::error(e),
356 None => return Response::done(),
357 }
358 }
359 }
360 imp.connected = true;
361 }
362 imp.action(Intent(Ok(fsm), exp, dline), scope)
363 }
364 fn spawned(self, _scope: &mut Scope<Self::Context>)
365 -> Response<Self, Self::Seed>
366 {
367 unreachable!();
368 }
369 fn timeout(self, scope: &mut Scope<Self::Context>)
370 -> Response<Self, Self::Seed>
371 {
372 if scope.reached(self.deadline) {
373 let (fsm, _exp, _dline, mut imp) = self.decompose();
374 let res = fsm.timeout(&mut imp.transport(), scope);
375 imp.action(res, scope)
376 } else {
377 Response::ok(self)
380 }
381 }
382 fn wakeup(self, scope: &mut Scope<Self::Context>)
383 -> Response<Self, Self::Seed>
384 {
385 let (fsm, _exp, _dline, mut imp) = self.decompose();
386 let res = fsm.wakeup(&mut imp.transport(), scope);
387 imp.action(res, scope)
388 }
389}