yq_async/
async_client.rs

1use redis::aio::ConnectionManager;
2use yq::{
3    EnqueueAction, EnqueueAtAction, EnqueueAtStatus, EnqueueStatus, Job, Queue, YqError, YqResult,
4};
5
6#[derive(Clone)]
7pub struct AsyncClient {
8    connection_manager: ConnectionManager,
9    enqueue_action: EnqueueAction,
10    enqueue_at_action: EnqueueAtAction,
11}
12
13impl AsyncClient {
14    pub async fn new(redis_ur: &str, queue: Queue) -> YqResult<AsyncClient> {
15        let client = redis::Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
16        let connection_manager = client
17            .get_tokio_connection_manager()
18            .await
19            .map_err(YqError::GetRedisConn)?;
20
21        Ok(Self {
22            connection_manager,
23            enqueue_action: EnqueueAction::new(queue.clone()),
24            enqueue_at_action: EnqueueAtAction::new(queue),
25        })
26    }
27
28    pub async fn schedule<J: Job>(&self, job: &J) -> YqResult<i64> {
29        let mut redis_conn = self.connection_manager.clone();
30        let enqueue_status: EnqueueStatus = self
31            .enqueue_action
32            .prepare_invoke(job)?
33            .invoke_async(&mut redis_conn)
34            .await
35            .map_err(YqError::Enqueue)?;
36
37        match enqueue_status {
38            EnqueueStatus::Added(added) => Ok(added.mid),
39            EnqueueStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
40                redis::ErrorKind::ResponseError,
41                "enqueue error",
42                err,
43            )))),
44        }
45    }
46
47    pub async fn schedule_at<J: Job>(&self, job: &J, run_at: i64) -> YqResult<i64> {
48        let mut redis_conn = self.connection_manager.clone();
49        let enqueue_at_status: EnqueueAtStatus = self
50            .enqueue_at_action
51            .prepare_invoke(job, run_at)?
52            .invoke_async(&mut redis_conn)
53            .await
54            .map_err(YqError::EnqueueAt)?;
55
56        match enqueue_at_status {
57            EnqueueAtStatus::Added(mid) => Ok(mid),
58            EnqueueAtStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
59                redis::ErrorKind::ResponseError,
60                "enqueue error",
61                err,
62            )))),
63        }
64    }
65}