use std::{mem, thread};
use std::error::Error as StdError;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use std::cell::UnsafeCell;
use std::sync::{Arc, Mutex};
use std::net::{TcpStream, TcpListener};
use std::os::unix::io::{RawFd, AsRawFd, IntoRawFd};
use libc;
use errno::errno;
use threadpool::ThreadPool;
use simple_slab::Slab;
use types::*;
use config::Config;
use super::Handler;
const DEFAULT_EVENTS: i32 = libc::EPOLLIN |
libc::EPOLLRDHUP |
libc::EPOLLET |
libc::EPOLLONESHOT;
const MAX_EVENTS: i32 = 100;
#[allow(non_upper_case_globals)]
static mut epfd: RawFd = 0 as RawFd;
pub fn begin(handler: Box<Handler>, cfg: Config) {
info!("Starting server...");
let event_handler = EventHandler(Box::into_raw(handler));
let new_connection_slab = Arc::new(Mutex::new(Slab::<Connection>::with_capacity(10)));
let mut_slab = MutSlab {
inner: UnsafeCell::new(Slab::<Arc<Connection>>::with_capacity(cfg.pre_allocated))
};
let connection_slab = Arc::new(mut_slab);
let threads = cfg.max_threads;
let eh_clone = event_handler.clone();
let new_connections = new_connection_slab.clone();
unsafe {
thread::Builder::new()
.name("Event Loop".to_string())
.spawn(move || {
event_loop(new_connections, connection_slab, eh_clone, threads)
})
.unwrap();
}
let eh_clone = event_handler.clone();
let listener_thread = unsafe {
thread::Builder::new()
.name("TcpListener Loop".to_string())
.spawn(move || { listener_loop(cfg, new_connection_slab, eh_clone) })
.unwrap()
};
let _ = listener_thread.join();
}
unsafe fn listener_loop(cfg: Config, new_connections: NewConnectionSlab, handler: EventHandler) {
info!("Starting incoming TCP connection listener...");
let listener_result = TcpListener::bind((&cfg.addr[..], cfg.port));
if listener_result.is_err() {
let err = listener_result.unwrap_err();
error!("Creating TcpListener: {}", err);
panic!();
}
let listener = listener_result.unwrap();
setup_listener_options(&listener, handler.clone());
info!("Incoming TCP conecction listener started");
for accept_attempt in listener.incoming() {
match accept_attempt {
Ok(tcp_stream) => handle_new_connection(tcp_stream, &new_connections, handler.clone()),
Err(e) => error!("Accepting connection: {}", e)
};
}
debug!("Dropping TcpListener");
drop(listener);
}
unsafe fn setup_listener_options(listener: &TcpListener, handler: EventHandler) {
info!("Setting up listener options");
let fd = listener.as_raw_fd();
let EventHandler(handler_ptr) = handler;
(*handler_ptr).on_server_created(fd);
}
unsafe fn handle_new_connection(tcp_stream: TcpStream,
new_connections: &NewConnectionSlab,
handler: EventHandler)
{
debug!("New connection received");
let fd = tcp_stream.into_raw_fd();
let EventHandler(handler_ptr) = handler;
let arc_stream = (*handler_ptr).on_new_connection(fd);
let connection = Connection {
fd: fd,
err_mutex: Mutex::new(None),
tx_mutex: Mutex::new(()),
stream: arc_stream
};
let mut slab = match (*new_connections).lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
(&mut *slab).insert(connection);
}
unsafe fn event_loop(new_connections: NewConnectionSlab,
connection_slab: ConnectionSlab,
handler: EventHandler,
threads: usize)
{
info!("Event loop starting...");
const MAX_WAIT: i32 = 1000;
info!("Creating epoll instance...");
let result = libc::epoll_create(1);
if result < 0 {
let err = Error::from_raw_os_error(errno().0 as i32);
error!("Creating epoll instance: {}", err);
panic!();
}
epfd = result;
info!("Epoll instance created with fd: {}", epfd);
info!("Creating I/O threadpool with {} threads", threads);
let thread_pool = ThreadPool::new(threads);
let arc_io_queue = Arc::new(Mutex::new(Vec::<IoPair>::with_capacity(MAX_EVENTS as usize)));
let t_pool_clone = thread_pool.clone();
let handler_clone = handler.clone();
let io_queue = arc_io_queue.clone();
thread::Builder::new()
.name("I/O Sentinel".to_string())
.spawn(move || {
io_sentinel(io_queue, t_pool_clone, handler_clone)
})
.unwrap();
let mut event_buffer = Vec::<libc::epoll_event>::with_capacity(MAX_EVENTS as usize);
event_buffer.set_len(MAX_EVENTS as usize);
info!("Starting epoll_wait loop...");
loop {
remove_stale_connections(&connection_slab, &thread_pool, &handler);
insert_new_connections(&new_connections, &connection_slab);
let result = libc::epoll_wait(epfd, event_buffer.as_mut_ptr(), MAX_EVENTS, MAX_WAIT);
if result < 0 {
let err = Error::from_raw_os_error(errno().0 as i32);
error!("During epoll_wait: {}", err);
panic!();
}
let num_events = result as usize;
update_io_events(&connection_slab, &arc_io_queue, &event_buffer[0..num_events]);
}
}
unsafe fn remove_stale_connections(connection_slab: &ConnectionSlab,
thread_pool: &ThreadPool,
handler: &EventHandler)
{
let slab_ptr = (*connection_slab).inner.get();
let mut x: isize = 0;
while x < (*slab_ptr).len() as isize {
let err_state = {
let state = (*slab_ptr)[x as usize].err_mutex.lock().unwrap();
if state.is_some() {
let err_kind = (*state).as_ref().unwrap().kind();
let err_desc = (*state).as_ref().unwrap().description();
Some(Error::new(err_kind, err_desc))
} else {
None
}
};
err_state.map(|e| {
trace!("Found stale connection");
let arc_connection = (*slab_ptr).remove(x as usize);
close_connection(&arc_connection);
let fd = arc_connection.fd;
let handler_clone = (*handler).clone();
thread_pool.execute(move || {
let EventHandler(ptr) = handler_clone;
(*ptr).on_connection_removed(fd, e);
});
x -= 1;
});
x += 1;
}
}
unsafe fn close_connection(connection: &Arc<Connection>) {
let fd = (*connection).fd;
debug!("Closing fd: {}", fd);
let result = libc::close(fd);
if result < 0 {
let err = Error::from_raw_os_error(errno().0 as i32);
error!("Closing fd: {} {}", fd, err);
}
}
unsafe fn insert_new_connections(new_connections: &NewConnectionSlab,
connection_slab: &ConnectionSlab)
{
let mut new_slab = match new_connections.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let num_connections = (&mut *new_slab).len();
let arc_main_slab = (*connection_slab).inner.get();
for _ in 0..num_connections {
let connection = (&mut *new_slab).remove(0);
let arc_connection = Arc::new(connection);
(*arc_main_slab).insert(arc_connection.clone());
add_connection_to_epoll(&arc_connection);
}
}
unsafe fn add_connection_to_epoll(arc_connection: &Arc<Connection>) {
let fd = (*arc_connection).fd;
debug!("Adding fd {} to epoll", fd);
let result = libc::epoll_ctl(epfd,
libc::EPOLL_CTL_ADD,
fd,
&mut libc::epoll_event {
events: DEFAULT_EVENTS as u32,
u64: fd as u64
});
if result < 0 {
let err = Error::from_raw_os_error(errno().0 as i32);
error!("Adding fd: {} to epoll: {}", fd, err);
let mut err_state = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_state = Some(err);
}
}
unsafe fn rearm_connection_in_epoll(arc_connection: &Arc<Connection>, flags: i32) {
let fd = arc_connection.fd;
let events = DEFAULT_EVENTS | flags;
trace!("EPOLL_CTL_MOD fd: {} flags: {:#b}", fd, (flags as u32));
let result = libc::epoll_ctl(epfd,
libc::EPOLL_CTL_MOD,
fd,
&mut libc::epoll_event { events: events as u32, u64: fd as u64 });
if result < 0 {
let err = Error::from_raw_os_error(errno().0 as i32);
error!("EPOLL_CTL_MOD fd: {} {}", fd, err);
let mut err_state = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_state = Some(err);
}
}
unsafe fn update_io_events(connection_slab: &ConnectionSlab,
arc_io_queue: &IoQueue,
events: &[libc::epoll_event])
{
const READ_EVENT: u32 = libc::EPOLLIN as u32;
const WRITE_EVENT: u32 = libc::EPOLLOUT as u32;
const CLOSE_EVENT: u32 = (libc::EPOLLRDHUP | libc::EPOLLERR | libc::EPOLLHUP) as u32;
for event in events.iter() {
let fd = event.u64 as RawFd;
trace!("Epoll event for fd: {} flags: {:#b}", fd, event.events);
let find_result = find_connection_from_fd(fd, connection_slab);
if find_result.is_err() {
error!("Unable to find fd {} in ConnectionSlab", fd);
continue;
}
let arc_connection = find_result.unwrap();
let close_event = (event.events & CLOSE_EVENT) > 0;
if close_event {
{ let mut err_opt = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_opt = Some(Error::new(ErrorKind::ConnectionAborted, "ConnectionAborted"));
} continue;
}
let read_available = (event.events & READ_EVENT) > 0;
let write_available = (event.events & WRITE_EVENT) > 0;
if !read_available && !write_available {
trace!("Event was neither read nor write: assuming hangup");
{ let mut err_opt = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_opt = Some(Error::new(ErrorKind::ConnectionAborted, "ConnectionAborted"));
} continue;
}
let io_event = if read_available && write_available {
trace!("Event: RW");
IoEvent::ReadWriteAvailable
} else if read_available {
trace!("Event: R");
IoEvent::ReadAvailable
} else {
trace!("Event W");
IoEvent::WriteAvailable
};
let io_pair = IoPair {
event: io_event,
arc_connection: arc_connection
};
trace!("Adding event to queue");
{ let mut io_queue = match arc_io_queue.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
(*io_queue).push(io_pair);
} }
}
unsafe fn find_connection_from_fd(fd: RawFd,
connection_slab: &ConnectionSlab)
-> Result<Arc<Connection>, ()>
{
let slab_ptr = (*connection_slab).inner.get();
for ref arc_connection in (*slab_ptr).iter() {
if (*arc_connection).fd == fd {
return Ok((*arc_connection).clone());
}
}
Err(())
}
unsafe fn io_sentinel(arc_io_queue: IoQueue, thread_pool: ThreadPool, handler: EventHandler) {
info!("Starting I/O Sentinel");
let _100ms = 1000000 * 100;
let wait_interval = Duration::new(0, _100ms);
loop {
thread::sleep(wait_interval);
let io_queue;
{ let mut queue = match arc_io_queue.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let empty_queue = Vec::<IoPair>::with_capacity(MAX_EVENTS as usize);
io_queue = mem::replace(&mut (*queue), empty_queue);
}
if io_queue.len() > 0 {
trace!("Processing {} I/O events", io_queue.len());
}
for ref io_pair in io_queue.iter() {
let io_event = io_pair.event.clone();
let handler_clone = handler.clone();
let arc_connection = io_pair.arc_connection.clone();
thread_pool.execute(move || {
let mut rearm_events = 0i32;
if io_event == IoEvent::WriteAvailable
|| io_event == IoEvent::ReadWriteAvailable
{
let flags = handle_write_event(arc_connection.clone());
if flags == -1 {
return;
}
rearm_events |= flags;
}
if io_event == IoEvent::ReadAvailable
|| io_event == IoEvent::ReadWriteAvailable
{
let flags = handle_read_event(arc_connection.clone(), handler_clone);
if flags == -1 {
return;
}
rearm_events |= flags;
}
rearm_connection_in_epoll(&arc_connection, rearm_events);
});
}
}
}
unsafe fn handle_write_event(arc_connection: Arc<Connection>) -> i32 {
debug!("Handling a write backlog event...");
let err;
{ let _ = match arc_connection.tx_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
let stream_ptr = arc_connection.stream.get();
let empty = Vec::<u8>::new();
let write_result = (*stream_ptr).send(&empty[..]);
if write_result.is_ok() {
debug!("Cleared backlog");
return 0i32;
}
err = write_result.unwrap_err();
if err.kind() == ErrorKind::WouldBlock {
debug!("Backlog still not cleared, returning EPOLLOUT flags for fd");
return libc::EPOLLOUT;
}
}
{ let mut err_state = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_state = Some(err);
}
return -1i32;
}
unsafe fn handle_read_event(arc_connection: Arc<Connection>, handler: EventHandler) -> i32 {
trace!("Handling read event");
let stream_ptr = arc_connection.stream.get();
match (*stream_ptr).recv() {
Ok(mut queue) => {
trace!("Read {} msgs", queue.len());
for msg in queue.drain(..) {
let EventHandler(ptr) = handler;
let hydrogen_socket = HydrogenSocket::new(arc_connection.clone(),
rearm_connection_in_epoll);
(*ptr).on_data_received(hydrogen_socket, msg);
}
return libc::EPOLLIN;
}
Err(err) => {
let kind = err.kind();
if kind == ErrorKind::WouldBlock {
trace!("ErrorKind::WouldBlock");
return libc::EPOLLIN;
}
if kind != ErrorKind::UnexpectedEof
&& kind != ErrorKind::ConnectionReset
&& kind != ErrorKind::ConnectionAborted
{
error!("Unexpected during recv: {}", err);
} else {
debug!("Received during read: {}", err);
}
{ let mut err_state = match arc_connection.err_mutex.lock() {
Ok(g) => g,
Err(p) => p.into_inner()
};
*err_state = Some(err);
} }
};
return -1i32;
}