#![allow(deprecated)]
#[cfg(feature = "mio")]
mod service_mio;
#[cfg(not(feature = "mio"))]
mod service_non_mio;
#[cfg(feature = "mio")]
mod worker;
use std::cell::Cell;
use std::{fmt, error};
#[cfg(feature = "mio")]
use mio::deprecated::{EventLoop, NotifyError};
#[cfg(feature = "mio")]
use mio::Token;
thread_local! {
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
#[derive(Debug)]
pub enum IoError {
#[cfg(feature = "mio")]
Mio(::std::io::Error),
StdIo(::std::io::Error),
}
impl fmt::Display for IoError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
#[cfg(feature = "mio")]
IoError::Mio(ref std_err) => std_err.fmt(f),
IoError::StdIo(ref std_err) => std_err.fmt(f),
}
}
}
impl error::Error for IoError {
fn description(&self) -> &str {
"IO error"
}
}
impl From<::std::io::Error> for IoError {
fn from(err: ::std::io::Error) -> IoError {
IoError::StdIo(err)
}
}
#[cfg(feature = "mio")]
impl<Message> From<NotifyError<service_mio::IoMessage<Message>>> for IoError where Message: Send {
fn from(_err: NotifyError<service_mio::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
}
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
fn initialize(&self, _io: &IoContext<Message>) {}
fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
#[cfg(feature = "mio")]
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
#[cfg(feature = "mio")]
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
#[cfg(feature = "mio")]
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
#[cfg(feature = "mio")]
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
#[cfg(feature = "mio")]
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
#[cfg(feature = "mio")]
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
}
#[cfg(feature = "mio")]
pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER};
#[cfg(not(feature = "mio"))]
pub use crate::service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, atomic},
thread,
time::Duration,
};
use super::*;
#[test]
#[cfg_attr(feature = "mio", ignore)]
fn send_message_to_handler() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
assert_eq!(message.data, 5);
self.0.store(true, atomic::Ordering::SeqCst);
}
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
service.send_message(MyMessage { data: 5 }).unwrap();
thread::sleep(Duration::from_secs(1));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn timeout_working() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer_once(1234, Duration::from_millis(500)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
}
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn multi_timeout_working() {
struct MyHandler(atomic::AtomicUsize);
#[derive(Clone)]
struct MyMessage {
data: u32
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(1234, Duration::from_millis(500)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
self.0.fetch_add(1, atomic::Ordering::SeqCst);
}
}
let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
}
}