1use crate::config::env_var_or_file;
2use crate::model::InfraQueueMessage;
3use anyhow::Context;
4use deadpool_redis::redis::AsyncCommands;
5use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
6use log::info;
7use std::env;
8
9const REDIS_PREFIX: &str = "infraqueue";
11
12#[inline]
14pub(crate) fn make_key(topic: &str) -> String {
15 format!("{}:{}", REDIS_PREFIX, topic)
16}
17
18#[inline]
19fn make_inflight_key(topic: &str) -> String {
20 format!("{}:{}:inflight", REDIS_PREFIX, topic)
21}
22
23#[inline]
24fn make_msg_key(id: &str) -> String {
25 format!("{}:msg:{}", REDIS_PREFIX, id)
26}
27
28#[derive(Clone, Debug)]
44pub struct RetryPolicy {
45 pub max_retries: u32,
46 pub base_delay_ms: u64,
47 pub max_delay_ms: u64,
48 pub multiplier: f64,
49}
50
51impl Default for RetryPolicy {
52 fn default() -> Self {
53 Self {
54 max_retries: 5,
55 base_delay_ms: 1000,
56 max_delay_ms: 60_000,
57 multiplier: 2.0,
58 }
59 }
60}
61
62impl RetryPolicy {
63 pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
64 let mut delay = self.base_delay_ms as f64;
65 if retry_count > 1 {
66 delay *= self.multiplier.powi((retry_count - 1) as i32);
67 }
68 delay.min(self.max_delay_ms as f64) as u64
69 }
70}
71
72#[derive(Clone, Debug)]
81pub struct DequeueWithReceipt {
82 pub message: InfraQueueMessage,
83 pub receipt: String,
84}
85
86#[derive(Clone, Debug)]
94pub enum NackOutcome {
95 Requeued { delay_ms: u64, retry_count: u32 },
96 DeadLettered,
97}
98
99pub struct InfraQueueQueue {
101 pub(crate) pool: Pool,
102}
103
104impl InfraQueueQueue {
105 pub fn new(pool: Pool) -> Self {
107 Self { pool }
108 }
109
110 pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
112 let mut cfg = RedisConfig::from_url(url.to_string());
113 cfg.pool = Some(deadpool_redis::PoolConfig::default());
114 let pool = cfg
115 .create_pool(Some(Runtime::Tokio1))
116 .context("failed to create Redis pool")?;
117 Ok(Self { pool })
118 }
119
120 pub fn from_env() -> Result<Self, anyhow::Error> {
126 fn redact_redis_url(url: &str) -> String {
127 let s = url.to_string();
128 if let Some(proto) = s.find("://") {
129 let creds = &s[proto + 3..];
130 if let Some(at) = creds.find('@') {
131 let after_at = &creds[at + 1..];
132 return format!("{}://***@{}", &s[..proto], after_at);
133 }
134 }
135 s
136 }
137 let redis_url = match env::var("REDIS_URL") {
138 Ok(u) => {
139 info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
140 u
141 }
142 Err(_) => {
143 let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
144 let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
145 let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
146 let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
147 info!(
148 "Using Redis built from parts: host={} port={} user={} password_set={}",
149 host,
150 port,
151 user,
152 !pass.is_empty()
153 );
154 if pass.is_empty() {
155 format!("redis://{}@{}:{}/0", user, host, port)
156 } else {
157 format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
158 }
159 }
160 };
161 Self::from_url(&redis_url)
162 }
163
164 fn now_ms() -> u64 {
165 use std::time::{SystemTime, UNIX_EPOCH};
166 SystemTime::now()
167 .duration_since(UNIX_EPOCH)
168 .unwrap_or_default()
169 .as_millis() as u64
170 }
171
172 pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
174 let mut conn = self
175 .pool
176 .get()
177 .await
178 .context("failed to get Redis connection from pool")?;
179 let key = make_key(&msg.topic);
180 let payload =
181 serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
182 let _: usize = conn
183 .rpush(key, payload)
184 .await
185 .context("failed to RPUSH message to Redis list")?;
186 Ok(())
187 }
188
189 pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
191 let mut conn = self
192 .pool
193 .get()
194 .await
195 .context("failed to get Redis connection from pool")?;
196 let key = make_key(topic);
197 let result: Option<String> = conn
198 .lpop(key, None)
199 .await
200 .context("failed to LPOP from Redis list")?;
201 if let Some(payload) = result {
202 let msg: InfraQueueMessage = serde_json::from_str(&payload)
203 .context("failed to deserialize InfraQueueMessage from JSON")?;
204 Ok(Some(msg))
205 } else {
206 Ok(None)
207 }
208 }
209
210 pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
220 let mut conn = self
221 .pool
222 .get()
223 .await
224 .context("failed to get Redis connection from pool")?;
225 let inflight = make_inflight_key(topic);
226 let now = Self::now_ms() as i64;
227 let ids: Vec<String> = conn
228 .zrangebyscore(inflight.clone(), 0, now)
229 .await
230 .context("failed to ZRANGEBYSCORE inflight")?;
231 if ids.is_empty() {
232 return Ok(0);
233 }
234 let list_key = make_key(topic);
235 let mut reclaimed = 0u64;
236 for id in ids.iter() {
237 let msg_key = make_msg_key(id);
238 if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
239 if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
240 let new_retry = (msg.retry_count as u32).saturating_add(1);
241 msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
242 let updated = serde_json::to_string(&msg).unwrap_or(raw);
243 let _: () = conn.set(msg_key.clone(), updated).await?;
244 let _: usize = conn
245 .rpush(list_key.clone(), serde_json::to_string(&msg)?)
246 .await?;
247 }
248 }
249 let _: i64 = conn.zrem(inflight.clone(), id).await?;
250 reclaimed += 1;
251 }
252 Ok(reclaimed)
253 }
254
255 pub async fn dequeue_with_visibility(
265 &self,
266 topic: &str,
267 visibility_timeout_ms: u64,
268 ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
269 let _ = self.reclaim_inflight(topic).await?;
270 let mut conn = self
271 .pool
272 .get()
273 .await
274 .context("failed to get Redis connection from pool")?;
275 let key = make_key(topic);
276 if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
277 let msg: InfraQueueMessage = serde_json::from_str(&raw)
278 .context("failed to deserialize InfraQueueMessage from JSON")?;
279 let receipt = msg.id.clone();
280 let body_key = make_msg_key(&receipt);
281 let _: () = conn
282 .set(body_key, raw)
283 .await
284 .context("failed to SET message body")?;
285 let inflight = make_inflight_key(topic);
286 let deadline = Self::now_ms() + visibility_timeout_ms;
287 let _: i64 = conn
288 .zadd(inflight, &receipt, deadline as i64)
289 .await
290 .context("failed to ZADD inflight")?;
291 Ok(Some(DequeueWithReceipt {
292 message: msg,
293 receipt,
294 }))
295 } else {
296 Ok(None)
297 }
298 }
299
300 pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
308 let mut conn = self
309 .pool
310 .get()
311 .await
312 .context("failed to get Redis connection from pool")?;
313 let inflight = make_inflight_key(topic);
314 let _: i64 = conn
315 .zrem(inflight, receipt)
316 .await
317 .context("failed to ZREM inflight")?;
318 let msg_key = make_msg_key(receipt);
319 let _: i64 = conn
320 .del(msg_key)
321 .await
322 .context("failed to DEL message body")?;
323 Ok(true)
324 }
325
326 pub async fn nack(
336 &self,
337 topic: &str,
338 receipt: &str,
339 policy: &RetryPolicy,
340 ) -> Result<NackOutcome, anyhow::Error> {
341 let mut conn = self
342 .pool
343 .get()
344 .await
345 .context("failed to get Redis connection from pool")?;
346 let msg_key = make_msg_key(receipt);
347 let raw: Option<String> = conn
348 .get(msg_key.clone())
349 .await
350 .context("failed to GET message body")?;
351 if raw.is_none() {
352 return Ok(NackOutcome::DeadLettered);
353 }
354 let raw = raw.unwrap();
355 let mut msg: InfraQueueMessage =
356 serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
357 let new_retry = (msg.retry_count as u32).saturating_add(1);
358 msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
359 if new_retry > policy.max_retries {
360 let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
361 let _: usize = conn
362 .rpush(dlq, serde_json::to_string(&msg)?)
363 .await
364 .context("failed to RPUSH to DLQ")?;
365 let inflight = make_inflight_key(topic);
366 let _: i64 = conn.zrem(inflight, receipt).await?;
367 let _: i64 = conn.del(msg_key).await?;
368 return Ok(NackOutcome::DeadLettered);
369 }
370 let delay = policy.backoff_delay_ms(new_retry);
371 let inflight = make_inflight_key(topic);
372 let new_deadline = (Self::now_ms() + delay) as i64;
373 let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
374 let _: () = conn.set(msg_key, updated).await?;
375 let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
376 Ok(NackOutcome::Requeued {
377 delay_ms: delay,
378 retry_count: new_retry,
379 })
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 #[test]
387 fn test_make_key() {
388 assert_eq!(make_key("topic"), "infraqueue:topic");
389 assert_eq!(make_key("orders"), "infraqueue:orders");
390 assert_eq!(make_key(""), "infraqueue:");
391 }
392}