#![feature(test)]
#![feature(fnbox)]
extern crate test;
extern crate libc;
extern crate time;
#[cfg(feature = "developer")] pub mod ops;
#[cfg(feature = "developer")] pub mod backbone;
#[cfg(not(feature = "developer"))] mod ops;
#[cfg(not(feature = "developer"))] mod backbone;
use backbone::Backbone;
mod socket;
pub use self::socket::*;
mod timer;
pub use self::timer::*;
mod str;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
pub trait IoObject : Sized {
fn io_service(&self) -> &IoService;
}
#[derive(Clone)]
pub struct IoService(Arc<Backbone>);
impl IoService {
pub fn new() -> IoService {
IoService(Arc::new(Backbone::new().unwrap()))
}
pub fn stop(&self) {
self.0.stop()
}
pub fn stopped(&self) -> bool {
self.0.task.stopped()
}
pub fn reset(&self) {
self.0.task.reset()
}
pub fn post<F>(&self, callback: F)
where F: FnOnce(&IoService) + Send + 'static {
self.0.post(0, Box::new(move |io: *const IoService| callback(unsafe { &*io })));
}
pub fn post_strand<'a, F, T>(&self, callback: F, strand: &Strand<'a, T>)
where F: FnOnce(Strand<'a, T>) + Send + 'static,
T: 'static {
let arc = strand.arc.clone();
self.0.post(strand.id(), Box::new(move |io: *const IoService| callback(Strand::from_raw(io, arc))));
}
pub fn run(&self) {
Backbone::run(self);
}
pub fn work<F: FnOnce(&IoService)>(&self, callback: F) {
self.reset();
self.0.task.set_work(true);
callback(self);
self.run();
self.0.task.set_work(false);
}
}
impl IoObject for IoService {
fn io_service(&self) -> &IoService {
self
}
}
struct UnsafeThreadableCell<T> {
value: T,
}
impl<T> UnsafeThreadableCell<T> {
fn new(value: T) -> UnsafeThreadableCell<T> {
UnsafeThreadableCell {
value: value,
}
}
unsafe fn get(&self) -> *mut T {
&self.value as *const T as *mut T
}
}
unsafe impl<T> Send for UnsafeThreadableCell<T> {}
unsafe impl<T> Sync for UnsafeThreadableCell<T> {}
pub struct Strand<'a, T> {
io: &'a IoService,
arc: Arc<UnsafeThreadableCell<T>>,
}
impl<'a, T> Strand<'a, T> {
pub fn new(io: &'a IoService, t: T) -> Strand<'a, T> {
Strand {
io: io,
arc: Arc::new(UnsafeThreadableCell::new(t)),
}
}
fn from_raw(io: *const IoService, arc: Arc<UnsafeThreadableCell<T>>) -> Strand<'a, T> {
Strand {
io: unsafe { &*io },
arc: arc,
}
}
fn id(&self) -> usize {
unsafe { self.arc.get() as usize }
}
fn get_mut(&self) -> &mut T {
unsafe { &mut *self.arc.get() }
}
}
impl<'a, T> IoObject for Strand<'a, T> {
fn io_service(&self) -> &IoService {
self.io
}
}
impl<'a, T> Deref for Strand<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.arc.get() }
}
}
impl<'a, T> DerefMut for Strand<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.arc.get() }
}
}
pub trait Cancel {
fn cancel<A, T>(a: A, obj: &Strand<T>)
where A: Fn(&T) -> &Self;
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::*;
use std::thread;
#[test]
fn test_io_service() {
let io = IoService::new();
io.stop();
io.run();
}
#[test]
fn test_io_run() {
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let io = IoService::new();
for _ in 0..10 {
io.post(|_| { COUNT.fetch_add(1, Ordering::Relaxed); });
}
assert!(COUNT.load(Ordering::Relaxed) == 0);
io.run();
assert!(COUNT.load(Ordering::Relaxed) == 10);
}
#[test]
fn test_io_stop() {
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let io = IoService::new();
for _ in 0..10 {
let io_ = io.clone();
io.post(|_| { COUNT.fetch_add(1, Ordering::Relaxed); });
}
io.stop();
io.run();
assert!(COUNT.load(Ordering::Relaxed) == 10);
}
#[test]
fn test_io_reset() {
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let io = IoService::new();
for _ in 0..10 {
let io_ = io.clone();
io.post(|_| { COUNT.fetch_add(1, Ordering::Relaxed); });
}
io.stop();
io.run();
assert!(io.stopped() == true);
io.reset();
assert!(io.stopped() == false);
}
#[test]
fn test_io_multi_thread() {
IoService::new().work(|io| {
static COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
let mut thrds = Vec::new();
for _ in 0..5 {
let io = io.clone();
thrds.push(thread::spawn(move || io.run()));
}
for _ in 0..1000 {
io.post(|io| if COUNT.fetch_add(1, Ordering::Relaxed) == 999 {
io.stop();
});
}
for thrd in thrds {
thrd.join().unwrap();
}
assert!(COUNT.load(Ordering::Relaxed) == 1000);
});
}
#[test]
fn test_strand_id() {
let io = IoService::new();
let strand = Strand::new(&io, 0);
assert!(strand.id() == Strand::from_raw(&io, strand.arc.clone()).id());
}
}