use std::fmt;
use std::boxed::FnBox;
use std::sync::{Mutex, Condvar};
use std::sync::atomic::{Ordering, AtomicBool, AtomicUsize};
use std::collections::VecDeque;
use unsafe_cell::{UnsafeRefCell};
use error::{READY, ECANCELED};
use super::{IoService, Reactor, TimerQueue, Control, CallStack, ThreadInfo};
type Callback = Box<FnBox(*const IoService) + Send + 'static>;
pub struct IoServiceImpl {
mutex: Mutex<VecDeque<Callback>>,
condvar: Condvar,
stopped: AtomicBool,
outstanding_work: AtomicUsize,
nthreads: AtomicUsize,
pub react: Reactor,
pub queue: TimerQueue,
pub ctrl: Control,
}
impl IoServiceImpl {
pub fn new() -> IoServiceImpl {
IoServiceImpl {
mutex: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
stopped: AtomicBool::new(false),
outstanding_work: AtomicUsize::new(0),
nthreads: AtomicUsize::new(0),
react: Reactor::new(),
queue: TimerQueue::new(),
ctrl: Control::new(),
}
}
pub fn running_in_this_thread(&self) -> bool {
CallStack::contains()
}
fn count(&self) -> usize {
let task = self.mutex.lock().unwrap();
task.len()
}
pub fn stopped(&self) -> bool {
self.stopped.load(Ordering::Relaxed)
}
pub fn stop(&self) {
if !self.stopped.swap(true, Ordering::SeqCst) {
let mut _task = self.mutex.lock().unwrap();
self.ctrl.interrupt();
self.condvar.notify_all();
}
}
pub fn reset(&self) {
self.stopped.store(false, Ordering::SeqCst);
}
pub fn dispatch<F>(&self, io: &IoService, func: F)
where F: FnOnce(&IoService) + Send + 'static
{
if self.running_in_this_thread() {
func(io);
} else {
self.post(func)
}
}
pub fn post<F>(&self, func: F)
where F: FnOnce(&IoService) + Send + 'static
{
let mut task = self.mutex.lock().unwrap();
task.push_back(Box::new(move |io: *const IoService| func(unsafe { &*io })));
self.condvar.notify_one();
}
fn wait(&self) -> Option<Callback> {
let mut task = self.mutex.lock().unwrap();
loop {
let stoppable = self.outstanding_work.load(Ordering::Relaxed) == 0
|| self.stopped.load(Ordering::Relaxed);
if let Some(callback) = task.pop_front() {
return Some(callback);
} else if stoppable {
return None
}
task = self.condvar.wait(task).unwrap();
}
}
fn event_loop(io: &IoService) {
if io.stopped() {
io.0.react.cancel_all(io);
io.0.queue.cancel_all(io);
io.0.ctrl.stop(io);
} else {
io.post(move |io| {
let mut count = io.0.outstanding_work.load(Ordering::Relaxed);
let timeout = if count > 0 && io.0.nthreads.load(Ordering::Relaxed) > 1 {
Some(io.0.ctrl.wait_duration(200000))
} else {
None
};
count += io.0.react.poll(timeout, io);
count += io.0.queue.ready_expired(io);
if count == 0 && io.0.count() == 0 {
io.0.stop();
}
Self::event_loop(io);
});
}
}
pub fn run(&self, io: &IoService) {
if self.stopped() {
return;
}
let thread_info = match ThreadInfo::new() {
None => return,
Some(thread_info) => thread_info,
};
self.nthreads.fetch_add(1, Ordering::SeqCst);
if self.ctrl.start(io) {
Self::event_loop(io);
}
while let Some(func) = self.wait() {
func(io);
}
self.nthreads.fetch_sub(1, Ordering::SeqCst);
}
pub fn work_started(&self) {
self.outstanding_work.fetch_add(1, Ordering::SeqCst);
}
pub fn work_finished(&self) -> bool {
self.outstanding_work.fetch_sub(1, Ordering::SeqCst) == 1
}
}
impl fmt::Debug for IoServiceImpl {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TaskIoService")
}
}