redis_mq/
redis_client.rs

1use redis::{aio::ConnectionManager, RedisError};
2
3use crate::job::Job;
4use anyhow::{Context, Result};
5
6#[derive(Clone)]
7pub struct RedisClient {
8    connection: ConnectionManager,
9    queue_name: String,
10    unacked_queue_name: String,
11}
12
13impl RedisClient {
14    pub async fn new() -> Result<RedisClient> {
15        let client =
16            redis::Client::open("redis://127.0.0.1/").context("failed to connect to redis")?;
17        let manager = client
18            .get_tokio_connection_manager()
19            .await
20            .context("failed to connect to redis manager")?;
21
22        Ok(RedisClient {
23            connection: manager,
24            queue_name: "queue".to_owned(),
25            unacked_queue_name: "unacked_queue".to_owned(),
26        })
27    }
28
29    pub async fn rpop(&mut self, count: i32) -> Result<Vec<Job>> {
30        let resp: Result<Vec<String>, RedisError> = redis::cmd("RPOP")
31            .arg(&self.queue_name)
32            .arg(count)
33            .query_async(&mut self.connection)
34            .await;
35
36        match resp {
37            Ok(job_strs) => {
38                let result: Result<Vec<_>, _> = job_strs
39                    .into_iter()
40                    .map(|job_str| serde_json::from_str(&job_str).context("failed to decode job"))
41                    .collect();
42
43                result
44            }
45            Err(err) => Err(err.into()),
46        }
47    }
48
49    pub async fn lpush(&mut self, job: &Job) -> Result<()> {
50        let job_str =
51            serde_json::to_string(&job).with_context(|| format!("failed to serialize job json"))?;
52
53        Ok(redis::cmd("LPUSH")
54            .arg(&self.queue_name)
55            .arg(&job_str)
56            .query_async(&mut self.connection)
57            .await?)
58    }
59
60    pub async fn rpoplpush(&mut self) -> Result<Option<Job>> {
61        let resp: Result<Option<String>, RedisError> = redis::cmd("RPOPLPUSH")
62            .arg(&self.queue_name)
63            .arg(&self.unacked_queue_name)
64            .query_async(&mut self.connection)
65            .await;
66
67        match resp {
68            Ok(None) => Ok(None),
69            Ok(Some(job_str)) => serde_json::from_str(&job_str).context("failed to decode job"),
70            Err(err) => Err(err.into()),
71        }
72    }
73}