1use crate::sync_job::{SyncJob, SyncJobFns};
2use redis::{Client, Commands, RedisResult};
3use std::sync::Arc;
4use yq::{
5 DequeueAction, DequeueSleep, DequeueStatus, FinishAction, Queue, SleepOnAction, YqError,
6 YqResult,
7};
8
9pub struct SyncWorker<S> {
10 client: Client,
11 queue: Queue,
12 dequeue_action: DequeueAction,
13 finish_action: FinishAction,
14 sleep_on_action: SleepOnAction,
15 sync_job_fns: SyncJobFns<S>,
16 state: S,
17}
18
19impl<S> SyncWorker<S>
20where
21 S: Send + Sync + Clone + 'static,
22{
23 pub fn new(redis_ur: &str, queue: Queue, state: S) -> YqResult<Self> {
24 let client = Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
25
26 Ok(Self {
27 client,
28 queue: queue.clone(),
29 dequeue_action: DequeueAction::new(queue.clone()),
30 finish_action: FinishAction::new(queue.clone()),
31 sleep_on_action: SleepOnAction::new(queue),
32 sync_job_fns: SyncJobFns::new(),
33 state,
34 })
35 }
36
37 pub fn reg_job<J: SyncJob<State = S>>(mut self) -> YqResult<Self> {
38 let job_type = J::JOB_TYPE;
39
40 self.sync_job_fns.reg_job(
41 job_type,
42 Arc::new(|mid, job_content, state| {
43 let job_data: J =
44 serde_json::from_str(&job_content).map_err(|err| err.to_string())?;
45 job_data.execute(mid, state)
46 }),
47 )?;
48 Ok(self)
49 }
50
51 fn sleep(&mut self, dequeue_sleep: DequeueSleep) {
52 let r: RedisResult<Option<String>> = self
53 .sleep_on_action
54 .prepare_invoke(dequeue_sleep)
55 .query(&mut self.client);
56
57 if let Err(err) = r {
58 tracing::error!("worker sleep ERROR: {}", err.to_string());
59 }
60 }
61
62 pub fn run(mut self) -> YqResult<()> {
63 loop {
64 let now = time::OffsetDateTime::now_utc();
65
66 let dequeue_status: RedisResult<DequeueStatus> = self
67 .dequeue_action
68 .prepare_invoke(now.unix_timestamp())
69 .invoke(&mut self.client);
70
71 let dequeue_status = match dequeue_status {
72 Ok(dequeue_status) => dequeue_status,
73 Err(err) => {
74 tracing::error!("dequeue_job ERROR: {err:?}");
75 self.sleep(DequeueSleep::default());
76 continue;
77 }
78 };
79 tracing::trace!("{:?}", &dequeue_status);
80
81 match dequeue_status {
82 DequeueStatus::Sleep(dequeue_sleep) => {
83 self.sleep(dequeue_sleep);
84 }
85 DequeueStatus::Handle(dequeue_handle) => {
86 match self.sync_job_fns.handle(
87 dequeue_handle.mid,
88 dequeue_handle.mcontent,
89 self.state.clone(),
90 ) {
91 Ok(_) => {
92 let r: RedisResult<i64> = self
93 .finish_action
94 .prepare_invoke(dequeue_handle.mid)
95 .query(&mut self.client);
96
97 if let Err(err) = r {
98 tracing::error!(
99 "error when finish_job: {} - {}, {:?}",
100 &self.queue.queue_name,
101 dequeue_handle.mid,
102 err
103 );
104 }
105 }
106 Err(err) => {
107 if let Err(err) = self.fail_job(dequeue_handle.mid, err) {
108 tracing::error!("{:?}", err);
109 }
110 }
111 }
112 }
113 DequeueStatus::Skip(_dequeue_skip) => {
114 }
116 DequeueStatus::Unknown(s) => panic!("{}", s),
117 }
118 }
119 }
120
121 fn fail_job(&mut self, job_id: i64, err: YqError) -> YqResult<()> {
122 match err {
123 YqError::RunJobError(run_job_error) => {
124 self.client
125 .hset(
126 self.queue.err_messages_key.as_str(),
127 job_id,
128 run_job_error.job_data,
129 )
130 .map_err(YqError::FailJobError)?;
131
132 self.client
133 .hset(self.queue.err_key.as_str(), job_id, run_job_error.error)
134 .map_err(YqError::FailJobError)?;
135 }
136 other => {
137 self.client
138 .hset(self.queue.err_key.as_str(), job_id, other.to_string())
139 .map_err(YqError::FailJobError)?;
140 }
141 }
142
143 Ok(())
144 }
145}