1use crate::config::env_var_or_file;
8use crate::model::InfraQueueMessage;
9use anyhow::Context;
10use deadpool_redis::redis::AsyncCommands;
11use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
12use log::info;
13use std::collections::HashMap;
14use std::env;
15
16const REDIS_PREFIX: &str = "infraqueue";
18
19#[inline]
21pub(crate) fn make_key(topic: &str) -> String {
22 format!("{}:{}", REDIS_PREFIX, topic)
23}
24
25#[inline]
26fn make_inflight_key(topic: &str) -> String {
27 format!("{}:{}:inflight", REDIS_PREFIX, topic)
28}
29
30#[inline]
31fn make_msg_key(id: &str) -> String {
32 format!("{}:msg:{}", REDIS_PREFIX, id)
33}
34
35#[derive(Clone, Debug)]
51pub struct RetryPolicy {
52 pub max_retries: u32,
53 pub base_delay_ms: u64,
54 pub max_delay_ms: u64,
55 pub multiplier: f64,
56}
57
58impl Default for RetryPolicy {
59 fn default() -> Self {
60 Self {
61 max_retries: 5,
62 base_delay_ms: 1000,
63 max_delay_ms: 60_000,
64 multiplier: 2.0,
65 }
66 }
67}
68
69impl RetryPolicy {
70 pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
71 let mut delay = self.base_delay_ms as f64;
72 if retry_count > 1 {
73 delay *= self.multiplier.powi((retry_count - 1) as i32);
74 }
75 delay.min(self.max_delay_ms as f64) as u64
76 }
77}
78
79#[derive(Clone, Debug)]
88pub struct DequeueWithReceipt {
89 pub message: InfraQueueMessage,
90 pub receipt: String,
91}
92
93#[derive(Clone, Debug)]
101pub enum NackOutcome {
102 Requeued { delay_ms: u64, retry_count: u32 },
103 DeadLettered,
104}
105
106pub struct InfraQueueQueue {
108 pub(crate) pool: Pool,
109}
110
111impl InfraQueueQueue {
112 pub fn new(pool: Pool) -> Self {
114 Self { pool }
115 }
116
117 pub async fn get_topic_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
119 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
120 let key = make_key(topic);
121 let len: u64 = conn.llen(&key).await.context("failed to get LLEN from redis")?;
122 Ok(len)
123 }
124
125 pub async fn get_inflight_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
127 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
128 let key = make_inflight_key(topic);
129 let len: u64 = conn.zcard(&key).await.context("failed to get ZCARD from redis")?;
130 Ok(len)
131 }
132
133 pub async fn heartbeat(&self, topic: &str, consumer_name: &str, ttl_secs: u64) -> Result<(), anyhow::Error> {
135 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
136 let key = format!("{}:heartbeat:{}:{}", REDIS_PREFIX, topic, consumer_name);
137 let _: () = conn.set_ex(&key, 1, ttl_secs).await.context("failed to SETEX heartbeat in redis")?;
138 Ok(())
139 }
140
141 pub async fn list_active_consumers(&self) -> Result<HashMap<String, Vec<String>>, anyhow::Error> {
143 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
144 let pattern = format!("{}:heartbeat:*", REDIS_PREFIX);
145 let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get heartbeat KEYS from redis")?;
146
147 let mut result: HashMap<String, Vec<String>> = HashMap::new();
148 for key in keys {
149 let parts: Vec<&str> = key.split(':').collect();
151 if parts.len() >= 4 {
152 let topic = parts[2].to_string();
153 let consumer = parts[3].to_string();
154 result.entry(topic).or_default().push(consumer);
155 }
156 }
157 Ok(result)
158 }
159
160 pub async fn list_topics(&self) -> Result<Vec<String>, anyhow::Error> {
163 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
164 let pattern = format!("{}:*", REDIS_PREFIX);
165 let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get KEYS from redis")?;
166
167 let mut topics = std::collections::HashSet::new();
168 for key in keys {
169 let parts: Vec<&str> = key.split(':').collect();
171 if parts.len() >= 2 && parts[0] == REDIS_PREFIX {
172 if parts[1] == "msg" {
173 continue;
174 }
175 topics.insert(parts[1].to_string());
176 }
177 }
178
179 Ok(topics.into_iter().collect())
180 }
181
182 pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
184 let mut cfg = RedisConfig::from_url(url.to_string());
185 cfg.pool = Some(deadpool_redis::PoolConfig::default());
186 let pool = cfg
187 .create_pool(Some(Runtime::Tokio1))
188 .context("failed to create Redis pool")?;
189 Ok(Self { pool })
190 }
191
192 pub fn from_env() -> Result<Self, anyhow::Error> {
198 fn redact_redis_url(url: &str) -> String {
199 let s = url.to_string();
200 if let Some(proto) = s.find("://") {
201 let creds = &s[proto + 3..];
202 if let Some(at) = creds.find('@') {
203 let after_at = &creds[at + 1..];
204 return format!("{}://***@{}", &s[..proto], after_at);
205 }
206 }
207 s
208 }
209 let redis_url = match env::var("REDIS_URL") {
210 Ok(u) => {
211 info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
212 u
213 }
214 Err(_) => {
215 let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
216 let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
217 let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
218 let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
219 info!(
220 "Using Redis built from parts: host={} port={} user={} password_set={}",
221 host,
222 port,
223 user,
224 !pass.is_empty()
225 );
226 if pass.is_empty() {
227 format!("redis://{}@{}:{}/0", user, host, port)
228 } else {
229 format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
230 }
231 }
232 };
233 Self::from_url(&redis_url)
234 }
235
236 fn now_ms() -> u64 {
237 use std::time::{SystemTime, UNIX_EPOCH};
238 SystemTime::now()
239 .duration_since(UNIX_EPOCH)
240 .unwrap_or_default()
241 .as_millis() as u64
242 }
243
244 pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
246 let mut conn = self
247 .pool
248 .get()
249 .await
250 .context("failed to get Redis connection from pool")?;
251 let key = make_key(&msg.topic);
252 let payload =
253 serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
254 let _: usize = conn
255 .rpush(key, payload)
256 .await
257 .context("failed to RPUSH message to Redis list")?;
258 Ok(())
259 }
260
261 pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
263 let mut conn = self
264 .pool
265 .get()
266 .await
267 .context("failed to get Redis connection from pool")?;
268 let key = make_key(topic);
269 let result: Option<String> = conn
270 .lpop(key, None)
271 .await
272 .context("failed to LPOP from Redis list")?;
273 if let Some(payload) = result {
274 let msg: InfraQueueMessage = serde_json::from_str(&payload)
275 .context("failed to deserialize InfraQueueMessage from JSON")?;
276 Ok(Some(msg))
277 } else {
278 Ok(None)
279 }
280 }
281
282 pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
292 let mut conn = self
293 .pool
294 .get()
295 .await
296 .context("failed to get Redis connection from pool")?;
297 let inflight = make_inflight_key(topic);
298 let now = Self::now_ms() as i64;
299 let ids: Vec<String> = conn
300 .zrangebyscore(inflight.clone(), 0, now)
301 .await
302 .context("failed to ZRANGEBYSCORE inflight")?;
303 if ids.is_empty() {
304 return Ok(0);
305 }
306 let list_key = make_key(topic);
307 let mut reclaimed = 0u64;
308 for id in ids.iter() {
309 let msg_key = make_msg_key(id);
310 if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
311 if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
312 let new_retry = (msg.retry_count as u32).saturating_add(1);
313 msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
314 let updated = serde_json::to_string(&msg).unwrap_or(raw);
315 let _: () = conn.set(msg_key.clone(), updated).await?;
316 let _: usize = conn
317 .rpush(list_key.clone(), serde_json::to_string(&msg)?)
318 .await?;
319 }
320 }
321 let _: i64 = conn.zrem(inflight.clone(), id).await?;
322 reclaimed += 1;
323 }
324 Ok(reclaimed)
325 }
326
327 pub async fn dequeue_with_visibility(
337 &self,
338 topic: &str,
339 visibility_timeout_ms: u64,
340 ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
341 let _ = self.reclaim_inflight(topic).await?;
342 let mut conn = self
343 .pool
344 .get()
345 .await
346 .context("failed to get Redis connection from pool")?;
347 let key = make_key(topic);
348 if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
349 let msg: InfraQueueMessage = serde_json::from_str(&raw)
350 .context("failed to deserialize InfraQueueMessage from JSON")?;
351 let receipt = msg.id.clone();
352 let body_key = make_msg_key(&receipt);
353 let _: () = conn
354 .set(body_key, raw)
355 .await
356 .context("failed to SET message body")?;
357 let inflight = make_inflight_key(topic);
358 let deadline = Self::now_ms() + visibility_timeout_ms;
359 let _: i64 = conn
360 .zadd(inflight, &receipt, deadline as i64)
361 .await
362 .context("failed to ZADD inflight")?;
363 Ok(Some(DequeueWithReceipt {
364 message: msg,
365 receipt,
366 }))
367 } else {
368 Ok(None)
369 }
370 }
371
372 pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
380 let mut conn = self
381 .pool
382 .get()
383 .await
384 .context("failed to get Redis connection from pool")?;
385 let inflight = make_inflight_key(topic);
386 let _: i64 = conn
387 .zrem(inflight, receipt)
388 .await
389 .context("failed to ZREM inflight")?;
390 let msg_key = make_msg_key(receipt);
391 let _: i64 = conn
392 .del(msg_key)
393 .await
394 .context("failed to DEL message body")?;
395 Ok(true)
396 }
397
398 pub async fn nack(
408 &self,
409 topic: &str,
410 receipt: &str,
411 policy: &RetryPolicy,
412 ) -> Result<NackOutcome, anyhow::Error> {
413 let mut conn = self
414 .pool
415 .get()
416 .await
417 .context("failed to get Redis connection from pool")?;
418 let msg_key = make_msg_key(receipt);
419 let raw: Option<String> = conn
420 .get(msg_key.clone())
421 .await
422 .context("failed to GET message body")?;
423 if raw.is_none() {
424 return Ok(NackOutcome::DeadLettered);
425 }
426 let raw = raw.unwrap();
427 let mut msg: InfraQueueMessage =
428 serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
429 let new_retry = (msg.retry_count as u32).saturating_add(1);
430 msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
431 if new_retry > policy.max_retries {
432 let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
433 let _: usize = conn
434 .rpush(dlq, serde_json::to_string(&msg)?)
435 .await
436 .context("failed to RPUSH to DLQ")?;
437 let inflight = make_inflight_key(topic);
438 let _: i64 = conn.zrem(inflight, receipt).await?;
439 let _: i64 = conn.del(msg_key).await?;
440 return Ok(NackOutcome::DeadLettered);
441 }
442 let delay = policy.backoff_delay_ms(new_retry);
443 let inflight = make_inflight_key(topic);
444 let new_deadline = (Self::now_ms() + delay) as i64;
445 let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
446 let _: () = conn.set(msg_key, updated).await?;
447 let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
448 Ok(NackOutcome::Requeued {
449 delay_ms: delay,
450 retry_count: new_retry,
451 })
452 }
453
454 pub async fn get_redis_memory_info(&self) -> Result<HashMap<String, String>, anyhow::Error> {
456 let mut conn = self.pool.get().await.context("failed to get redis connection")?;
457 let info: String = deadpool_redis::redis::cmd("INFO")
458 .arg("memory")
459 .query_async(&mut conn)
460 .await
461 .context("failed to get INFO memory from redis")?;
462
463 let mut metrics = HashMap::new();
464 for line in info.lines() {
465 if line.contains(':') {
466 let parts: Vec<&str> = line.split(':').collect();
467 if parts.len() == 2 {
468 metrics.insert(parts[0].to_string(), parts[1].to_string());
469 }
470 }
471 }
472 Ok(metrics)
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 #[test]
480 fn test_make_key() {
481 assert_eq!(make_key("topic"), "infraqueue:topic");
482 assert_eq!(make_key("orders"), "infraqueue:orders");
483 assert_eq!(make_key(""), "infraqueue:");
484 }
485}