use std::any::Any;
use std::mem;
use std::raw;
use std::rt::Runtime;
use std::rt::local::Local;
use std::rt::mutex::NativeMutex;
use std::rt::stack;
use std::rt::task::{Task, BlockedTask, TaskOpts};
use std::rt;
use context::Context;
use coroutine::Coroutine;
use sched::{Scheduler, SchedHandle, RunOnce};
use stack::StackPool;
pub use self::TaskType::{TypeGreen, TypeSched};
pub use self::Home::{AnySched, HomeSched};
pub struct GreenTask {
pub coroutine: Option<Coroutine>,
pub handle: Option<SchedHandle>,
pub sched: Option<Box<Scheduler>>,
pub task: Option<Box<Task>>,
pub task_type: TaskType,
pub pool_id: uint,
pub nasty_deschedule_lock: NativeMutex,
}
pub enum TaskType {
TypeGreen(Option<Home>),
TypeSched,
}
pub enum Home {
AnySched,
HomeSched(SchedHandle),
}
pub fn spawn(f: proc(): Send) {
spawn_opts(TaskOpts::new(), f)
}
pub fn spawn_opts(opts: TaskOpts, f: proc(): Send) {
let mut task: Box<Task> = Local::take();
let task = match task.maybe_take_runtime::<GreenTask>() {
Some(mut green) => {
green.put_task(task);
green
}
None => {
Local::put(task);
panic!("cannot spawn a green task from a non-green task")
}
};
struct Bomb { inner: Option<Box<GreenTask>> }
impl Drop for Bomb {
fn drop(&mut self) {
let _ = self.inner.take().map(|task| task.put());
}
}
let mut bomb = Bomb { inner: Some(task) };
let sibling = {
let sched = bomb.inner.as_mut().unwrap().sched.as_mut().unwrap();
GreenTask::configure(&mut sched.stack_pool, opts, f)
};
let mut me = bomb.inner.take().unwrap();
let sched = me.sched.take().unwrap();
sched.run_task(me, sibling)
}
extern fn bootstrap_green_task(task: uint, code: *mut (), env: *mut ()) -> ! {
let start: proc() = unsafe {
mem::transmute(raw::Procedure { code: code, env: env })
};
let mut task: Box<GreenTask> = unsafe { mem::transmute(task) };
task.pool_id = {
let sched = task.sched.as_mut().unwrap();
sched.run_cleanup_job();
sched.task_state.increment();
sched.pool_id
};
let mut start = Some(start);
let task = task.swap().run(|| start.take().unwrap()()).destroy();
GreenTask::convert(task).terminate();
}
impl GreenTask {
pub fn new(stack_pool: &mut StackPool,
stack_size: Option<uint>,
start: proc():Send) -> Box<GreenTask> {
GreenTask::new_homed(stack_pool, stack_size, AnySched, start)
}
pub fn new_homed(stack_pool: &mut StackPool,
stack_size: Option<uint>,
home: Home,
start: proc():Send) -> Box<GreenTask> {
let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home)));
let stack_size = stack_size.unwrap_or_else(|| rt::min_stack());
let mut stack = stack_pool.take_stack(stack_size);
let context = Context::new(bootstrap_green_task, ops.as_uint(), start,
&mut stack);
ops.coroutine = Some(Coroutine {
current_stack_segment: stack,
saved_context: context,
});
return ops;
}
pub fn new_typed(coroutine: Option<Coroutine>,
task_type: TaskType) -> Box<GreenTask> {
box GreenTask {
pool_id: 0,
coroutine: coroutine,
task_type: task_type,
sched: None,
handle: None,
nasty_deschedule_lock: unsafe { NativeMutex::new() },
task: Some(box Task::new()),
}
}
pub fn configure(pool: &mut StackPool,
opts: TaskOpts,
f: proc():Send) -> Box<GreenTask> {
let TaskOpts { name, stack_size, on_exit } = opts;
let mut green = GreenTask::new(pool, stack_size, f);
{
let task = green.task.as_mut().unwrap();
task.name = name;
task.death.on_exit = on_exit;
}
return green;
}
pub fn convert(mut task: Box<Task>) -> Box<GreenTask> {
match task.maybe_take_runtime::<GreenTask>() {
Some(mut green) => {
green.put_task(task);
green
}
None => rtabort!("not a green task any more?"),
}
}
pub fn give_home(&mut self, new_home: Home) {
match self.task_type {
TypeGreen(ref mut home) => { *home = Some(new_home); }
TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
}
}
pub fn take_unwrap_home(&mut self) -> Home {
match self.task_type {
TypeGreen(ref mut home) => home.take().unwrap(),
TypeSched => rtabort!("type error: used SchedTask as GreenTask"),
}
}
pub fn is_home_no_tls(&self, sched: &Scheduler) -> bool {
match self.task_type {
TypeGreen(Some(AnySched)) => { false }
TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => {
*id == sched.sched_id()
}
TypeGreen(None) => { rtabort!("task without home"); }
TypeSched => {
rtabort!("type error: expected: TypeGreen, found: TaskSched");
}
}
}
pub fn homed(&self) -> bool {
match self.task_type {
TypeGreen(Some(AnySched)) => { false }
TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true }
TypeGreen(None) => {
rtabort!("task without home");
}
TypeSched => {
rtabort!("type error: expected: TypeGreen, found: TaskSched");
}
}
}
pub fn is_sched(&self) -> bool {
match self.task_type {
TypeGreen(..) => false, TypeSched => true,
}
}
pub fn as_uint(&self) -> uint {
self as *const GreenTask as uint
}
pub unsafe fn from_uint(val: uint) -> Box<GreenTask> {
mem::transmute(val)
}
pub fn put_with_sched(mut self: Box<GreenTask>, sched: Box<Scheduler>) {
assert!(self.sched.is_none());
self.sched = Some(sched);
self.put();
}
pub fn put_task(&mut self, task: Box<Task>) {
assert!(self.task.is_none());
self.task = Some(task);
}
pub fn swap(mut self: Box<GreenTask>) -> Box<Task> {
let mut task = self.task.take().unwrap();
task.put_runtime(self);
return task;
}
pub fn put(self: Box<GreenTask>) {
assert!(self.sched.is_some());
Local::put(self.swap());
}
fn terminate(mut self: Box<GreenTask>) -> ! {
let sched = self.sched.take().unwrap();
sched.terminate_current_task(self)
}
fn reawaken_remotely(mut self: Box<GreenTask>) {
unsafe {
let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex;
let handle = self.handle.as_mut().unwrap() as *mut SchedHandle;
let _guard = (*mtx).lock();
(*handle).send(RunOnce(self));
}
}
}
impl Runtime for GreenTask {
fn yield_now(mut self: Box<GreenTask>, cur_task: Box<Task>) {
self.put_task(cur_task);
let sched = self.sched.take().unwrap();
sched.yield_now(self);
}
fn maybe_yield(mut self: Box<GreenTask>, cur_task: Box<Task>) {
self.put_task(cur_task);
let sched = self.sched.take().unwrap();
sched.maybe_yield(self);
}
fn deschedule(mut self: Box<GreenTask>,
times: uint,
cur_task: Box<Task>,
f: |BlockedTask| -> Result<(), BlockedTask>) {
self.put_task(cur_task);
let mut sched = self.sched.take().unwrap();
if self.handle.is_none() {
self.handle = Some(sched.make_handle());
self.pool_id = sched.pool_id;
}
if times == 1 {
sched.deschedule_running_task_and_then(self, |sched, task| {
match f(task) {
Ok(()) => {}
Err(t) => {
t.wake().map(|t| {
sched.enqueue_task(GreenTask::convert(t))
});
}
}
});
} else {
sched.deschedule_running_task_and_then(self, |sched, task| {
for task in task.make_selectable(times) {
match f(task) {
Ok(()) => {},
Err(task) => {
task.wake().map(|t| {
sched.enqueue_task(GreenTask::convert(t))
});
break
}
}
}
});
}
}
fn reawaken(mut self: Box<GreenTask>, to_wake: Box<Task>) {
self.put_task(to_wake);
assert!(self.sched.is_none());
let mut running_task: Box<Task> = match Local::try_take() {
Some(task) => task,
None => return self.reawaken_remotely()
};
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);
let sched = running_green_task.sched.take().unwrap();
if sched.pool_id == self.pool_id {
sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();
running_green_task.put_with_sched(sched);
}
}
None => {
self.reawaken_remotely();
Local::put(running_task);
}
}
}
fn spawn_sibling(mut self: Box<GreenTask>,
cur_task: Box<Task>,
opts: TaskOpts,
f: proc():Send) {
self.put_task(cur_task);
self.put();
{
use std::task::Spawner;
use native::task::NativeSpawner;
NativeSpawner.spawn(opts, f)
}
}
fn stack_bounds(&self) -> (uint, uint) {
let c = self.coroutine.as_ref()
.expect("GreenTask.stack_bounds called without a coroutine");
(c.current_stack_segment.start() as uint + stack::RED_ZONE,
c.current_stack_segment.end() as uint)
}
fn stack_guard(&self) -> Option<uint> {
Some(self.coroutine.as_ref().unwrap().current_stack_segment.guard()
as uint)
}
fn can_block(&self) -> bool { false }
fn wrap(self: Box<GreenTask>) -> Box<Any + Send> { self as Box<Any + Send> }
}
#[cfg(test)]
mod tests {
use std::task;
use std::rt::task::TaskOpts;
use super::super::{PoolConfig, SchedPool};
fn spawn_opts(opts: TaskOpts, f: proc():Send) {
let mut pool = SchedPool::new(PoolConfig {
threads: 1,
event_loop_factory: ::basic::event_loop,
});
pool.spawn(opts, f);
pool.shutdown();
}
#[test]
fn smoke() {
let (tx, rx) = channel();
spawn_opts(TaskOpts::new(), proc() {
tx.send(());
});
rx.recv();
}
#[test]
fn smoke_fail() {
let (tx, rx) = channel::<int>();
spawn_opts(TaskOpts::new(), proc() {
let _tx = tx;
panic!()
});
assert_eq!(rx.recv_opt(), Err(()));
}
#[test]
fn smoke_opts() {
let mut opts = TaskOpts::new();
opts.name = Some("test".into_maybe_owned());
opts.stack_size = Some(20 * 4096);
let (tx, rx) = channel();
opts.on_exit = Some(proc(r) tx.send(r));
spawn_opts(opts, proc() {});
assert!(rx.recv().is_ok());
}
#[test]
fn smoke_opts_fail() {
let mut opts = TaskOpts::new();
let (tx, rx) = channel();
opts.on_exit = Some(proc(r) tx.send(r));
spawn_opts(opts, proc() { panic!() });
assert!(rx.recv().is_err());
}
#[test]
fn yield_test() {
let (tx, rx) = channel();
spawn_opts(TaskOpts::new(), proc() {
for _ in range(0u, 10) { task::deschedule(); }
tx.send(());
});
rx.recv();
}
#[test]
fn spawn_children() {
let (tx1, rx) = channel();
spawn_opts(TaskOpts::new(), proc() {
let (tx2, rx) = channel();
spawn(proc() {
let (tx3, rx) = channel();
spawn(proc() {
tx3.send(());
});
rx.recv();
tx2.send(());
});
rx.recv();
tx1.send(());
});
rx.recv();
}
}