rotor_stream/lib.rs
1//! Stream abstraction for rotor
2//!
3//! The crate provides:
4//!
5//! * Buffering for network sockets
6//! * Simple abstractions like read N bytes, read until '\n'
7//! * Persistent (auto-reconnecting) client connections
8//! * Abstraction for accepting connection on server-side
9//!
10//! Assumptions for streams:
11//!
12//! 1. You read data by length-prefixed or fixed-string-delimited chunks rather
13//! than byte-by-byte
14//! 2. Each chunk fits memory
15//! 3. Your data stream is not entirely full-duplex: while you can read and
16//! write simultaneously, when you apply pushback (i.e. waiting for bytes to
17//! be flushed), you can't do reads [*]
18//!
19//! [*] This matches HTTP perfectly, and most bidirectional interactive
20//! workflows (including based on websockets). But for some cases may be
21//! hard to implement. One such case is when you need to generate some
22//! output stream (you can't buffer it), and have to parse input stream at
23//! the same time.
24
25extern crate netbuf;
26extern crate memchr;
27extern crate rotor;
28#[macro_use] extern crate log;
29#[macro_use] extern crate quick_error;
30#[cfg(feature="replaceable")] extern crate rotor_tools;
31
32
33mod substr;
34mod transport;
35mod protocol;
36mod stream;
37mod accept;
38mod persistent;
39mod trait_impls;
40mod intention;
41mod extensions;
42mod errors;
43
44pub use protocol::{Protocol, Expectation, Exception};
45pub use accept::{Accepted};
46pub use persistent::{Persistent};
47pub use errors::ProtocolStop;
48
49use std::any::Any;
50use std::io;
51use std::io::{Read, Write};
52use std::error::Error;
53
54use rotor::{Evented, Time};
55use rotor::mio::{TryAccept};
56
57pub use netbuf::{Buf, MAX_BUF_SIZE};
58
59// Any is needed to use Stream as a Seed for Machine
60pub trait StreamSocket: Read + Write + Evented + SocketError + Sized + Any {}
61
62/// Transport is thing that provides buffered I/O for stream sockets
63///
64/// This is usually passed in all the `Protocol` handler methods. But in
65/// case you manipulate the transport by some external methods (like the
66/// one stored in `Arc<Mutex<Stream>>` you may wish to use `Stream::transport`
67/// or `Persistent::transport` methods to manipulate tranpsport. Just remember
68/// to **wake up** the state machine after manipulating buffers of transport.
69pub struct Transport<'a, S: StreamSocket> {
70 sock: &'a mut S,
71 inbuf: &'a mut Buf,
72 outbuf: &'a mut Buf,
73}
74
75/// Socket acceptor State Machine
76///
77/// TODO(tailhook) Currently this panics when there is no slab space when
78/// accepting a connection. This may be fixed by sleeping and retrying
79pub enum Accept<M, A: TryAccept+Sized>
80 where A::Output: StreamSocket,
81 M: Accepted<Socket=A::Output>,
82{
83 Server(A, <M as Accepted>::Seed),
84 Connection(M),
85}
86
87/// A main stream state machine abstaction
88///
89/// You may use the `Stream` directly. But it's recommented to either use
90/// `Persistent` for client connections or `Accept` for server-side
91/// connection processing.
92#[derive(Debug)]
93pub struct Stream<P: Protocol> {
94 socket: P::Socket,
95 fsm: P,
96 expectation: Expectation,
97 connected: bool,
98 deadline: Option<Time>,
99 inbuf: Buf,
100 outbuf: Buf,
101}
102
103struct StreamImpl<S: StreamSocket> {
104 socket: S,
105 connected: bool,
106 inbuf: Buf,
107 outbuf: Buf,
108}
109
110pub trait ActiveStream: StreamSocket {
111 type Address;
112 fn connect(addr: &Self::Address) -> io::Result<Self>;
113}
114
115pub trait SocketError {
116 fn take_socket_error(&self) -> io::Result<()>;
117}
118
119/// A structure that encapsulates a state machine and an expectation
120///
121/// It's usually built with a builder that starts with `Intent::of(machine)`.
122///
123/// Then you add an expectation:
124///
125/// ```ignore
126/// Intent::of(machine).expect_delimiter("\n", 1024)
127/// ```
128///
129/// And then you may add a timeout (in form of "deadline" or "absolute time"):
130///
131/// ```ignore
132/// Intent::of(machine).expect_bytes(100)
133/// .deadline(scope.now() + Duration::new(10, 0))
134/// ```
135///
136#[derive(Debug)]
137pub struct Intent<M>(Result<M, Option<Box<Error>>>, Expectation, Option<Time>);
138
139/// A helper class returned from `Intent::of()`
140///
141/// See the documentation of `Intent` for guide
142#[derive(Debug)]
143pub struct IntentBuilder<M>(M);