1use redis::Client;
2use yq::{
3 EnqueueAction, EnqueueAtAction, EnqueueAtStatus, EnqueueStatus, Job, Queue, YqError, YqResult,
4};
5
6#[derive(Clone)]
7pub struct SyncClient {
8 client: Client,
9 enqueue_action: EnqueueAction,
10 enqueue_at_action: EnqueueAtAction,
11}
12
13impl SyncClient {
14 pub fn new(redis_ur: &str, queue: Queue) -> YqResult<SyncClient> {
15 let client = Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
16
17 Ok(Self {
18 client,
19 enqueue_action: EnqueueAction::new(queue.clone()),
20 enqueue_at_action: EnqueueAtAction::new(queue),
21 })
22 }
23
24 pub fn schedule<J: Job>(&self, job: &J) -> YqResult<i64> {
25 let mut redis_conn = self
26 .client
27 .get_connection()
28 .map_err(YqError::GetRedisConn)?;
29 let enqueue_status: EnqueueStatus = self
30 .enqueue_action
31 .prepare_invoke(job)?
32 .invoke(&mut redis_conn)
33 .map_err(YqError::Enqueue)?;
34
35 match enqueue_status {
36 EnqueueStatus::Added(added) => Ok(added.mid),
37 EnqueueStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
38 redis::ErrorKind::ResponseError,
39 "enqueue error",
40 err,
41 )))),
42 }
43 }
44
45 pub fn schedule_at<J: Job>(&self, job: &J, run_at: i64) -> YqResult<i64> {
46 let mut redis_conn = self
47 .client
48 .get_connection()
49 .map_err(YqError::GetRedisConn)?;
50
51 let enqueue_at_status: EnqueueAtStatus = self
52 .enqueue_at_action
53 .prepare_invoke(job, run_at)?
54 .invoke(&mut redis_conn)
55 .map_err(YqError::EnqueueAt)?;
56
57 match enqueue_at_status {
58 EnqueueAtStatus::Added(mid) => Ok(mid),
59 EnqueueAtStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
60 redis::ErrorKind::ResponseError,
61 "enqueue error",
62 err,
63 )))),
64 }
65 }
66}