Skip to main content

infraqueue_lib/
queue.rs

1use crate::config::env_var_or_file;
2use crate::model::InfraQueueMessage;
3use anyhow::Context;
4use deadpool_redis::redis::AsyncCommands;
5use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
6use log::info;
7use std::env;
8
9// Redis key prefix to namespace all INFRAQUEUE data.
10const REDIS_PREFIX: &str = "infraqueue";
11
12/// Build a Redis list key for the provided topic.
13#[inline]
14pub(crate) fn make_key(topic: &str) -> String {
15    format!("{}:{}", REDIS_PREFIX, topic)
16}
17
18#[inline]
19fn make_inflight_key(topic: &str) -> String {
20    format!("{}:{}:inflight", REDIS_PREFIX, topic)
21}
22
23#[inline]
24fn make_msg_key(id: &str) -> String {
25    format!("{}:msg:{}", REDIS_PREFIX, id)
26}
27
28/// Retry/backoff policy configuration.
29///
30/// LLM-centric guidance:
31/// - Useful when downstream model APIs (OpenAI, Anthropic, Ollama, etc.) return transient
32///   failures like HTTP 429 (rate limit), 408 (timeout), or 5xx. Your worker can NACK the
33///   message to reschedule it with a computed delay based on this policy.
34/// - Combine with an appropriate visibility timeout so only one worker attempts a long-running
35///   generation at a time.
36/// - After `max_retries`, messages are dead-lettered to `infraqueue:{topic}:dlq` for manual inspection.
37///
38/// Fields:
39/// - `max_retries`: reschedules allowed before dead-letter.
40/// - `base_delay_ms`: initial delay on first failure.
41/// - `max_delay_ms`: upper bound for any backoff delay.
42/// - `multiplier`: exponential growth factor (e.g., 2.0 doubles each attempt).
43#[derive(Clone, Debug)]
44pub struct RetryPolicy {
45    pub max_retries: u32,
46    pub base_delay_ms: u64,
47    pub max_delay_ms: u64,
48    pub multiplier: f64,
49}
50
51impl Default for RetryPolicy {
52    fn default() -> Self {
53        Self {
54            max_retries: 5,
55            base_delay_ms: 1000,
56            max_delay_ms: 60_000,
57            multiplier: 2.0,
58        }
59    }
60}
61
62impl RetryPolicy {
63    pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
64        let mut delay = self.base_delay_ms as f64;
65        if retry_count > 1 {
66            delay *= self.multiplier.powi((retry_count - 1) as i32);
67        }
68        delay.min(self.max_delay_ms as f64) as u64
69    }
70}
71
72/// Result of dequeue with receipt token for ack/nack.
73/// In LLM workflows, treat `receipt` like a visibility token; your worker:
74/// 1) dequeues with a visibility window sized to the expected generation time,
75/// 2) runs the model call,
76/// 3) ACKs on success (deletes inflight and body), or NACKs on transient failure to reschedule.
77///
78/// Note: If the worker crashes or the visibility window elapses before ACK/NACK,
79/// the message becomes eligible for reclaim and may be delivered again (at-least-once).
80#[derive(Clone, Debug)]
81pub struct DequeueWithReceipt {
82    pub message: InfraQueueMessage,
83    pub receipt: String,
84}
85
86/// Outcome of a NACK operation.
87///
88/// LLM guidance:
89/// - `Requeued` means the message remains in-flight with a new scheduled visibility
90///   deadline; workers should not attempt it until then. Useful after 429/timeout.
91/// - `DeadLettered` indicates the message exceeded `max_retries` and was moved to
92///   `infraqueue:{topic}:dlq` for inspection (e.g., bad prompt, invalid params).
93#[derive(Clone, Debug)]
94pub enum NackOutcome {
95    Requeued { delay_ms: u64, retry_count: u32 },
96    DeadLettered,
97}
98
99/// A minimal queue facade backed by Redis per topic.
100pub struct InfraQueueQueue {
101    pub(crate) pool: Pool,
102}
103
104impl InfraQueueQueue {
105    /// Create a new queue facade over an existing connection pool.
106    pub fn new(pool: Pool) -> Self {
107        Self { pool }
108    }
109
110    /// Create a new queue from a Redis URL string.
111    pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
112        let mut cfg = RedisConfig::from_url(url.to_string());
113        cfg.pool = Some(deadpool_redis::PoolConfig::default());
114        let pool = cfg
115            .create_pool(Some(Runtime::Tokio1))
116            .context("failed to create Redis pool")?;
117        Ok(Self { pool })
118    }
119
120    /// Create a new queue using environment variables.
121    ///
122    /// Precedence:
123    /// - REDIS_URL if set.
124    /// - Otherwise REDIS_HOST/REDIS_PORT/REDIS_USER/REDIS_PASSWORD parts.
125    pub fn from_env() -> Result<Self, anyhow::Error> {
126        fn redact_redis_url(url: &str) -> String {
127            let s = url.to_string();
128            if let Some(proto) = s.find("://") {
129                let creds = &s[proto + 3..];
130                if let Some(at) = creds.find('@') {
131                    let after_at = &creds[at + 1..];
132                    return format!("{}://***@{}", &s[..proto], after_at);
133                }
134            }
135            s
136        }
137        let redis_url = match env::var("REDIS_URL") {
138            Ok(u) => {
139                info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
140                u
141            }
142            Err(_) => {
143                let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
144                let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
145                let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
146                let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
147                info!(
148                    "Using Redis built from parts: host={} port={} user={} password_set={}",
149                    host,
150                    port,
151                    user,
152                    !pass.is_empty()
153                );
154                if pass.is_empty() {
155                    format!("redis://{}@{}:{}/0", user, host, port)
156                } else {
157                    format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
158                }
159            }
160        };
161        Self::from_url(&redis_url)
162    }
163
164    fn now_ms() -> u64 {
165        use std::time::{SystemTime, UNIX_EPOCH};
166        SystemTime::now()
167            .duration_since(UNIX_EPOCH)
168            .unwrap_or_default()
169            .as_millis() as u64
170    }
171
172    /// Enqueue a message into Redis under the given topic.
173    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
174        let mut conn = self
175            .pool
176            .get()
177            .await
178            .context("failed to get Redis connection from pool")?;
179        let key = make_key(&msg.topic);
180        let payload =
181            serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
182        let _: usize = conn
183            .rpush(key, payload)
184            .await
185            .context("failed to RPUSH message to Redis list")?;
186        Ok(())
187    }
188
189    /// Basic dequeue (no visibility). Returns Ok(None) if empty.
190    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
191        let mut conn = self
192            .pool
193            .get()
194            .await
195            .context("failed to get Redis connection from pool")?;
196        let key = make_key(topic);
197        let result: Option<String> = conn
198            .lpop(key, None)
199            .await
200            .context("failed to LPOP from Redis list")?;
201        if let Some(payload) = result {
202            let msg: InfraQueueMessage = serde_json::from_str(&payload)
203                .context("failed to deserialize InfraQueueMessage from JSON")?;
204            Ok(Some(msg))
205        } else {
206            Ok(None)
207        }
208    }
209
210    /// Reclaim expired in-flight messages by moving them back to the ready queue.
211    ///
212    /// LLM guidance:
213    /// - If a worker times out or crashes during generation, its visibility window may lapse.
214    ///   Those messages become reclaimable and are re-queued for redelivery.
215    /// - The retry_count is incremented to preserve a coarse history of delivery attempts
216    ///   and to inform backoff policy decisions.
217    /// - Your LLM worker should be idempotent. Use `message.id` or business keys to avoid
218    ///   double-processing outputs when duplicates occur (at-least-once semantics).
219    pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
220        let mut conn = self
221            .pool
222            .get()
223            .await
224            .context("failed to get Redis connection from pool")?;
225        let inflight = make_inflight_key(topic);
226        let now = Self::now_ms() as i64;
227        let ids: Vec<String> = conn
228            .zrangebyscore(inflight.clone(), 0, now)
229            .await
230            .context("failed to ZRANGEBYSCORE inflight")?;
231        if ids.is_empty() {
232            return Ok(0);
233        }
234        let list_key = make_key(topic);
235        let mut reclaimed = 0u64;
236        for id in ids.iter() {
237            let msg_key = make_msg_key(id);
238            if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
239                if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
240                    let new_retry = (msg.retry_count as u32).saturating_add(1);
241                    msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
242                    let updated = serde_json::to_string(&msg).unwrap_or(raw);
243                    let _: () = conn.set(msg_key.clone(), updated).await?;
244                    let _: usize = conn
245                        .rpush(list_key.clone(), serde_json::to_string(&msg)?)
246                        .await?;
247                }
248            }
249            let _: i64 = conn.zrem(inflight.clone(), id).await?;
250            reclaimed += 1;
251        }
252        Ok(reclaimed)
253    }
254
255    /// Dequeue with a visibility timeout and return a receipt token (message id).
256    ///
257    /// LLM guidance:
258    /// - Set `visibility_timeout_ms` to cover the expected model latency (e.g., 30–120s for
259    ///   long generations). While visible=false, other workers won't receive this message.
260    /// - If the operation completes within the window, ACK to remove the inflight entry and
261    ///   body; if it fails transiently (429/timeout), NACK to reschedule with backoff.
262    /// - If the window expires first, the message is reclaimable and may be delivered again
263    ///   (at-least-once delivery). Ensure your worker is idempotent.
264    pub async fn dequeue_with_visibility(
265        &self,
266        topic: &str,
267        visibility_timeout_ms: u64,
268    ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
269        let _ = self.reclaim_inflight(topic).await?;
270        let mut conn = self
271            .pool
272            .get()
273            .await
274            .context("failed to get Redis connection from pool")?;
275        let key = make_key(topic);
276        if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
277            let msg: InfraQueueMessage = serde_json::from_str(&raw)
278                .context("failed to deserialize InfraQueueMessage from JSON")?;
279            let receipt = msg.id.clone();
280            let body_key = make_msg_key(&receipt);
281            let _: () = conn
282                .set(body_key, raw)
283                .await
284                .context("failed to SET message body")?;
285            let inflight = make_inflight_key(topic);
286            let deadline = Self::now_ms() + visibility_timeout_ms;
287            let _: i64 = conn
288                .zadd(inflight, &receipt, deadline as i64)
289                .await
290                .context("failed to ZADD inflight")?;
291            Ok(Some(DequeueWithReceipt {
292                message: msg,
293                receipt,
294            }))
295        } else {
296            Ok(None)
297        }
298    }
299
300    /// Acknowledge successful processing.
301    ///
302    /// LLM guidance:
303    /// - Only ACK after you have durably persisted the model output (DB/S3) and emitted any
304    ///   downstream events to avoid losing work if the worker crashes right after ACK.
305    /// - Keep idempotency in mind: a duplicate ACK on the same receipt is harmless, but
306    ///   your side-effects (writes) should also be idempotent when possible.
307    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
308        let mut conn = self
309            .pool
310            .get()
311            .await
312            .context("failed to get Redis connection from pool")?;
313        let inflight = make_inflight_key(topic);
314        let _: i64 = conn
315            .zrem(inflight, receipt)
316            .await
317            .context("failed to ZREM inflight")?;
318        let msg_key = make_msg_key(receipt);
319        let _: i64 = conn
320            .del(msg_key)
321            .await
322            .context("failed to DEL message body")?;
323        Ok(true)
324    }
325
326    /// Negative acknowledge: reschedule with backoff or dead-letter if retries exceeded.
327    ///
328    /// LLM guidance:
329    /// - Call NACK for transient provider errors (HTTP 429, timeouts) to delay and retry without
330    ///   blocking a worker. The delay is computed via `RetryPolicy` (exponential backoff capped
331    ///   by `max_delay_ms`).
332    /// - After `max_retries`, the message is moved to `infraqueue:{topic}:dlq` for manual triage.
333    /// - Choose conservative backoff to avoid cascading failures and control spend when providers
334    ///   are degraded. Pair with observability on DLQ size and retry counters.
335    pub async fn nack(
336        &self,
337        topic: &str,
338        receipt: &str,
339        policy: &RetryPolicy,
340    ) -> Result<NackOutcome, anyhow::Error> {
341        let mut conn = self
342            .pool
343            .get()
344            .await
345            .context("failed to get Redis connection from pool")?;
346        let msg_key = make_msg_key(receipt);
347        let raw: Option<String> = conn
348            .get(msg_key.clone())
349            .await
350            .context("failed to GET message body")?;
351        if raw.is_none() {
352            return Ok(NackOutcome::DeadLettered);
353        }
354        let raw = raw.unwrap();
355        let mut msg: InfraQueueMessage =
356            serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
357        let new_retry = (msg.retry_count as u32).saturating_add(1);
358        msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
359        if new_retry > policy.max_retries {
360            let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
361            let _: usize = conn
362                .rpush(dlq, serde_json::to_string(&msg)?)
363                .await
364                .context("failed to RPUSH to DLQ")?;
365            let inflight = make_inflight_key(topic);
366            let _: i64 = conn.zrem(inflight, receipt).await?;
367            let _: i64 = conn.del(msg_key).await?;
368            return Ok(NackOutcome::DeadLettered);
369        }
370        let delay = policy.backoff_delay_ms(new_retry);
371        let inflight = make_inflight_key(topic);
372        let new_deadline = (Self::now_ms() + delay) as i64;
373        let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
374        let _: () = conn.set(msg_key, updated).await?;
375        let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
376        Ok(NackOutcome::Requeued {
377            delay_ms: delay,
378            retry_count: new_retry,
379        })
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    #[test]
387    fn test_make_key() {
388        assert_eq!(make_key("topic"), "infraqueue:topic");
389        assert_eq!(make_key("orders"), "infraqueue:orders");
390        assert_eq!(make_key(""), "infraqueue:");
391    }
392}