hydrogen
Documentation
hydrogen is a non-blocking socket server framework built atop epoll.
It takes care of the tedious connection and I/O marshaling across threads, and
leaves the specifics of I/O reading and writing up the consumer, through trait
implementations.
Streams
hydrogen works with Stream
trait Objects so any custom type can be used.
simple-stream was built in conjunction and offers several
stream abstractions and types including Plain and Secured streams with basic
and WebSocket framing.
Multithreaded
hydrogen is multithreaded. It uses one thread for accepting incoming
connections, one for updating epoll reported event, and one used for
marshalling I/O into a threadpool of a user specified size.
Slab allocation
The connection pool is managed as a slab, which means traversal times are
similar to traversing a Vector, with an insertion and removal time of O(1).
Examples
extern crate hydrogen;
extern crate simple_stream as ss;
use hydrogen;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
use ss::frame::Frame;
use ss::frame::simple::{SimpleFrame, SimpleFrameBuilder};
use ss::{Socket, Plain, NonBlocking, SocketOptions};
#[derive(Clone)]
pub struct Stream {
inner: Plain<Socket, SimpleFrameBuilder>
}
impl HydrogenStream for Stream {
fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
match self.inner.nb_recv() {
Ok(frame_vec) => {
let mut ret_buf = Vec::<Vec<u8>>::with_capacity(frame_vec.len());
for frame in frame_vec.iter() {
ret_buf.push(frame.payload());
}
Ok(ret_buf)
}
Err(e) => Err(e)
}
}
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
let frame = SimpleFrame::new(buf);
self.inner.nb_send(&frame)
}
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown()
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
std::net::TcpStream
extern crate hydrogen;
use std::cell::UnsafeCell;
use std::io::{Read, Write, Error, ErrorKind};
use std::net::{TcpStream, Shutdown};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
pub struct Stream {
inner: TcpStream
}
impl Stream {
pub fn from_tcp_stream(tcp_stream: TcpStream) -> Stream {
tcp_stream.set_nonblocking(true);
Stream {
inner: tcp_stream
}
}
}
impl HydrogenStream for Stream {
fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
let mut msgs = Vec::<Vec<u8>>::new();
let mut total_read = Vec::<u8>::new();
loop {
let mut buf = [0u8; 4098];
let read_result = self.inner.read(&mut buf);
if read_result.is_err() {
let err = read_result.unwrap_err();
if err.kind() == ErrorKind::WouldBlock {
break;
}
return Err(err);
}
let num_read = read_result.unwrap();
total_read.extend_from_slice(&buf[0..num_read]);
}
msgs.push(total_read);
return Ok(msgs);
}
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
self.inner.write_all(buf)
}
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown(Shutdown::Both)
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
Author
Nathan Sizemore, nathanrsizemore@gmail.com
License
hydrogen is available under the MPL-2.0 license. See the LICENSE file for more info.