1use std::fmt::{Display, Formatter};
2use std::hash::{Hash, Hasher};
3use std::str::FromStr;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_std::task::JoinHandle;
7use base58::{FromBase58, ToBase58};
8use futures::future::{AbortHandle, Aborted};
9use generic_array::GenericArray;
10use generic_array::typenum::{U32};
11use cyfs_base::*;
12use crate::{AsyncCondvar, AsyncCondvarRef, TaskStore};
13
14pub const PUBLISH_TASK_CATEGORY: TaskCategory = TaskCategory(1);
15pub const DOWNLOAD_TASK_CATEGORY: TaskCategory = TaskCategory(2);
16pub const BUILD_FILE_TASK_CATEGORY: TaskCategory = TaskCategory(3);
17
18pub const PUBLISH_LOCAL_FILE_TASK: TaskType = TaskType(101);
19pub const PUBLISH_LOCAL_DIR_TASK: TaskType = TaskType(102);
20pub const DOWNLOAD_CHUNK_TASK: TaskType = TaskType(111);
21pub const DOWNLOAD_FILE_TASK: TaskType = TaskType(112);
22pub const BUILD_FILE_TASK: TaskType = TaskType(121);
23pub const BUILD_DIR_TASK: TaskType = TaskType(122);
24
25#[derive(Copy, Clone, Eq, PartialEq, Debug, RawEncode, RawDecode)]
26pub enum TaskStatus {
27 Stopped,
28 Paused,
29 Running,
30 Finished,
31 Failed,
32}
33
34impl TaskStatus {
35 pub fn into(self) -> i32 {
36 match self {
37 Self::Stopped => 0,
38 Self::Paused => 1,
39 Self::Running => 2,
40 Self::Finished => 3,
41 Self::Failed => 4,
42 }
43 }
44
45 pub fn try_from(value: i32) -> BuckyResult<Self> {
46 match value {
47 0 => Ok(Self::Stopped),
48 1 => Ok(Self::Paused),
49 2 => Ok(Self::Running),
50 3 => Ok(Self::Finished),
51 4 => Ok(Self::Failed),
52 _ => {
53 let msg = format!("unsupport task type {}", value);
54 log::error!("{}", msg.as_str());
55 Err(BuckyError::new(BuckyErrorCode::NotSupport, msg))
56 }
57 }
58 }
59}
60
61#[repr(transparent)]
62#[derive(Copy, Clone, Eq, PartialEq, Hash)]
63pub struct TaskType(pub u16);
64
65impl TaskType {
66 pub fn into(self) -> i32 {
67 self.0 as i32
68 }
69
70 pub fn try_from(value: i32) -> BuckyResult<Self> {
71 Ok(Self(value as u16))
72 }
73}
74
75impl Display for TaskType {
76 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77 write!(f, "{}", self.0)
78 }
79}
80
81#[repr(transparent)]
82#[derive(Copy, Clone, Eq, PartialEq, Hash)]
83pub struct TaskCategory(pub u16);
84
85impl TaskCategory {
86 pub fn into(self) -> i32 {
87 self.0 as i32
88 }
89 pub fn try_from(value: i32) -> BuckyResult<Self> {
90 Ok(Self(value as u16))
91 }
92}
93
94impl Display for TaskCategory {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 write!(f, "{}", self.0)
97 }
98}
99
100#[derive(Copy, Clone, PartialOrd, PartialEq, Ord, Eq, Debug, Default)]
101pub struct TaskId(GenericArray<u8, U32>);
102
103impl From<&[u8]> for TaskId {
104 fn from(hash: &[u8]) -> Self {
105 Self(GenericArray::clone_from_slice(hash))
106 }
107}
108
109impl Display for TaskId {
110 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111 write!(f, "{}", self.0.as_slice().to_base58())
112 }
113}
114
115impl FromStr for TaskId {
116 type Err = BuckyError;
117
118 fn from_str(s: &str) -> Result<Self, Self::Err> {
119 let buf = s.from_base58().map_err(|_e| {
120 log::error!("convert base58 str to TaskId failed, str:{}", s);
121 let msg = format!("convert base58 str to object id failed, str={}", s);
122 BuckyError::new(BuckyErrorCode::InvalidFormat, msg)
123 })?;
124
125 if buf.len() != 32 {
126 let msg = format!(
127 "convert base58 str to object id failed, len unmatch: str={}",
128 s
129 );
130 return Err(BuckyError::new(BuckyErrorCode::InvalidFormat, msg));
131 }
132
133 let mut id = Self::default();
134 unsafe {
135 std::ptr::copy(buf.as_ptr(), id.0.as_mut_slice().as_mut_ptr(), buf.len());
136 }
137
138 Ok(id)
139 }
140}
141
142impl Hash for TaskId {
143 fn hash<H: Hasher>(&self, state: &mut H) {
144 state.write(self.0.as_slice());
145 }
146}
147
148impl TaskId {
149 pub fn as_slice(&self) -> &[u8] {
150 self.0.as_slice()
151 }
152}
153
154impl From<GenericArray<u8, U32>> for TaskId {
155 fn from(hash: GenericArray<u8, U32>) -> Self {
156 Self(hash)
157 }
158}
159
160#[async_trait::async_trait]
161pub trait Task: Send + Sync {
162 fn get_task_id(&self) -> TaskId;
163 fn get_task_type(&self) -> TaskType;
164 fn get_task_category(&self) -> TaskCategory;
165 fn need_persist(&self) -> bool {
166 true
167 }
168 async fn get_task_status(&self) -> TaskStatus;
169 async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>);
170 async fn start_task(&self) -> BuckyResult<()>;
171 async fn pause_task(&self) -> BuckyResult<()>;
172 async fn stop_task(&self) -> BuckyResult<()>;
173 async fn check_and_waiting_stop(&self) {
174 loop {
175 if TaskStatus::Running == self.get_task_status().await {
176 async_std::task::sleep(Duration::from_secs(1)).await;
177 } else {
178 break;
179 }
180 }
181 }
182 async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>>;
183}
184
185#[async_trait::async_trait]
186pub trait Runnable: Send + Sync {
187 fn get_task_id(&self) -> TaskId;
188 fn get_task_type(&self) -> TaskType;
189 fn get_task_category(&self) -> TaskCategory;
190 fn need_persist(&self) -> bool {
191 true
192 }
193 fn status_change(&self, _task_status: TaskStatus) {}
194 async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>);
195 async fn run(&self) -> BuckyResult<()>;
196 async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>>;
197}
198
199struct RunnableTaskData {
200 canceler: Option<AbortHandle>,
201 task_status: TaskStatus,
202 error: Option<BuckyError>,
203 task_store: Option<Arc<dyn TaskStore>>,
204 runnable_handle: Option<JoinHandle<()>>,
205}
206
207pub struct RunnableTask<R: Runnable> {
208 runnable: Arc<R>,
209 data: Arc<Mutex<RunnableTaskData>>,
210 waiting_list: Arc<Mutex<Vec<AsyncCondvarRef>>>,
211}
212
213impl<R: Runnable> RunnableTask<R> {
214 pub fn new(runnable: R) -> Self {
215 Self {
216 runnable: Arc::new(runnable),
217 data: Arc::new(Mutex::new(RunnableTaskData {
218 canceler: None,
219 task_status: TaskStatus::Stopped,
220 error: None,
221 task_store: None,
222 runnable_handle: None,
223 })),
224 waiting_list: Arc::new(Mutex::new(vec![]))
225 }
226 }
227
228 fn get_runnable(&self) -> &mut dyn Runnable {
229 unsafe {
230 let runnable = &mut *(self.runnable.as_ref() as *const dyn Runnable as *mut dyn Runnable);
231 runnable
232 }
233 }
234}
235
236#[async_trait::async_trait]
237impl<R: 'static + Runnable> Task for RunnableTask<R> {
238 fn get_task_id(&self) -> TaskId {
239 self.runnable.get_task_id()
240 }
241
242 fn get_task_type(&self) -> TaskType {
243 self.runnable.get_task_type()
244 }
245
246 fn get_task_category(&self) -> TaskCategory {
247 self.runnable.get_task_category()
248 }
249
250 fn need_persist(&self) -> bool {
251 self.runnable.need_persist()
252 }
253
254 async fn get_task_status(&self) -> TaskStatus {
255 let data = self.data.lock().unwrap();
256 data.task_status
257 }
258
259 async fn set_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
260 {
261 let mut data = self.data.lock().unwrap();
262 data.task_store = Some(task_store.clone());
263 }
264
265 let runnable = self.get_runnable();
266 runnable.set_task_store(task_store).await;
267 }
268
269 async fn start_task(&self) -> BuckyResult<()> {
270 let runnable = self.runnable.clone();
271 let task_id = self.runnable.get_task_id();
272
273 {
274 let tmp_data = self.data.lock().unwrap();
275 if tmp_data.task_status == TaskStatus::Running {
276 return Ok(());
277 }
278 }
279
280 let (ft, handle) = futures::future::abortable(async move {
281 runnable.run().await
282 });
283
284 {
285 let mut data = self.data.lock().unwrap();
286 data.canceler = Some(handle);
287 }
288
289 let runnable = self.runnable.clone();
290 let data = self.data.clone();
291 let task_store = {
292 let data = data.lock().unwrap();
293 data.task_store.clone()
294 };
295 let runnable_handle = async_std::task::spawn(async move {
296 let _: BuckyResult<()> = async move {
297 {
298 let mut tmp_data = data.lock().unwrap();
299 tmp_data.task_status = TaskStatus::Running;
300 runnable.status_change(tmp_data.task_status);
301 }
302 if task_store.is_some() {
303 task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Running).await?;
304 }
305 match ft.await {
306 Ok(ret) => {
307 match ret {
308 Ok(_) => {
309 {
310 let mut tmp_data = data.lock().unwrap();
311 tmp_data.task_status = TaskStatus::Finished;
312 tmp_data.canceler = None;
313 tmp_data.runnable_handle = None;
314 runnable.status_change(tmp_data.task_status);
315 }
316 if task_store.is_some() {
317 task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Finished).await?;
318 }
319 }
320 Err(err) => {
321 log::error!("task {} err {}", task_id.to_string(), err);
322 {
323 let mut tmp_data = data.lock().unwrap();
324 tmp_data.task_status = TaskStatus::Failed;
325 tmp_data.error = Some(err);
326 tmp_data.canceler = None;
327 tmp_data.runnable_handle = None;
328 runnable.status_change(tmp_data.task_status);
329 }
330 if task_store.is_some() {
331 task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Failed).await?;
332 }
333 }
334 }
335 }
336 Err(Aborted) => {
337 {
338 let msg = format!("runnable task been aborted! task={}", runnable.get_task_id());
339 warn!("{}", msg);
340 let err = BuckyError::new(BuckyErrorCode::UserCanceled, msg);
341
342 let mut tmp_data = data.lock().unwrap();
343 tmp_data.task_status = TaskStatus::Failed;
344 tmp_data.error = Some(err);
345 runnable.status_change(tmp_data.task_status);
346 }
347 if task_store.is_some() {
348 task_store.as_ref().unwrap().save_task_status(&task_id, TaskStatus::Failed).await?;
349 }
350 }
351 }
352 Ok(())
353 }.await;
354 });
355 {
356 let mut data = self.data.lock().unwrap();
357 data.runnable_handle = Some(runnable_handle);
358 }
359 Ok(())
360 }
361
362 async fn pause_task(&self) -> BuckyResult<()> {
363 self.stop_task().await
364 }
365
366 async fn stop_task(&self) -> BuckyResult<()>
367 {
368 let (canceler, runnable_handle) = {
369 let mut data = self.data.lock().unwrap();
370 (data.canceler.take(), data.runnable_handle.take())
371 };
372 if canceler.is_some() {
373 canceler.unwrap().abort();
374 if runnable_handle.is_some() {
375 runnable_handle.unwrap().await;
376 }
377 Ok(())
378 } else {
379 let err = format!("task [{}] is not running!", self.runnable.get_task_id());
380 log::error!("{}", err);
381 Err(BuckyError::from((BuckyErrorCode::ErrorState, err)))
382 }
383 }
384
385 async fn check_and_waiting_stop(&self) {
386 let (runnable_handle, waiting) = {
387 let mut waiting_list = self.waiting_list.lock().unwrap();
388 let mut data = self.data.lock().unwrap();
389 let handle = data.runnable_handle.take();
390 if handle.is_some() {
391 (handle, None)
392 } else {
393 if data.task_status != TaskStatus::Running {
394 return;
395 }
396 let waiting = AsyncCondvar::new();
397 waiting_list.push(waiting.clone());
398 (None, Some(waiting))
399 }
400 };
401
402 if runnable_handle.is_some() {
403 runnable_handle.unwrap().await;
404 let mut waiting_list = self.waiting_list.lock().unwrap();
405 for waiting in waiting_list.iter() {
406 waiting.notify();
407 }
408 waiting_list.clear();
409 } else {
410 waiting.unwrap().wait().await;
411 }
412 }
413
414 async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>> {
415 self.runnable.get_task_detail_status().await
416 }
417}
418
419#[async_trait::async_trait]
420pub trait TaskFactory: 'static + Send + Sync {
421 fn get_task_type(&self) -> TaskType;
422 async fn create(&self, params: &[u8]) -> BuckyResult<Box<dyn Task>>;
423 async fn restore(&self, task_status: TaskStatus, params: &[u8], data: &[u8]) -> BuckyResult<Box<dyn Task>>;
424}
425
426#[cfg(test)]
427mod test_task {
428 use std::sync::Arc;
429 use std::time::Duration;
430 use cyfs_base::BuckyResult;
431 use crate::{Runnable, RunnableTask, Task, TaskCategory, TaskId, TaskStatus, TaskStore, TaskType};
432
433 struct TestRunnable {
434
435 }
436
437 #[async_trait::async_trait]
438 impl Runnable for TestRunnable {
439 fn get_task_id(&self) -> TaskId {
440 TaskId::default()
441 }
442
443 fn get_task_type(&self) -> TaskType {
444 todo!()
445 }
446
447 fn get_task_category(&self) -> TaskCategory {
448 todo!()
449 }
450
451 async fn set_task_store(&mut self, _task_store: Arc<dyn TaskStore>) {
452 todo!()
453 }
454
455 async fn run(&self) -> BuckyResult<()> {
456 async_std::task::sleep(Duration::from_secs(10)).await;
457 Ok(())
458 }
459
460 async fn get_task_detail_status(&self) -> BuckyResult<Vec<u8>> {
461 todo!()
462 }
463 }
464 #[test]
465 fn test_runnable() {
466 async_std::task::block_on(async {
467 let task = RunnableTask::new(TestRunnable {});
468 task.start_task().await.unwrap();
469 async_std::task::sleep(Duration::from_secs(2)).await;
470 assert_eq!(task.get_task_status().await, TaskStatus::Running);
471 task.stop_task().await.unwrap();
472 assert_eq!(task.get_task_status().await, TaskStatus::Stopped);
473 });
474 }
475}