#![allow(dead_code)]
use crossbeam::channel::{self, after, Receiver, Sender};
use may::coroutine::{self, JoinHandle};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crossbeam::select;
use serde_json::Value;
#[derive(Debug)]
pub enum TaskEvent<T, E> {
Data(T), Progress((u8,u32)), Done, Cancelled, Error(E), Panic(String), }
#[derive(Clone)]
pub struct JobTask<T: Send + 'static, E: Send + 'static,D: Send + 'static> {
is_cancelled: Arc<AtomicBool>,
handle: Option<Arc<JoinHandle<()>>>,
event_rx: Receiver<TaskEvent<T, E>>,
_event_tx: Sender<TaskEvent<T, E>>, sender: Sender<D>, }
impl<T: Send + 'static, E: Send + 'static, D: Send + 'static> JobTask<T, E, D> {
pub fn new<F>(params: Value,task: F) -> Self
where
F: FnOnce(Value,Sender<TaskEvent<T, E>>, Receiver<D>) + Send + 'static,
{
let is_cancelled = Arc::new(AtomicBool::new(false));
let (event_tx, event_rx) = channel::unbounded();
let (data_tx, data_rx) = channel::unbounded();
let flag = is_cancelled.clone();
let sender = event_tx.clone();
let handle = unsafe { coroutine::spawn(move || {
if flag.load(Ordering::Acquire) {
let _ = sender.send(TaskEvent::Cancelled);
return;
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
task(params,sender.clone(),data_rx.clone());
}));
match result {
Ok(_) => {
let _ = sender.send(TaskEvent::Done);
}
Err(_) => {
let _ = sender.send(TaskEvent::Panic(format!("panic")));
}
}
}) };
JobTask {
is_cancelled: is_cancelled,
handle: Some(Arc::new(handle)),
event_rx: event_rx,
_event_tx: event_tx,
sender: data_tx,
}
}
pub fn cancel(&mut self) {
self.is_cancelled.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
unsafe { handle.coroutine().cancel() };
}
}
pub fn try_recv(&self) -> Option<TaskEvent<T, E>> {
self.event_rx.try_recv().ok()
}
pub fn recv(&self) -> Option<TaskEvent<T, E>> {
self.event_rx.recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<TaskEvent<T, E>> {
self.event_rx.recv_timeout(timeout).ok()
}
pub fn send(&self, data: D) {
let _ = self.sender.send(data);
}
}
impl <T, E, D> Drop for JobTask<T, E, D>
where
T: Send + 'static,
E: Send + 'static,
D: Send + 'static
{
fn drop(&mut self) {
self.cancel(); }
}
#[cfg(test)]
mod tests {
use std::thread;
use generator::{done, Gn};
use irgo::defer;
use serde_json::json;
use super::*;
#[test]
fn test_job_task() {
let params = json!({});
let mut job:JobTask<String,String,i32> = JobTask::new(params,|params,sender,receiver| {
println!("Hello, world!");
defer!(println!("Goodbye, world!"));
loop {
let n = receiver.try_recv();
if let Ok(n) = n {
println!("Received: {} Exit", n);
break;
}
sender.send(TaskEvent::Data("hi".to_string())).unwrap();
may::coroutine::yield_now();
may::coroutine::sleep(std::time::Duration::from_secs(1));
}
}
);
let cloned_job = job.clone();
thread::spawn(move||{
while let Some(event) = cloned_job.recv() {
match event {
TaskEvent::Data(v) => println!("{}", v),
TaskEvent::Done => println!("Task completed"),
TaskEvent::Cancelled => println!("Task cancelled"),
TaskEvent::Error(e) => println!("Error: {}", e),
TaskEvent::Panic(p) => println!("Panic: {}", p),
TaskEvent::Progress(p) => {
println!("Progress: {}", p.0);
}
}
}
});
std::thread::sleep(std::time::Duration::from_secs(5));
assert_eq!(job.is_cancelled.load(Ordering::Relaxed), false);
job.send(100);
std::thread::sleep(std::time::Duration::from_secs(1));
job.cancel();
assert_eq!(job.is_cancelled.load(Ordering::Relaxed), true);
println!("Job cancelled!");
std::thread::sleep(std::time::Duration::from_secs(3));
println!("Main thread finished.");
}
#[test]
fn test_generator() {
let g = Gn::new_scoped(|mut s| {
let (mut a, mut b) = (0, 1);
while b < 200 {
std::mem::swap(&mut a, &mut b);
b = a + b;
s.yield_(b);
}
done!();
});
for i in g {
println!("{}", i);
}
}
}