cyfs_task_manager/
task_manager.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use cyfs_base::*;
5use crate::{Locker, Task, TaskCategory, TaskFactory, TaskId, TaskManagerStore, TaskStatus, TaskStore, TaskType};
6
7struct TaskInfo {
8    pub task: Arc<Box<dyn Task>>,
9    pub complete_time: Option<u64>,
10    pub dec_list: Vec<DecInfo>,
11}
12
13impl TaskInfo {
14    pub fn new(task: Arc<Box<dyn Task>>, dec_list: Vec<DecInfo>) -> Self {
15        Self {
16            task,
17            complete_time: None,
18            dec_list
19        }
20    }
21}
22
23#[derive(Debug, Clone, RawEncode, RawDecode)]
24pub struct DecInfoV1 {
25    dec_id: ObjectId,
26    source: DeviceId,
27}
28
29#[derive(Debug, Clone, RawEncode, RawDecode)]
30pub enum DecInfo {
31    V1(DecInfoV1)
32}
33
34impl DecInfo {
35    pub fn new(dec_id: ObjectId, source: DeviceId) -> Self {
36        Self::V1(DecInfoV1 { dec_id, source })
37    }
38
39    pub fn dec_id(&self) -> &ObjectId {
40        match self {
41            Self::V1(info) => &info.dec_id
42        }
43    }
44
45    pub fn source(&self) -> &DeviceId {
46        match self {
47            Self::V1(info) => &info.source
48        }
49    }
50}
51
52pub struct TaskManager {
53    task_factory_map: Mutex<HashMap<TaskType, Arc<dyn TaskFactory>>>,
54    task_manager_store: Arc<dyn TaskManagerStore>,
55    task_store: Arc<dyn TaskStore>,
56    task_map: async_std::sync::Mutex<HashMap<TaskId, TaskInfo>>,
57}
58
59impl TaskManager {
60    pub async fn new(task_manager_store: Arc<dyn TaskManagerStore>, task_store: Arc<dyn TaskStore>) -> BuckyResult<Arc<Self>> {
61        let task_manager = Arc::new(Self {
62            task_factory_map: Mutex::new(Default::default()),
63            task_store,
64            task_manager_store,
65            task_map: async_std::sync::Mutex::new(Default::default())
66        });
67
68        // task_manager.task_manager_store.clear_can_delete_task().await?;
69        let tmp_task_manager = Arc::downgrade(&task_manager);
70        async_std::task::spawn(async move {
71            loop {
72                match tmp_task_manager.upgrade() {
73                    Some(task_manager) => {
74                        if let Err(e) = task_manager.clear_task().await {
75                            log::error!("task manager clear task err {}", e);
76                        }
77                        async_std::task::sleep(Duration::from_secs(600)).await;
78                    },
79                    None => {
80                        break;
81                    }
82                }
83            }
84        });
85        Ok(task_manager)
86    }
87
88    async fn clear_task(&self) -> BuckyResult<()> {
89        let mut task_map = self.task_map.lock().await;
90        let mut clear_task = Vec::new();
91        for (task_id, task_info) in task_map.iter_mut() {
92            let task_status = task_info.task.get_task_status().await;
93            if task_status == TaskStatus::Stopped || task_status == TaskStatus::Finished || task_status == TaskStatus::Failed {
94                if task_info.complete_time.is_none() {
95                    task_info.complete_time = Some(bucky_time_now());
96                } else {
97                    if bucky_time_now() - task_info.complete_time.unwrap() > 600000000 {
98                        clear_task.push(task_id.clone());
99                    }
100                }
101            } else {
102                task_info.complete_time = None;
103            }
104        }
105
106        for clear_id in clear_task.iter() {
107            task_map.remove(clear_id);
108        }
109
110        Ok(())
111    }
112
113    pub fn register_task_factory(&self, factory: impl TaskFactory) -> BuckyResult<()> {
114        let mut task_factory_map = self.task_factory_map.lock().unwrap();
115        let task_type = factory.get_task_type();
116        let ret = task_factory_map.insert(factory.get_task_type(), Arc::new(factory));
117        if ret.is_none() {
118            Ok(())
119        } else {
120            Err(BuckyError::new(BuckyErrorCode::AlreadyExists, format!("task factory {} has exist", task_type.into())))
121        }
122    }
123
124    fn get_task_factory(&self, task_type: &TaskType) -> Option<Arc<dyn TaskFactory>> {
125        let task_factory_map = self.task_factory_map.lock().unwrap();
126        match task_factory_map.get(task_type) {
127            Some(factory) => Some(factory.clone()),
128            None => None
129        }
130    }
131
132    pub async fn resume_task(&self) -> BuckyResult<()> {
133        let task_data_list = self.task_manager_store.get_tasks_by_status(TaskStatus::Running).await?;
134        for (task_id, task_type, params, data) in task_data_list {
135            match self.get_task_factory(&task_type) {
136                Some(factory) => {
137                    let dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
138                    let mut task = match factory.restore(TaskStatus::Stopped, params.as_slice(), data.as_slice()).await {
139                        Ok(task) => task,
140                        Err(e) => {
141                            let msg = format!("restore task {} failed.{}", task_id.to_string(), e);
142                            log::error!("{}", msg.as_str());
143                            continue;
144                        }
145                    };
146                    task.set_task_store(self.task_store.clone()).await;
147                    task.start_task().await?;
148                    let mut task_map = self.task_map.lock().await;
149                    task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
150                },
151                None => {
152                    continue;
153                }
154            }
155        }
156
157        Ok(())
158    }
159
160    pub async fn create_task<P: RawEncode>(&self, dec_id: ObjectId, source: DeviceId, task_type: TaskType, task_param: P) -> BuckyResult<TaskId> {
161        log::info!("create_task dec_id {} task_type {}", dec_id.to_string(), task_type.into());
162        match self.get_task_factory(&task_type) {
163            Some(factory) => {
164                let param = task_param.to_vec()?;
165                let mut task = factory.create(param.as_slice()).await.map_err(|e| {
166                    let msg = format!("create task failed! dec={}, source={}, task_type={}, {}", dec_id, source, task_type, e);
167                    error!("{}", msg);
168                    BuckyError::new(e.code(), msg)
169                })?;
170
171                if task.get_task_type() != task_type {
172                    let msg = format!("create task but task_type mismatch! dec={}, source={}, create task_type={}, got={}", dec_id, source, task_type, task.get_task_type());
173                    error!("{}", msg);
174                    return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
175                }
176
177                if task.need_persist() {
178                    task.set_task_store(self.task_store.clone()).await;
179                }
180
181                let task_id = task.get_task_id();
182                let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
183                {
184                    let ret = {
185                        let mut task_map = self.task_map.lock().await;
186                        if task_map.contains_key(&task_id) {
187                            let task = task_map.get_mut(&task_id).unwrap();
188                            if Self::add_dec(&mut task.dec_list, dec_id, source.clone()) && task.task.need_persist() {
189                                self.task_manager_store.add_dec_info(&task_id,
190                                                                     task.task.get_task_category(),
191                                                                     task.task.get_task_status().await, task.dec_list.last().unwrap()).await?;
192                            }
193                            Some(task.task.get_task_id())
194                        } else {
195                            None
196                        }
197                    };
198                    if let Some(task_id) = ret {
199                        return Ok(task_id);
200                    }
201                }
202
203                if task.need_persist() {
204                    match self.task_manager_store.get_task(&task_id).await {
205                        Ok((task_category, _task_type, task_status, task_param, task_data)) => {
206                            let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
207                            task.set_task_store(self.task_store.clone()).await;
208
209                            let dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
210                            let mut task_map = self.task_map.lock().await;
211                            let mut task_info = TaskInfo::new(Arc::new(task), dec_list);
212                            if Self::add_dec(&mut task_info.dec_list, dec_id, source) {
213                                self.task_manager_store.add_dec_info(&task_id, task_category, task_info.task.get_task_status().await, task_info.dec_list.last().unwrap()).await?;
214                            }
215                            task_map.insert(task_id.clone(), task_info);
216
217                            Ok(task_id)
218                        },
219                        Err(e) => {
220                            if e.code() == BuckyErrorCode::NotFound {
221                                let dec_list = vec![DecInfo::new(dec_id, source)];
222                                let task_info = TaskInfo::new(Arc::new(task), dec_list.clone());
223                                self.task_manager_store.add_task(&task_id,
224                                                                 task_info.task.get_task_category(),
225                                                                 task_info.task.get_task_type(),
226                                                                 task_info.task.get_task_status().await,
227                                                                 dec_list,
228                                                                 param).await?;
229                                let mut task_map = self.task_map.lock().await;
230                                task_map.insert(task_id.clone(), task_info);
231                                Ok(task_id)
232                            } else {
233                                Err(e)
234                            }
235                        }
236                    }
237                } else {
238                    let dec_list = vec![DecInfo::new(dec_id, source)];
239                    let task_info = TaskInfo::new(Arc::new(task), dec_list.clone());
240                    let mut task_map = self.task_map.lock().await;
241                    task_map.insert(task_id.clone(), task_info);
242                    Ok(task_id)
243                }
244            },
245            None => {
246                let msg = format!("not support task type {}", task_type);
247                log::error!("{}", msg.as_str());
248                Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
249            }
250        }
251    }
252
253    pub async fn start_task(&self, task_id: &TaskId) -> BuckyResult<()> {
254        log::info!("start_task {}", task_id.to_string());
255        let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
256        let task = {
257            let task_map = self.task_map.lock().await;
258            match task_map.get(task_id) {
259                Some(task_info) => Some(task_info.task.clone()),
260                None => None,
261            }
262        };
263        match task {
264            Some(task) => {
265                task.start_task().await
266            }
267            None => {
268                let (_task_category, task_type, task_status, task_param, task_data) = self.task_manager_store.get_task(task_id).await?;
269                match self.get_task_factory(&task_type) {
270                    Some(factory) => {
271                        let dec_list = self.task_manager_store.get_dec_list(task_id).await?;
272                        let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
273                        task.set_task_store(self.task_store.clone()).await;
274                        let task = {
275                            let mut task_map = self.task_map.lock().await;
276                            if !task_map.contains_key(task_id) {
277                                task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
278                            }
279                            task_map.get(task_id).unwrap().task.clone()
280                        };
281                        task.start_task().await
282                    }
283                    None => {
284                        let msg = format!("task not found: task={}", task_id.to_string());
285                        log::error!("{}", msg);
286                        Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
287                    }
288                }
289            }
290        }
291    }
292
293    pub async fn check_and_waiting_stop(&self, task_id: &TaskId) {
294        log::info!("check_and_waiting_stop {}", task_id.to_string());
295        let task = {
296            let task_map = self.task_map.lock().await;
297            match task_map.get(task_id) {
298                Some(task_info) => Some(task_info.task.clone()),
299                None => None,
300            }
301        };
302        match task {
303            Some(task) => {
304                task.check_and_waiting_stop().await;
305            },
306            None => {}
307        }
308    }
309
310    pub async fn get_task_detail_status(&self, task_id: &TaskId) -> BuckyResult<Vec<u8>> {
311        log::debug!("will get_task_detail_status {}", task_id);
312        let task = {
313            let task_map = self.task_map.lock().await;
314            match task_map.get(task_id) {
315                Some(task_info) => Some(task_info.task.clone()),
316                None => None,
317            }
318        };
319        match task {
320            Some(task) => {
321                task.get_task_detail_status().await
322            },
323            None => {
324                let (_task_category, task_type, task_status, task_param, task_data) = self.task_manager_store.get_task(task_id).await?;
325                match self.get_task_factory(&task_type) {
326                    Some(factory) => {
327                        let dec_list = self.task_manager_store.get_dec_list(task_id).await?;
328                        let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
329                        task.set_task_store(self.task_store.clone()).await;
330                        let task = {
331                            let mut task_map = self.task_map.lock().await;
332                            if !task_map.contains_key(task_id) {
333                                task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
334                            }
335                            task_map.get(task_id).unwrap().task.clone()
336                        };
337                        task.get_task_detail_status().await
338                    }
339                    None => {
340                        let msg = format!("task not found! task={}", task_id);
341                        log::error!("{}", msg);
342                        Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
343                    }
344                }
345            }
346        }
347    }
348
349    pub async fn pause_task(&self, task_id: &TaskId) -> BuckyResult<()> {
350        log::info!("will pause_task {}", task_id);
351        let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
352        let task = {
353            let task_map = self.task_map.lock().await;
354            match task_map.get(task_id) {
355                Some(task_info) => task_info.task.clone(),
356                None => {
357                    return Ok(());
358                }
359            }
360        };
361        task.pause_task().await
362    }
363
364    pub async fn stop_task(&self, task_id: &TaskId) -> BuckyResult<()> {
365        log::info!("will stop_task {}", task_id);
366        let _locker = Locker::get_locker(format!("task_manager_{}", task_id)).await;
367        let task = {
368            let mut task_map = self.task_map.lock().await;
369            task_map.remove(task_id)
370        };
371        match task {
372            Some(task) => {
373                task.task.stop_task().await
374            },
375            None => {
376                warn!("stop task but not found! task={}", task_id);
377                Ok(())
378            }
379        }
380    }
381
382    pub async fn remove_task(&self, dec_id: &ObjectId, source: &DeviceId, task_id: &TaskId) -> BuckyResult<()> {
383        log::info!("remove_task dec_id {} task_id {}", dec_id.to_string(), task_id.to_string());
384        let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
385
386        let mut task_map = self.task_map.lock().await;
387        match task_map.get_mut(task_id) {
388            None => {
389                let mut dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
390                if Self::remove_dec(&mut dec_list, dec_id, source) {
391                    self.task_manager_store.delete_dec_info(task_id, dec_id, source).await?;
392                }
393
394                if dec_list.len() == 0 {
395                    self.task_manager_store.delete_task(task_id).await?;
396                    task_map.remove(task_id);
397                }
398            }
399            Some(info) => {
400                if Self::remove_dec(&mut info.dec_list, dec_id, source) {
401                    if info.task.need_persist() {
402                        self.task_manager_store.delete_dec_info(task_id, dec_id, source).await?;
403                    }
404                }
405                if info.dec_list.len() == 0 {
406                    if info.task.need_persist() {
407                        self.task_manager_store.delete_task(task_id).await?;
408                    }
409                    task_map.remove(task_id);
410                }
411            }
412        }
413
414        Ok(())
415    }
416
417    pub async fn remove_task_by_task_id(&self, task_id: &TaskId) -> BuckyResult<()> {
418        log::info!("remove_task task_id {}", task_id.to_string());
419        let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
420
421        let mut task_map = self.task_map.lock().await;
422        match task_map.get_mut(task_id) {
423            None => {
424                self.task_manager_store.delete_task(task_id).await?;
425            }
426            Some(task) => {
427                if task.task.need_persist() {
428                    self.task_manager_store.delete_task(task_id).await?;
429                }
430                task_map.remove(task_id);
431            }
432        }
433
434        Ok(())
435    }
436
437    pub async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
438        self.task_manager_store.get_tasks_by_task_id(task_id_list).await
439    }
440
441    pub async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
442        self.task_manager_store.get_tasks_by_category(category).await
443    }
444
445    fn add_dec(dec_list: &mut Vec<DecInfo>, new_dec: ObjectId, source: DeviceId) -> bool {
446        let mut find = false;
447        for dec in dec_list.iter_mut() {
448            if dec.dec_id() == &new_dec && dec.source() == &source {
449                find = true;
450                break;
451            }
452        }
453
454        if !find {
455            dec_list.push(DecInfo::new(new_dec, source));
456        }
457
458        !find
459    }
460
461    fn exist_dec(dec_list: & Vec<DecInfo>, new_dec: &ObjectId, source: &DeviceId) -> bool {
462        for dec in dec_list.iter() {
463            if dec.dec_id() == new_dec && dec.source() == source {
464                return true;
465            }
466        }
467        false
468    }
469
470    fn remove_dec(dec_list: &mut Vec<DecInfo>, dest_dec: &ObjectId, source: &DeviceId) -> bool {
471        let mut find = false;
472        for (index, dec) in dec_list.iter().enumerate() {
473            if dec.dec_id() == dest_dec && dec.source() ==  source {
474                dec_list.remove(index);
475                find = true;
476                break;
477            }
478        }
479        find
480    }
481}
482
483pub mod test_task_manager {
484    use std::sync::Arc;
485    use cyfs_base::BuckyResult;
486    use crate::{SQLiteTaskStore, TaskManager};
487
488    pub async fn create_test_task_manager() -> BuckyResult<Arc<TaskManager>> {
489        let store = Arc::new(SQLiteTaskStore::new(":memory:").await?);
490        store.init().await?;
491        let task_manager = TaskManager::new(store.clone(), store).await?;
492        Ok(task_manager)
493    }
494}