rust_asio 0.5.3

Asynchronous I/O library
Documentation
use std::mem;
use std::cmp::Ordering;
use std::sync::Mutex;
use unsafe_cell::{UnsafeBoxedCell};
use error::{ECANCELED, READY, ErrCode};
use clock::Expiry;
use super::{IoObject, IoService, Callback, ThreadInfo};

struct Op {
    expiry: Expiry,
    callback: Callback,
}

struct Entry {
    op: Option<Op>,
}

struct EntryPtr(*mut Entry);

impl Eq for EntryPtr {
}

impl PartialEq for EntryPtr {
    fn eq(&self, other: &Self) -> bool {
        self.0 == other.0
    }
}

impl Ord for EntryPtr {
    fn cmp(&self, other: &Self) -> Ordering {
        let lhs = &unsafe { &*self.0 }.op.as_ref().unwrap().expiry;
        let rhs = &unsafe { &*other.0 }.op.as_ref().unwrap().expiry;

        match lhs.cmp(rhs) {
            Ordering::Equal => self.0.cmp(&other.0),
            cmp => cmp,
        }
    }
}

impl PartialOrd for EntryPtr {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

unsafe impl Send for EntryPtr {
}

unsafe impl Sync for EntryPtr {
}

fn insert(queue: &mut Vec<EntryPtr>, ptr: *mut Entry) -> bool {
    debug_assert!(unsafe { &*ptr }.op.is_some());

    let key = EntryPtr(ptr);
    let idx = match queue.binary_search(&key) {
        Ok(len) => len + 1,
        Err(len) => len,
    };
    queue.insert(idx, key);
    idx == 0
}

fn remove(queue: &mut Vec<EntryPtr>, ptr: *mut Entry) -> bool {
    debug_assert!(unsafe { &*ptr }.op.is_some());

    let key = EntryPtr(ptr);
    let idx = queue.binary_search(&key).unwrap();
    queue.remove(idx);
    idx == 0
}

fn find_timeout(queue: &Vec<EntryPtr>, expiry: Expiry) -> usize {
    for (i, ptr) in queue.iter().enumerate() {
        if unsafe { &*ptr.0 }.op.as_ref().unwrap().expiry > expiry {
            return i;
        }
    }
    queue.len()
}

fn drain(queue: &mut Vec<EntryPtr>, len: usize, io: &IoService, ec: ErrCode) {
    for ptr in queue.drain(..len) {
        let Op { expiry:_, callback } = unsafe { &mut *ptr.0 }.op.take().unwrap();
        io.post(move |io| callback(io, ec));
    }
}

pub struct TimerQueue {
    mutex: Mutex<Vec<EntryPtr>>
}

impl TimerQueue {
    pub fn new() -> TimerQueue {
        TimerQueue {
            mutex: Mutex::new(Vec::new()),
        }
    }

    fn set(&self, ptr: *mut Entry, mut op: Op, is_first: &mut bool) -> Option<Callback> {
        let mut queue = self.mutex.lock().unwrap();
        if let Some(old_op) = unsafe { &mut *ptr }.op.as_mut() {
            remove(&mut queue, ptr);
            mem::swap(old_op, &mut op);
            *is_first = insert(&mut queue, ptr);
            Some(op.callback)
        } else {
            unsafe { &mut *ptr }.op = Some(op);
            *is_first = insert(&mut queue, ptr);
            None
        }
    }

    fn unset(&self, ptr: *mut Entry, expiry_opt: &mut Option<Expiry>) -> Option<Callback> {
        let mut queue = self.mutex.lock().unwrap();
        if let Some(_) = unsafe { &mut *ptr }.op.as_mut() {
            if remove(&mut queue, ptr) {
                *expiry_opt = Some(
                    if let Some(ptr) = queue.first() {
                        unsafe { &*ptr.0 }.op.as_ref().unwrap().expiry
                    } else {
                        Default::default()
                    }
                )
            }
            let Op { expiry:_, callback } = unsafe { &mut *ptr }.op.take().unwrap();
            Some(callback)
        } else {
            None
        }
    }

    pub fn cancel_all(&self, io: &IoService) {
        let mut queue = self.mutex.lock().unwrap();
        let len = queue.len();
        drain(&mut queue, len, io, ECANCELED);
    }

    pub fn ready_expired(&self, io: &IoService) -> usize {
        let mut queue = self.mutex.lock().unwrap();
        let len = find_timeout(&queue, Expiry::now());
        drain(&mut queue, len, io, READY);
        queue.len()
    }
}

pub struct TimerActor {
    io: IoService,
    ptr: UnsafeBoxedCell<Entry>,
}

impl TimerActor {
    pub fn new(io: &IoService) -> TimerActor {
        TimerActor {
            io: io.clone(),
            ptr: UnsafeBoxedCell::new(Entry { op: None }),
        }
    }

    pub fn set_wait(&self, expiry: Expiry, callback: Callback) {
        let mut is_first = false;
        let op = Op { expiry: expiry, callback: callback };
        if let Some(callback) = self.io.0.queue.set(unsafe { self.ptr.get() }, op, &mut is_first) {
            self.io.post(|io| callback(io, ECANCELED));
        }
        if is_first {
            self.io.0.ctrl.reset_timeout(expiry)
        }
    }

    pub fn unset_wait(&self) -> Option<Callback> {
        let mut expiry_opt = None;
        let callback_opt = self.io.0.queue.unset(unsafe { self.ptr.get() }, &mut expiry_opt);
        if let Some(expiry) = expiry_opt {
            self.io.0.ctrl.reset_timeout(expiry);
        }
        callback_opt
    }
}

unsafe impl IoObject for TimerActor {
    fn io_service(&self) -> &IoService {
        &self.io
    }
}

#[test]
fn test_timer_set_unset() {
    let io = &IoService::new();
    let act = TimerActor::new(io);
    act.set_wait(Expiry::now(), Box::new(|_,_| {}));
    assert!(act.unset_wait().is_some());
}