1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4use cyfs_base::*;
5use crate::{Locker, Task, TaskCategory, TaskFactory, TaskId, TaskManagerStore, TaskStatus, TaskStore, TaskType};
6
7struct TaskInfo {
8 pub task: Arc<Box<dyn Task>>,
9 pub complete_time: Option<u64>,
10 pub dec_list: Vec<DecInfo>,
11}
12
13impl TaskInfo {
14 pub fn new(task: Arc<Box<dyn Task>>, dec_list: Vec<DecInfo>) -> Self {
15 Self {
16 task,
17 complete_time: None,
18 dec_list
19 }
20 }
21}
22
23#[derive(Debug, Clone, RawEncode, RawDecode)]
24pub struct DecInfoV1 {
25 dec_id: ObjectId,
26 source: DeviceId,
27}
28
29#[derive(Debug, Clone, RawEncode, RawDecode)]
30pub enum DecInfo {
31 V1(DecInfoV1)
32}
33
34impl DecInfo {
35 pub fn new(dec_id: ObjectId, source: DeviceId) -> Self {
36 Self::V1(DecInfoV1 { dec_id, source })
37 }
38
39 pub fn dec_id(&self) -> &ObjectId {
40 match self {
41 Self::V1(info) => &info.dec_id
42 }
43 }
44
45 pub fn source(&self) -> &DeviceId {
46 match self {
47 Self::V1(info) => &info.source
48 }
49 }
50}
51
52pub struct TaskManager {
53 task_factory_map: Mutex<HashMap<TaskType, Arc<dyn TaskFactory>>>,
54 task_manager_store: Arc<dyn TaskManagerStore>,
55 task_store: Arc<dyn TaskStore>,
56 task_map: async_std::sync::Mutex<HashMap<TaskId, TaskInfo>>,
57}
58
59impl TaskManager {
60 pub async fn new(task_manager_store: Arc<dyn TaskManagerStore>, task_store: Arc<dyn TaskStore>) -> BuckyResult<Arc<Self>> {
61 let task_manager = Arc::new(Self {
62 task_factory_map: Mutex::new(Default::default()),
63 task_store,
64 task_manager_store,
65 task_map: async_std::sync::Mutex::new(Default::default())
66 });
67
68 let tmp_task_manager = Arc::downgrade(&task_manager);
70 async_std::task::spawn(async move {
71 loop {
72 match tmp_task_manager.upgrade() {
73 Some(task_manager) => {
74 if let Err(e) = task_manager.clear_task().await {
75 log::error!("task manager clear task err {}", e);
76 }
77 async_std::task::sleep(Duration::from_secs(600)).await;
78 },
79 None => {
80 break;
81 }
82 }
83 }
84 });
85 Ok(task_manager)
86 }
87
88 async fn clear_task(&self) -> BuckyResult<()> {
89 let mut task_map = self.task_map.lock().await;
90 let mut clear_task = Vec::new();
91 for (task_id, task_info) in task_map.iter_mut() {
92 let task_status = task_info.task.get_task_status().await;
93 if task_status == TaskStatus::Stopped || task_status == TaskStatus::Finished || task_status == TaskStatus::Failed {
94 if task_info.complete_time.is_none() {
95 task_info.complete_time = Some(bucky_time_now());
96 } else {
97 if bucky_time_now() - task_info.complete_time.unwrap() > 600000000 {
98 clear_task.push(task_id.clone());
99 }
100 }
101 } else {
102 task_info.complete_time = None;
103 }
104 }
105
106 for clear_id in clear_task.iter() {
107 task_map.remove(clear_id);
108 }
109
110 Ok(())
111 }
112
113 pub fn register_task_factory(&self, factory: impl TaskFactory) -> BuckyResult<()> {
114 let mut task_factory_map = self.task_factory_map.lock().unwrap();
115 let task_type = factory.get_task_type();
116 let ret = task_factory_map.insert(factory.get_task_type(), Arc::new(factory));
117 if ret.is_none() {
118 Ok(())
119 } else {
120 Err(BuckyError::new(BuckyErrorCode::AlreadyExists, format!("task factory {} has exist", task_type.into())))
121 }
122 }
123
124 fn get_task_factory(&self, task_type: &TaskType) -> Option<Arc<dyn TaskFactory>> {
125 let task_factory_map = self.task_factory_map.lock().unwrap();
126 match task_factory_map.get(task_type) {
127 Some(factory) => Some(factory.clone()),
128 None => None
129 }
130 }
131
132 pub async fn resume_task(&self) -> BuckyResult<()> {
133 let task_data_list = self.task_manager_store.get_tasks_by_status(TaskStatus::Running).await?;
134 for (task_id, task_type, params, data) in task_data_list {
135 match self.get_task_factory(&task_type) {
136 Some(factory) => {
137 let dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
138 let mut task = match factory.restore(TaskStatus::Stopped, params.as_slice(), data.as_slice()).await {
139 Ok(task) => task,
140 Err(e) => {
141 let msg = format!("restore task {} failed.{}", task_id.to_string(), e);
142 log::error!("{}", msg.as_str());
143 continue;
144 }
145 };
146 task.set_task_store(self.task_store.clone()).await;
147 task.start_task().await?;
148 let mut task_map = self.task_map.lock().await;
149 task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
150 },
151 None => {
152 continue;
153 }
154 }
155 }
156
157 Ok(())
158 }
159
160 pub async fn create_task<P: RawEncode>(&self, dec_id: ObjectId, source: DeviceId, task_type: TaskType, task_param: P) -> BuckyResult<TaskId> {
161 log::info!("create_task dec_id {} task_type {}", dec_id.to_string(), task_type.into());
162 match self.get_task_factory(&task_type) {
163 Some(factory) => {
164 let param = task_param.to_vec()?;
165 let mut task = factory.create(param.as_slice()).await.map_err(|e| {
166 let msg = format!("create task failed! dec={}, source={}, task_type={}, {}", dec_id, source, task_type, e);
167 error!("{}", msg);
168 BuckyError::new(e.code(), msg)
169 })?;
170
171 if task.get_task_type() != task_type {
172 let msg = format!("create task but task_type mismatch! dec={}, source={}, create task_type={}, got={}", dec_id, source, task_type, task.get_task_type());
173 error!("{}", msg);
174 return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg));
175 }
176
177 if task.need_persist() {
178 task.set_task_store(self.task_store.clone()).await;
179 }
180
181 let task_id = task.get_task_id();
182 let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
183 {
184 let ret = {
185 let mut task_map = self.task_map.lock().await;
186 if task_map.contains_key(&task_id) {
187 let task = task_map.get_mut(&task_id).unwrap();
188 if Self::add_dec(&mut task.dec_list, dec_id, source.clone()) && task.task.need_persist() {
189 self.task_manager_store.add_dec_info(&task_id,
190 task.task.get_task_category(),
191 task.task.get_task_status().await, task.dec_list.last().unwrap()).await?;
192 }
193 Some(task.task.get_task_id())
194 } else {
195 None
196 }
197 };
198 if let Some(task_id) = ret {
199 return Ok(task_id);
200 }
201 }
202
203 if task.need_persist() {
204 match self.task_manager_store.get_task(&task_id).await {
205 Ok((task_category, _task_type, task_status, task_param, task_data)) => {
206 let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
207 task.set_task_store(self.task_store.clone()).await;
208
209 let dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
210 let mut task_map = self.task_map.lock().await;
211 let mut task_info = TaskInfo::new(Arc::new(task), dec_list);
212 if Self::add_dec(&mut task_info.dec_list, dec_id, source) {
213 self.task_manager_store.add_dec_info(&task_id, task_category, task_info.task.get_task_status().await, task_info.dec_list.last().unwrap()).await?;
214 }
215 task_map.insert(task_id.clone(), task_info);
216
217 Ok(task_id)
218 },
219 Err(e) => {
220 if e.code() == BuckyErrorCode::NotFound {
221 let dec_list = vec![DecInfo::new(dec_id, source)];
222 let task_info = TaskInfo::new(Arc::new(task), dec_list.clone());
223 self.task_manager_store.add_task(&task_id,
224 task_info.task.get_task_category(),
225 task_info.task.get_task_type(),
226 task_info.task.get_task_status().await,
227 dec_list,
228 param).await?;
229 let mut task_map = self.task_map.lock().await;
230 task_map.insert(task_id.clone(), task_info);
231 Ok(task_id)
232 } else {
233 Err(e)
234 }
235 }
236 }
237 } else {
238 let dec_list = vec![DecInfo::new(dec_id, source)];
239 let task_info = TaskInfo::new(Arc::new(task), dec_list.clone());
240 let mut task_map = self.task_map.lock().await;
241 task_map.insert(task_id.clone(), task_info);
242 Ok(task_id)
243 }
244 },
245 None => {
246 let msg = format!("not support task type {}", task_type);
247 log::error!("{}", msg.as_str());
248 Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
249 }
250 }
251 }
252
253 pub async fn start_task(&self, task_id: &TaskId) -> BuckyResult<()> {
254 log::info!("start_task {}", task_id.to_string());
255 let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
256 let task = {
257 let task_map = self.task_map.lock().await;
258 match task_map.get(task_id) {
259 Some(task_info) => Some(task_info.task.clone()),
260 None => None,
261 }
262 };
263 match task {
264 Some(task) => {
265 task.start_task().await
266 }
267 None => {
268 let (_task_category, task_type, task_status, task_param, task_data) = self.task_manager_store.get_task(task_id).await?;
269 match self.get_task_factory(&task_type) {
270 Some(factory) => {
271 let dec_list = self.task_manager_store.get_dec_list(task_id).await?;
272 let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
273 task.set_task_store(self.task_store.clone()).await;
274 let task = {
275 let mut task_map = self.task_map.lock().await;
276 if !task_map.contains_key(task_id) {
277 task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
278 }
279 task_map.get(task_id).unwrap().task.clone()
280 };
281 task.start_task().await
282 }
283 None => {
284 let msg = format!("task not found: task={}", task_id.to_string());
285 log::error!("{}", msg);
286 Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
287 }
288 }
289 }
290 }
291 }
292
293 pub async fn check_and_waiting_stop(&self, task_id: &TaskId) {
294 log::info!("check_and_waiting_stop {}", task_id.to_string());
295 let task = {
296 let task_map = self.task_map.lock().await;
297 match task_map.get(task_id) {
298 Some(task_info) => Some(task_info.task.clone()),
299 None => None,
300 }
301 };
302 match task {
303 Some(task) => {
304 task.check_and_waiting_stop().await;
305 },
306 None => {}
307 }
308 }
309
310 pub async fn get_task_detail_status(&self, task_id: &TaskId) -> BuckyResult<Vec<u8>> {
311 log::debug!("will get_task_detail_status {}", task_id);
312 let task = {
313 let task_map = self.task_map.lock().await;
314 match task_map.get(task_id) {
315 Some(task_info) => Some(task_info.task.clone()),
316 None => None,
317 }
318 };
319 match task {
320 Some(task) => {
321 task.get_task_detail_status().await
322 },
323 None => {
324 let (_task_category, task_type, task_status, task_param, task_data) = self.task_manager_store.get_task(task_id).await?;
325 match self.get_task_factory(&task_type) {
326 Some(factory) => {
327 let dec_list = self.task_manager_store.get_dec_list(task_id).await?;
328 let mut task = factory.restore(task_status, task_param.as_slice(), task_data.as_slice()).await?;
329 task.set_task_store(self.task_store.clone()).await;
330 let task = {
331 let mut task_map = self.task_map.lock().await;
332 if !task_map.contains_key(task_id) {
333 task_map.insert(task_id.clone(), TaskInfo::new(Arc::new(task), dec_list));
334 }
335 task_map.get(task_id).unwrap().task.clone()
336 };
337 task.get_task_detail_status().await
338 }
339 None => {
340 let msg = format!("task not found! task={}", task_id);
341 log::error!("{}", msg);
342 Err(BuckyError::new(BuckyErrorCode::NotFound, msg))
343 }
344 }
345 }
346 }
347 }
348
349 pub async fn pause_task(&self, task_id: &TaskId) -> BuckyResult<()> {
350 log::info!("will pause_task {}", task_id);
351 let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
352 let task = {
353 let task_map = self.task_map.lock().await;
354 match task_map.get(task_id) {
355 Some(task_info) => task_info.task.clone(),
356 None => {
357 return Ok(());
358 }
359 }
360 };
361 task.pause_task().await
362 }
363
364 pub async fn stop_task(&self, task_id: &TaskId) -> BuckyResult<()> {
365 log::info!("will stop_task {}", task_id);
366 let _locker = Locker::get_locker(format!("task_manager_{}", task_id)).await;
367 let task = {
368 let mut task_map = self.task_map.lock().await;
369 task_map.remove(task_id)
370 };
371 match task {
372 Some(task) => {
373 task.task.stop_task().await
374 },
375 None => {
376 warn!("stop task but not found! task={}", task_id);
377 Ok(())
378 }
379 }
380 }
381
382 pub async fn remove_task(&self, dec_id: &ObjectId, source: &DeviceId, task_id: &TaskId) -> BuckyResult<()> {
383 log::info!("remove_task dec_id {} task_id {}", dec_id.to_string(), task_id.to_string());
384 let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
385
386 let mut task_map = self.task_map.lock().await;
387 match task_map.get_mut(task_id) {
388 None => {
389 let mut dec_list = self.task_manager_store.get_dec_list(&task_id).await?;
390 if Self::remove_dec(&mut dec_list, dec_id, source) {
391 self.task_manager_store.delete_dec_info(task_id, dec_id, source).await?;
392 }
393
394 if dec_list.len() == 0 {
395 self.task_manager_store.delete_task(task_id).await?;
396 task_map.remove(task_id);
397 }
398 }
399 Some(info) => {
400 if Self::remove_dec(&mut info.dec_list, dec_id, source) {
401 if info.task.need_persist() {
402 self.task_manager_store.delete_dec_info(task_id, dec_id, source).await?;
403 }
404 }
405 if info.dec_list.len() == 0 {
406 if info.task.need_persist() {
407 self.task_manager_store.delete_task(task_id).await?;
408 }
409 task_map.remove(task_id);
410 }
411 }
412 }
413
414 Ok(())
415 }
416
417 pub async fn remove_task_by_task_id(&self, task_id: &TaskId) -> BuckyResult<()> {
418 log::info!("remove_task task_id {}", task_id.to_string());
419 let _locker = Locker::get_locker(format!("task_manager_{}", task_id.to_string())).await;
420
421 let mut task_map = self.task_map.lock().await;
422 match task_map.get_mut(task_id) {
423 None => {
424 self.task_manager_store.delete_task(task_id).await?;
425 }
426 Some(task) => {
427 if task.task.need_persist() {
428 self.task_manager_store.delete_task(task_id).await?;
429 }
430 task_map.remove(task_id);
431 }
432 }
433
434 Ok(())
435 }
436
437 pub async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
438 self.task_manager_store.get_tasks_by_task_id(task_id_list).await
439 }
440
441 pub async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
442 self.task_manager_store.get_tasks_by_category(category).await
443 }
444
445 fn add_dec(dec_list: &mut Vec<DecInfo>, new_dec: ObjectId, source: DeviceId) -> bool {
446 let mut find = false;
447 for dec in dec_list.iter_mut() {
448 if dec.dec_id() == &new_dec && dec.source() == &source {
449 find = true;
450 break;
451 }
452 }
453
454 if !find {
455 dec_list.push(DecInfo::new(new_dec, source));
456 }
457
458 !find
459 }
460
461 fn exist_dec(dec_list: & Vec<DecInfo>, new_dec: &ObjectId, source: &DeviceId) -> bool {
462 for dec in dec_list.iter() {
463 if dec.dec_id() == new_dec && dec.source() == source {
464 return true;
465 }
466 }
467 false
468 }
469
470 fn remove_dec(dec_list: &mut Vec<DecInfo>, dest_dec: &ObjectId, source: &DeviceId) -> bool {
471 let mut find = false;
472 for (index, dec) in dec_list.iter().enumerate() {
473 if dec.dec_id() == dest_dec && dec.source() == source {
474 dec_list.remove(index);
475 find = true;
476 break;
477 }
478 }
479 find
480 }
481}
482
483pub mod test_task_manager {
484 use std::sync::Arc;
485 use cyfs_base::BuckyResult;
486 use crate::{SQLiteTaskStore, TaskManager};
487
488 pub async fn create_test_task_manager() -> BuckyResult<Arc<TaskManager>> {
489 let store = Arc::new(SQLiteTaskStore::new(":memory:").await?);
490 store.init().await?;
491 let task_manager = TaskManager::new(store.clone(), store).await?;
492 Ok(task_manager)
493 }
494}