bios_basic/process/
task_processor.rs

1//! Async task processor
2//!
3//! 异步任务处理器
4#[cfg(feature = "with-mq")]
5use bios_sdk_invoke::clients::event_client::{
6    asteroid_mq::prelude::{EventAttribute, Subject, TopicCode},
7    get_topic,
8};
9use lazy_static::lazy_static;
10
11use std::{collections::HashMap, future::Future, sync::Arc};
12use tardis::{
13    basic::{dto::TardisContext, error::TardisError, result::TardisResult},
14    cache::cache_client::TardisCacheClient,
15    chrono::Local,
16    log,
17    serde_json::Value,
18    tokio::{sync::RwLock, task::JoinHandle},
19    TardisFuns,
20};
21
22lazy_static! {
23    static ref TASK_HANDLE: Arc<RwLock<HashMap<u64, JoinHandle<()>>>> = Arc::new(RwLock::new(HashMap::new()));
24}
25const TASK_PROCESSOR_DATA_EX_SEC: u64 = 60 * 60 * 24;
26const TASK_IN_CTX_FLAG: &str = "task_id";
27#[cfg(feature = "with-mq")]
28const TASK_TOPIC: TopicCode = TopicCode::const_new("task");
29/// Set task status event flag
30/// 设置任务状态事件标识
31pub const EVENT_SET_TASK_STATUS_FLAG: &str = "task/set_status";
32/// Set task process data event flag
33/// 设置任务处理数据事件标识
34pub const EVENT_SET_TASK_PROCESS_DATA_FLAG: &str = "task/set_process";
35/// Execute task event flag
36/// 执行任务事件标识
37pub const EVENT_EXECUTE_TASK_FLAG: &str = "task/execute";
38
39pub struct TaskProcessor;
40
41impl TaskProcessor {
42    /// Initialize the asynchronous task status
43    ///
44    /// 初始化异步任务状态
45    pub async fn init_status(cache_key: &str, task_id: Option<u64>, cache_client: &TardisCacheClient) -> TardisResult<u64> {
46        let task_id = task_id.unwrap_or(Local::now().timestamp_nanos_opt().expect("maybe in 23rd century") as u64);
47        Self::set_status(cache_key, task_id, false, cache_client).await?;
48        Ok(task_id)
49    }
50
51    /// Set the status of the asynchronous task (whether it is completed)
52    ///
53    /// 设置异步任务状态(是否完成)
54    pub async fn set_status(cache_key: &str, task_id: u64, status: bool, cache_client: &TardisCacheClient) -> TardisResult<()> {
55        if task_id <= u32::MAX as u64 {
56            cache_client.setbit(&format!("{cache_key}:1"), task_id as usize, status).await?;
57        } else if task_id > 18446744069414584319 {
58            // u32::MAX * u32::MAX + u32::MAX - 1
59            cache_client.setbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize, status).await?;
60        } else {
61            let _: String = cache_client
62                .script(
63                    r#"
64                       redis.call('SETBIT', KEYS[1]..':3', ARGV[1], ARGV[3])
65                       redis.call('SETBIT', KEYS[1]..':4', ARGV[2], ARGV[3])
66                       return 'OK'
67               "#,
68                )
69                .key(cache_key)
70                .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64, if status { 1 } else { 0 }])
71                .invoke()
72                .await?;
73        }
74        Ok(())
75    }
76
77    /// Check the status of the asynchronous task (whether it is completed)
78    ///
79    /// 检查异步任务状态(是否完成)
80    pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<bool> {
81        if task_id <= u32::MAX as u64 {
82            Ok(cache_client.getbit(&format!("{cache_key}:1"), task_id as usize).await?)
83        } else if task_id > 18446744069414584319 {
84            // u32::MAX * u32::MAX + u32::MAX - 1
85            Ok(cache_client.getbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize).await?)
86        } else {
87            let (r1, r2): (bool, bool) = cache_client
88                .script(r#"return {redis.call('GETBIT', KEYS[1]..':3', ARGV[1]),redis.call('GETBIT', KEYS[1]..':4', ARGV[2])}"#)
89                .key(cache_key)
90                .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64])
91                .invoke()
92                .await?;
93            Ok(r1 && r2)
94        }
95    }
96
97    /// Set the status of the asynchronous task (whether it is completed) and send an event
98    ///
99    /// 设置异步任务状态(是否完成)并发送事件
100    pub async fn set_status_with_event(
101        cache_key: &str,
102        task_id: u64,
103        status: bool,
104        cache_client: &TardisCacheClient,
105        _from_avatar: String,
106        _to_avatars: Option<Vec<String>>,
107    ) -> TardisResult<()> {
108        Self::set_status(cache_key, task_id, status, cache_client).await?;
109        #[cfg(feature = "with-mq")]
110        if let Some(_topic) = get_topic(&TASK_TOPIC) {
111            // todo: broadcast event to users
112            // topic
113            //     .send_event(
114            //         TaskSetStatusEventReq {
115            //             task_id,
116            //             data: status,
117            //             msg: format!("task status: {}", status),
118            //         }
119            //         .json(),
120            //     )
121            //     .await
122            //     .map_err(mq_error)?;
123        }
124        Ok(())
125    }
126
127    /// Set the processing data of the asynchronous task
128    ///
129    /// 设置异步任务处理数据
130    pub async fn set_process_data(cache_key: &str, task_id: u64, data: Value, cache_client: &TardisCacheClient) -> TardisResult<()> {
131        cache_client.set_ex(&format!("{cache_key}:{task_id}"), &TardisFuns::json.json_to_string(data)?, TASK_PROCESSOR_DATA_EX_SEC).await?;
132        Ok(())
133    }
134
135    /// Set the processing data of the asynchronous task and send an event
136    ///
137    /// 设置异步任务处理数据并发送事件
138    pub async fn set_process_data_with_event(
139        cache_key: &str,
140        task_id: u64,
141        data: Value,
142        cache_client: &TardisCacheClient,
143        _from_avatar: String,
144        _to_avatars: Option<Vec<String>>,
145    ) -> TardisResult<()> {
146        Self::set_process_data(cache_key, task_id, data.clone(), cache_client).await?;
147        #[cfg(feature = "with-mq")]
148        if let Some(_topic) = get_topic(&TASK_TOPIC) {
149            // todo: broadcast event to users
150        }
151        Ok(())
152    }
153
154    /// Fetch the processing data of the asynchronous task
155    ///
156    /// 获取异步任务处理数据
157    pub async fn get_process_data(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<Value> {
158        if let Some(result) = cache_client.get(&format!("{cache_key}:{task_id}")).await? {
159            Ok(TardisFuns::json.str_to_obj(&result)?)
160        } else {
161            Ok(Value::Null)
162        }
163    }
164
165    /// Execute asynchronous task
166    ///
167    /// 执行异步任务
168    pub async fn execute_task<P, T>(cache_key: &str, process_fun: P, cache_client: &Arc<TardisCacheClient>) -> TardisResult<u64>
169    where
170        P: FnOnce(u64) -> T + Send + Sync + 'static,
171        T: Future<Output = TardisResult<()>> + Send + 'static,
172    {
173        Self::do_execute_task_with_ctx(cache_key, process_fun, cache_client, "".to_string(), None, None).await
174    }
175
176    /// Execute asynchronous task and send event
177    ///
178    /// 执行异步任务并发送事件
179    pub async fn execute_task_with_ctx<P, T>(
180        cache_key: &str,
181        process_fun: P,
182        cache_client: &Arc<TardisCacheClient>,
183        from_avatar: String,
184        to_avatars: Option<Vec<String>>,
185        ctx: &TardisContext,
186    ) -> TardisResult<u64>
187    where
188        P: FnOnce(u64) -> T + Send + Sync + 'static,
189        T: Future<Output = TardisResult<()>> + Send + 'static,
190    {
191        Self::do_execute_task_with_ctx(cache_key, process_fun, cache_client, from_avatar, to_avatars, Some(ctx)).await
192    }
193
194    async fn do_execute_task_with_ctx<P, T>(
195        cache_key: &str,
196        process_fun: P,
197        cache_client: &Arc<TardisCacheClient>,
198        from_avatar: String,
199        to_avatars: Option<Vec<String>>,
200        ctx: Option<&TardisContext>,
201    ) -> TardisResult<u64>
202    where
203        P: FnOnce(u64) -> T + Send + Sync + 'static,
204        T: Future<Output = TardisResult<()>> + Send + 'static,
205    {
206        let cache_client_clone = cache_client.clone();
207        let task_id = TaskProcessor::init_status(cache_key, None, cache_client).await?;
208        let cache_key = cache_key.to_string();
209        let _from_avatar_clone = from_avatar.clone();
210        let _to_avatars_clone = to_avatars.clone();
211        let handle = tardis::tokio::spawn(async move {
212            let result = process_fun(task_id).await;
213            match result {
214                Ok(_) => match TaskProcessor::set_status_with_event(&cache_key, task_id, true, &cache_client_clone, from_avatar, to_avatars).await {
215                    Ok(_) => {}
216                    Err(e) => log::error!("Asynchronous task [{}] process error:{:?}", task_id, e),
217                },
218                Err(e) => {
219                    log::error!("Asynchronous task [{}] process error:{:?}", task_id, e);
220                }
221            }
222        });
223        TASK_HANDLE.write().await.insert(task_id, handle);
224        #[cfg(feature = "with-mq")]
225        if let Some(_topic) = get_topic(&TASK_TOPIC) {
226            // todo: broadcast event to users
227        }
228        if let Some(ctx) = ctx {
229            if let Some(exist_task_ids) = ctx.get_ext(TASK_IN_CTX_FLAG).await? {
230                ctx.add_ext(TASK_IN_CTX_FLAG, &format!("{exist_task_ids},{task_id}")).await?;
231            } else {
232                ctx.add_ext(TASK_IN_CTX_FLAG, &task_id.to_string()).await?;
233            }
234        }
235        Ok(task_id)
236    }
237
238    /// Execute asynchronous task (without asynchronous function, only used to mark the start of the task)
239    ///
240    /// 执行异步任务(不带异步函数,仅用于标记任务开始执行)
241    pub async fn execute_task_without_fun(
242        cache_key: &str,
243        task_id: u64,
244        cache_client: &Arc<TardisCacheClient>,
245        _from_avatar: String,
246        _to_avatars: Option<Vec<String>>,
247    ) -> TardisResult<u64> {
248        let task_id = TaskProcessor::init_status(cache_key, Some(task_id), cache_client).await?;
249        #[cfg(feature = "with-mq")]
250        if let Some(_topic) = get_topic(&TASK_TOPIC) {
251            // todo: broadcast event to users
252        }
253        Ok(task_id)
254    }
255
256    /// Stop asynchronous task
257    ///
258    /// 停止异步任务
259    pub async fn stop_task(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<()> {
260        Self::stop_task_with_event(cache_key, task_id, cache_client, "".to_string(), None).await
261    }
262
263    /// Stop asynchronous task and send event
264    ///
265    /// 停止异步任务并发送事件
266    pub async fn stop_task_with_event(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient, from_avatar: String, to_avatars: Option<Vec<String>>) -> TardisResult<()> {
267        if TaskProcessor::check_status(cache_key, task_id, cache_client).await? {
268            TASK_HANDLE.write().await.remove(&task_id);
269            return Ok(());
270        }
271        if TASK_HANDLE.read().await.contains_key(&task_id) {
272            match TASK_HANDLE.write().await.remove(&task_id) {
273                Some(handle) => {
274                    handle.abort();
275                }
276                None => return Err(TardisError::bad_request("task not found,may task is end", "400-task-stop-error")),
277            }
278        }
279        match TaskProcessor::set_status_with_event(cache_key, task_id, true, cache_client, from_avatar, to_avatars).await {
280            Ok(_) => {}
281            Err(e) => log::error!("Asynchronous task [{}] stop error:{:?}", task_id, e),
282        }
283        Ok(())
284    }
285
286    /// Fetch the asynchronous task id set in the context
287    ///
288    /// 获取异步任务id集合
289    ///
290    /// Use ``,`` to separate multiple tasks
291    ///
292    /// 多个任务使用``,``分隔
293    pub async fn get_task_id_with_ctx(ctx: &TardisContext) -> TardisResult<Option<String>> {
294        ctx.get_ext(TASK_IN_CTX_FLAG).await
295    }
296}
297#[cfg(feature = "with-mq")]
298#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
299struct TaskSetStatusEventReq {
300    pub task_id: u64,
301    pub data: bool,
302    pub msg: String,
303}
304#[cfg(feature = "with-mq")]
305#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
306struct TaskSetProcessDataEventReq {
307    pub task_id: u64,
308    pub data: Value,
309    pub msg: String,
310}
311#[cfg(feature = "with-mq")]
312#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
313struct TaskExecuteEventReq {
314    pub task_id: u64,
315    pub msg: String,
316}
317#[cfg(feature = "with-mq")]
318
319impl EventAttribute for TaskSetStatusEventReq {
320    const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_STATUS_FLAG);
321}
322#[cfg(feature = "with-mq")]
323
324impl EventAttribute for TaskSetProcessDataEventReq {
325    const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_PROCESS_DATA_FLAG);
326}
327#[cfg(feature = "with-mq")]
328
329impl EventAttribute for TaskExecuteEventReq {
330    const SUBJECT: Subject = Subject::const_new(EVENT_EXECUTE_TASK_FLAG);
331}