rust_asio 0.6.0

Asynchronous I/O library
Documentation
use unsafe_cell::UnsafeBoxedCell;
use ffi::{AsRawFd};
use core::{Reactor, ThreadIoContext, TimerContext, Expiry, TimerQueue, Operation, IntrFd};

use std::io;
use std::ptr;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use libc::{c_int, timespec, CLOCK_MONOTONIC};

#[repr(C)]
pub struct itimerspec {
    pub it_interval: timespec,
    pub it_value: timespec,
}

const TFD_CLOEXEC: c_int = 0o2000000;
const TFD_NONBLOCK: c_int = 0o4000;
const TFD_TIMER_ABSTIME: c_int = 1 << 0;

extern {
    fn timerfd_create(
        clkid: c_int,
        flags: c_int
    ) -> c_int;

    fn timerfd_settime(
        fd: c_int,
        flags: c_int,
        new_value: *const itimerspec,
        old_value: *mut itimerspec
    ) -> c_int;

    fn timerfd_gettime(
        fd: c_int,
        curr_value:
        *mut itimerspec
    ) -> c_int;
}

pub struct TimerFdScheduler {
    tfd: IntrFd,
    mutex: Mutex<TimerQueue>,
    outstanding_work: Arc<AtomicUsize>,
}

impl TimerFdScheduler {
    pub fn new(outstanding_work: Arc<AtomicUsize>) -> io::Result<Self> {
        let tfd = libc_try!(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC));
        Ok(TimerFdScheduler {
            tfd: IntrFd::new::<Self>(tfd),
            mutex: Default::default(),
            outstanding_work: outstanding_work,
        })
    }

    pub fn startup(&self, ctx: &Reactor) {
        ctx.register_intr_fd(&self.tfd)
    }

    pub fn cleanup(&self, ctx: &Reactor) {
        ctx.deregister_intr_fd(&self.tfd)
    }

    pub fn wait_duration(&self, max: Duration) -> Duration {
        max
    }

    pub fn timer_queue_insert(&self, mut timer: UnsafeBoxedCell<TimerContext>, op: Operation) -> Option<Operation> {
        let (old_op, expiry) = {
            let mut tq = self.mutex.lock().unwrap();
            let old_op = timer.op.take();
            timer.op = Some(op);
            (old_op, tq.insert(&timer))
        };

        if let Some(expiry) = expiry {
            do_reset_timeout(&self.tfd, expiry)
        }

        if old_op.is_none() {
            self.outstanding_work.fetch_add(1, Ordering::SeqCst);
        }
        old_op
    }

    pub fn timer_queue_remove(&self, mut timer: UnsafeBoxedCell<TimerContext>) -> Option<Operation> {
        let mut tq = self.mutex.lock().unwrap();
        let old_op = timer.op.take();
        if let Some(expiry) = tq.erase(&timer) {
            do_reset_timeout(&self.tfd, expiry);
        }
        if old_op.is_some() {
            self.outstanding_work.fetch_sub(1, Ordering::SeqCst);
        }
        old_op
    }

    pub fn cancel_all_timers(&self, this: &mut ThreadIoContext) {
        self.mutex.lock().unwrap().cancel_all_timers(this)
    }

    pub fn get_ready_timers(&self, this: &mut ThreadIoContext) {
        let len = this.len();
        if let Some(expiry) = {
            let mut tq = self.mutex.lock().unwrap();
            tq.get_ready_timers(this, Expiry::now());
            tq.front()
        } {
            do_reset_timeout(&self.tfd, expiry);
        }
        self.outstanding_work.fetch_sub(this.len() - len, Ordering::SeqCst);
    }
}

fn do_reset_timeout<T>(t: &T, expiry: Expiry)
    where T: AsRawFd,
{
    let expiry = expiry.abs();
    let new_value = itimerspec {
        it_interval: timespec { tv_sec: 0, tv_nsec: 0 },
        it_value: timespec {
            tv_sec: expiry.as_secs() as i64,
            tv_nsec: expiry.subsec_nanos() as i64,
        },
    };
    libc_ign!(timerfd_settime(
        t.as_raw_fd(),
        TFD_TIMER_ABSTIME,
        &new_value, ptr::null_mut()
    ));
}