use anyhow::{Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use redis::AsyncCommands;
use redis::aio::MultiplexedConnection;
use crate::driver::Driver;
pub struct RedisDriver {
conn: MultiplexedConnection,
}
impl RedisDriver {
pub async fn new(url: &str) -> Result<std::sync::Arc<Self>> {
let client = redis::Client::open(url).context("redis connect")?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.context("redis connection")?;
redis::cmd("PING")
.query_async::<_, String>(&mut conn)
.await
.context("redis PING")?;
Ok(std::sync::Arc::new(Self { conn }))
}
}
#[async_trait]
impl Driver for RedisDriver {
async fn pop(&self, queue: &str) -> Result<Option<Bytes>> {
let mut conn = self.conn.clone();
let result: Option<(String, Vec<u8>)> = conn.blpop(queue, 0.0).await.context("BLPOP")?;
Ok(result.map(|(_, v)| Bytes::from(v)))
}
async fn push(&self, queue: &str, payload: Bytes) -> Result<()> {
let mut conn = self.conn.clone();
conn.rpush::<_, _, ()>(queue, payload.to_vec()).await?;
Ok(())
}
async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()> {
if delay_secs == 0 {
return self.push(queue, payload).await;
}
let mut conn = self.conn.clone();
let score = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
+ delay_secs;
let delayed_key = format!("{queue}:delayed");
conn.zadd::<_, _, _, ()>(&delayed_key, payload.to_vec(), score as f64)
.await?;
Ok(())
}
async fn promote_delayed(&self, queue: &str) -> Result<u64> {
let mut conn = self.conn.clone();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as f64;
let delayed_key = format!("{queue}:delayed");
let count: u64 = redis::Script::new(
r"local items = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1])
for _, v in ipairs(items) do
redis.call('RPUSH', KEYS[2], v)
redis.call('ZREM', KEYS[1], v)
end
return #items",
)
.key(&delayed_key)
.key(queue)
.arg(now)
.invoke_async(&mut conn)
.await
.context("promote_delayed lua script")?;
Ok(count)
}
async fn depth(&self, queue: &str) -> Result<u64> {
let mut conn = self.conn.clone();
Ok(conn.llen::<_, u64>(queue).await.unwrap_or(0))
}
}