use std::sync::mpsc;
use std::any::Any;
use std::cell::RefCell;
thread_local! {
static TASK_CHANNEL:TaskChannel = TaskChannel::new();
static RUNNING_STATUS: RefCell<RunningStatus> = RefCell::new(RunningStatus::Pending);
}
enum RunningStatus {
Pending,
Running,
Stopping,
Stopped,
Rerunning
}
pub struct TaskChannel {
sender: mpsc::Sender<Task>,
receiver: mpsc::Receiver<Task>
}
impl TaskChannel {
fn new() -> TaskChannel {
let (sender, receiver) = mpsc::channel();
TaskChannel { sender: sender, receiver: receiver }
}
}
pub trait TaskCallback {
fn call(&self);
}
pub struct Task {
parameter: Box<Any>,
callback: Box<Fn(Box<Any>)>
}
pub struct TaskCollector {
sender: mpsc::Sender<Task>
}
unsafe impl Send for TaskCollector {}
impl TaskCollector {
fn new() -> TaskCollector {
TASK_CHANNEL.with(|task_channel| {
TaskCollector {
sender: task_channel.sender.clone()
}
})
}
pub fn add_task(&self, task: Task) {
let _ = self.sender.send(task);
}
pub fn add_task_with_callback<T, R>(&self, callback: Box<Fn(Box<T>) -> R>, parameter: Box<T>) where T: 'static + Any + Send, R: 'static {
let task_callback = Box::new(move |parameter: Box<Any>| {
match parameter.downcast() {
Ok(params) => {
(callback)(params);
}
Err(_) => {}
}
});
let task = Task {
parameter: parameter,
callback: task_callback
};
let _ = self.sender.send(task);
}
pub fn add_task_with_trait(&self, callback: Box<TaskCallback>) {
let task_callback = Box::new(move |_| {
(callback).call();
});
let task = Task {
parameter: Box::new(0),
callback: task_callback
};
let _ = self.sender.send(task);
}
}
pub fn run() {
TASK_CHANNEL.with(|task_channel| {
RUNNING_STATUS.with(|running_status_ref| {
let receiver = &task_channel.receiver;
{
let mut running_status = running_status_ref.borrow_mut();
match *running_status {
RunningStatus::Pending => {
*running_status = RunningStatus::Running;
}
RunningStatus::Stopped => {
*running_status = RunningStatus::Rerunning;
}
_ => {}
}
};
loop {
{
let mut running_status = running_status_ref.borrow_mut();
match *running_status {
RunningStatus::Stopping => {
*running_status = RunningStatus::Stopped;
break;
}
_ => {}
}
};
let task = receiver.recv().expect("Task runner encounters a problem when receiving task");
(task.callback)(task.parameter);
{
let mut running_status = running_status_ref.borrow_mut();
match *running_status {
RunningStatus::Running => {
*running_status = RunningStatus::Stopping;
}
RunningStatus::Rerunning => {
*running_status = RunningStatus::Stopping;
}
_ => {}
}
};
}
});
});
}
pub fn stop() -> bool {
RUNNING_STATUS.with(|running_status_ref| {
let mut affected = true;
let mut running_status = running_status_ref.borrow_mut();
match *running_status {
RunningStatus::Running => {
*running_status = RunningStatus::Stopping;
}
RunningStatus::Rerunning => {
*running_status = RunningStatus::Stopping;
}
_ => { affected = false }
};
return affected;
})
}
pub fn new_task_collector() -> TaskCollector {
TaskCollector::new()
}