use std::sync::Arc;
use std::boxed::FnBox;
use error::ErrCode;
pub use std::os::unix::io::{RawFd, AsRawFd};
pub type Callback = Box<FnBox(*const IoService, ErrCode) + Send + 'static>;
#[cfg(all(feature = "epoll", target_os = "linux"))] mod epoll_reactor;
#[cfg(all(feature = "epoll", target_os = "linux"))] pub use self::epoll_reactor::{Reactor, IoActor, IntrActor};
#[cfg(all(feature = "kqueue", target_os = "macos"))] mod kqueue_reactor;
#[cfg(all(feature = "kqueue", target_os = "macos"))] pub use self::kqueue_reactor::{Reactor, IoActor, IntrActor};
#[cfg(not(any(all(feature = "epoll", target_os = "linux"), all(feature = "kqueue", target_os = "macos"))))] mod null_reactor;
#[cfg(not(any(all(feature = "epoll", target_os = "linux"), all(feature = "kqueue", target_os = "macos"))))] pub use self::null_reactor::{Reactor, IntrActor, IoActor};
#[cfg(all(feature = "timerfd", target_os = "linux"))] mod timerfd_control;
#[cfg(all(feature = "timerfd", target_os = "linux"))] pub use self::timerfd_control::Control;
#[cfg(all(unix, not(all(feature = "timerfd", target_os = "linux"))))] mod pipe_control;
#[cfg(all(unix, not(all(feature = "timerfd", target_os = "linux"))))] pub use self::pipe_control::Control;
mod thread_info;
pub use self::thread_info::{CallStack, ThreadInfo};
mod task_io_service;
use self::task_io_service::IoServiceImpl;
mod timer_queue;
pub use self::timer_queue::{TimerQueue, TimerActor};
pub unsafe trait IoObject : Sized {
fn io_service(&self) -> &IoService;
}
pub trait FromRawFd<P> : AsRawFd + Send + 'static {
unsafe fn from_raw_fd(io: &IoService, pro: P, fd: RawFd) -> Self;
}
#[derive(Clone, Debug)]
pub struct IoService(Arc<IoServiceImpl>);
impl IoService {
pub fn new() -> IoService {
IoService(Arc::new(IoServiceImpl::new()))
}
pub fn dispatch<F>(&self, func: F)
where F: FnOnce(&IoService) + Send + 'static
{
self.0.dispatch(self, func)
}
pub fn post<F>(&self, func: F)
where F: FnOnce(&IoService) + Send + 'static
{
self.0.post(func);
}
pub fn reset(&self) {
self.0.reset()
}
pub fn run(&self) {
self.0.run(self)
}
pub fn stop(&self) {
self.0.stop()
}
pub fn stopped(&self) -> bool {
self.0.stopped()
}
pub fn work(io: &IoService) -> IoServiceWork {
io.0.work_started();
IoServiceWork { io: io.clone() }
}
pub fn strand<T>(io: &IoService, data: T) -> StrandImmutable<T> {
strand_new(io, data)
}
#[cfg(feature = "context")]
pub fn spawn<F>(io: &IoService, func: F)
where F: FnOnce(Coroutine) + Send + 'static,
{
spawn(io, func);
}
}
unsafe impl IoObject for IoService {
fn io_service(&self) -> &IoService {
self
}
}
pub struct IoServiceWork {
io: IoService,
}
unsafe impl IoObject for IoServiceWork {
fn io_service(&self) -> &IoService {
&self.io
}
}
impl Drop for IoServiceWork {
fn drop(&mut self) {
if self.io.0.work_finished() {
self.io.stop();
}
}
}
mod handler;
pub use self::handler::{Handler, AsyncResult, NoAsyncResult, wrap};
mod strand;
pub use self::strand::{Strand, StrandImmutable, StrandHandler, strand_clone, strand_new};
#[cfg(feature = "context")] mod coroutine;
#[cfg(feature = "context")] pub use self::coroutine::{Coroutine, spawn};
#[test]
fn test_multithread_working() {
use std::thread;
use std::sync::atomic::{Ordering, AtomicUsize, ATOMIC_USIZE_INIT};
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let io = &IoService::new();
let _work = IoService::work(io);
let mut thrds = Vec::new();
for _ in 0..10 {
let io = io.clone();
thrds.push(thread::spawn(move || io.run()));
}
for _ in 0..100 {
io.post(move |io| {
if COUNT.fetch_add(1, Ordering::SeqCst) == 99 {
io.stop();
}
});
}
io.run();
for thrd in thrds {
thrd.join().unwrap();
}
assert_eq!(COUNT.load(Ordering::Relaxed), 100);
}