rioc/
task.rs

1#![allow(dead_code)]
2
3use crossbeam::channel::{self, after, Receiver, Sender};
4use may::coroutine::{self, JoinHandle};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8use crossbeam::select;
9use serde_json::Value;
10
11#[derive(Debug)]
12pub enum TaskEvent<T, E> {
13    Data(T),         // 任务发送的数据项
14    Progress((u8,u32)),    // 任务进度更新
15    Done,            // 任务正常完成
16    Cancelled,       // 任务被取消
17    Error(E),        // 任务返回错误
18    Panic(String),   // 任务 panic
19}
20
21#[derive(Clone)]
22pub struct JobTask<T: Send + 'static, E: Send + 'static,D: Send + 'static>  {
23    is_cancelled: Arc<AtomicBool>,
24    handle: Option<Arc<JoinHandle<()>>>,
25    event_rx:  Receiver<TaskEvent<T, E>>,
26    _event_tx: Sender<TaskEvent<T, E>>, // 保持 channel 开启
27    sender: Sender<D>, // 用于向任务发送数据
28}
29
30
31impl<T: Send + 'static, E: Send + 'static, D: Send + 'static> JobTask<T, E, D>  {
32    pub fn new<F>(params: Value,task: F) -> Self  
33    where
34        F: FnOnce(Value,Sender<TaskEvent<T, E>>, Receiver<D>) + Send + 'static,
35    {
36        let is_cancelled = Arc::new(AtomicBool::new(false));
37        let (event_tx, event_rx) = channel::unbounded();
38        let (data_tx, data_rx) = channel::unbounded();
39
40
41        let flag = is_cancelled.clone();
42        let sender = event_tx.clone();
43
44        // 在协程中运行任务
45        let handle = unsafe { coroutine::spawn(move || {
46            // 检查是否已被取消
47            if flag.load(Ordering::Acquire) {
48                let _ = sender.send(TaskEvent::Cancelled);
49                return;
50            }
51
52            // 执行任务并捕获 panic
53            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
54                task(params,sender.clone(),data_rx.clone());
55            }));
56
57            match result {
58                Ok(_) => {
59                    // 任务正常完成
60                    let _ = sender.send(TaskEvent::Done);
61                }
62                Err(_) => {
63                    // 任务 panic
64                    let _ = sender.send(TaskEvent::Panic(format!("panic")));
65                }
66            }
67        }) };
68
69        JobTask {
70            is_cancelled: is_cancelled,
71            handle: Some(Arc::new(handle)),
72            event_rx: event_rx,
73            _event_tx: event_tx,
74            sender: data_tx,
75        }
76    }
77
78    // 中断任务
79    pub fn cancel(&mut self) {
80        self.is_cancelled.store(true, Ordering::Relaxed);
81        if let Some(handle) = self.handle.take() {
82            // 强制取消协程(如果标志位未被及时检查)
83            unsafe { handle.coroutine().cancel() };
84        }
85    }
86
87    pub fn try_recv(&self) -> Option<TaskEvent<T, E>> {
88        self.event_rx.try_recv().ok()
89    }
90
91    pub fn recv(&self) -> Option<TaskEvent<T, E>> {
92        self.event_rx.recv().ok()
93    }
94
95    pub fn recv_timeout(&self, timeout: Duration) -> Option<TaskEvent<T, E>> {
96        self.event_rx.recv_timeout(timeout).ok()
97    }
98
99    pub fn send(&self, data: D) {
100        let _ = self.sender.send(data);
101    }
102}
103
104impl <T, E, D>  Drop for JobTask<T, E, D>
105where
106    T: Send  + 'static,
107    E: Send  + 'static,
108    D: Send  + 'static
109{
110    fn drop(&mut self) {
111        self.cancel(); // 确保任务被清理
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use std::thread;
118    use generator::{done, Gn};
119    use irgo::defer;
120    use serde_json::json;
121    use super::*;
122    #[test]
123    fn test_job_task() {
124        let params = json!({});
125        let mut job:JobTask<String,String,i32> = JobTask::new(params,|params,sender,receiver| {
126            println!("Hello, world!");
127            defer!(println!("Goodbye, world!"));
128
129            loop {
130                let n = receiver.try_recv();
131                if let Ok(n) = n {
132                   println!("Received: {} Exit", n);
133                   break;
134                }
135                // 模拟长时间运行的任务
136                sender.send(TaskEvent::Data("hi".to_string())).unwrap();
137                may::coroutine::yield_now();
138                may::coroutine::sleep(std::time::Duration::from_secs(1));
139            }
140        }
141        );
142
143        let cloned_job = job.clone();
144        thread::spawn(move||{
145            while let Some(event) = cloned_job.recv() {
146                match event {
147                    TaskEvent::Data(v) => println!("{}", v),
148                    TaskEvent::Done => println!("Task completed"),
149                    TaskEvent::Cancelled => println!("Task cancelled"),
150                    TaskEvent::Error(e) => println!("Error: {}", e),
151                    TaskEvent::Panic(p) => println!("Panic: {}", p),
152                    TaskEvent::Progress(p) => {
153                        println!("Progress: {}", p.0);
154                    }
155                }
156            }
157        });
158
159        std::thread::sleep(std::time::Duration::from_secs(5)); 
160        assert_eq!(job.is_cancelled.load(Ordering::Relaxed), false);
161        job.send(100);
162        std::thread::sleep(std::time::Duration::from_secs(1));
163        job.cancel();
164        assert_eq!(job.is_cancelled.load(Ordering::Relaxed), true);
165        println!("Job cancelled!");
166        std::thread::sleep(std::time::Duration::from_secs(3)); 
167        println!("Main thread finished.");
168    }
169
170    #[test]
171    fn test_generator() {
172        let g = Gn::new_scoped(|mut s| {
173            let (mut a, mut b) = (0, 1);
174            while b < 200 {
175                std::mem::swap(&mut a, &mut b);
176                b = a + b;
177                s.yield_(b);
178            }
179            done!();
180        });
181        for i in g {
182            println!("{}", i);
183        }
184    }
185}