1use redis::{aio::ConnectionManager, RedisError};
2
3use crate::job::Job;
4use anyhow::{Context, Result};
5
6#[derive(Clone)]
7pub struct RedisClient {
8 connection: ConnectionManager,
9 queue_name: String,
10 unacked_queue_name: String,
11}
12
13impl RedisClient {
14 pub async fn new() -> Result<RedisClient> {
15 let client =
16 redis::Client::open("redis://127.0.0.1/").context("failed to connect to redis")?;
17 let manager = client
18 .get_tokio_connection_manager()
19 .await
20 .context("failed to connect to redis manager")?;
21
22 Ok(RedisClient {
23 connection: manager,
24 queue_name: "queue".to_owned(),
25 unacked_queue_name: "unacked_queue".to_owned(),
26 })
27 }
28
29 pub async fn rpop(&mut self, count: i32) -> Result<Vec<Job>> {
30 let resp: Result<Vec<String>, RedisError> = redis::cmd("RPOP")
31 .arg(&self.queue_name)
32 .arg(count)
33 .query_async(&mut self.connection)
34 .await;
35
36 match resp {
37 Ok(job_strs) => {
38 let result: Result<Vec<_>, _> = job_strs
39 .into_iter()
40 .map(|job_str| serde_json::from_str(&job_str).context("failed to decode job"))
41 .collect();
42
43 result
44 }
45 Err(err) => Err(err.into()),
46 }
47 }
48
49 pub async fn lpush(&mut self, job: &Job) -> Result<()> {
50 let job_str =
51 serde_json::to_string(&job).with_context(|| format!("failed to serialize job json"))?;
52
53 Ok(redis::cmd("LPUSH")
54 .arg(&self.queue_name)
55 .arg(&job_str)
56 .query_async(&mut self.connection)
57 .await?)
58 }
59
60 pub async fn rpoplpush(&mut self) -> Result<Option<Job>> {
61 let resp: Result<Option<String>, RedisError> = redis::cmd("RPOPLPUSH")
62 .arg(&self.queue_name)
63 .arg(&self.unacked_queue_name)
64 .query_async(&mut self.connection)
65 .await;
66
67 match resp {
68 Ok(None) => Ok(None),
69 Ok(Some(job_str)) => serde_json::from_str(&job_str).context("failed to decode job"),
70 Err(err) => Err(err.into()),
71 }
72 }
73}