1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use redis::Client;
use yq::{
    EnqueueAction, EnqueueAtAction, EnqueueAtStatus, EnqueueStatus, Job, Queue, YqError, YqResult,
};

pub struct SyncClient {
    client: Client,
    enqueue_action: EnqueueAction,
    enqueue_at_action: EnqueueAtAction,
}

impl SyncClient {
    pub fn new(redis_ur: &str, queue: Queue) -> YqResult<SyncClient> {
        let client = Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;

        Ok(Self {
            client,
            enqueue_action: EnqueueAction::new(queue.clone()),
            enqueue_at_action: EnqueueAtAction::new(queue),
        })
    }

    pub fn schedule<J: Job>(&mut self, job: &J) -> YqResult<i64> {
        let enqueue_status: EnqueueStatus = self
            .enqueue_action
            .prepare_invoke(job)?
            .invoke(&mut self.client)
            .map_err(YqError::Enqueue)?;

        match enqueue_status {
            EnqueueStatus::Added(added) => Ok(added.mid),
            EnqueueStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
                redis::ErrorKind::ResponseError,
                "enqueue error",
                err,
            )))),
        }
    }

    pub fn schedule_at<J: Job>(&mut self, job: &J, run_at: i64) -> YqResult<i64> {
        let enqueue_at_status: EnqueueAtStatus = self
            .enqueue_at_action
            .prepare_invoke(job, run_at)?
            .invoke(&mut self.client)
            .map_err(YqError::EnqueueAt)?;

        match enqueue_at_status {
            EnqueueAtStatus::Added(mid) => Ok(mid),
            EnqueueAtStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
                redis::ErrorKind::ResponseError,
                "enqueue error",
                err,
            )))),
        }
    }
}