infraqueue-lib 0.1.1

Core library for INFRAQUEUE
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
//! Redis-backed queue implementation.
//! 
//! WARNING: This module should ONLY be used by `infraqueue-server`.
//! All other components (senders, consumers, CLI) MUST use `InfraQueueClient` 
//! from `lib::client` to communicate via the HTTP API.

use crate::config::env_var_or_file;
use crate::model::InfraQueueMessage;
use anyhow::Context;
use deadpool_redis::redis::AsyncCommands;
use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
use log::info;
use std::collections::HashMap;
use std::env;

// Redis key prefix to namespace all INFRAQUEUE data.
const REDIS_PREFIX: &str = "infraqueue";

/// Build a Redis list key for the provided topic.
#[inline]
pub(crate) fn make_key(topic: &str) -> String {
    format!("{}:{}", REDIS_PREFIX, topic)
}

#[inline]
fn make_inflight_key(topic: &str) -> String {
    format!("{}:{}:inflight", REDIS_PREFIX, topic)
}

#[inline]
fn make_msg_key(id: &str) -> String {
    format!("{}:msg:{}", REDIS_PREFIX, id)
}

/// Retry/backoff policy configuration.
///
/// LLM-centric guidance:
/// - Useful when downstream model APIs (OpenAI, Anthropic, Ollama, etc.) return transient
///   failures like HTTP 429 (rate limit), 408 (timeout), or 5xx. Your worker can NACK the
///   message to reschedule it with a computed delay based on this policy.
/// - Combine with an appropriate visibility timeout so only one worker attempts a long-running
///   generation at a time.
/// - After `max_retries`, messages are dead-lettered to `infraqueue:{topic}:dlq` for manual inspection.
///
/// Fields:
/// - `max_retries`: reschedules allowed before dead-letter.
/// - `base_delay_ms`: initial delay on first failure.
/// - `max_delay_ms`: upper bound for any backoff delay.
/// - `multiplier`: exponential growth factor (e.g., 2.0 doubles each attempt).
#[derive(Clone, Debug)]
pub struct RetryPolicy {
    pub max_retries: u32,
    pub base_delay_ms: u64,
    pub max_delay_ms: u64,
    pub multiplier: f64,
}

impl Default for RetryPolicy {
    fn default() -> Self {
        Self {
            max_retries: 5,
            base_delay_ms: 1000,
            max_delay_ms: 60_000,
            multiplier: 2.0,
        }
    }
}

impl RetryPolicy {
    pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
        let mut delay = self.base_delay_ms as f64;
        if retry_count > 1 {
            delay *= self.multiplier.powi((retry_count - 1) as i32);
        }
        delay.min(self.max_delay_ms as f64) as u64
    }
}

/// Result of dequeue with receipt token for ack/nack.
/// In LLM workflows, treat `receipt` like a visibility token; your worker:
/// 1) dequeues with a visibility window sized to the expected generation time,
/// 2) runs the model call,
/// 3) ACKs on success (deletes inflight and body), or NACKs on transient failure to reschedule.
///
/// Note: If the worker crashes or the visibility window elapses before ACK/NACK,
/// the message becomes eligible for reclaim and may be delivered again (at-least-once).
#[derive(Clone, Debug)]
pub struct DequeueWithReceipt {
    pub message: InfraQueueMessage,
    pub receipt: String,
}

/// Outcome of a NACK operation.
///
/// LLM guidance:
/// - `Requeued` means the message remains in-flight with a new scheduled visibility
///   deadline; workers should not attempt it until then. Useful after 429/timeout.
/// - `DeadLettered` indicates the message exceeded `max_retries` and was moved to
///   `infraqueue:{topic}:dlq` for inspection (e.g., bad prompt, invalid params).
#[derive(Clone, Debug)]
pub enum NackOutcome {
    Requeued { delay_ms: u64, retry_count: u32 },
    DeadLettered,
}

