use prelude::Protocol;
use ffi::{RawFd, AsRawFd};
use error::ErrCode;
use std::io;
use std::sync::Arc;
pub unsafe trait AsIoContext {
fn as_ctx(&self) -> &IoContext;
}
#[derive(Clone, Debug)]
pub struct IoContext(Arc<Impl>);
impl IoContext {
pub fn new() -> io::Result<IoContext> {
Impl::new()
}
pub fn dispatch<F>(&self, func: F)
where F: FnOnce(&IoContext) + Send + 'static
{
self.do_dispatch(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
}
#[doc(hidden)]
pub fn do_dispatch<F>(&self, func: F)
where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
{
Impl::do_dispatch(self, func)
}
pub fn post<F>(&self, func: F)
where F: FnOnce(&IoContext) + Send + 'static
{
self.do_post(move|ctx: &IoContext, _: &mut ThreadIoContext| func(ctx))
}
#[doc(hidden)]
pub fn do_post<F>(&self, func: F)
where F: FnOnce(&IoContext, &mut ThreadIoContext) + Send + 'static
{
Impl::do_post(self, func)
}
pub fn restart(&self) -> bool {
Impl::restart(self)
}
pub fn run(&self) -> usize {
Impl::run(self)
}
pub fn run_one(&self) -> usize {
Impl::run_one(self)
}
pub fn stop(&self) {
Impl::stop(self)
}
pub fn stopped(&self) -> bool {
self.0.stopped()
}
pub fn work(ctx: &IoContext) -> IoContextWork {
ctx.0.work_started();
IoContextWork(ctx.clone())
}
}
unsafe impl AsIoContext for IoContext {
fn as_ctx(&self) -> &IoContext {
self
}
}
unsafe impl Send for IoContext { }
pub struct IoContextWork(IoContext);
impl Drop for IoContextWork {
fn drop(&mut self) {
(self.0).0.work_finished();
self.0.stop();
}
}
unsafe impl AsIoContext for IoContextWork {
fn as_ctx(&self) -> &IoContext {
&self.0
}
}
pub trait Socket<P: Protocol> : AsIoContext + AsRawFd + Send + 'static {
unsafe fn from_raw_fd(&IoContext, pro: P, fd: RawFd) -> Self;
fn protocol(&self) -> P;
}
pub trait Upcast<T: ?Sized> {
fn upcast(self: Box<Self>) -> Box<T>;
}
pub trait FnOp {
fn call_op(self: Box<Self>, ctx: &IoContext, this: &mut ThreadIoContext, ec: ErrCode);
}
type Operation = Box<FnOp + Send>;
mod task_ctx;
pub use self::task_ctx::{TaskIoContext as Impl, ThreadIoContext, workplace};
mod init;
pub use self::init::Init;
mod callstack;
pub use self::callstack::ThreadCallStack;
mod reactor;
pub use self::reactor::*;
mod interrupter;
pub use self::interrupter::*;
mod scheduler;
pub use self::scheduler::*;
#[test]
fn test_new() {
IoContext::new().unwrap();
}
#[test]
fn test_run() {
let ctx = &IoContext::new().unwrap();
ctx.run();
assert!(ctx.stopped());
}
#[test]
fn test_run_one() {
let ctx = &IoContext::new().unwrap();
ctx.run_one();
assert!(ctx.stopped());
}
#[test]
fn test_work() {
let ctx = &IoContext::new().unwrap();
{
let _work = IoContext::work(ctx);
}
assert!(ctx.stopped());
}
#[test]
fn test_multithread_working() {
use std::thread;
use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let ctx = &IoContext::new().unwrap();
let _work = IoContext::work(ctx);
let mut thrds = Vec::new();
for _ in 0..10 {
let ctx = ctx.clone();
thrds.push(thread::spawn(move|| ctx.run()))
}
for _ in 0..100 {
ctx.post(move |ctx| {
if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
ctx.stop();
}
})
}
ctx.run();
for thrd in thrds {
thrd.join().unwrap();
}
assert_eq!(COUNT.load(Ordering::Relaxed), 100);
}