cyfs_task_manager/
task.rs

1use std::fmt::{Display, Formatter};
2use std::hash::{Hash, Hasher};
3use std::str::FromStr;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_std::task::JoinHandle;
7use base58::{FromBase58, ToBase58};
8use futures::future::{AbortHandle, Aborted};
9use generic_array::GenericArray;
10use generic_array::typenum::{U32};
11use cyfs_base::*;
12use crate::{AsyncCondvar, AsyncCondvarRef, TaskStore};
13
14pub const PUBLISH_TASK_CATEGORY: TaskCategory = TaskCategory(1);
15pub const DOWNLOAD_TASK_CATEGORY: TaskCategory = TaskCategory(2);
16pub const BUILD_FILE_TASK_CATEGORY: TaskCategory = TaskCategory(3);
17
18pub const PUBLISH_LOCAL_FILE_TASK: TaskType = TaskType(101);
19pub const PUBLISH_LOCAL_DIR_TASK: TaskType = TaskType(102);
20pub const DOWNLOAD_CHUNK_TASK: TaskType = TaskType(111);
21pub const DOWNLOAD_FILE_TASK: TaskType = TaskType(112);
22pub const BUILD_FILE_TASK: TaskType = TaskType(121);
23pub const BUILD_DIR_TASK: TaskType = TaskType(122);
24
25#[derive(Copy, Clone, Eq, PartialEq, Debug, RawEncode, RawDecode)]
26pub enum TaskStatus {
27    Stopped,
28    Paused,
29    Running,
30    Finished,
31    Failed,
32}
33
34impl TaskStatus {
35    pub fn into(self) -> i32 {
36        match self {
37            Self::Stopped => 0,
38            Self::Paused => 1,
39            Self::Running => 2,
40            Self::Finished => 3,
41            Self::Failed => 4,
42        }
43    }
44
45    pub fn try_from(value: i32) -> BuckyResult<Self> {
46        match value {
47            0 => Ok(Self::Stopped),
48            1 => Ok(Self::Paused),
49            2 => Ok(Self::Running),
50            3 => Ok(Self::Finished),
51            4 => Ok(Self::Failed),
52            _ => {
53                let msg = format!("unsupport task type {}", value);
54                log::error!("{}", msg.as_str());
55                Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
56            }
57        }
58    }
59}
60
61#[repr(transparent)]
62#[derive(Copy, Clone, Eq, PartialEq, Hash)]
63pub struct TaskType(pub u16);
64
65impl TaskType {
66    pub fn into(self) -> i32 {
67        self.0 as i32
68    }
69
70    pub fn try_from(value: i32) -> BuckyResult<Self> {
71        Ok(Self(value as u16))
72    }
73}
74
75impl Display for TaskType {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        write!(f, "{}", self.0)
78    }
79}
80
81#[repr(transparent)]
82#[derive(Copy, Clone, Eq, PartialEq, Hash)]
83pub struct TaskCategory(pub u16);
84
85impl TaskCategory {
86    pub fn into(self) -> i32 {
87        self.0 as i32
88    }
89    pub fn try_from(value: i32) -> BuckyResult<Self> {
90        Ok(Self(value as u16))
91    }
92}
93
94impl Display for TaskCategory {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        write!(f, "{}", self.0)
97    }
98}
99
100#[derive(Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Debug, Default)]
101pub struct TaskId(GenericArray<u8, U32>);
102
103impl From<&[u8]> for TaskId {
104    fn from(hash: &[u8]) -> Self {
105        Self(GenericArray::clone_from_slice(hash))
106    }
107}
108
109impl Display for TaskId {
110    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111        write!(f, "{}", self.0.as_slice().to_base58())
112    }
113}
114
115impl FromStr for TaskId {
116    type Err = BuckyError;
117
118    fn from_str(s: &str) -> Result<Self, Self::Err> {
119        let buf = s.from_base58().map_err(|_e| {
120            log::error!("convert base58 str to TaskId failed, str:{}", s);
121            let msg = format!("convert base58 str to object id failed, str={}", s);
122            BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
123        })?;
124
125        if buf.len() != 32 {
126            let msg = format!(
127                "convert base58 str to object id failed, len unmatch: str={}",
128                s
129            );
130            return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
131        }
132
133        let mut id = Self::default();
134        unsafe {
135            std::ptr::copy(buf.as_ptr(), id.0.as_mut_slice().as_mut_ptr(), buf.len());
136        }
137
138        Ok(id)
139    }
140}
141
142impl Hash for TaskId {
143    fn hash<H: Hasher>(&self, state: &mut H) {
144        state.write(self.0.as_slice());
145    }
146}
147
148impl TaskId {
149    pub fn as_slice(&self) -> &[u8] {
150        self.0.as_slice()
151    }
152}
153
154impl From<GenericArray<u8, U32>> for TaskId {
155    fn from(hash: GenericArray<u8, U32>) -> Self {
156        Self(hash)
157    }
158}
159
160#[async_trait::async_trait]
161pub trait Task: Send + Sync {
162    fn get_task_id(&self) -> TaskId;
163    fn get_task_type(&self) -> TaskType;
164    fn get_task_category(&self) -> TaskCategory;
165    fn need_persist(&self) -> bool {
166        true
167    }
168    async fn get_task_status(&self) -> TaskStatus;
169    async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>);
170    async fn start_task(&self) -> BuckyResult<()>;
171    async fn pause_task(&self) -> BuckyResult<()>;
172    async fn stop_task(&self) -> BuckyResult<()>;
173    async fn check_and_waiting_stop(&self) {
174        loop {
175            if TaskStatus::Running == self.get_task_status().await {
176                async_std::task::sleep(Duration::from_secs(1)).await;
177            } else {
178                break;
179            }
180        }
181    }
182    async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>>;
183}
184
185#[async_trait::async_trait]
186pub trait Runnable: Send + Sync {
187    fn get_task_id(&self) -> TaskId;
188    fn get_task_type(&self) -> TaskType;
189    fn get_task_category(&self) -> TaskCategory;
190    fn need_persist(&self) -> bool {
191        true
192    }
193    fn status_change(&self, _task_status: TaskStatus) {}
194    async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>);
195    async fn run(&self) -> BuckyResult<()>;
196    async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>>;
197}
198
199struct RunnableTaskData {
200    canceler: Option<AbortHandle>,
201    task_status: TaskStatus,
202    error: Option<BuckyError>,
203    task_store: Option<Arc<dyn TaskStore>>,
204    runnable_handle: Option<JoinHandle<()>>,
205}
206
207pub struct RunnableTask<R: Runnable> {
208    runnable: Arc<R>,
209    data: Arc<Mutex<RunnableTaskData>>,
210    waiting_list: Arc<Mutex<Vec<AsyncCondvarRef>>>,
211}
212
213impl<R: Runnable> RunnableTask<R> {
214    pub fn new(runnable: R) -> Self {
215        Self {
216            runnable: Arc::new(runnable),
217            data: Arc::new(Mutex::new(RunnableTaskData {
218                canceler: None,
219                task_status: TaskStatus::Stopped,
220                error: None,
221                task_store: None,
222                runnable_handle: None,
223            })),
224            waiting_list: Arc::new(Mutex::new(vec![]))
225        }
226    }
227
228    fn get_runnable(&self) -> &mut dyn Runnable {
229        unsafe {
230            let runnable = &mut *(self.runnable.as_ref() as *const dyn Runnable as *mut dyn Runnable);
231            runnable
232        }
233    }
234}
235
236#[async_trait::async_trait]
237impl<R: 'static + Runnable> Task for RunnableTask<R> {
238    fn get_task_id(&self) -> TaskId {
239        self.runnable.get_task_id()
240    }
241
242    fn get_task_type(&self) -> TaskType {
243        self.runnable.get_task_type()
244    }
245
246    fn get_task_category(&self) -> TaskCategory {
247        self.runnable.get_task_category()
248    }
249
250    fn need_persist(&self) -> bool {
251        self.runnable.need_persist()
252    }
253
254    async fn get_task_status(&self) -> TaskStatus {
255        let data = self.data.lock().unwrap();
256        data.task_status
257    }
258
259    async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
260        {
261            let mut data = self.data.lock().unwrap();
262            data.task_store = Some(task_store.clone());
263        }
264
265        let runnable = self.get_runnable();
266        runnable.set_task_store(task_store).await;
267    }
268
269    async fn start_task(&self) -> BuckyResult<()> {
270        let runnable = self.runnable.clone();
271        let task_id = self.runnable.get_task_id();
272
273        {
274            let tmp_data = self.data.lock().unwrap();
275            if tmp_data.task_status == TaskStatus::Running {
276                return Ok(());
277            }
278        }
279
280        let (ft, handle) = futures::future::abortable(async move {
281            runnable.run().await
282        });
283
284        {
285            let mut data = self.data.lock().unwrap();
286            data.canceler = Some(handle);
287        }
288
289        let runnable = self.runnable.clone();
290        let data = self.data.clone();
291        let task_store = {
292            let data = data.lock().unwrap();
293            data.task_store.clone()
294        };
295        let runnable_handle = async_std::task::spawn(async move {
296            let _: BuckyResult<()> = async move {
297                {
298                    let mut tmp_data = data.lock().unwrap();
299                    tmp_data.task_status = TaskStatus::Running;
300                    runnable.status_change(tmp_data.task_status);
301                }
302                if task_store.is_some() {
303                    task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Running).await?;
304                }
305                match ft.await {
306                    Ok(ret) => {
307                        match ret {
308                            Ok(_) => {
309                                {
310                                    let mut tmp_data = data.lock().unwrap();
311                                    tmp_data.task_status = TaskStatus::Finished;
312                                    tmp_data.canceler = None;
313                                    tmp_data.runnable_handle = None;
314                                    runnable.status_change(tmp_data.task_status);
315                                }
316                                if task_store.is_some() {
317                                    task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Finished).await?;
318                                }
319                            }
320                            Err(err) => {
321                                log::error!("task {} err {}", task_id.to_string(), err);
322                                {
323                                    let mut tmp_data = data.lock().unwrap();
324                                    tmp_data.task_status = TaskStatus::Failed;
325                                    tmp_data.error = Some(err);
326                                    tmp_data.canceler = None;
327                                    tmp_data.runnable_handle = None;
328                                    runnable.status_change(tmp_data.task_status);
329                                }
330                                if task_store.is_some() {
331                                    task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Failed).await?;
332                                }
333                            }
334                        }
335                    }
336                    Err(Aborted) => {
337                        {
338                            let msg = format!("runnable task been aborted! task={}", runnable.get_task_id());
339                            warn!("{}", msg);
340                            let err = BuckyError::new(BuckyErrorCode::UserCanceled, msg);
341
342                            let mut tmp_data = data.lock().unwrap();
343                            tmp_data.task_status = TaskStatus::Failed;
344                            tmp_data.error = Some(err);
345                            runnable.status_change(tmp_data.task_status);
346                        }
347                        if task_store.is_some() {
348                            task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Failed).await?;
349                        }
350                    }
351                }
352                Ok(())
353            }.await;
354        });
355        {
356            let mut data = self.data.lock().unwrap();
357            data.runnable_handle = Some(runnable_handle);
358        }
359        Ok(())
360    }
361
362    async fn pause_task(&self) -> BuckyResult<()> {
363        self.stop_task().await
364    }
365
366    async fn stop_task(&self) -> BuckyResult<()>
367    {
368        let (canceler, runnable_handle) = {
369            let mut data = self.data.lock().unwrap();
370            (data.canceler.take(), data.runnable_handle.take())
371        };
372        if canceler.is_some() {
373            canceler.unwrap().abort();
374            if runnable_handle.is_some() {
375                runnable_handle.unwrap().await;
376            }
377            Ok(())
378        } else {
379            let err = format!("task [{}] is not running!", self.runnable.get_task_id());
380            log::error!("{}", err);
381            Err(BuckyError::from((BuckyErrorCode::ErrorState, err)))
382        }
383    }
384
385    async fn check_and_waiting_stop(&self) {
386        let (runnable_handle, waiting) = {
387            let mut waiting_list = self.waiting_list.lock().unwrap();
388            let mut data = self.data.lock().unwrap();
389            let handle = data.runnable_handle.take();
390            if handle.is_some() {
391                (handle, None)
392            } else {
393                if data.task_status != TaskStatus::Running {
394                    return;
395                }
396                let waiting = AsyncCondvar::new();
397                waiting_list.push(waiting.clone());
398                (None, Some(waiting))
399            }
400        };
401
402        if runnable_handle.is_some() {
403            runnable_handle.unwrap().await;
404            let mut waiting_list = self.waiting_list.lock().unwrap();
405            for waiting in waiting_list.iter() {
406                waiting.notify();
407            }
408            waiting_list.clear();
409        } else {
410            waiting.unwrap().wait().await;
411        }
412    }
413
414    async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>> {
415        self.runnable.get_task_detail_status().await
416    }
417}
418
419#[async_trait::async_trait]
420pub trait TaskFactory: 'static + Send + Sync {
421    fn get_task_type(&self) -> TaskType;
422    async fn create(&self, params: &[u8]) -> BuckyResult<Box<dyn Task>>;
423    async fn restore(&self, task_status: TaskStatus, params: &[u8], data: &[u8]) -> BuckyResult<Box<dyn Task>>;
424}
425
426#[cfg(test)]
427mod test_task {
428    use std::sync::Arc;
429    use std::time::Duration;
430    use cyfs_base::BuckyResult;
431    use crate::{Runnable, RunnableTask, Task, TaskCategory, TaskId, TaskStatus, TaskStore, TaskType};
432
433    struct TestRunnable {
434
435    }
436
437    #[async_trait::async_trait]
438    impl Runnable for TestRunnable {
439        fn get_task_id(&self) -> TaskId {
440            TaskId::default()
441        }
442
443        fn get_task_type(&self) -> TaskType {
444            todo!()
445        }
446
447        fn get_task_category(&self) -> TaskCategory {
448            todo!()
449        }
450
451        async fn set_task_store(&mut self, _task_store: Arc<dyn TaskStore>) {
452            todo!()
453        }
454
455        async fn run(&self) -> BuckyResult<()> {
456            async_std::task::sleep(Duration::from_secs(10)).await;
457            Ok(())
458        }
459
460        async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>> {
461            todo!()
462        }
463    }
464    #[test]
465    fn test_runnable() {
466        async_std::task::block_on(async {
467            let task = RunnableTask::new(TestRunnable {});
468            task.start_task().await.unwrap();
469            async_std::task::sleep(Duration::from_secs(2)).await;
470            assert_eq!(task.get_task_status().await, TaskStatus::Running);
471            task.stop_task().await.unwrap();
472            assert_eq!(task.get_task_status().await, TaskStatus::Stopped);
473        });
474    }
475}