yq_sync/
sync_client.rs

1use redis::Client;
2use yq::{
3    EnqueueAction, EnqueueAtAction, EnqueueAtStatus, EnqueueStatus, Job, Queue, YqError, YqResult,
4};
5
6#[derive(Clone)]
7pub struct SyncClient {
8    client: Client,
9    enqueue_action: EnqueueAction,
10    enqueue_at_action: EnqueueAtAction,
11}
12
13impl SyncClient {
14    pub fn new(redis_ur: &str, queue: Queue) -> YqResult<SyncClient> {
15        let client = Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
16
17        Ok(Self {
18            client,
19            enqueue_action: EnqueueAction::new(queue.clone()),
20            enqueue_at_action: EnqueueAtAction::new(queue),
21        })
22    }
23
24    pub fn schedule<J: Job>(&self, job: &J) -> YqResult<i64> {
25        let mut redis_conn = self
26            .client
27            .get_connection()
28            .map_err(YqError::GetRedisConn)?;
29        let enqueue_status: EnqueueStatus = self
30            .enqueue_action
31            .prepare_invoke(job)?
32            .invoke(&mut redis_conn)
33            .map_err(YqError::Enqueue)?;
34
35        match enqueue_status {
36            EnqueueStatus::Added(added) => Ok(added.mid),
37            EnqueueStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
38                redis::ErrorKind::ResponseError,
39                "enqueue error",
40                err,
41            )))),
42        }
43    }
44
45    pub fn schedule_at<J: Job>(&self, job: &J, run_at: i64) -> YqResult<i64> {
46        let mut redis_conn = self
47            .client
48            .get_connection()
49            .map_err(YqError::GetRedisConn)?;
50
51        let enqueue_at_status: EnqueueAtStatus = self
52            .enqueue_at_action
53            .prepare_invoke(job, run_at)?
54            .invoke(&mut redis_conn)
55            .map_err(YqError::EnqueueAt)?;
56
57        match enqueue_at_status {
58            EnqueueAtStatus::Added(mid) => Ok(mid),
59            EnqueueAtStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
60                redis::ErrorKind::ResponseError,
61                "enqueue error",
62                err,
63            )))),
64        }
65    }
66}