yq_sync/
sync_worker.rs

1use crate::sync_job::{SyncJob, SyncJobFns};
2use redis::{Client, Commands, RedisResult};
3use std::sync::Arc;
4use yq::{
5    DequeueAction, DequeueSleep, DequeueStatus, FinishAction, Queue, SleepOnAction, YqError,
6    YqResult,
7};
8
9pub struct SyncWorker<S> {
10    client: Client,
11    queue: Queue,
12    dequeue_action: DequeueAction,
13    finish_action: FinishAction,
14    sleep_on_action: SleepOnAction,
15    sync_job_fns: SyncJobFns<S>,
16    state: S,
17}
18
19impl<S> SyncWorker<S>
20where
21    S: Send + Sync + Clone + 'static,
22{
23    pub fn new(redis_ur: &str, queue: Queue, state: S) -> YqResult<Self> {
24        let client = Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
25
26        Ok(Self {
27            client,
28            queue: queue.clone(),
29            dequeue_action: DequeueAction::new(queue.clone()),
30            finish_action: FinishAction::new(queue.clone()),
31            sleep_on_action: SleepOnAction::new(queue),
32            sync_job_fns: SyncJobFns::new(),
33            state,
34        })
35    }
36
37    pub fn reg_job<J: SyncJob<State = S>>(mut self) -> YqResult<Self> {
38        let job_type = J::JOB_TYPE;
39
40        self.sync_job_fns.reg_job(
41            job_type,
42            Arc::new(|mid, job_content, state| {
43                let job_data: J =
44                    serde_json::from_str(&job_content).map_err(|err| err.to_string())?;
45                job_data.execute(mid, state)
46            }),
47        )?;
48        Ok(self)
49    }
50
51    fn sleep(&mut self, dequeue_sleep: DequeueSleep) {
52        let r: RedisResult<Option<String>> = self
53            .sleep_on_action
54            .prepare_invoke(dequeue_sleep)
55            .query(&mut self.client);
56
57        if let Err(err) = r {
58            tracing::error!("worker sleep ERROR: {}", err.to_string());
59        }
60    }
61
62    pub fn run(mut self) -> YqResult<()> {
63        loop {
64            let now = time::OffsetDateTime::now_utc();
65
66            let dequeue_status: RedisResult<DequeueStatus> = self
67                .dequeue_action
68                .prepare_invoke(now.unix_timestamp())
69                .invoke(&mut self.client);
70
71            let dequeue_status = match dequeue_status {
72                Ok(dequeue_status) => dequeue_status,
73                Err(err) => {
74                    tracing::error!("dequeue_job ERROR: {err:?}");
75                    self.sleep(DequeueSleep::default());
76                    continue;
77                }
78            };
79            tracing::trace!("{:?}", &dequeue_status);
80
81            match dequeue_status {
82                DequeueStatus::Sleep(dequeue_sleep) => {
83                    self.sleep(dequeue_sleep);
84                }
85                DequeueStatus::Handle(dequeue_handle) => {
86                    match self.sync_job_fns.handle(
87                        dequeue_handle.mid,
88                        dequeue_handle.mcontent,
89                        self.state.clone(),
90                    ) {
91                        Ok(_) => {
92                            let r: RedisResult<i64> = self
93                                .finish_action
94                                .prepare_invoke(dequeue_handle.mid)
95                                .query(&mut self.client);
96
97                            if let Err(err) = r {
98                                tracing::error!(
99                                    "error when finish_job: {} - {}, {:?}",
100                                    &self.queue.queue_name,
101                                    dequeue_handle.mid,
102                                    err
103                                );
104                            }
105                        }
106                        Err(err) => {
107                            if let Err(err) = self.fail_job(dequeue_handle.mid, err) {
108                                tracing::error!("{:?}", err);
109                            }
110                        }
111                    }
112                }
113                DequeueStatus::Skip(_dequeue_skip) => {
114                    // skip and continue
115                }
116                DequeueStatus::Unknown(s) => panic!("{}", s),
117            }
118        }
119    }
120
121    fn fail_job(&mut self, job_id: i64, err: YqError) -> YqResult<()> {
122        match err {
123            YqError::RunJobError(run_job_error) => {
124                self.client
125                    .hset(
126                        self.queue.err_messages_key.as_str(),
127                        job_id,
128                        run_job_error.job_data,
129                    )
130                    .map_err(YqError::FailJobError)?;
131
132                self.client
133                    .hset(self.queue.err_key.as_str(), job_id, run_job_error.error)
134                    .map_err(YqError::FailJobError)?;
135            }
136            other => {
137                self.client
138                    .hset(self.queue.err_key.as_str(), job_id, other.to_string())
139                    .map_err(YqError::FailJobError)?;
140            }
141        }
142
143        Ok(())
144    }
145}