Skip to main content

infraqueue_lib/
queue.rs

1//! Redis-backed queue implementation.
2//! 
3//! WARNING: This module should ONLY be used by `infraqueue-server`.
4//! All other components (senders, consumers, CLI) MUST use `InfraQueueClient` 
5//! from `lib::client` to communicate via the HTTP API.
6
7use crate::config::env_var_or_file;
8use crate::model::InfraQueueMessage;
9use anyhow::Context;
10use deadpool_redis::redis::AsyncCommands;
11use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
12use log::info;
13use std::collections::HashMap;
14use std::env;
15
16// Redis key prefix to namespace all INFRAQUEUE data.
17const REDIS_PREFIX: &str = "infraqueue";
18
19/// Build a Redis list key for the provided topic.
20#[inline]
21pub(crate) fn make_key(topic: &str) -> String {
22    format!("{}:{}", REDIS_PREFIX, topic)
23}
24
25#[inline]
26fn make_inflight_key(topic: &str) -> String {
27    format!("{}:{}:inflight", REDIS_PREFIX, topic)
28}
29
30#[inline]
31fn make_msg_key(id: &str) -> String {
32    format!("{}:msg:{}", REDIS_PREFIX, id)
33}
34
35/// Retry/backoff policy configuration.
36///
37/// LLM-centric guidance:
38/// - Useful when downstream model APIs (OpenAI, Anthropic, Ollama, etc.) return transient
39///   failures like HTTP 429 (rate limit), 408 (timeout), or 5xx. Your worker can NACK the
40///   message to reschedule it with a computed delay based on this policy.
41/// - Combine with an appropriate visibility timeout so only one worker attempts a long-running
42///   generation at a time.
43/// - After `max_retries`, messages are dead-lettered to `infraqueue:{topic}:dlq` for manual inspection.
44///
45/// Fields:
46/// - `max_retries`: reschedules allowed before dead-letter.
47/// - `base_delay_ms`: initial delay on first failure.
48/// - `max_delay_ms`: upper bound for any backoff delay.
49/// - `multiplier`: exponential growth factor (e.g., 2.0 doubles each attempt).
50#[derive(Clone, Debug)]
51pub struct RetryPolicy {
52    pub max_retries: u32,
53    pub base_delay_ms: u64,
54    pub max_delay_ms: u64,
55    pub multiplier: f64,
56}
57
58impl Default for RetryPolicy {
59    fn default() -> Self {
60        Self {
61            max_retries: 5,
62            base_delay_ms: 1000,
63            max_delay_ms: 60_000,
64            multiplier: 2.0,
65        }
66    }
67}
68
69impl RetryPolicy {
70    pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
71        let mut delay = self.base_delay_ms as f64;
72        if retry_count > 1 {
73            delay *= self.multiplier.powi((retry_count - 1) as i32);
74        }
75        delay.min(self.max_delay_ms as f64) as u64
76    }
77}
78
79/// Result of dequeue with receipt token for ack/nack.
80/// In LLM workflows, treat `receipt` like a visibility token; your worker:
81/// 1) dequeues with a visibility window sized to the expected generation time,
82/// 2) runs the model call,
83/// 3) ACKs on success (deletes inflight and body), or NACKs on transient failure to reschedule.
84///
85/// Note: If the worker crashes or the visibility window elapses before ACK/NACK,
86/// the message becomes eligible for reclaim and may be delivered again (at-least-once).
87#[derive(Clone, Debug)]
88pub struct DequeueWithReceipt {
89    pub message: InfraQueueMessage,
90    pub receipt: String,
91}
92
93/// Outcome of a NACK operation.
94///
95/// LLM guidance:
96/// - `Requeued` means the message remains in-flight with a new scheduled visibility
97///   deadline; workers should not attempt it until then. Useful after 429/timeout.
98/// - `DeadLettered` indicates the message exceeded `max_retries` and was moved to
99///   `infraqueue:{topic}:dlq` for inspection (e.g., bad prompt, invalid params).
100#[derive(Clone, Debug)]
101pub enum NackOutcome {
102    Requeued { delay_ms: u64, retry_count: u32 },
103    DeadLettered,
104}
105
106/// A minimal queue facade backed by Redis per topic.
107pub struct InfraQueueQueue {
108    pub(crate) pool: Pool,
109}
110
111impl InfraQueueQueue {
112    /// Create a new queue facade over an existing connection pool.
113    pub fn new(pool: Pool) -> Self {
114        Self { pool }
115    }
116
117    /// Get the length of a specific topic list in Redis.
118    pub async fn get_topic_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
119        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
120        let key = make_key(topic);
121        let len: u64 = conn.llen(&key).await.context("failed to get LLEN from redis")?;
122        Ok(len)
123    }
124
125    /// Get the number of in-flight messages for a specific topic.
126    pub async fn get_inflight_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
127        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
128        let key = make_inflight_key(topic);
129        let len: u64 = conn.zcard(&key).await.context("failed to get ZCARD from redis")?;
130        Ok(len)
131    }
132
133    /// Register a heartbeat for a consumer.
134    pub async fn heartbeat(&self, topic: &str, consumer_name: &str, ttl_secs: u64) -> Result<(), anyhow::Error> {
135        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
136        let key = format!("{}:heartbeat:{}:{}", REDIS_PREFIX, topic, consumer_name);
137        let _: () = conn.set_ex(&key, 1, ttl_secs).await.context("failed to SETEX heartbeat in redis")?;
138        Ok(())
139    }
140
141    /// List all active consumers for all topics.
142    pub async fn list_active_consumers(&self) -> Result<HashMap<String, Vec<String>>, anyhow::Error> {
143        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
144        let pattern = format!("{}:heartbeat:*", REDIS_PREFIX);
145        let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get heartbeat KEYS from redis")?;
146        
147        let mut result: HashMap<String, Vec<String>> = HashMap::new();
148        for key in keys {
149            // key is "infraqueue:heartbeat:topic:consumer"
150            let parts: Vec<&str> = key.split(':').collect();
151            if parts.len() >= 4 {
152                let topic = parts[2].to_string();
153                let consumer = parts[3].to_string();
154                result.entry(topic).or_default().push(consumer);
155            }
156        }
157        Ok(result)
158    }
159
160    /// List all topics currently active in Redis (those starting with the prefix).
161    /// WARNING: Uses KEYS which is O(N) where N is the total number of keys in the DB.
162    pub async fn list_topics(&self) -> Result<Vec<String>, anyhow::Error> {
163        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
164        let pattern = format!("{}:*", REDIS_PREFIX);
165        let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get KEYS from redis")?;
166        
167        let mut topics = std::collections::HashSet::new();
168        for key in keys {
169            // key is "infraqueue:topic" or "infraqueue:topic:inflight" or "infraqueue:msg:id"
170            let parts: Vec<&str> = key.split(':').collect();
171            if parts.len() >= 2 && parts[0] == REDIS_PREFIX {
172                if parts[1] == "msg" {
173                    continue;
174                }
175                topics.insert(parts[1].to_string());
176            }
177        }
178        
179        Ok(topics.into_iter().collect())
180    }
181
182    /// Create a new queue from a Redis URL string.
183    pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
184        let mut cfg = RedisConfig::from_url(url.to_string());
185        cfg.pool = Some(deadpool_redis::PoolConfig::default());
186        let pool = cfg
187            .create_pool(Some(Runtime::Tokio1))
188            .context("failed to create Redis pool")?;
189        Ok(Self { pool })
190    }
191
192    /// Create a new queue using environment variables.
193    ///
194    /// Precedence:
195    /// - REDIS_URL if set.
196    /// - Otherwise REDIS_HOST/REDIS_PORT/REDIS_USER/REDIS_PASSWORD parts.
197    pub fn from_env() -> Result<Self, anyhow::Error> {
198        fn redact_redis_url(url: &str) -> String {
199            let s = url.to_string();
200            if let Some(proto) = s.find("://") {
201                let creds = &s[proto + 3..];
202                if let Some(at) = creds.find('@') {
203                    let after_at = &creds[at + 1..];
204                    return format!("{}://***@{}", &s[..proto], after_at);
205                }
206            }
207            s
208        }
209        let redis_url = match env::var("REDIS_URL") {
210            Ok(u) => {
211                info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
212                u
213            }
214            Err(_) => {
215                let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
216                let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
217                let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
218                let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
219                info!(
220                    "Using Redis built from parts: host={} port={} user={} password_set={}",
221                    host,
222                    port,
223                    user,
224                    !pass.is_empty()
225                );
226                if pass.is_empty() {
227                    format!("redis://{}@{}:{}/0", user, host, port)
228                } else {
229                    format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
230                }
231            }
232        };
233        Self::from_url(&redis_url)
234    }
235
236    fn now_ms() -> u64 {
237        use std::time::{SystemTime, UNIX_EPOCH};
238        SystemTime::now()
239            .duration_since(UNIX_EPOCH)
240            .unwrap_or_default()
241            .as_millis() as u64
242    }
243
244    /// Enqueue a message into Redis under the given topic.
245    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
246        let mut conn = self
247            .pool
248            .get()
249            .await
250            .context("failed to get Redis connection from pool")?;
251        let key = make_key(&msg.topic);
252        let payload =
253            serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
254        let _: usize = conn
255            .rpush(key, payload)
256            .await
257            .context("failed to RPUSH message to Redis list")?;
258        Ok(())
259    }
260
261    /// Basic dequeue (no visibility). Returns Ok(None) if empty.
262    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
263        let mut conn = self
264            .pool
265            .get()
266            .await
267            .context("failed to get Redis connection from pool")?;
268        let key = make_key(topic);
269        let result: Option<String> = conn
270            .lpop(key, None)
271            .await
272            .context("failed to LPOP from Redis list")?;
273        if let Some(payload) = result {
274            let msg: InfraQueueMessage = serde_json::from_str(&payload)
275                .context("failed to deserialize InfraQueueMessage from JSON")?;
276            Ok(Some(msg))
277        } else {
278            Ok(None)
279        }
280    }
281
282    /// Reclaim expired in-flight messages by moving them back to the ready queue.
283    ///
284    /// LLM guidance:
285    /// - If a worker times out or crashes during generation, its visibility window may lapse.
286    ///   Those messages become reclaimable and are re-queued for redelivery.
287    /// - The retry_count is incremented to preserve a coarse history of delivery attempts
288    ///   and to inform backoff policy decisions.
289    /// - Your LLM worker should be idempotent. Use `message.id` or business keys to avoid
290    ///   double-processing outputs when duplicates occur (at-least-once semantics).
291    pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
292        let mut conn = self
293            .pool
294            .get()
295            .await
296            .context("failed to get Redis connection from pool")?;
297        let inflight = make_inflight_key(topic);
298        let now = Self::now_ms() as i64;
299        let ids: Vec<String> = conn
300            .zrangebyscore(inflight.clone(), 0, now)
301            .await
302            .context("failed to ZRANGEBYSCORE inflight")?;
303        if ids.is_empty() {
304            return Ok(0);
305        }
306        let list_key = make_key(topic);
307        let mut reclaimed = 0u64;
308        for id in ids.iter() {
309            let msg_key = make_msg_key(id);
310            if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
311                if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
312                    let new_retry = (msg.retry_count as u32).saturating_add(1);
313                    msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
314                    let updated = serde_json::to_string(&msg).unwrap_or(raw);
315                    let _: () = conn.set(msg_key.clone(), updated).await?;
316                    let _: usize = conn
317                        .rpush(list_key.clone(), serde_json::to_string(&msg)?)
318                        .await?;
319                }
320            }
321            let _: i64 = conn.zrem(inflight.clone(), id).await?;
322            reclaimed += 1;
323        }
324        Ok(reclaimed)
325    }
326
327    /// Dequeue with a visibility timeout and return a receipt token (message id).
328    ///
329    /// LLM guidance:
330    /// - Set `visibility_timeout_ms` to cover the expected model latency (e.g., 30–120s for
331    ///   long generations). While visible=false, other workers won't receive this message.
332    /// - If the operation completes within the window, ACK to remove the inflight entry and
333    ///   body; if it fails transiently (429/timeout), NACK to reschedule with backoff.
334    /// - If the window expires first, the message is reclaimable and may be delivered again
335    ///   (at-least-once delivery). Ensure your worker is idempotent.
336    pub async fn dequeue_with_visibility(
337        &self,
338        topic: &str,
339        visibility_timeout_ms: u64,
340    ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
341        let _ = self.reclaim_inflight(topic).await?;
342        let mut conn = self
343            .pool
344            .get()
345            .await
346            .context("failed to get Redis connection from pool")?;
347        let key = make_key(topic);
348        if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
349            let msg: InfraQueueMessage = serde_json::from_str(&raw)
350                .context("failed to deserialize InfraQueueMessage from JSON")?;
351            let receipt = msg.id.clone();
352            let body_key = make_msg_key(&receipt);
353            let _: () = conn
354                .set(body_key, raw)
355                .await
356                .context("failed to SET message body")?;
357            let inflight = make_inflight_key(topic);
358            let deadline = Self::now_ms() + visibility_timeout_ms;
359            let _: i64 = conn
360                .zadd(inflight, &receipt, deadline as i64)
361                .await
362                .context("failed to ZADD inflight")?;
363            Ok(Some(DequeueWithReceipt {
364                message: msg,
365                receipt,
366            }))
367        } else {
368            Ok(None)
369        }
370    }
371
372    /// Acknowledge successful processing.
373    ///
374    /// LLM guidance:
375    /// - Only ACK after you have durably persisted the model output (DB/S3) and emitted any
376    ///   downstream events to avoid losing work if the worker crashes right after ACK.
377    /// - Keep idempotency in mind: a duplicate ACK on the same receipt is harmless, but
378    ///   your side-effects (writes) should also be idempotent when possible.
379    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
380        let mut conn = self
381            .pool
382            .get()
383            .await
384            .context("failed to get Redis connection from pool")?;
385        let inflight = make_inflight_key(topic);
386        let _: i64 = conn
387            .zrem(inflight, receipt)
388            .await
389            .context("failed to ZREM inflight")?;
390        let msg_key = make_msg_key(receipt);
391        let _: i64 = conn
392            .del(msg_key)
393            .await
394            .context("failed to DEL message body")?;
395        Ok(true)
396    }
397
398    /// Negative acknowledge: reschedule with backoff or dead-letter if retries exceeded.
399    ///
400    /// LLM guidance:
401    /// - Call NACK for transient provider errors (HTTP 429, timeouts) to delay and retry without
402    ///   blocking a worker. The delay is computed via `RetryPolicy` (exponential backoff capped
403    ///   by `max_delay_ms`).
404    /// - After `max_retries`, the message is moved to `infraqueue:{topic}:dlq` for manual triage.
405    /// - Choose conservative backoff to avoid cascading failures and control spend when providers
406    ///   are degraded. Pair with observability on DLQ size and retry counters.
407    pub async fn nack(
408        &self,
409        topic: &str,
410        receipt: &str,
411        policy: &RetryPolicy,
412    ) -> Result<NackOutcome, anyhow::Error> {
413        let mut conn = self
414            .pool
415            .get()
416            .await
417            .context("failed to get Redis connection from pool")?;
418        let msg_key = make_msg_key(receipt);
419        let raw: Option<String> = conn
420            .get(msg_key.clone())
421            .await
422            .context("failed to GET message body")?;
423        if raw.is_none() {
424            return Ok(NackOutcome::DeadLettered);
425        }
426        let raw = raw.unwrap();
427        let mut msg: InfraQueueMessage =
428            serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
429        let new_retry = (msg.retry_count as u32).saturating_add(1);
430        msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
431        if new_retry > policy.max_retries {
432            let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
433            let _: usize = conn
434                .rpush(dlq, serde_json::to_string(&msg)?)
435                .await
436                .context("failed to RPUSH to DLQ")?;
437            let inflight = make_inflight_key(topic);
438            let _: i64 = conn.zrem(inflight, receipt).await?;
439            let _: i64 = conn.del(msg_key).await?;
440            return Ok(NackOutcome::DeadLettered);
441        }
442        let delay = policy.backoff_delay_ms(new_retry);
443        let inflight = make_inflight_key(topic);
444        let new_deadline = (Self::now_ms() + delay) as i64;
445        let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
446        let _: () = conn.set(msg_key, updated).await?;
447        let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
448        Ok(NackOutcome::Requeued {
449            delay_ms: delay,
450            retry_count: new_retry,
451        })
452    }
453
454    /// Get Redis memory info.
455    pub async fn get_redis_memory_info(&self) -> Result<HashMap<String, String>, anyhow::Error> {
456        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
457        let info: String = deadpool_redis::redis::cmd("INFO")
458            .arg("memory")
459            .query_async(&mut conn)
460            .await
461            .context("failed to get INFO memory from redis")?;
462
463        let mut metrics = HashMap::new();
464        for line in info.lines() {
465            if line.contains(':') {
466                let parts: Vec<&str> = line.split(':').collect();
467                if parts.len() == 2 {
468                    metrics.insert(parts[0].to_string(), parts[1].to_string());
469                }
470            }
471        }
472        Ok(metrics)
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    #[test]
480    fn test_make_key() {
481        assert_eq!(make_key("topic"), "infraqueue:topic");
482        assert_eq!(make_key("orders"), "infraqueue:orders");
483        assert_eq!(make_key(""), "infraqueue:");
484    }
485}