yq_async/
async_worker.rs

1use crate::async_job::{AsyncJob, AsyncJobFns};
2use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisResult};
3use std::sync::Arc;
4use yq::{
5    DequeueAction, DequeueSleep, DequeueStatus, FinishAction, Queue, SleepOnAction, YqError,
6    YqResult,
7};
8
9pub struct AsyncWorker<S> {
10    connection_manager: ConnectionManager,
11    queue: Queue,
12    dequeue_action: DequeueAction,
13    finish_action: FinishAction,
14    sleep_on_action: SleepOnAction,
15    async_job_fns: AsyncJobFns<S>,
16    state: S,
17}
18
19impl<S> AsyncWorker<S>
20where
21    S: Send + Sync + Clone + 'static,
22{
23    pub async fn new(redis_url: &str, queue: Queue, state: S) -> YqResult<Self> {
24        let client = Client::open(redis_url).map_err(YqError::CreateRedisClient)?;
25        let connection_manager = client
26            .get_tokio_connection_manager()
27            .await
28            .map_err(YqError::GetRedisConn)?;
29
30        Ok(Self {
31            connection_manager,
32            queue: queue.clone(),
33            dequeue_action: DequeueAction::new(queue.clone()),
34            finish_action: FinishAction::new(queue.clone()),
35            sleep_on_action: SleepOnAction::new(queue),
36            async_job_fns: AsyncJobFns::new(),
37            state,
38        })
39    }
40
41    pub fn reg_job<J: AsyncJob<State = S>>(mut self) -> YqResult<Self> {
42        let job_type = J::JOB_TYPE;
43
44        self.async_job_fns.reg_job(
45            job_type,
46            Arc::new(|mid, job_content, state| {
47                Box::pin(async move {
48                    let job_data: J =
49                        serde_json::from_str(&job_content).map_err(|err| err.to_string())?;
50                    job_data.execute_async(mid, state).await
51                })
52            }),
53        )?;
54        Ok(self)
55    }
56
57    async fn sleep(&mut self, dequeue_sleep: DequeueSleep) {
58        let r: RedisResult<Option<String>> = self
59            .sleep_on_action
60            .prepare_invoke(dequeue_sleep)
61            .query_async(&mut self.connection_manager)
62            .await;
63
64        if let Err(err) = r {
65            tracing::error!("worker sleep ERROR: {}", err.to_string());
66        }
67    }
68
69    pub async fn run(mut self) -> YqResult<()> {
70        loop {
71            let now = time::OffsetDateTime::now_utc();
72
73            let dequeue_status: RedisResult<DequeueStatus> = self
74                .dequeue_action
75                .prepare_invoke(now.unix_timestamp())
76                .invoke_async(&mut self.connection_manager)
77                .await;
78
79            let dequeue_status = match dequeue_status {
80                Ok(dequeue_status) => dequeue_status,
81                Err(err) => {
82                    tracing::error!("dequeue_job ERROR: {err:?}");
83                    self.sleep(DequeueSleep::default()).await;
84                    continue;
85                }
86            };
87
88            tracing::trace!("{:?}", &dequeue_status);
89
90            match dequeue_status {
91                DequeueStatus::Sleep(dequeue_sleep) => {
92                    self.sleep(dequeue_sleep).await;
93                }
94                DequeueStatus::Handle(dequeue_handle) => {
95                    match self
96                        .async_job_fns
97                        .handle(
98                            dequeue_handle.mid,
99                            dequeue_handle.mcontent,
100                            self.state.clone(),
101                        )
102                        .await
103                    {
104                        Ok(_) => {
105                            let r: RedisResult<i64> = self
106                                .finish_action
107                                .prepare_invoke(dequeue_handle.mid)
108                                .query_async(&mut self.connection_manager)
109                                .await;
110
111                            if let Err(err) = r {
112                                tracing::error!(
113                                    "error when finish_job: {} - {}, {:?}",
114                                    &self.queue.queue_name,
115                                    dequeue_handle.mid,
116                                    err
117                                );
118                            }
119                        }
120                        Err(err) => {
121                            if let Err(err) = self.fail_job(dequeue_handle.mid, err).await {
122                                tracing::error!("{:?}", err);
123                            }
124                        }
125                    }
126                }
127                DequeueStatus::Skip(_dequeue_skip) => {
128                    // skip and continue
129                }
130                DequeueStatus::Unknown(s) => panic!("{}", s),
131            }
132        }
133    }
134
135    async fn fail_job(&mut self, job_id: i64, err: YqError) -> YqResult<()> {
136        match err {
137            YqError::RunJobError(run_job_error) => {
138                self.connection_manager
139                    .hset(
140                        self.queue.err_messages_key.as_str(),
141                        job_id,
142                        run_job_error.job_data,
143                    )
144                    .await
145                    .map_err(YqError::FailJobError)?;
146                self.connection_manager
147                    .hset(self.queue.err_key.as_str(), job_id, run_job_error.error)
148                    .await
149                    .map_err(YqError::FailJobError)?;
150            }
151            other => {
152                self.connection_manager
153                    .hset(self.queue.err_key.as_str(), job_id, other.to_string())
154                    .await
155                    .map_err(YqError::FailJobError)?;
156            }
157        }
158        Ok(())
159    }
160}