1use std::num::NonZeroUsize;
2
3use redis::{aio::ConnectionLike, AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs};
4use tracing::warn;
5
6use crate::{jobs::JobDefinition, QueueName, Result};
7
8#[derive(Debug, Clone)]
9pub struct Backend {
10 redis_client: redis::Client,
11}
12
13impl FromRedisValue for JobDefinition {
14 fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
15 let bytes = <Vec<u8> as FromRedisValue>::from_redis_value(v)?;
16 let def = bincode::deserialize::<Self>(&bytes)
17 .map_err(|_e| (redis::ErrorKind::TypeError, "bincode failed"))?;
19 Ok(def)
20 }
21}
22
23impl Backend {
24 pub fn new(redis_url: &str) -> Result<Self> {
25 let redis_client = redis::Client::open(redis_url)?;
26 Ok(Self { redis_client })
27 }
28}
29
30fn job_definition_to_redis_args(def: &JobDefinition) -> Result<impl ToRedisArgs> {
31 let bytes = bincode::serialize(def)?;
32 Ok(bytes)
33}
34
35#[async_trait::async_trait]
36impl super::Backend for Backend {
37 async fn pull(&self, queue: &QueueName, count: NonZeroUsize) -> Result<Vec<JobDefinition>> {
38 let mut connection = self.redis_client.get_async_connection().await?;
39 let mut job_defs = Vec::new();
43 for _ in 0..count.get() {
44 match connection
45 .rpop::<_, Option<JobDefinition>>(queue, None)
46 .await
47 {
48 Ok(Some(job_def)) => job_defs.push(job_def),
49 Ok(None) => {
50 break;
51 }
52 Err(e) => {
53 warn!("failed to rpop: {}", e);
54 break;
55 }
56 }
57 }
58 Ok(job_defs)
59 }
60
61 async fn enqueue(&self, job_def: &JobDefinition) -> Result<()> {
62 let mut connection = self.redis_client.get_async_connection().await?;
63 let () = connection
64 .lpush(&job_def.queue, job_definition_to_redis_args(job_def)?)
65 .await?;
66 Ok(())
67 }
68}
69
70impl ToRedisArgs for QueueName {
71 fn write_redis_args<W>(&self, out: &mut W)
72 where
73 W: ?Sized + RedisWrite,
74 {
75 let key = format!("steady_queue:{}", self.as_str());
76 out.write_arg(key.as_bytes());
77 }
78}
79
80async fn lock(connection: &mut redis::aio::Connection, key: &str) -> Result<()> {
81 const value: &'static str = "lock";
82 let result = redis::cmd("set")
83 .arg(key)
84 .arg(value)
85 .arg("ex")
86 .arg(100)
87 .arg("nx")
88 .query_async::<_, Option<String>>(connection)
89 .await?;
90 panic!("well {:?}", result);
91 Ok(())
92}