rotor_stream/
stream.rs

1use std::io;
2use std::error::Error;
3use std::io::ErrorKind::{WouldBlock, BrokenPipe, WriteZero, ConnectionReset};
4
5use rotor::{Response, Scope, Machine, EventSet, PollOpt, Time};
6use rotor::void::{Void, unreachable};
7
8use substr::find_substr;
9use extensions::{ScopeExt, ResponseExt};
10use {Expectation, Protocol, StreamSocket, Stream, StreamImpl};
11use {Buf, Transport, Accepted, Exception, Intent};
12use {ProtocolStop, SocketError};
13
14
15#[derive(Debug)]
16enum IoOp {
17    Done,
18    NoOp,
19    Eos,
20    Error(io::Error),
21}
22
23fn to_result<P: Protocol>(intent: Intent<P>)
24    -> Result<(P, Expectation, Option<Time>), Option<Box<Error>>>
25{
26    match intent.0 {
27        Ok(x) => Ok((x, intent.1, intent.2)),
28        Err(e) => Err(e),
29    }
30}
31
32impl<S: StreamSocket> StreamImpl<S> {
33    fn transport(&mut self) -> Transport<S> {
34        Transport {
35            sock: &mut self.socket,
36            inbuf: &mut self.inbuf,
37            outbuf: &mut self.outbuf,
38        }
39    }
40    fn action<M>(self, intent: Intent<M>,
41        scope: &mut Scope<M::Context>)
42        -> Response<Stream<M>, Void>
43        where M: Protocol<Socket=S>
44    {
45        match self._action(intent, scope) {
46            Ok(stream) => {
47                let dline = stream.deadline;
48                Response::ok(stream).deadline_opt(dline)
49            }
50            Err(Some(e)) => Response::error(e),
51            Err(None) => Response::done(),
52        }
53    }
54    fn _action<P>(mut self, intent: Intent<P>,
55        scope: &mut Scope<P::Context>)
56        -> Result<Stream<P>, Option<Box<Error>>>
57        where P: Protocol<Socket=S>
58    {
59        use Expectation::*;
60        let mut intent = try!(to_result(intent));
61        let mut can_write = match self.write() {
62            IoOp::Done => true,
63            IoOp::NoOp => false,
64            IoOp::Eos => {
65                self.outbuf.remove_range(..);
66                return Err(intent.0.fatal(
67                    Exception::WriteError(io::Error::new(
68                        WriteZero, "failed to write whole buffer")),
69                    scope));
70            }
71            IoOp::Error(e) => {
72                return Err(intent.0.fatal(
73                    Exception::WriteError(e),
74                    scope));
75            }
76        };
77        'outer: loop {
78            if can_write {
79                can_write = match self.write() {
80                    IoOp::Done => true,
81                    IoOp::NoOp => false,
82                    IoOp::Eos => {
83                        self.outbuf.remove_range(..);
84                        return Err(intent.0.fatal(
85                            Exception::WriteError(io::Error::new(
86                                WriteZero, "failed to write whole buffer")),
87                            scope));
88                    }
89                    IoOp::Error(e) => {
90                        self.outbuf.remove_range(..);
91                        return Err(intent.0.fatal(
92                            Exception::WriteError(e),
93                            scope));
94                    }
95                };
96            }
97            match intent.1 {
98                Bytes(num) => {
99                    loop {
100                        if self.inbuf.len() >= num {
101                            intent = try!(to_result(intent.0.bytes_read(
102                                &mut self.transport(),
103                                num, scope)));
104                            continue 'outer;
105                        }
106                        match self.read() {
107                            IoOp::Done => {}
108                            IoOp::NoOp => {
109                                return Ok(Stream::compose(self, intent));
110                            }
111                            IoOp::Eos => {
112                                intent = try!(to_result(intent.0.exception(
113                                    &mut self.transport(),
114                                    Exception::EndOfStream,
115                                    scope)));
116                                continue 'outer;
117                            }
118                            IoOp::Error(e) => {
119                                intent = try!(to_result(intent.0.exception(
120                                    &mut self.transport(),
121                                    Exception::ReadError(e),
122                                    scope)));
123                                continue 'outer;
124                            }
125                        }
126                    }
127                }
128                Delimiter(min, delim, max) => {
129                    loop {
130                        if self.inbuf.len() > min {
131                            let opt = find_substr(&self.inbuf[min..], delim);
132                            if let Some(num) = opt {
133                                intent = try!(to_result(intent.0.bytes_read(
134                                    &mut self.transport(),
135                                    num, scope)));
136                                continue 'outer;
137                            }
138                        }
139                        if self.inbuf.len() > max {
140                            intent = try!(to_result(intent.0.exception(
141                                &mut self.transport(),
142                                Exception::LimitReached,
143                                scope)));
144                            continue 'outer;
145                        }
146                        match self.read() {
147                            IoOp::Done => {}
148                            IoOp::NoOp => {
149                                return Ok(Stream::compose(self, intent));
150                            }
151                            IoOp::Eos => {
152                                intent = try!(to_result(intent.0.exception(
153                                    &mut self.transport(),
154                                    Exception::EndOfStream,
155                                    scope)));
156                                continue 'outer;
157                            }
158                            IoOp::Error(e) => {
159                                intent = try!(to_result(intent.0.exception(
160                                    &mut self.transport(),
161                                    Exception::ReadError(e),
162                                    scope)));
163                                continue 'outer;
164                            }
165                        }
166                    }
167                }
168                Flush(num) => {
169                    if self.outbuf.len() <= num {
170                        intent = try!(to_result(intent.0.bytes_flushed(
171                            &mut self.transport(), scope)));
172                    } else {
173                        return Ok(Stream::compose(self, intent));
174                    }
175                }
176                Sleep => {
177                    return Ok(Stream::compose(self, intent));
178                }
179            }
180        }
181    }
182    // Returns Ok(true) to if we have read something, does not loop for reading
183    // because this might use whole memory, and we may parse and consume the
184    // input instead of buffering it whole.
185    fn read(&mut self) -> IoOp {
186        match self.inbuf.read_from(&mut self.socket) {
187            Ok(0) => IoOp::Eos,
188            Ok(_) => IoOp::Done,
189            Err(ref e) if e.kind() == BrokenPipe
190                       || e.kind() == ConnectionReset
191            => return IoOp::Eos,
192            Err(ref e) if e.kind() == WouldBlock => IoOp::NoOp,
193            Err(e) => IoOp::Error(e),
194        }
195    }
196    fn write(&mut self) -> IoOp {
197        loop {
198            if self.outbuf.len() == 0 {
199                return IoOp::Done;
200            }
201            match self.outbuf.write_to(&mut self.socket) {
202                Ok(0) => return IoOp::Eos,
203                Ok(_) => continue,
204                Err(ref e) if e.kind() == BrokenPipe
205                           || e.kind() == ConnectionReset
206                => return IoOp::Eos,
207                Err(ref e) if e.kind() == WouldBlock => return IoOp::NoOp,
208                Err(e) => return IoOp::Error(e),
209            }
210        }
211    }
212}
213
214impl<P: Protocol> Accepted for Stream<P>
215    where <P as Protocol>::Seed: Clone
216{
217    type Seed = <P as Protocol>::Seed;
218    type Socket = P::Socket;
219    fn accepted(sock: P::Socket, seed: <P as Protocol>::Seed,
220        scope: &mut Scope<Self::Context>)
221        -> Response<Self, Void>
222    {
223        Self::new(sock, seed, scope)
224    }
225}
226
227impl<P: Protocol> Stream<P> {
228    /// Get a `Transport` object for the stream
229    ///
230    /// This method is only useful  if you want to manipulate buffers
231    /// externally (like pushing to the buffer from another thread). Just be
232    /// sure to **wake up** state machine after manipulating buffers.
233    pub fn transport(&mut self) -> Transport<P::Socket> {
234        Transport {
235            sock: &mut self.socket,
236            inbuf: &mut self.inbuf,
237            outbuf: &mut self.outbuf,
238        }
239    }
240    fn decompose(self) -> (P, Expectation, Option<Time>, StreamImpl<P::Socket>)
241    {
242        (self.fsm, self.expectation, self.deadline, StreamImpl {
243            socket: self.socket,
244            connected: self.connected,
245            inbuf: self.inbuf,
246            outbuf: self.outbuf,
247        })
248    }
249    fn compose(implem: StreamImpl<P::Socket>,
250        (fsm, exp, dline): (P, Expectation, Option<Time>))
251        -> Stream<P>
252    {
253        Stream {
254            fsm: fsm,
255            socket: implem.socket,
256            expectation: exp,
257            connected: implem.connected,
258            deadline: dline,
259            inbuf: implem.inbuf,
260            outbuf: implem.outbuf,
261        }
262    }
263    pub fn new(mut sock: P::Socket, seed: P::Seed,
264        scope: &mut Scope<P::Context>)
265        -> Response<Self, Void>
266    {
267        // Always register everything in edge-triggered mode.
268        // This allows to never reregister socket.
269        //
270        // The no-reregister strategy is not a goal (although, it's expected
271        // to lower number of syscalls for many request-reply-like protocols)
272        // but it allows to have single source of truth for
273        // readable()/writable() mask (no duplication in kernel space)
274        if let Err(e) = scope.register(&sock,
275            EventSet::readable() | EventSet::writable(), PollOpt::edge())
276        {
277            // TODO(tailhook) wrap it to more clear error
278            return Response::error(Box::new(e));
279        }
280        let Intent(m, exp, dline) = P::create(seed, &mut sock, scope);
281        match m {
282            Err(None) => Response::error(Box::new(ProtocolStop)),
283            Err(Some(e)) => Response::error(e),
284            Ok(m) => {
285                Response::ok(Stream {
286                    socket: sock,
287                    expectation: exp,
288                    connected: false,
289                    deadline: dline,
290                    fsm: m,
291                    inbuf: Buf::new(),
292                    outbuf: Buf::new(),
293                }).deadline_opt(dline)
294            }
295        }
296    }
297    pub fn connected(mut sock: P::Socket, seed: P::Seed,
298        scope: &mut Scope<P::Context>)
299        -> Response<Self, Void>
300    {
301        // Always register everything in edge-triggered mode.
302        // This allows to never reregister socket.
303        //
304        // The no-reregister strategy is not a goal (although, it's expected
305        // to lower number of syscalls for many request-reply-like protocols)
306        // but it allows to have single source of truth for
307        // readable()/writable() mask (no duplication in kernel space)
308        //
309        // We reregister here, because we assume that higher level abstraction
310        // has the socket already registered (perhaps `Persistent` machine)
311        if let Err(e) = scope.reregister(&sock,
312            EventSet::readable() | EventSet::writable(), PollOpt::edge())
313        {
314            // TODO(tailhook) wrap it to more clear error
315            return Response::error(Box::new(e));
316        }
317        let Intent(m, exp, dline) = P::create(seed, &mut sock, scope);
318        match m {
319            Err(None) => Response::error(Box::new(ProtocolStop)),
320            Err(Some(e)) => Response::error(e),
321            Ok(m) => {
322                Response::ok(Stream {
323                    socket: sock,
324                    expectation: exp,
325                    connected: true,
326                    deadline: dline,
327                    fsm: m,
328                    inbuf: Buf::new(),
329                    outbuf: Buf::new(),
330                }).deadline_opt(dline)
331            }
332        }
333    }
334}
335
336impl<P: Protocol> Machine for Stream<P>
337{
338    type Context = P::Context;
339    type Seed = Void;
340    fn create(void: Void, _scope: &mut Scope<Self::Context>)
341        -> Response<Self, Void>
342    {
343        unreachable(void);
344    }
345    fn ready(self, events: EventSet, scope: &mut Scope<Self::Context>)
346        -> Response<Self, Self::Seed>
347    {
348        // TODO(tailhook) use `events` to optimize reading
349        let (fsm, exp, dline, mut imp) = self.decompose();
350        if !imp.connected && events.is_writable() {
351            match imp.socket.take_socket_error() {
352                Ok(()) => {}
353                Err(e) => {
354                    match fsm.fatal(Exception::ConnectError(e), scope) {
355                        Some(e) => return Response::error(e),
356                        None => return Response::done(),
357                    }
358                }
359            }
360            imp.connected = true;
361        }
362        imp.action(Intent(Ok(fsm), exp, dline), scope)
363    }
364    fn spawned(self, _scope: &mut Scope<Self::Context>)
365        -> Response<Self, Self::Seed>
366    {
367        unreachable!();
368    }
369    fn timeout(self, scope: &mut Scope<Self::Context>)
370        -> Response<Self, Self::Seed>
371    {
372        if scope.reached(self.deadline) {
373            let (fsm, _exp, _dline, mut imp) = self.decompose();
374            let res = fsm.timeout(&mut imp.transport(), scope);
375            imp.action(res, scope)
376        } else {
377            // TODO(tailhook) in rotor 0.6 should be no spurious timeouts
378            // anymore, but let's keep it until we remove Scope::timeout_ms()
379            Response::ok(self)
380        }
381    }
382    fn wakeup(self, scope: &mut Scope<Self::Context>)
383        -> Response<Self, Self::Seed>
384    {
385        let (fsm, _exp, _dline, mut imp) = self.decompose();
386        let res = fsm.wakeup(&mut imp.transport(), scope);
387        imp.action(res, scope)
388    }
389}