1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//! Stream abstraction for rotor
//!
//! The crate provides:
//!
//! * Buffering for network sockets
//! * Simple abstractions like read N bytes, read until '\n'
//! * Persistent (auto-reconnecting) client connections
//! * Abstraction for accepting connection on server-side
//!
//! Assumptions for streams:
//!
//! 1. You read data by length-prefixed or fixed-string-delimited chunks rather
//!    than byte-by-byte
//! 2. Each chunk fits memory
//! 3. Your data stream is not entirely full-duplex: while you can read and
//!    write simultaneously, when you apply pushback (i.e. waiting for bytes to
//!    be flushed), you can't do reads [*]
//!
//! [*] This matches HTTP perfectly, and most bidirectional interactive
//!     workflows (including based on websockets). But for some cases may be
//!     hard to implement. One such case is when you need to generate some
//!     output stream (you can't buffer it), and have to parse input stream at
//!     the same time.

extern crate netbuf;
extern crate memchr;
extern crate rotor;
#[macro_use] extern crate log;
#[macro_use] extern crate quick_error;
#[cfg(feature="replaceable")] extern crate rotor_tools;


mod substr;
mod transport;
mod protocol;
mod stream;
mod accept;
mod persistent;
mod trait_impls;
mod intention;
mod extensions;
mod errors;

pub use protocol::{Protocol, Expectation, Exception};
pub use accept::{Accepted};
pub use persistent::{Persistent};
pub use errors::ProtocolStop;

use std::any::Any;
use std::io;
use std::io::{Read, Write};
use std::error::Error;

use rotor::{Evented, Time};
use rotor::mio::{TryAccept};

pub use netbuf::{Buf, MAX_BUF_SIZE};

// Any is needed to use Stream as a Seed for Machine
pub trait StreamSocket: Read + Write + Evented + SocketError + Sized + Any {}

/// Transport is thing that provides buffered I/O for stream sockets
///
/// This is usually passed in all the `Protocol` handler methods. But in
/// case you manipulate the transport by some external methods (like the
/// one stored in `Arc<Mutex<Stream>>` you may wish to use `Stream::transport`
/// or `Persistent::transport` methods to manipulate tranpsport. Just remember
/// to **wake up** the state machine after manipulating buffers of transport.
pub struct Transport<'a, S: StreamSocket> {
    sock: &'a mut S,
    inbuf: &'a mut Buf,
    outbuf: &'a mut Buf,
}

/// Socket acceptor State Machine
///
/// TODO(tailhook) Currently this panics when there is no slab space when
/// accepting a connection. This may be fixed by sleeping and retrying
pub enum Accept<M, A: TryAccept+Sized>
    where A::Output: StreamSocket,
          M: Accepted<Socket=A::Output>,
{
    Server(A, <M as Accepted>::Seed),
    Connection(M),
}

/// A main stream state machine abstaction
///
/// You may use the `Stream` directly. But it's recommented to either use
/// `Persistent` for client connections or `Accept` for server-side
/// connection processing.
#[derive(Debug)]
pub struct Stream<P: Protocol> {
    socket: P::Socket,
    fsm: P,
    expectation: Expectation,
    connected: bool,
    deadline: Option<Time>,
    inbuf: Buf,
    outbuf: Buf,
}

struct StreamImpl<S: StreamSocket> {
    socket: S,
    connected: bool,
    inbuf: Buf,
    outbuf: Buf,
}

pub trait ActiveStream: StreamSocket {
    type Address;
    fn connect(addr: &Self::Address) -> io::Result<Self>;
}

pub trait SocketError {
    fn take_socket_error(&self) -> io::Result<()>;
}

/// A structure that encapsulates a state machine and an expectation
///
/// It's usually built with a builder that starts with `Intent::of(machine)`.
///
/// Then you add an expectation:
///
/// ```ignore
/// Intent::of(machine).expect_delimiter("\n", 1024)
/// ```
///
/// And then you may add a timeout (in form of "deadline" or "absolute time"):
///
/// ```ignore
/// Intent::of(machine).expect_bytes(100)
///     .deadline(scope.now() + Duration::new(10, 0))
/// ```
///
#[derive(Debug)]
pub struct Intent<M>(Result<M, Option<Box<Error>>>, Expectation, Option<Time>);

/// A helper class returned from `Intent::of()`
///
/// See the documentation of `Intent` for guide
#[derive(Debug)]
pub struct IntentBuilder<M>(M);