use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use log::debug;
use mio::event::*;
use mio::*;
use quick_error::*;
pub const EVENT_MANAGER_TICK: u64 = 10;
quick_error! {
#[derive(Debug)]
pub enum EventError {
InvalidEvent {
description("Invalid event")
display(r#"Invalid event"#)
}
RegistryError {
description("Registry error")
display(r#"Registry error"#)
}
PollError {
description("Poll error")
display(r#"Poll error"#)
}
NotReadable {
description("Not readable")
display(r#"Not readable"#)
}
NotWritable {
description("Not writable")
display(r#"Not writable"#)
}
ConnectError(s: String) {
description("Connect error")
display(r#"Connect error {}"#, s)
}
ReadError(s: String) {
description("Read Error")
display(r#"Read Error {}"#, s)
}
WriteError(s: String) {
description("Write error")
display(r#"Write error {}"#, s)
}
NoStream {
description("No stream")
display(r#"No stream"#)
}
UdsServerError(s: String) {
description("UdsServerError")
display(r#"UdsServerError {}"#, s)
}
ChannelError(s: String) {
description("Channel error")
display(r#"Channel error {}"#, s)
}
SystemShutdown {
description("System shutdown")
display(r#"System shutdown"#)
}
}
}
pub enum EventType {
SimpleEvent,
ReadEvent,
WriteEvent,
TimerEvent,
ChannelEvent,
ErrorEvent,
}
impl EventType {
pub fn to_string(&self) -> &str {
match *self {
EventType::SimpleEvent => "Simple Event",
EventType::ReadEvent => "Read Event",
EventType::WriteEvent => "Write Event",
EventType::TimerEvent => "Timer Event",
EventType::ChannelEvent => "Channel Event",
EventType::ErrorEvent => "Error Event",
}
}
}
impl fmt::Debug for EventType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.to_string())
}
}
pub trait EventHandler
where
Self: Send,
Self: Sync,
{
fn handle(&self, event_type: EventType) -> Result<(), EventError>;
fn set_token(&self, _token: Token) {
}
fn get_token(&self) -> Token {
Token(0)
}
}
pub struct EventManager {
simple: RefCell<SimplePoller>,
fdesc: RefCell<FdescPoller>,
timer: RefCell<TimerPoller>,
channel: RefCell<ChannelPoller>,
}
impl Drop for EventManager {
fn drop(&mut self) {
println!("Drop EventManager");
}
}
impl EventManager {
pub fn new() -> EventManager {
EventManager {
simple: RefCell::new(SimplePoller::new()),
fdesc: RefCell::new(FdescPoller::new()),
timer: RefCell::new(TimerPoller::new()),
channel: RefCell::new(ChannelPoller::new()),
}
}
pub fn shutdown(&self) {
self.simple.borrow_mut().release();
self.fdesc.borrow_mut().release();
self.timer.borrow_mut().release();
self.channel.borrow_mut().release();
}
pub fn register_low(&self, handler: Arc<dyn EventHandler>) {
self.simple.borrow_mut().register_low(handler);
}
pub fn register_high(&self, handler: Arc<dyn EventHandler>) {
self.simple.borrow_mut().register_high(handler);
}
pub fn poll_low(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut simple = self.simple.borrow_mut();
simple
.low
.drain(..)
.map(|h| (EventType::SimpleEvent, h))
.collect()
}
pub fn poll_high(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut simple = self.simple.borrow_mut();
simple
.high
.drain(..)
.map(|h| (EventType::SimpleEvent, h))
.collect()
}
pub fn register_listen(
&self,
fd: &mut dyn Source,
handler: Arc<dyn EventHandler>,
) -> Result<(), EventError> {
let mut fdesc = self.fdesc.borrow_mut();
let index = fdesc.index;
let token = Token(index);
fdesc.handlers.insert(token, handler);
fdesc.index += 1;
if let Err(_) = fdesc
.poll
.registry()
.register(fd, token, Interest::READABLE)
{
Err(EventError::RegistryError)
} else {
Ok(())
}
}
pub fn register_read_write(
&self,
fd: &mut dyn Source,
handler: Arc<dyn EventHandler>,
) -> Result<(), EventError> {
debug!("register_read_write");
let mut fdesc = self.fdesc.borrow_mut();
let index = fdesc.index;
let token = Token(index);
handler.set_token(token);
fdesc.handlers.insert(token, handler);
fdesc.index += 1;
if let Err(_) =
fdesc
.poll
.registry()
.register(fd, token, Interest::READABLE | Interest::WRITABLE)
{
Err(EventError::RegistryError)
} else {
Ok(())
}
}
pub fn register_read(
&self,
fd: &mut dyn Source,
handler: Arc<dyn EventHandler>,
) -> Result<(), EventError> {
debug!("register_read");
let mut fdesc = self.fdesc.borrow_mut();
let index = fdesc.index;
let token = Token(index);
handler.set_token(token);
fdesc.handlers.insert(token, handler);
fdesc.index += 1;
if let Err(_) = fdesc
.poll
.registry()
.register(fd, token, Interest::READABLE)
{
Err(EventError::RegistryError)
} else {
Ok(())
}
}
pub fn register_write(
&self,
fd: &mut dyn Source,
handler: Arc<dyn EventHandler>,
) -> Result<(), EventError> {
debug!("register_write");
let mut fdesc = self.fdesc.borrow_mut();
let index = fdesc.index;
let token = Token(index);
handler.set_token(token);
fdesc.handlers.insert(token, handler);
fdesc.index += 1;
if let Err(_) = fdesc
.poll
.registry()
.register(fd, token, Interest::WRITABLE)
{
Err(EventError::RegistryError)
} else {
Ok(())
}
}
pub fn unregister_read(&self, fd: &mut dyn Source, token: Token) {
let mut fdesc = self.fdesc.borrow_mut();
let _e = fdesc.handlers.remove(&token);
fdesc.poll.registry().deregister(fd).unwrap();
}
pub fn poll_get_events(&self) -> Events {
let mut fdesc = self.fdesc.borrow_mut();
let mut events = Events::with_capacity(1024);
let timeout = fdesc.timeout;
fdesc.poll.poll(&mut events, Some(timeout)).unwrap();
events
}
pub fn poll_get_handler(&self, event: &Event) -> Option<Arc<dyn EventHandler>> {
let fdesc = self.fdesc.borrow_mut();
match fdesc.handlers.get(&event.token()) {
Some(handler) => Some(handler.clone()),
None => None,
}
}
pub fn poll_fd(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let events = self.poll_get_events();
let mut vec = Vec::new();
for event in events.iter() {
if let Some(handler) = self.poll_get_handler(event) {
if event.is_readable() {
vec.push((EventType::ReadEvent, handler));
} else if event.is_writable() {
vec.push((EventType::WriteEvent, handler));
} else {
vec.push((EventType::ErrorEvent, handler));
};
}
}
vec
}
pub fn register_timer(&self, d: Duration, handler: Arc<dyn EventHandler>) {
debug!("register_timer");
let timers = self.timer.borrow();
timers.register(d, handler);
}
pub fn poll_timer(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut vec = Vec::new();
let timer = self.timer.borrow_mut();
while let Some(handler) = timer.poll() {
vec.push((EventType::TimerEvent, handler));
}
vec
}
pub fn register_channel(&self, handler: Box<dyn ChannelHandler>) {
debug!("register_channel");
self.channel.borrow_mut().register_handler(handler);
}
pub fn poll_channel(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
self.channel.borrow_mut().poll_channel()
}
pub fn poll(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut vec = Vec::new();
vec.append(&mut self.poll_high());
vec.append(&mut self.poll_fd());
vec.append(&mut self.poll_channel());
vec.append(&mut self.poll_timer());
vec.append(&mut self.poll_low());
vec
}
}
pub trait EventRunner {
fn run(&self, events: Vec<(EventType, Arc<dyn EventHandler>)>) -> Result<(), EventError>;
}
pub struct SimpleRunner {}
impl SimpleRunner {
pub fn new() -> SimpleRunner {
SimpleRunner {}
}
pub fn sleep(&self) {
thread::sleep(Duration::from_millis(EVENT_MANAGER_TICK));
}
}
impl EventRunner for SimpleRunner {
fn run(&self, events: Vec<(EventType, Arc<dyn EventHandler>)>) -> Result<(), EventError> {
let mut result = Ok(());
for (event_type, handler) in events {
debug!("Event {:?}", event_type);
result = handler.handle(event_type);
}
result
}
}
pub fn poll_and_run(manager: &mut EventManager, runner: Box<dyn EventRunner>) {
loop {
let events = manager.poll();
let result = runner.run(events);
match result {
Err(EventError::SystemShutdown) => break,
_ => {}
}
}
}
struct SimplePoller {
high: Vec<Arc<dyn EventHandler>>,
low: Vec<Arc<dyn EventHandler>>,
}
impl SimplePoller {
pub fn new() -> SimplePoller {
SimplePoller {
high: Vec::new(),
low: Vec::new(),
}
}
pub fn release(&mut self) {
self.high.drain(..);
self.low.drain(..);
}
pub fn register_low(&mut self, handler: Arc<dyn EventHandler>) {
self.low.push(handler);
}
pub fn register_high(&mut self, handler: Arc<dyn EventHandler>) {
self.high.push(handler);
}
}
struct FdescPoller {
index: usize,
handlers: HashMap<Token, Arc<dyn EventHandler>>,
poll: Poll,
timeout: Duration,
}
impl FdescPoller {
pub fn new() -> FdescPoller {
FdescPoller {
index: 1,
handlers: HashMap::new(),
poll: Poll::new().unwrap(),
timeout: Duration::from_millis(EVENT_MANAGER_TICK),
}
}
pub fn release(&mut self) {
self.handlers.drain();
}
}
pub struct TimerHandler {
exp: Instant,
handler: Arc<dyn EventHandler>,
}
impl TimerHandler {
pub fn new(d: Duration, handler: Arc<dyn EventHandler>) -> TimerHandler {
TimerHandler {
exp: Instant::now() + d,
handler,
}
}
pub fn expiration(&self) -> Instant {
self.exp
}
}
impl Ord for TimerHandler {
fn cmp(&self, other: &Self) -> Ordering {
other.expiration().cmp(&self.expiration())
}
}
impl PartialOrd for TimerHandler {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for TimerHandler {}
impl PartialEq for TimerHandler {
fn eq(&self, other: &Self) -> bool {
other.expiration() == self.expiration()
}
}
struct TimerPoller {
heap: RefCell<BinaryHeap<TimerHandler>>,
}
impl TimerPoller {
pub fn new() -> TimerPoller {
TimerPoller {
heap: RefCell::new(BinaryHeap::new()),
}
}
pub fn release(&mut self) {
self.heap.borrow_mut().drain();
}
pub fn register(&self, d: Duration, handler: Arc<dyn EventHandler>) {
let timer_handler = TimerHandler::new(d, handler);
self.heap.borrow_mut().push(timer_handler);
}
fn pop_if_expired(&self) -> Option<TimerHandler> {
if match self.heap.borrow_mut().peek() {
Some(handler) if handler.expiration() < Instant::now() => true,
_ => false,
} {
self.heap.borrow_mut().pop()
} else {
None
}
}
pub fn poll(&self) -> Option<Arc<dyn EventHandler>> {
match self.pop_if_expired() {
Some(timer_handler) => Some(timer_handler.handler),
None => None,
}
}
}
struct ChannelPoller {
handlers: RefCell<Vec<Box<dyn ChannelHandler>>>,
}
impl ChannelPoller {
pub fn new() -> ChannelPoller {
ChannelPoller {
handlers: RefCell::new(Vec::new()),
}
}
pub fn release(&mut self) {
self.handlers.borrow_mut().drain(..);
}
pub fn register_handler(&self, handler: Box<dyn ChannelHandler>) {
self.handlers.borrow_mut().push(handler);
}
pub fn poll_channel(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut vec = Vec::new();
for handler in self.handlers.borrow().iter() {
let mut tmp = (*handler).poll_channel();
vec.append(&mut tmp);
}
vec
}
}
pub trait ChannelHandler {
fn poll_channel(&self) -> Vec<(EventType, Arc<dyn EventHandler>)>;
}
#[cfg(test)]
mod tests {
use super::*;
use mio::net::UnixListener;
use mio::net::UnixStream;
use std::fs::*;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Mutex;
use std::thread;
use log::error;
struct TestState {
vec: Vec<u32>,
}
impl TestState {
pub fn new() -> TestState {
TestState { vec: Vec::new() }
}
}
struct TestHandler {
priority: u32,
state: Arc<Mutex<TestState>>,
}
impl EventHandler for TestHandler {
fn handle(&self, event_type: EventType) -> Result<(), EventError> {
match event_type {
EventType::SimpleEvent => {
let state = self.state.clone();
let mut state = state.lock().unwrap();
state.vec.push(self.priority);
}
_ => {
assert!(false);
}
}
Ok(())
}
}
#[test]
pub fn test_simple_event() {
let em = EventManager::new();
let state = Arc::new(Mutex::new(TestState::new()));
let handler1 = TestHandler {
state: state.clone(),
priority: 100,
};
let handler2 = TestHandler {
state: state.clone(),
priority: 200,
};
em.register_low(Arc::new(handler1));
em.register_high(Arc::new(handler2));
let runner = SimpleRunner::new();
let events = em.poll();
runner.run(events).unwrap();
let state = state.lock().unwrap();
assert_eq!(state.vec[0], 200);
assert_eq!(state.vec[1], 100);
}
struct TestListener {
accept: Mutex<bool>,
}
impl EventHandler for TestListener {
fn handle(&self, event_type: EventType) -> Result<(), EventError> {
match event_type {
EventType::ReadEvent => {
*self.accept.lock().unwrap() = true;
debug!("Listener got a connection");
}
_ => {
assert!(false);
}
}
Ok(())
}
}
#[test]
pub fn test_fd_event() {
let path = Path::new("/tmp/test_uds.sock");
remove_file(&path).unwrap();
let em = EventManager::new();
let mut listener = UnixListener::bind(path).unwrap();
let eh = Arc::new(TestListener {
accept: Mutex::new(false),
});
em.register_listen(&mut listener, eh.clone()).unwrap();
let _ = thread::spawn(move || {
match UnixStream::connect(&path) {
Ok(_stream) => {
debug!("Stream");
}
Err(_) => {
error!("Connect error");
}
}
()
});
let runner = SimpleRunner::new();
let events = em.poll();
runner.run(events).unwrap();
assert_eq!(*eh.accept.lock().unwrap(), true);
}
struct TimerEntry {
done: Mutex<bool>,
}
impl TimerEntry {
pub fn new() -> TimerEntry {
TimerEntry {
done: Mutex::new(false),
}
}
pub fn done(&self) -> bool {
*self.done.lock().unwrap()
}
}
impl EventHandler for TimerEntry {
fn handle(&self, event_type: EventType) -> Result<(), EventError> {
match event_type {
EventType::TimerEvent => {
*self.done.lock().unwrap() = true;
}
_ => {
assert!(false);
}
}
Ok(())
}
}
#[test]
pub fn test_timer_event() {
let em = EventManager::new();
let d = Duration::from_secs(1);
let tc = Arc::new(TimerEntry::new());
let runner = SimpleRunner::new();
em.register_timer(d, tc.clone());
let events = em.poll();
runner.run(events).unwrap();
assert_eq!(tc.done(), false);
thread::sleep(Duration::from_millis(1100));
let events = em.poll();
runner.run(events).unwrap();
assert_eq!(tc.done(), true);
}
pub enum TestMessage {
Number(i32),
Desc(String),
}
pub struct TestMessageState {
number: Option<i32>,
desc: Option<String>,
}
impl TestMessageState {
pub fn new() -> TestMessageState {
TestMessageState {
number: None,
desc: None,
}
}
}
pub struct TestChannelHandler {
receiver: mpsc::Receiver<TestMessage>,
state: Arc<Mutex<TestMessageState>>,
}
impl TestChannelHandler {
pub fn new(
receiver: mpsc::Receiver<TestMessage>,
state: Arc<Mutex<TestMessageState>>,
) -> TestChannelHandler {
TestChannelHandler {
receiver: receiver,
state: state,
}
}
}
impl ChannelHandler for TestChannelHandler {
fn poll_channel(&self) -> Vec<(EventType, Arc<dyn EventHandler>)> {
let mut vec = Vec::new();
while let Ok(d) = self.receiver.try_recv() {
let handler = TestMessageHandler::new(d, self.state.clone());
vec.push((EventType::ChannelEvent, handler));
}
vec
}
}
pub struct TestMessageHandler {
message: TestMessage,
state: Arc<Mutex<TestMessageState>>,
}
impl TestMessageHandler {
pub fn new(
message: TestMessage,
state: Arc<Mutex<TestMessageState>>,
) -> Arc<dyn EventHandler> {
Arc::new(TestMessageHandler {
message: message,
state: state,
})
}
}
impl EventHandler for TestMessageHandler {
fn handle(&self, event_type: EventType) -> Result<(), EventError> {
match event_type {
EventType::ChannelEvent => match &self.message {
TestMessage::Number(i) => {
let state = self.state.clone();
let mut state = state.lock().unwrap();
(*state).number.replace(*i);
}
TestMessage::Desc(s) => {
let state = self.state.clone();
let mut state = state.lock().unwrap();
(*state).desc.replace(s.clone());
}
},
_ => assert!(false),
}
Ok(())
}
}
#[test]
pub fn test_channel_event() {
let em = EventManager::new();
let runner = SimpleRunner::new();
let (sender, receiver) = mpsc::channel::<TestMessage>();
let state = Arc::new(Mutex::new(TestMessageState::new()));
let channel_handler = TestChannelHandler::new(receiver, state.clone());
em.register_channel(Box::new(channel_handler));
thread::spawn(move || {
sender.send(TestMessage::Number(100)).unwrap();
});
let events = em.poll();
runner.run(events).unwrap();
let state = state.lock().unwrap();
assert_eq!(state.number, Some(100));
assert_eq!(state.desc, None);
}
}