/// A minimal queue facade backed by Redis per topic.
pub struct InfraQueueQueue {
    pub(crate) pool: Pool,
}

impl InfraQueueQueue {
    /// Create a new queue facade over an existing connection pool.
    pub fn new(pool: Pool) -> Self {
        Self { pool }
    }

    /// Get the length of a specific topic list in Redis.
    pub async fn get_topic_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let key = make_key(topic);
        let len: u64 = conn.llen(&key).await.context("failed to get LLEN from redis")?;
        Ok(len)
    }

    /// Get the number of in-flight messages for a specific topic.
    pub async fn get_inflight_length(&self, topic: &str) -> Result<u64, anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let key = make_inflight_key(topic);
        let len: u64 = conn.zcard(&key).await.context("failed to get ZCARD from redis")?;
        Ok(len)
    }

    /// Register a heartbeat for a consumer.
    pub async fn heartbeat(&self, topic: &str, consumer_name: &str, ttl_secs: u64) -> Result<(), anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let key = format!("{}:heartbeat:{}:{}", REDIS_PREFIX, topic, consumer_name);
        let _: () = conn.set_ex(&key, 1, ttl_secs).await.context("failed to SETEX heartbeat in redis")?;
        Ok(())
    }

    /// List all active consumers for all topics.
    pub async fn list_active_consumers(&self) -> Result<HashMap<String, Vec<String>>, anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let pattern = format!("{}:heartbeat:*", REDIS_PREFIX);
        let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get heartbeat KEYS from redis")?;
        
        let mut result: HashMap<String, Vec<String>> = HashMap::new();
        for key in keys {
            // key is "infraqueue:heartbeat:topic:consumer"
            let parts: Vec<&str> = key.split(':').collect();
            if parts.len() >= 4 {
                let topic = parts[2].to_string();
                let consumer = parts[3].to_string();
                result.entry(topic).or_default().push(consumer);
            }
        }
        Ok(result)
    }

    /// List all topics currently active in Redis (those starting with the prefix).
    /// WARNING: Uses KEYS which is O(N) where N is the total number of keys in the DB.
    pub async fn list_topics(&self) -> Result<Vec<String>, anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let pattern = format!("{}:*", REDIS_PREFIX);
        let keys: Vec<String> = conn.keys(&pattern).await.context("failed to get KEYS from redis")?;
        
        let mut topics = std::collections::HashSet::new();
        for key in keys {
            // key is "infraqueue:topic" or "infraqueue:topic:inflight" or "infraqueue:msg:id"
            let parts: Vec<&str> = key.split(':').collect();
            if parts.len() >= 2 && parts[0] == REDIS_PREFIX {
                if parts[1] == "msg" {
                    continue;
                }
                topics.insert(parts[1].to_string());
            }
        }
        
        Ok(topics.into_iter().collect())
    }

    /// Create a new queue from a Redis URL string.
    pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
        let mut cfg = RedisConfig::from_url(url.to_string());
        cfg.pool = Some(deadpool_redis::PoolConfig::default());
        let pool = cfg
            .create_pool(Some(Runtime::Tokio1))
            .context("failed to create Redis pool")?;
        Ok(Self { pool })
    }

    /// Create a new queue using environment variables.
    ///
    /// Precedence:
    /// - REDIS_URL if set.
    /// - Otherwise REDIS_HOST/REDIS_PORT/REDIS_USER/REDIS_PASSWORD parts.
    pub fn from_env() -> Result<Self, anyhow::Error> {
        fn redact_redis_url(url: &str) -> String {
            let s = url.to_string();
            if let Some(proto) = s.find("://") {
                let creds = &s[proto + 3..];
                if let Some(at) = creds.find('@') {
                    let after_at = &creds[at + 1..];
                    return format!("{}://***@{}", &s[..proto], after_at);
                }
            }
            s
        }
        let redis_url = match env::var("REDIS_URL") {
            Ok(u) => {
                info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
                u
            }
            Err(_) => {
                let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
                let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
                let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
                let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
                info!(
                    "Using Redis built from parts: host={} port={} user={} password_set={}",
                    host,
                    port,
                    user,
                    !pass.is_empty()
                );
                if pass.is_empty() {
                    format!("redis://{}@{}:{}/0", user, host, port)
                } else {
                    format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
                }
            }
        };
        Self::from_url(&redis_url)
    }

    fn now_ms() -> u64 {
        use std::time::{SystemTime, UNIX_EPOCH};
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis() as u64
    }

    /// Enqueue a message into Redis under the given topic.
    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(&msg.topic);
        let payload =
            serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
        let _: usize = conn
            .rpush(key, payload)
            .await
            .context("failed to RPUSH message to Redis list")?;
        Ok(())
    }

    /// Basic dequeue (no visibility). Returns Ok(None) if empty.
    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(topic);
        let result: Option<String> = conn
            .lpop(key, None)
            .await
            .context("failed to LPOP from Redis list")?;
        if let Some(payload) = result {
            let msg: InfraQueueMessage = serde_json::from_str(&payload)
                .context("failed to deserialize InfraQueueMessage from JSON")?;
            Ok(Some(msg))
        } else {
            Ok(None)
        }
    }

    /// Reclaim expired in-flight messages by moving them back to the ready queue.
    ///
    /// LLM guidance:
    /// - If a worker times out or crashes during generation, its visibility window may lapse.
    ///   Those messages become reclaimable and are re-queued for redelivery.
    /// - The retry_count is incremented to preserve a coarse history of delivery attempts
    ///   and to inform backoff policy decisions.
    /// - Your LLM worker should be idempotent. Use `message.id` or business keys to avoid
    ///   double-processing outputs when duplicates occur (at-least-once semantics).
    pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let inflight = make_inflight_key(topic);
        let now = Self::now_ms() as i64;
        let ids: Vec<String> = conn
            .zrangebyscore(inflight.clone(), 0, now)
            .await
            .context("failed to ZRANGEBYSCORE inflight")?;
        if ids.is_empty() {
            return Ok(0);
        }
        let list_key = make_key(topic);
        let mut reclaimed = 0u64;
        for id in ids.iter() {
            let msg_key = make_msg_key(id);
            if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
                if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
                    let new_retry = (msg.retry_count as u32).saturating_add(1);
                    msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
                    let updated = serde_json::to_string(&msg).unwrap_or(raw);
                    let _: () = conn.set(msg_key.clone(), updated).await?;
                    let _: usize = conn
                        .rpush(list_key.clone(), serde_json::to_string(&msg)?)
                        .await?;
                }
            }
            let _: i64 = conn.zrem(inflight.clone(), id).await?;
            reclaimed += 1;
        }
        Ok(reclaimed)
    }

    /// Dequeue with a visibility timeout and return a receipt token (message id).
    ///
    /// LLM guidance:
    /// - Set `visibility_timeout_ms` to cover the expected model latency (e.g., 30–120s for
    ///   long generations). While visible=false, other workers won't receive this message.
    /// - If the operation completes within the window, ACK to remove the inflight entry and
    ///   body; if it fails transiently (429/timeout), NACK to reschedule with backoff.
    /// - If the window expires first, the message is reclaimable and may be delivered again
    ///   (at-least-once delivery). Ensure your worker is idempotent.
    pub async fn dequeue_with_visibility(
        &self,
        topic: &str,
        visibility_timeout_ms: u64,
    ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
        let _ = self.reclaim_inflight(topic).await?;
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(topic);
        if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
            let msg: InfraQueueMessage = serde_json::from_str(&raw)
                .context("failed to deserialize InfraQueueMessage from JSON")?;
            let receipt = msg.id.clone();
            let body_key = make_msg_key(&receipt);
            let _: () = conn
                .set(body_key, raw)
                .await
                .context("failed to SET message body")?;
            let inflight = make_inflight_key(topic);
            let deadline = Self::now_ms() + visibility_timeout_ms;
            let _: i64 = conn
                .zadd(inflight, &receipt, deadline as i64)
                .await
                .context("failed to ZADD inflight")?;
            Ok(Some(DequeueWithReceipt {
                message: msg,
                receipt,
            }))
        } else {
            Ok(None)
        }
    }

    /// Acknowledge successful processing.
    ///
    /// LLM guidance:
    /// - Only ACK after you have durably persisted the model output (DB/S3) and emitted any
    ///   downstream events to avoid losing work if the worker crashes right after ACK.
    /// - Keep idempotency in mind: a duplicate ACK on the same receipt is harmless, but
    ///   your side-effects (writes) should also be idempotent when possible.
    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let inflight = make_inflight_key(topic);
        let _: i64 = conn
            .zrem(inflight, receipt)
            .await
            .context("failed to ZREM inflight")?;
        let msg_key = make_msg_key(receipt);
        let _: i64 = conn
            .del(msg_key)
            .await
            .context("failed to DEL message body")?;
        Ok(true)
    }

    /// Negative acknowledge: reschedule with backoff or dead-letter if retries exceeded.
    ///
    /// LLM guidance:
    /// - Call NACK for transient provider errors (HTTP 429, timeouts) to delay and retry without
    ///   blocking a worker. The delay is computed via `RetryPolicy` (exponential backoff capped
    ///   by `max_delay_ms`).
    /// - After `max_retries`, the message is moved to `infraqueue:{topic}:dlq` for manual triage.
    /// - Choose conservative backoff to avoid cascading failures and control spend when providers
    ///   are degraded. Pair with observability on DLQ size and retry counters.
    pub async fn nack(
        &self,
        topic: &str,
        receipt: &str,
        policy: &RetryPolicy,
    ) -> Result<NackOutcome, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let msg_key = make_msg_key(receipt);
        let raw: Option<String> = conn
            .get(msg_key.clone())
            .await
            .context("failed to GET message body")?;
        if raw.is_none() {
            return Ok(NackOutcome::DeadLettered);
        }
        let raw = raw.unwrap();
        let mut msg: InfraQueueMessage =
            serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
        let new_retry = (msg.retry_count as u32).saturating_add(1);
        msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
        if new_retry > policy.max_retries {
            let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
            let _: usize = conn
                .rpush(dlq, serde_json::to_string(&msg)?)
                .await
                .context("failed to RPUSH to DLQ")?;
            let inflight = make_inflight_key(topic);
            let _: i64 = conn.zrem(inflight, receipt).await?;
            let _: i64 = conn.del(msg_key).await?;
            return Ok(NackOutcome::DeadLettered);
        }
        let delay = policy.backoff_delay_ms(new_retry);
        let inflight = make_inflight_key(topic);
        let new_deadline = (Self::now_ms() + delay) as i64;
        let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
        let _: () = conn.set(msg_key, updated).await?;
        let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
        Ok(NackOutcome::Requeued {
            delay_ms: delay,
            retry_count: new_retry,
        })
    }

    /// Get Redis memory info.
    pub async fn get_redis_memory_info(&self) -> Result<HashMap<String, String>, anyhow::Error> {
        let mut conn = self.pool.get().await.context("failed to get redis connection")?;
        let info: String = deadpool_redis::redis::cmd("INFO")
            .arg("memory")
            .query_async(&mut conn)
            .await
            .context("failed to get INFO memory from redis")?;

        let mut metrics = HashMap::new();
        for line in info.lines() {
            if line.contains(':') {
                let parts: Vec<&str> = line.split(':').collect();
                if parts.len() == 2 {
                    metrics.insert(parts[0].to_string(), parts[1].to_string());
                }
            }
        }
        Ok(metrics)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_make_key() {
        assert_eq!(make_key("topic"), "infraqueue:topic");
        assert_eq!(make_key("orders"), "infraqueue:orders");
        assert_eq!(make_key(""), "infraqueue:");
    }
}