1use crate::async_job::{AsyncJob, AsyncJobFns};
2use redis::{aio::ConnectionManager, AsyncCommands, Client, RedisResult};
3use std::sync::Arc;
4use yq::{
5 DequeueAction, DequeueSleep, DequeueStatus, FinishAction, Queue, SleepOnAction, YqError,
6 YqResult,
7};
8
9pub struct AsyncWorker<S> {
10 connection_manager: ConnectionManager,
11 queue: Queue,
12 dequeue_action: DequeueAction,
13 finish_action: FinishAction,
14 sleep_on_action: SleepOnAction,
15 async_job_fns: AsyncJobFns<S>,
16 state: S,
17}
18
19impl<S> AsyncWorker<S>
20where
21 S: Send + Sync + Clone + 'static,
22{
23 pub async fn new(redis_url: &str, queue: Queue, state: S) -> YqResult<Self> {
24 let client = Client::open(redis_url).map_err(YqError::CreateRedisClient)?;
25 let connection_manager = client
26 .get_tokio_connection_manager()
27 .await
28 .map_err(YqError::GetRedisConn)?;
29
30 Ok(Self {
31 connection_manager,
32 queue: queue.clone(),
33 dequeue_action: DequeueAction::new(queue.clone()),
34 finish_action: FinishAction::new(queue.clone()),
35 sleep_on_action: SleepOnAction::new(queue),
36 async_job_fns: AsyncJobFns::new(),
37 state,
38 })
39 }
40
41 pub fn reg_job<J: AsyncJob<State = S>>(mut self) -> YqResult<Self> {
42 let job_type = J::JOB_TYPE;
43
44 self.async_job_fns.reg_job(
45 job_type,
46 Arc::new(|mid, job_content, state| {
47 Box::pin(async move {
48 let job_data: J =
49 serde_json::from_str(&job_content).map_err(|err| err.to_string())?;
50 job_data.execute_async(mid, state).await
51 })
52 }),
53 )?;
54 Ok(self)
55 }
56
57 async fn sleep(&mut self, dequeue_sleep: DequeueSleep) {
58 let r: RedisResult<Option<String>> = self
59 .sleep_on_action
60 .prepare_invoke(dequeue_sleep)
61 .query_async(&mut self.connection_manager)
62 .await;
63
64 if let Err(err) = r {
65 tracing::error!("worker sleep ERROR: {}", err.to_string());
66 }
67 }
68
69 pub async fn run(mut self) -> YqResult<()> {
70 loop {
71 let now = time::OffsetDateTime::now_utc();
72
73 let dequeue_status: RedisResult<DequeueStatus> = self
74 .dequeue_action
75 .prepare_invoke(now.unix_timestamp())
76 .invoke_async(&mut self.connection_manager)
77 .await;
78
79 let dequeue_status = match dequeue_status {
80 Ok(dequeue_status) => dequeue_status,
81 Err(err) => {
82 tracing::error!("dequeue_job ERROR: {err:?}");
83 self.sleep(DequeueSleep::default()).await;
84 continue;
85 }
86 };
87
88 tracing::trace!("{:?}", &dequeue_status);
89
90 match dequeue_status {
91 DequeueStatus::Sleep(dequeue_sleep) => {
92 self.sleep(dequeue_sleep).await;
93 }
94 DequeueStatus::Handle(dequeue_handle) => {
95 match self
96 .async_job_fns
97 .handle(
98 dequeue_handle.mid,
99 dequeue_handle.mcontent,
100 self.state.clone(),
101 )
102 .await
103 {
104 Ok(_) => {
105 let r: RedisResult<i64> = self
106 .finish_action
107 .prepare_invoke(dequeue_handle.mid)
108 .query_async(&mut self.connection_manager)
109 .await;
110
111 if let Err(err) = r {
112 tracing::error!(
113 "error when finish_job: {} - {}, {:?}",
114 &self.queue.queue_name,
115 dequeue_handle.mid,
116 err
117 );
118 }
119 }
120 Err(err) => {
121 if let Err(err) = self.fail_job(dequeue_handle.mid, err).await {
122 tracing::error!("{:?}", err);
123 }
124 }
125 }
126 }
127 DequeueStatus::Skip(_dequeue_skip) => {
128 }
130 DequeueStatus::Unknown(s) => panic!("{}", s),
131 }
132 }
133 }
134
135 async fn fail_job(&mut self, job_id: i64, err: YqError) -> YqResult<()> {
136 match err {
137 YqError::RunJobError(run_job_error) => {
138 self.connection_manager
139 .hset(
140 self.queue.err_messages_key.as_str(),
141 job_id,
142 run_job_error.job_data,
143 )
144 .await
145 .map_err(YqError::FailJobError)?;
146 self.connection_manager
147 .hset(self.queue.err_key.as_str(), job_id, run_job_error.error)
148 .await
149 .map_err(YqError::FailJobError)?;
150 }
151 other => {
152 self.connection_manager
153 .hset(self.queue.err_key.as_str(), job_id, other.to_string())
154 .await
155 .map_err(YqError::FailJobError)?;
156 }
157 }
158 Ok(())
159 }
160}