1#[cfg(feature = "with-mq")]
5use bios_sdk_invoke::clients::event_client::{
6 asteroid_mq::prelude::{EventAttribute, Subject, TopicCode},
7 get_topic,
8};
9use lazy_static::lazy_static;
10
11use std::{collections::HashMap, future::Future, sync::Arc};
12use tardis::{
13 basic::{dto::TardisContext, error::TardisError, result::TardisResult},
14 cache::cache_client::TardisCacheClient,
15 chrono::Local,
16 log,
17 serde_json::Value,
18 tokio::{sync::RwLock, task::JoinHandle},
19 TardisFuns,
20};
21
22lazy_static! {
23 static ref TASK_HANDLE: Arc<RwLock<HashMap<u64, JoinHandle<()>>>> = Arc::new(RwLock::new(HashMap::new()));
24}
25const TASK_PROCESSOR_DATA_EX_SEC: u64 = 60 * 60 * 24;
26const TASK_IN_CTX_FLAG: &str = "task_id";
27#[cfg(feature = "with-mq")]
28const TASK_TOPIC: TopicCode = TopicCode::const_new("task");
29pub const EVENT_SET_TASK_STATUS_FLAG: &str = "task/set_status";
32pub const EVENT_SET_TASK_PROCESS_DATA_FLAG: &str = "task/set_process";
35pub const EVENT_EXECUTE_TASK_FLAG: &str = "task/execute";
38
39pub struct TaskProcessor;
40
41impl TaskProcessor {
42 pub async fn init_status(cache_key: &str, task_id: Option<u64>, cache_client: &TardisCacheClient) -> TardisResult<u64> {
46 let task_id = task_id.unwrap_or(Local::now().timestamp_nanos_opt().expect("maybe in 23rd century") as u64);
47 Self::set_status(cache_key, task_id, false, cache_client).await?;
48 Ok(task_id)
49 }
50
51 pub async fn set_status(cache_key: &str, task_id: u64, status: bool, cache_client: &TardisCacheClient) -> TardisResult<()> {
55 if task_id <= u32::MAX as u64 {
56 cache_client.setbit(&format!("{cache_key}:1"), task_id as usize, status).await?;
57 } else if task_id > 18446744069414584319 {
58 cache_client.setbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize, status).await?;
60 } else {
61 let _: String = cache_client
62 .script(
63 r#"
64 redis.call('SETBIT', KEYS[1]..':3', ARGV[1], ARGV[3])
65 redis.call('SETBIT', KEYS[1]..':4', ARGV[2], ARGV[3])
66 return 'OK'
67 "#,
68 )
69 .key(cache_key)
70 .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64, if status { 1 } else { 0 }])
71 .invoke()
72 .await?;
73 }
74 Ok(())
75 }
76
77 pub async fn check_status(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<bool> {
81 if task_id <= u32::MAX as u64 {
82 Ok(cache_client.getbit(&format!("{cache_key}:1"), task_id as usize).await?)
83 } else if task_id > 18446744069414584319 {
84 Ok(cache_client.getbit(&format!("{cache_key}:2"), (u64::MAX - task_id) as usize).await?)
86 } else {
87 let (r1, r2): (bool, bool) = cache_client
88 .script(r#"return {redis.call('GETBIT', KEYS[1]..':3', ARGV[1]),redis.call('GETBIT', KEYS[1]..':4', ARGV[2])}"#)
89 .key(cache_key)
90 .arg(&[task_id / u32::MAX as u64, task_id % u32::MAX as u64])
91 .invoke()
92 .await?;
93 Ok(r1 && r2)
94 }
95 }
96
97 pub async fn set_status_with_event(
101 cache_key: &str,
102 task_id: u64,
103 status: bool,
104 cache_client: &TardisCacheClient,
105 _from_avatar: String,
106 _to_avatars: Option<Vec<String>>,
107 ) -> TardisResult<()> {
108 Self::set_status(cache_key, task_id, status, cache_client).await?;
109 #[cfg(feature = "with-mq")]
110 if let Some(_topic) = get_topic(&TASK_TOPIC) {
111 }
124 Ok(())
125 }
126
127 pub async fn set_process_data(cache_key: &str, task_id: u64, data: Value, cache_client: &TardisCacheClient) -> TardisResult<()> {
131 cache_client.set_ex(&format!("{cache_key}:{task_id}"), &TardisFuns::json.json_to_string(data)?, TASK_PROCESSOR_DATA_EX_SEC).await?;
132 Ok(())
133 }
134
135 pub async fn set_process_data_with_event(
139 cache_key: &str,
140 task_id: u64,
141 data: Value,
142 cache_client: &TardisCacheClient,
143 _from_avatar: String,
144 _to_avatars: Option<Vec<String>>,
145 ) -> TardisResult<()> {
146 Self::set_process_data(cache_key, task_id, data.clone(), cache_client).await?;
147 #[cfg(feature = "with-mq")]
148 if let Some(_topic) = get_topic(&TASK_TOPIC) {
149 }
151 Ok(())
152 }
153
154 pub async fn get_process_data(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<Value> {
158 if let Some(result) = cache_client.get(&format!("{cache_key}:{task_id}")).await? {
159 Ok(TardisFuns::json.str_to_obj(&result)?)
160 } else {
161 Ok(Value::Null)
162 }
163 }
164
165 pub async fn execute_task<P, T>(cache_key: &str, process_fun: P, cache_client: &Arc<TardisCacheClient>) -> TardisResult<u64>
169 where
170 P: FnOnce(u64) -> T + Send + Sync + 'static,
171 T: Future<Output = TardisResult<()>> + Send + 'static,
172 {
173 Self::do_execute_task_with_ctx(cache_key, process_fun, cache_client, "".to_string(), None, None).await
174 }
175
176 pub async fn execute_task_with_ctx<P, T>(
180 cache_key: &str,
181 process_fun: P,
182 cache_client: &Arc<TardisCacheClient>,
183 from_avatar: String,
184 to_avatars: Option<Vec<String>>,
185 ctx: &TardisContext,
186 ) -> TardisResult<u64>
187 where
188 P: FnOnce(u64) -> T + Send + Sync + 'static,
189 T: Future<Output = TardisResult<()>> + Send + 'static,
190 {
191 Self::do_execute_task_with_ctx(cache_key, process_fun, cache_client, from_avatar, to_avatars, Some(ctx)).await
192 }
193
194 async fn do_execute_task_with_ctx<P, T>(
195 cache_key: &str,
196 process_fun: P,
197 cache_client: &Arc<TardisCacheClient>,
198 from_avatar: String,
199 to_avatars: Option<Vec<String>>,
200 ctx: Option<&TardisContext>,
201 ) -> TardisResult<u64>
202 where
203 P: FnOnce(u64) -> T + Send + Sync + 'static,
204 T: Future<Output = TardisResult<()>> + Send + 'static,
205 {
206 let cache_client_clone = cache_client.clone();
207 let task_id = TaskProcessor::init_status(cache_key, None, cache_client).await?;
208 let cache_key = cache_key.to_string();
209 let _from_avatar_clone = from_avatar.clone();
210 let _to_avatars_clone = to_avatars.clone();
211 let handle = tardis::tokio::spawn(async move {
212 let result = process_fun(task_id).await;
213 match result {
214 Ok(_) => match TaskProcessor::set_status_with_event(&cache_key, task_id, true, &cache_client_clone, from_avatar, to_avatars).await {
215 Ok(_) => {}
216 Err(e) => log::error!("Asynchronous task [{}] process error:{:?}", task_id, e),
217 },
218 Err(e) => {
219 log::error!("Asynchronous task [{}] process error:{:?}", task_id, e);
220 }
221 }
222 });
223 TASK_HANDLE.write().await.insert(task_id, handle);
224 #[cfg(feature = "with-mq")]
225 if let Some(_topic) = get_topic(&TASK_TOPIC) {
226 }
228 if let Some(ctx) = ctx {
229 if let Some(exist_task_ids) = ctx.get_ext(TASK_IN_CTX_FLAG).await? {
230 ctx.add_ext(TASK_IN_CTX_FLAG, &format!("{exist_task_ids},{task_id}")).await?;
231 } else {
232 ctx.add_ext(TASK_IN_CTX_FLAG, &task_id.to_string()).await?;
233 }
234 }
235 Ok(task_id)
236 }
237
238 pub async fn execute_task_without_fun(
242 cache_key: &str,
243 task_id: u64,
244 cache_client: &Arc<TardisCacheClient>,
245 _from_avatar: String,
246 _to_avatars: Option<Vec<String>>,
247 ) -> TardisResult<u64> {
248 let task_id = TaskProcessor::init_status(cache_key, Some(task_id), cache_client).await?;
249 #[cfg(feature = "with-mq")]
250 if let Some(_topic) = get_topic(&TASK_TOPIC) {
251 }
253 Ok(task_id)
254 }
255
256 pub async fn stop_task(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient) -> TardisResult<()> {
260 Self::stop_task_with_event(cache_key, task_id, cache_client, "".to_string(), None).await
261 }
262
263 pub async fn stop_task_with_event(cache_key: &str, task_id: u64, cache_client: &TardisCacheClient, from_avatar: String, to_avatars: Option<Vec<String>>) -> TardisResult<()> {
267 if TaskProcessor::check_status(cache_key, task_id, cache_client).await? {
268 TASK_HANDLE.write().await.remove(&task_id);
269 return Ok(());
270 }
271 if TASK_HANDLE.read().await.contains_key(&task_id) {
272 match TASK_HANDLE.write().await.remove(&task_id) {
273 Some(handle) => {
274 handle.abort();
275 }
276 None => return Err(TardisError::bad_request("task not found,may task is end", "400-task-stop-error")),
277 }
278 }
279 match TaskProcessor::set_status_with_event(cache_key, task_id, true, cache_client, from_avatar, to_avatars).await {
280 Ok(_) => {}
281 Err(e) => log::error!("Asynchronous task [{}] stop error:{:?}", task_id, e),
282 }
283 Ok(())
284 }
285
286 pub async fn get_task_id_with_ctx(ctx: &TardisContext) -> TardisResult<Option<String>> {
294 ctx.get_ext(TASK_IN_CTX_FLAG).await
295 }
296}
297#[cfg(feature = "with-mq")]
298#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
299struct TaskSetStatusEventReq {
300 pub task_id: u64,
301 pub data: bool,
302 pub msg: String,
303}
304#[cfg(feature = "with-mq")]
305#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
306struct TaskSetProcessDataEventReq {
307 pub task_id: u64,
308 pub data: Value,
309 pub msg: String,
310}
311#[cfg(feature = "with-mq")]
312#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
313struct TaskExecuteEventReq {
314 pub task_id: u64,
315 pub msg: String,
316}
317#[cfg(feature = "with-mq")]
318
319impl EventAttribute for TaskSetStatusEventReq {
320 const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_STATUS_FLAG);
321}
322#[cfg(feature = "with-mq")]
323
324impl EventAttribute for TaskSetProcessDataEventReq {
325 const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_PROCESS_DATA_FLAG);
326}
327#[cfg(feature = "with-mq")]
328
329impl EventAttribute for TaskExecuteEventReq {
330 const SUBJECT: Subject = Subject::const_new(EVENT_EXECUTE_TASK_FLAG);
331}