1use redis::aio::ConnectionManager;
2use yq::{
3 EnqueueAction, EnqueueAtAction, EnqueueAtStatus, EnqueueStatus, Job, Queue, YqError, YqResult,
4};
5
6#[derive(Clone)]
7pub struct AsyncClient {
8 connection_manager: ConnectionManager,
9 enqueue_action: EnqueueAction,
10 enqueue_at_action: EnqueueAtAction,
11}
12
13impl AsyncClient {
14 pub async fn new(redis_ur: &str, queue: Queue) -> YqResult<AsyncClient> {
15 let client = redis::Client::open(redis_ur).map_err(YqError::CreateRedisClient)?;
16 let connection_manager = client
17 .get_tokio_connection_manager()
18 .await
19 .map_err(YqError::GetRedisConn)?;
20
21 Ok(Self {
22 connection_manager,
23 enqueue_action: EnqueueAction::new(queue.clone()),
24 enqueue_at_action: EnqueueAtAction::new(queue),
25 })
26 }
27
28 pub async fn schedule<J: Job>(&self, job: &J) -> YqResult<i64> {
29 let mut redis_conn = self.connection_manager.clone();
30 let enqueue_status: EnqueueStatus = self
31 .enqueue_action
32 .prepare_invoke(job)?
33 .invoke_async(&mut redis_conn)
34 .await
35 .map_err(YqError::Enqueue)?;
36
37 match enqueue_status {
38 EnqueueStatus::Added(added) => Ok(added.mid),
39 EnqueueStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
40 redis::ErrorKind::ResponseError,
41 "enqueue error",
42 err,
43 )))),
44 }
45 }
46
47 pub async fn schedule_at<J: Job>(&self, job: &J, run_at: i64) -> YqResult<i64> {
48 let mut redis_conn = self.connection_manager.clone();
49 let enqueue_at_status: EnqueueAtStatus = self
50 .enqueue_at_action
51 .prepare_invoke(job, run_at)?
52 .invoke_async(&mut redis_conn)
53 .await
54 .map_err(YqError::EnqueueAt)?;
55
56 match enqueue_at_status {
57 EnqueueAtStatus::Added(mid) => Ok(mid),
58 EnqueueAtStatus::Unknown(err) => Err(YqError::Enqueue(redis::RedisError::from((
59 redis::ErrorKind::ResponseError,
60 "enqueue error",
61 err,
62 )))),
63 }
64 }
65}