use crate::config::env_var_or_file;
use crate::model::InfraQueueMessage;
use anyhow::Context;
use deadpool_redis::redis::AsyncCommands;
use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
use log::info;
use std::collections::HashMap;
use std::env;
const REDIS_PREFIX: &str = "infraqueue";
#[inline]
pub(crate) fn make_key(topic: &str) -> String {
format!("{}:{}", REDIS_PREFIX, topic)
}
#[inline]
fn make_inflight_key(topic: &str) -> String {
format!("{}:{}:inflight", REDIS_PREFIX, topic)
}
#[inline]
fn make_msg_key(id: &str) -> String {
format!("{}:msg:{}", REDIS_PREFIX, id)
}
#[derive(Clone, Debug)]
pub struct RetryPolicy {
pub max_retries: u32,
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 5,
base_delay_ms: 1000,
max_delay_ms: 60_000,
multiplier: 2.0,
}
}
}
impl RetryPolicy {
pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
let mut delay = self.base_delay_ms as f64;
if retry_count > 1 {
delay *= self.multiplier.powi((retry_count - 1) as i32);
}
delay.min(self.max_delay_ms as f64) as u64
}
}
#[derive(Clone, Debug)]
pub struct DequeueWithReceipt {
pub message: InfraQueueMessage,
pub receipt: String,
}
#[derive(Clone, Debug)]
pub enum NackOutcome {
Requeued { delay_ms: u64, retry_count: u32 },
DeadLettered,
}
pub struct InfraQueueQueue {
pub(crate) pool: Pool,
}
impl InfraQueueQueue {
pub fn new(pool: Pool) -> Self {
Self { pool }
}
pub async fn get_topic_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let key = make_key(topic);
let len: u64 = conn.llen(&key).await.context("failed to get LLEN from redis")?;
Ok(len)
}
pub async fn get_inflight_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let key = make_inflight_key(topic);
let len: u64 = conn.zcard(&key).await.context("failed to get ZCARD from redis")?;
Ok(len)
}
pub async fn heartbeat(&self, topic: &str, consumer_name: &str, ttl_secs: u64) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let key = format!("{}:heartbeat:{}:{}", REDIS_PREFIX, topic, consumer_name);
let _: () = conn.set_ex(&key, 1, ttl_secs).await.context("failed to SETEX heartbeat in redis")?;
Ok(())
}
pub async fn list_active_consumers(&self) -> Result<HashMap<String, Vec<String>>, anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let pattern = format!("{}:heartbeat:*", REDIS_PREFIX);
let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get heartbeat KEYS from redis")?;
let mut result: HashMap<String, Vec<String>> = HashMap::new();
for key in keys {
let parts: Vec<&str> = key.split(':').collect();
if parts.len() >= 4 {
let topic = parts[2].to_string();
let consumer = parts[3].to_string();
result.entry(topic).or_default().push(consumer);
}
}
Ok(result)
}
pub async fn list_topics(&self) -> Result<Vec<String>, anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let pattern = format!("{}:*", REDIS_PREFIX);
let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get KEYS from redis")?;
let mut topics = std::collections::HashSet::new();
for key in keys {
let parts: Vec<&str> = key.split(':').collect();
if parts.len() >= 2 && parts[0] == REDIS_PREFIX {
if parts[1] == "msg" {
continue;
}
topics.insert(parts[1].to_string());
}
}
Ok(topics.into_iter().collect())
}
pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
let mut cfg = RedisConfig::from_url(url.to_string());
cfg.pool = Some(deadpool_redis::PoolConfig::default());
let pool = cfg
.create_pool(Some(Runtime::Tokio1))
.context("failed to create Redis pool")?;
Ok(Self { pool })
}
pub fn from_env() -> Result<Self, anyhow::Error> {
fn redact_redis_url(url: &str) -> String {
let s = url.to_string();
if let Some(proto) = s.find("://") {
let creds = &s[proto + 3..];
if let Some(at) = creds.find('@') {
let after_at = &creds[at + 1..];
return format!("{}://***@{}", &s[..proto], after_at);
}
}
s
}
let redis_url = match env::var("REDIS_URL") {
Ok(u) => {
info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
u
}
Err(_) => {
let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
info!(
"Using Redis built from parts: host={} port={} user={} password_set={}",
host,
port,
user,
!pass.is_empty()
);
if pass.is_empty() {
format!("redis://{}@{}:{}/0", user, host, port)
} else {
format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
}
}
};
Self::from_url(&redis_url)
}
fn now_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let key = make_key(&msg.topic);
let payload =
serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
let _: usize = conn
.rpush(key, payload)
.await
.context("failed to RPUSH message to Redis list")?;
Ok(())
}
pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let key = make_key(topic);
let result: Option<String> = conn
.lpop(key, None)
.await
.context("failed to LPOP from Redis list")?;
if let Some(payload) = result {
let msg: InfraQueueMessage = serde_json::from_str(&payload)
.context("failed to deserialize InfraQueueMessage from JSON")?;
Ok(Some(msg))
} else {
Ok(None)
}
}
pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let inflight = make_inflight_key(topic);
let now = Self::now_ms() as i64;
let ids: Vec<String> = conn
.zrangebyscore(inflight.clone(), 0, now)
.await
.context("failed to ZRANGEBYSCORE inflight")?;
if ids.is_empty() {
return Ok(0);
}
let list_key = make_key(topic);
let mut reclaimed = 0u64;
for id in ids.iter() {
let msg_key = make_msg_key(id);
if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
let new_retry = (msg.retry_count as u32).saturating_add(1);
msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
let updated = serde_json::to_string(&msg).unwrap_or(raw);
let _: () = conn.set(msg_key.clone(), updated).await?;
let _: usize = conn
.rpush(list_key.clone(), serde_json::to_string(&msg)?)
.await?;
}
}
let _: i64 = conn.zrem(inflight.clone(), id).await?;
reclaimed += 1;
}
Ok(reclaimed)
}
pub async fn dequeue_with_visibility(
&self,
topic: &str,
visibility_timeout_ms: u64,
) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
let _ = self.reclaim_inflight(topic).await?;
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let key = make_key(topic);
if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
let msg: InfraQueueMessage = serde_json::from_str(&raw)
.context("failed to deserialize InfraQueueMessage from JSON")?;
let receipt = msg.id.clone();
let body_key = make_msg_key(&receipt);
let _: () = conn
.set(body_key, raw)
.await
.context("failed to SET message body")?;
let inflight = make_inflight_key(topic);
let deadline = Self::now_ms() + visibility_timeout_ms;
let _: i64 = conn
.zadd(inflight, &receipt, deadline as i64)
.await
.context("failed to ZADD inflight")?;
Ok(Some(DequeueWithReceipt {
message: msg,
receipt,
}))
} else {
Ok(None)
}
}
pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let inflight = make_inflight_key(topic);
let _: i64 = conn
.zrem(inflight, receipt)
.await
.context("failed to ZREM inflight")?;
let msg_key = make_msg_key(receipt);
let _: i64 = conn
.del(msg_key)
.await
.context("failed to DEL message body")?;
Ok(true)
}
pub async fn nack(
&self,
topic: &str,
receipt: &str,
policy: &RetryPolicy,
) -> Result<NackOutcome, anyhow::Error> {
let mut conn = self
.pool
.get()
.await
.context("failed to get Redis connection from pool")?;
let msg_key = make_msg_key(receipt);
let raw: Option<String> = conn
.get(msg_key.clone())
.await
.context("failed to GET message body")?;
if raw.is_none() {
return Ok(NackOutcome::DeadLettered);
}
let raw = raw.unwrap();
let mut msg: InfraQueueMessage =
serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
let new_retry = (msg.retry_count as u32).saturating_add(1);
msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
if new_retry > policy.max_retries {
let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
let _: usize = conn
.rpush(dlq, serde_json::to_string(&msg)?)
.await
.context("failed to RPUSH to DLQ")?;
let inflight = make_inflight_key(topic);
let _: i64 = conn.zrem(inflight, receipt).await?;
let _: i64 = conn.del(msg_key).await?;
return Ok(NackOutcome::DeadLettered);
}
let delay = policy.backoff_delay_ms(new_retry);
let inflight = make_inflight_key(topic);
let new_deadline = (Self::now_ms() + delay) as i64;
let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
let _: () = conn.set(msg_key, updated).await?;
let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
Ok(NackOutcome::Requeued {
delay_ms: delay,
retry_count: new_retry,
})
}
pub async fn get_redis_memory_info(&self) -> Result<HashMap<String, String>, anyhow::Error> {
let mut conn = self.pool.get().await.context("failed to get redis connection")?;
let info: String = deadpool_redis::redis::cmd("INFO")
.arg("memory")
.query_async(&mut conn)
.await
.context("failed to get INFO memory from redis")?;
let mut metrics = HashMap::new();
for line in info.lines() {
if line.contains(':') {
let parts: Vec<&str> = line.split(':').collect();
if parts.len() == 2 {
metrics.insert(parts[0].to_string(), parts[1].to_string());
}
}
}
Ok(metrics)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_make_key() {
assert_eq!(make_key("topic"), "infraqueue:topic");
assert_eq!(make_key("orders"), "infraqueue:orders");
assert_eq!(make_key(""), "infraqueue:");
}
}