steady/backends/
redis.rs

1use std::num::NonZeroUsize;
2
3use redis::{aio::ConnectionLike, AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs};
4use tracing::warn;
5
6use crate::{jobs::JobDefinition, QueueName, Result};
7
8#[derive(Debug, Clone)]
9pub struct Backend {
10    redis_client: redis::Client,
11}
12
13impl FromRedisValue for JobDefinition {
14    fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
15        let bytes = <Vec<u8> as FromRedisValue>::from_redis_value(v)?;
16        let def = bincode::deserialize::<Self>(&bytes)
17            // todo better error message
18            .map_err(|_e| (redis::ErrorKind::TypeError, "bincode failed"))?;
19        Ok(def)
20    }
21}
22
23impl Backend {
24    pub fn new(redis_url: &str) -> Result<Self> {
25        let redis_client = redis::Client::open(redis_url)?;
26        Ok(Self { redis_client })
27    }
28}
29
30fn job_definition_to_redis_args(def: &JobDefinition) -> Result<impl ToRedisArgs> {
31    let bytes = bincode::serialize(def)?;
32    Ok(bytes)
33}
34
35#[async_trait::async_trait]
36impl super::Backend for Backend {
37    async fn pull(&self, queue: &QueueName, count: NonZeroUsize) -> Result<Vec<JobDefinition>> {
38        let mut connection = self.redis_client.get_async_connection().await?;
39        // let job_defs = connection
40        //     .rpop::<_, Vec<JobDefinition>>(queue, Some(count))
41        //     .await?;
42        let mut job_defs = Vec::new();
43        for _ in 0..count.get() {
44            match connection
45                .rpop::<_, Option<JobDefinition>>(queue, None)
46                .await
47            {
48                Ok(Some(job_def)) => job_defs.push(job_def),
49                Ok(None) => {
50                    break;
51                }
52                Err(e) => {
53                    warn!("failed to rpop: {}", e);
54                    break;
55                }
56            }
57        }
58        Ok(job_defs)
59    }
60
61    async fn enqueue(&self, job_def: &JobDefinition) -> Result<()> {
62        let mut connection = self.redis_client.get_async_connection().await?;
63        let () = connection
64            .lpush(&job_def.queue, job_definition_to_redis_args(job_def)?)
65            .await?;
66        Ok(())
67    }
68}
69
70impl ToRedisArgs for QueueName {
71    fn write_redis_args<W>(&self, out: &mut W)
72    where
73        W: ?Sized + RedisWrite,
74    {
75        let key = format!("steady_queue:{}", self.as_str());
76        out.write_arg(key.as_bytes());
77    }
78}
79
80async fn lock(connection: &mut redis::aio::Connection, key: &str) -> Result<()> {
81    const value: &'static str = "lock";
82    let result = redis::cmd("set")
83        .arg(key)
84        .arg(value)
85        .arg("ex")
86        .arg(100)
87        .arg("nx")
88        .query_async::<_, Option<String>>(connection)
89        .await?;
90    panic!("well {:?}", result);
91    Ok(())
92}