1#![allow(dead_code)]
2
3use crossbeam::channel::{self, after, Receiver, Sender};
4use may::coroutine::{self, JoinHandle};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8use crossbeam::select;
9use serde_json::Value;
10
11#[derive(Debug)]
12pub enum TaskEvent<T, E> {
13 Data(T), Progress((u8,u32)), Done, Cancelled, Error(E), Panic(String), }
20
21#[derive(Clone)]
22pub struct JobTask<T: Send + 'static, E: Send + 'static,D: Send + 'static> {
23 is_cancelled: Arc<AtomicBool>,
24 handle: Option<Arc<JoinHandle<()>>>,
25 event_rx: Receiver<TaskEvent<T, E>>,
26 _event_tx: Sender<TaskEvent<T, E>>, sender: Sender<D>, }
29
30
31impl<T: Send + 'static, E: Send + 'static, D: Send + 'static> JobTask<T, E, D> {
32 pub fn new<F>(params: Value,task: F) -> Self
33 where
34 F: FnOnce(Value,Sender<TaskEvent<T, E>>, Receiver<D>) + Send + 'static,
35 {
36 let is_cancelled = Arc::new(AtomicBool::new(false));
37 let (event_tx, event_rx) = channel::unbounded();
38 let (data_tx, data_rx) = channel::unbounded();
39
40
41 let flag = is_cancelled.clone();
42 let sender = event_tx.clone();
43
44 let handle = unsafe { coroutine::spawn(move || {
46 if flag.load(Ordering::Acquire) {
48 let _ = sender.send(TaskEvent::Cancelled);
49 return;
50 }
51
52 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
54 task(params,sender.clone(),data_rx.clone());
55 }));
56
57 match result {
58 Ok(_) => {
59 let _ = sender.send(TaskEvent::Done);
61 }
62 Err(_) => {
63 let _ = sender.send(TaskEvent::Panic(format!("panic")));
65 }
66 }
67 }) };
68
69 JobTask {
70 is_cancelled: is_cancelled,
71 handle: Some(Arc::new(handle)),
72 event_rx: event_rx,
73 _event_tx: event_tx,
74 sender: data_tx,
75 }
76 }
77
78 pub fn cancel(&mut self) {
80 self.is_cancelled.store(true, Ordering::Relaxed);
81 if let Some(handle) = self.handle.take() {
82 unsafe { handle.coroutine().cancel() };
84 }
85 }
86
87 pub fn try_recv(&self) -> Option<TaskEvent<T, E>> {
88 self.event_rx.try_recv().ok()
89 }
90
91 pub fn recv(&self) -> Option<TaskEvent<T, E>> {
92 self.event_rx.recv().ok()
93 }
94
95 pub fn recv_timeout(&self, timeout: Duration) -> Option<TaskEvent<T, E>> {
96 self.event_rx.recv_timeout(timeout).ok()
97 }
98
99 pub fn send(&self, data: D) {
100 let _ = self.sender.send(data);
101 }
102}
103
104impl <T, E, D> Drop for JobTask<T, E, D>
105where
106 T: Send + 'static,
107 E: Send + 'static,
108 D: Send + 'static
109{
110 fn drop(&mut self) {
111 self.cancel(); }
113}
114
115#[cfg(test)]
116mod tests {
117 use std::thread;
118 use generator::{done, Gn};
119 use irgo::defer;
120 use serde_json::json;
121 use super::*;
122 #[test]
123 fn test_job_task() {
124 let params = json!({});
125 let mut job:JobTask<String,String,i32> = JobTask::new(params,|params,sender,receiver| {
126 println!("Hello, world!");
127 defer!(println!("Goodbye, world!"));
128
129 loop {
130 let n = receiver.try_recv();
131 if let Ok(n) = n {
132 println!("Received: {} Exit", n);
133 break;
134 }
135 sender.send(TaskEvent::Data("hi".to_string())).unwrap();
137 may::coroutine::yield_now();
138 may::coroutine::sleep(std::time::Duration::from_secs(1));
139 }
140 }
141 );
142
143 let cloned_job = job.clone();
144 thread::spawn(move||{
145 while let Some(event) = cloned_job.recv() {
146 match event {
147 TaskEvent::Data(v) => println!("{}", v),
148 TaskEvent::Done => println!("Task completed"),
149 TaskEvent::Cancelled => println!("Task cancelled"),
150 TaskEvent::Error(e) => println!("Error: {}", e),
151 TaskEvent::Panic(p) => println!("Panic: {}", p),
152 TaskEvent::Progress(p) => {
153 println!("Progress: {}", p.0);
154 }
155 }
156 }
157 });
158
159 std::thread::sleep(std::time::Duration::from_secs(5));
160 assert_eq!(job.is_cancelled.load(Ordering::Relaxed), false);
161 job.send(100);
162 std::thread::sleep(std::time::Duration::from_secs(1));
163 job.cancel();
164 assert_eq!(job.is_cancelled.load(Ordering::Relaxed), true);
165 println!("Job cancelled!");
166 std::thread::sleep(std::time::Duration::from_secs(3));
167 println!("Main thread finished.");
168 }
169
170 #[test]
171 fn test_generator() {
172 let g = Gn::new_scoped(|mut s| {
173 let (mut a, mut b) = (0, 1);
174 while b < 200 {
175 std::mem::swap(&mut a, &mut b);
176 b = a + b;
177 s.yield_(b);
178 }
179 done!();
180 });
181 for i in g {
182 println!("{}", i);
183 }
184 }
185}