use std::default::Default;
use std::rt::util::min_stack;
use thunk::Thunk;
use std::mem::{transmute, transmute_copy};
use std::rt::unwind::try;
use std::any::Any;
use std::cell::UnsafeCell;
use std::ops::DerefMut;
use std::ptr;
use context::Context;
use stack::{StackPool, Stack};
#[derive(Debug, Copy)]
pub enum State {
Suspended,
Running,
Finished,
Panicked,
}
pub type ResumeResult<T> = Result<T, Box<Any + Send>>;
#[derive(Debug)]
pub struct Options {
pub stack_size: usize,
pub name: Option<String>,
}
impl Default for Options {
fn default() -> Options {
Options {
stack_size: min_stack(),
name: None,
}
}
}
#[derive(Debug)]
pub struct Handle(Box<Coroutine>);
impl Handle {
#[inline]
pub fn state(&self) -> State {
self.0.state()
}
pub fn resume(mut self) -> ResumeResult<Handle> {
error!("Resuming {:?}, state: {:?}", self.0.name, self.state());
match self.state() {
State::Finished | State::Running => return Ok(self),
State::Panicked => panic!("Tried to resume a panicked coroutine"),
_ => {}
}
Ok(self)
}
#[inline]
pub fn join(self) -> ResumeResult<Handle> {
let mut coro = self;
loop {
match coro.state() {
State::Suspended => coro = try!(coro.resume()),
_ => break,
}
}
Ok(coro)
}
}
#[allow(raw_pointer_derive)]
#[derive(Debug)]
pub struct Coroutine {
current_stack_segment: Option<Stack>,
saved_context: Context,
parent: *mut Coroutine,
state: State,
name: Option<String>,
}
unsafe impl Send for Coroutine {}
unsafe impl Sync for Coroutine {}
impl Drop for Coroutine {
fn drop(&mut self) {
error!("Dropping!! {:?}", self.name);
match self.current_stack_segment.take() {
Some(stack) => {
},
None => {}
}
}
}
impl Coroutine {
pub fn empty() -> Handle {
Handle(box Coroutine {
current_stack_segment: None,
saved_context: Context::empty(),
parent: ptr::null_mut(),
state: State::Running,
name: None,
})
}
pub fn spawn_opts<F>(f: F, opts: Options) -> Handle
where F: FnOnce() + Send + 'static {
COROUTINE_ENVIRONMENT.with(|_| {});
Coroutine::empty()
}
pub fn spawn<F>(f: F) -> Handle
where F: FnOnce() + Send + 'static {
Coroutine::spawn_opts(f, Default::default())
}
pub fn sched() {
}
pub fn state(&self) -> State {
self.state
}
}
thread_local!(static COROUTINE_ENVIRONMENT: UnsafeCell<Environment> = UnsafeCell::new(Environment::new()));
#[allow(raw_pointer_derive)]
#[derive(Debug)]
struct Environment {
stack_pool: StackPool,
current_running: Option<Handle>,
running_state: Option<Box<Any + Send>>,
}
impl Environment {
fn new() -> Environment {
error!("New Environment");
Environment {
stack_pool: StackPool::new(),
current_running: Some(Coroutine::empty()),
running_state: None,
}
}
}
#[cfg(test)]
mod test {
use std::sync::mpsc::channel;
use test::Bencher;
use coroutine::Coroutine;
#[test]
fn test_coroutine_basic() {
let (tx, rx) = channel();
Coroutine::spawn(move|| {
tx.send(1).unwrap();
}).resume().unwrap();
assert_eq!(rx.recv().unwrap(), 1);
}
#[test]
fn test_coroutine_yield() {
let (tx, rx) = channel();
let coro = Coroutine::spawn(move|| {
error!("HERE?");
tx.send(1).unwrap();
error!("HERE?");
Coroutine::sched();
error!("HERE?");
tx.send(2).unwrap();
}).resume().unwrap();
error!("HERE1");
assert_eq!(rx.recv().unwrap(), 1);
assert!(rx.try_recv().is_err());
coro.resume().unwrap();
assert_eq!(rx.recv().unwrap(), 2);
}
#[test]
fn test_coroutine_spawn_inside() {
let (tx, rx) = channel();
Coroutine::spawn(move|| {
tx.send(1).unwrap();
Coroutine::spawn(move|| {
tx.send(2).unwrap();
}).join().unwrap();
}).join().unwrap();;
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
}
#[test]
fn test_coroutine_panic() {
let coro = Coroutine::spawn(move|| {
panic!("Panic inside a coroutine!!");
});
assert!(coro.join().is_err());
}
#[test]
fn test_coroutine_child_panic() {
Coroutine::spawn(move|| {
let _ = Coroutine::spawn(move|| {
panic!("Panic inside a coroutine's child!!");
}).join();
}).join().unwrap();
}
#[test]
fn test_coroutine_resume_after_finished() {
let mut coro = Coroutine::spawn(move|| {});
coro = coro.resume().unwrap();
assert!(coro.resume().is_ok());
}
#[test]
fn test_coroutine_yield_in_main() {
Coroutine::sched();
}
#[bench]
fn bench_coroutine_spawning_with_recycle(b: &mut Bencher) {
b.iter(|| {
let _ = Coroutine::spawn(move|| {}).resume();
});
}
#[bench]
fn bench_normal_counting(b: &mut Bencher) {
b.iter(|| {
const MAX_NUMBER: usize = 100;
let (tx, rx) = channel();
let mut result = 0;
for _ in 0..MAX_NUMBER {
tx.send(1).unwrap();
result += rx.recv().unwrap();
}
assert_eq!(result, MAX_NUMBER);
});
}
#[bench]
fn bench_coroutine_counting(b: &mut Bencher) {
b.iter(|| {
const MAX_NUMBER: usize = 100;
let (tx, rx) = channel();
let mut coro = Coroutine::spawn(move|| {
for _ in 0..MAX_NUMBER {
tx.send(1).unwrap();
Coroutine::sched();
}
}).resume().unwrap();;
let mut result = 0;
for n in rx.iter() {
coro = coro.resume().unwrap();
result += n;
}
assert_eq!(result, MAX_NUMBER);
});
}
}