rust_asio 0.1.0

Asynchronous I/O Library
Documentation
use std::io;
use std::boxed::FnBox;
use std::sync::Mutex;
use IoService;
use ops::*;

pub enum HandlerResult {
    Ready,
    Canceled,
}

pub type Handler = Box<FnBox(*const IoService, HandlerResult) + Send + 'static>;

pub type TaskHandler = Box<FnBox(*const IoService) + Send + 'static>;

mod expiry;
pub use self::expiry::*;

mod task_executor;
pub use self::task_executor::*;

mod timer_queue;
pub use self::timer_queue::*;

mod epoll_reactor;
pub use self::epoll_reactor::*;

struct BackboneCache {
    handler_vec: Vec<(usize, Handler)>,
}

struct BackboneCtrl {
    polling: bool,
    event_fd: EpollIntrActor,
    timer_fd: EpollIntrActor,
}

pub struct Backbone {
    pub task: TaskExecutor,
    queue: TimerQueue,
    epoll: EpollReactor,
    ctrl: Mutex<BackboneCtrl>,
}

impl Backbone {
    pub fn new() -> io::Result<Backbone> {
        let event_fd = {
            let fd = try!(eventfd(0));
            EpollIntrActor::new(fd)
        };
        let timer_fd = {
            let fd = try!(timerfd_create(CLOCK_MONOTONIC));
            EpollIntrActor::new(fd)
        };
        Ok(Backbone {
            task: TaskExecutor::new(),
            queue: TimerQueue::new(),
            epoll: try!(EpollReactor::new()),
            ctrl: Mutex::new(BackboneCtrl {
                polling: false,
                event_fd: event_fd,
                timer_fd: timer_fd,
            }),
        })
    }

    pub fn stop(&self) {
        self.task.stop();
        self.interrupt();
    }

    fn interrupt(&self) {
        let ctrl = self.ctrl.lock().unwrap();
        if ctrl.polling {
            write(&ctrl.event_fd, &[1,0,0,0,0,0,0,0]).unwrap();
        }
    }

    fn reset_timeout(&self, expiry: Expiry) {
        let ctrl = self.ctrl.lock().unwrap();
        if ctrl.polling {
            let new_value = itimerspec {
                it_interval: timespec { tv_sec: 0, tv_nsec: 0 },
                it_value: expiry.wait_monotonic_timespec(),
            };
            timerfd_settime(&ctrl.timer_fd, TFD_TIMER_ABSTIME, &new_value).unwrap();
        }
    }

    pub fn post(&self, id: usize, callback: TaskHandler) {
        self.task.post(id, callback);
    }

    pub fn run(io: &IoService) {
        if {
            let mut ctrl = io.0.ctrl.lock().unwrap();
            if ctrl.polling {
                false
            } else {
                ctrl.event_fd.set_intr(io);
                ctrl.timer_fd.set_intr(io);
                ctrl.polling = true;
                true
            }
        } {
            Self::dispatch(io, Box::new(BackboneCache {
                handler_vec: Vec::new(),
            }));
        }

        while let Some((id, callback)) = io.0.task.do_run_one() {
            callback(io);
            io.0.task.pop(id);
        }
    }

    fn dispatch(io: &IoService, mut data: Box<BackboneCache>) {
        if io.stopped() {
            io.0.epoll.drain_all(&mut data.handler_vec);
            io.0.queue.drain_all(&mut data.handler_vec);
            for (id, callback) in data.handler_vec.drain(..) {
                io.0.task.post(id, Box::new(move |io| callback(io, HandlerResult::Canceled)));
            }

            let mut ctrl = io.0.ctrl.lock().unwrap();
            ctrl.polling = false;
            ctrl.event_fd.unset_intr(&io);
            ctrl.timer_fd.unset_intr(&io);
        } else {
            io.post(move |io| {
                let block = io.0.task.is_work();
                let mut count = io.0.epoll.poll(block, &mut data.handler_vec);
                count += io.0.queue.drain_expired(&mut data.handler_vec);
                count += data.handler_vec.len();
                for (id, callback) in data.handler_vec.drain(..) {
                    io.0.task.post(id, Box::new(move |io| callback(io, HandlerResult::Ready)));
                }

                if !block && count == 0 && io.0.task.count() == 0 {
                    io.0.task.stop();
                }

                Self::dispatch(&io, data);
            });
        }
    }
}