infraqueue-lib 0.1.0

Core library for INFRAQUEUE
Documentation
use crate::config::env_var_or_file;
use crate::model::InfraQueueMessage;
use anyhow::Context;
use deadpool_redis::redis::AsyncCommands;
use deadpool_redis::{Config as RedisConfig, Pool, Runtime};
use log::info;
use std::env;

// Redis key prefix to namespace all INFRAQUEUE data.
const REDIS_PREFIX: &str = "infraqueue";

/// Build a Redis list key for the provided topic.
#[inline]
pub(crate) fn make_key(topic: &str) -> String {
    format!("{}:{}", REDIS_PREFIX, topic)
}

#[inline]
fn make_inflight_key(topic: &str) -> String {
    format!("{}:{}:inflight", REDIS_PREFIX, topic)
}

#[inline]
fn make_msg_key(id: &str) -> String {
    format!("{}:msg:{}", REDIS_PREFIX, id)
}

/// Retry/backoff policy configuration.
///
/// LLM-centric guidance:
/// - Useful when downstream model APIs (OpenAI, Anthropic, Ollama, etc.) return transient
///   failures like HTTP 429 (rate limit), 408 (timeout), or 5xx. Your worker can NACK the
///   message to reschedule it with a computed delay based on this policy.
/// - Combine with an appropriate visibility timeout so only one worker attempts a long-running
///   generation at a time.
/// - After `max_retries`, messages are dead-lettered to `infraqueue:{topic}:dlq` for manual inspection.
///
/// Fields:
/// - `max_retries`: reschedules allowed before dead-letter.
/// - `base_delay_ms`: initial delay on first failure.
/// - `max_delay_ms`: upper bound for any backoff delay.
/// - `multiplier`: exponential growth factor (e.g., 2.0 doubles each attempt).
#[derive(Clone, Debug)]
pub struct RetryPolicy {
    pub max_retries: u32,
    pub base_delay_ms: u64,
    pub max_delay_ms: u64,
    pub multiplier: f64,
}

impl Default for RetryPolicy {
    fn default() -> Self {
        Self {
            max_retries: 5,
            base_delay_ms: 1000,
            max_delay_ms: 60_000,
            multiplier: 2.0,
        }
    }
}

impl RetryPolicy {
    pub fn backoff_delay_ms(&self, retry_count: u32) -> u64 {
        let mut delay = self.base_delay_ms as f64;
        if retry_count > 1 {
            delay *= self.multiplier.powi((retry_count - 1) as i32);
        }
        delay.min(self.max_delay_ms as f64) as u64
    }
}

/// Result of dequeue with receipt token for ack/nack.
/// In LLM workflows, treat `receipt` like a visibility token; your worker:
/// 1) dequeues with a visibility window sized to the expected generation time,
/// 2) runs the model call,
/// 3) ACKs on success (deletes inflight and body), or NACKs on transient failure to reschedule.
///
/// Note: If the worker crashes or the visibility window elapses before ACK/NACK,
/// the message becomes eligible for reclaim and may be delivered again (at-least-once).
#[derive(Clone, Debug)]
pub struct DequeueWithReceipt {
    pub message: InfraQueueMessage,
    pub receipt: String,
}

/// Outcome of a NACK operation.
///
/// LLM guidance:
/// - `Requeued` means the message remains in-flight with a new scheduled visibility
///   deadline; workers should not attempt it until then. Useful after 429/timeout.
/// - `DeadLettered` indicates the message exceeded `max_retries` and was moved to
///   `infraqueue:{topic}:dlq` for inspection (e.g., bad prompt, invalid params).
#[derive(Clone, Debug)]
pub enum NackOutcome {
    Requeued { delay_ms: u64, retry_count: u32 },
    DeadLettered,
}

/// A minimal queue facade backed by Redis per topic.
pub struct InfraQueueQueue {
    pub(crate) pool: Pool,
}

impl InfraQueueQueue {
    /// Create a new queue facade over an existing connection pool.
    pub fn new(pool: Pool) -> Self {
        Self { pool }
    }

    /// Create a new queue from a Redis URL string.
    pub fn from_url(url: &str) -> Result<Self, anyhow::Error> {
        let mut cfg = RedisConfig::from_url(url.to_string());
        cfg.pool = Some(deadpool_redis::PoolConfig::default());
        let pool = cfg
            .create_pool(Some(Runtime::Tokio1))
            .context("failed to create Redis pool")?;
        Ok(Self { pool })
    }

    /// Create a new queue using environment variables.
    ///
    /// Precedence:
    /// - REDIS_URL if set.
    /// - Otherwise REDIS_HOST/REDIS_PORT/REDIS_USER/REDIS_PASSWORD parts.
    pub fn from_env() -> Result<Self, anyhow::Error> {
        fn redact_redis_url(url: &str) -> String {
            let s = url.to_string();
            if let Some(proto) = s.find("://") {
                let creds = &s[proto + 3..];
                if let Some(at) = creds.find('@') {
                    let after_at = &creds[at + 1..];
                    return format!("{}://***@{}", &s[..proto], after_at);
                }
            }
            s
        }
        let redis_url = match env::var("REDIS_URL") {
            Ok(u) => {
                info!("Using Redis via REDIS_URL={}", redact_redis_url(&u));
                u
            }
            Err(_) => {
                let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".into());
                let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".into());
                let user = env::var("REDIS_USER").unwrap_or_else(|_| "default".into());
                let pass = env_var_or_file("REDIS_PASSWORD").unwrap_or_default();
                info!(
                    "Using Redis built from parts: host={} port={} user={} password_set={}",
                    host,
                    port,
                    user,
                    !pass.is_empty()
                );
                if pass.is_empty() {
                    format!("redis://{}@{}:{}/0", user, host, port)
                } else {
                    format!("redis://{}:{}@{}:{}/0", user, pass, host, port)
                }
            }
        };
        Self::from_url(&redis_url)
    }

    fn now_ms() -> u64 {
        use std::time::{SystemTime, UNIX_EPOCH};
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis() as u64
    }

    /// Enqueue a message into Redis under the given topic.
    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<(), anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(&msg.topic);
        let payload =
            serde_json::to_string(&msg).context("failed to serialize InfraQueueMessage to JSON")?;
        let _: usize = conn
            .rpush(key, payload)
            .await
            .context("failed to RPUSH message to Redis list")?;
        Ok(())
    }

    /// Basic dequeue (no visibility). Returns Ok(None) if empty.
    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(topic);
        let result: Option<String> = conn
            .lpop(key, None)
            .await
            .context("failed to LPOP from Redis list")?;
        if let Some(payload) = result {
            let msg: InfraQueueMessage = serde_json::from_str(&payload)
                .context("failed to deserialize InfraQueueMessage from JSON")?;
            Ok(Some(msg))
        } else {
            Ok(None)
        }
    }

    /// Reclaim expired in-flight messages by moving them back to the ready queue.
    ///
    /// LLM guidance:
    /// - If a worker times out or crashes during generation, its visibility window may lapse.
    ///   Those messages become reclaimable and are re-queued for redelivery.
    /// - The retry_count is incremented to preserve a coarse history of delivery attempts
    ///   and to inform backoff policy decisions.
    /// - Your LLM worker should be idempotent. Use `message.id` or business keys to avoid
    ///   double-processing outputs when duplicates occur (at-least-once semantics).
    pub async fn reclaim_inflight(&self, topic: &str) -> Result<u64, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let inflight = make_inflight_key(topic);
        let now = Self::now_ms() as i64;
        let ids: Vec<String> = conn
            .zrangebyscore(inflight.clone(), 0, now)
            .await
            .context("failed to ZRANGEBYSCORE inflight")?;
        if ids.is_empty() {
            return Ok(0);
        }
        let list_key = make_key(topic);
        let mut reclaimed = 0u64;
        for id in ids.iter() {
            let msg_key = make_msg_key(id);
            if let Ok(Some(raw)) = conn.get::<_, Option<String>>(msg_key.clone()).await {
                if let Ok(mut msg) = serde_json::from_str::<InfraQueueMessage>(&raw) {
                    let new_retry = (msg.retry_count as u32).saturating_add(1);
                    msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
                    let updated = serde_json::to_string(&msg).unwrap_or(raw);
                    let _: () = conn.set(msg_key.clone(), updated).await?;
                    let _: usize = conn
                        .rpush(list_key.clone(), serde_json::to_string(&msg)?)
                        .await?;
                }
            }
            let _: i64 = conn.zrem(inflight.clone(), id).await?;
            reclaimed += 1;
        }
        Ok(reclaimed)
    }

    /// Dequeue with a visibility timeout and return a receipt token (message id).
    ///
    /// LLM guidance:
    /// - Set `visibility_timeout_ms` to cover the expected model latency (e.g., 30–120s for
    ///   long generations). While visible=false, other workers won't receive this message.
    /// - If the operation completes within the window, ACK to remove the inflight entry and
    ///   body; if it fails transiently (429/timeout), NACK to reschedule with backoff.
    /// - If the window expires first, the message is reclaimable and may be delivered again
    ///   (at-least-once delivery). Ensure your worker is idempotent.
    pub async fn dequeue_with_visibility(
        &self,
        topic: &str,
        visibility_timeout_ms: u64,
    ) -> Result<Option<DequeueWithReceipt>, anyhow::Error> {
        let _ = self.reclaim_inflight(topic).await?;
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let key = make_key(topic);
        if let Some(raw) = conn.lpop::<_, Option<String>>(key, None).await? {
            let msg: InfraQueueMessage = serde_json::from_str(&raw)
                .context("failed to deserialize InfraQueueMessage from JSON")?;
            let receipt = msg.id.clone();
            let body_key = make_msg_key(&receipt);
            let _: () = conn
                .set(body_key, raw)
                .await
                .context("failed to SET message body")?;
            let inflight = make_inflight_key(topic);
            let deadline = Self::now_ms() + visibility_timeout_ms;
            let _: i64 = conn
                .zadd(inflight, &receipt, deadline as i64)
                .await
                .context("failed to ZADD inflight")?;
            Ok(Some(DequeueWithReceipt {
                message: msg,
                receipt,
            }))
        } else {
            Ok(None)
        }
    }

    /// Acknowledge successful processing.
    ///
    /// LLM guidance:
    /// - Only ACK after you have durably persisted the model output (DB/S3) and emitted any
    ///   downstream events to avoid losing work if the worker crashes right after ACK.
    /// - Keep idempotency in mind: a duplicate ACK on the same receipt is harmless, but
    ///   your side-effects (writes) should also be idempotent when possible.
    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<bool, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let inflight = make_inflight_key(topic);
        let _: i64 = conn
            .zrem(inflight, receipt)
            .await
            .context("failed to ZREM inflight")?;
        let msg_key = make_msg_key(receipt);
        let _: i64 = conn
            .del(msg_key)
            .await
            .context("failed to DEL message body")?;
        Ok(true)
    }

    /// Negative acknowledge: reschedule with backoff or dead-letter if retries exceeded.
    ///
    /// LLM guidance:
    /// - Call NACK for transient provider errors (HTTP 429, timeouts) to delay and retry without
    ///   blocking a worker. The delay is computed via `RetryPolicy` (exponential backoff capped
    ///   by `max_delay_ms`).
    /// - After `max_retries`, the message is moved to `infraqueue:{topic}:dlq` for manual triage.
    /// - Choose conservative backoff to avoid cascading failures and control spend when providers
    ///   are degraded. Pair with observability on DLQ size and retry counters.
    pub async fn nack(
        &self,
        topic: &str,
        receipt: &str,
        policy: &RetryPolicy,
    ) -> Result<NackOutcome, anyhow::Error> {
        let mut conn = self
            .pool
            .get()
            .await
            .context("failed to get Redis connection from pool")?;
        let msg_key = make_msg_key(receipt);
        let raw: Option<String> = conn
            .get(msg_key.clone())
            .await
            .context("failed to GET message body")?;
        if raw.is_none() {
            return Ok(NackOutcome::DeadLettered);
        }
        let raw = raw.unwrap();
        let mut msg: InfraQueueMessage =
            serde_json::from_str(&raw).context("failed to deserialize InfraQueueMessage from JSON")?;
        let new_retry = (msg.retry_count as u32).saturating_add(1);
        msg.retry_count = new_retry.min(u8::MAX as u32) as u8;
        if new_retry > policy.max_retries {
            let dlq = format!("{}:{}:dlq", REDIS_PREFIX, topic);
            let _: usize = conn
                .rpush(dlq, serde_json::to_string(&msg)?)
                .await
                .context("failed to RPUSH to DLQ")?;
            let inflight = make_inflight_key(topic);
            let _: i64 = conn.zrem(inflight, receipt).await?;
            let _: i64 = conn.del(msg_key).await?;
            return Ok(NackOutcome::DeadLettered);
        }
        let delay = policy.backoff_delay_ms(new_retry);
        let inflight = make_inflight_key(topic);
        let new_deadline = (Self::now_ms() + delay) as i64;
        let updated = serde_json::to_string(&msg).context("failed to serialize updated message")?;
        let _: () = conn.set(msg_key, updated).await?;
        let _: i64 = conn.zadd(inflight, receipt, new_deadline).await?;
        Ok(NackOutcome::Requeued {
            delay_ms: delay,
            retry_count: new_retry,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_make_key() {
        assert_eq!(make_key("topic"), "infraqueue:topic");
        assert_eq!(make_key("orders"), "infraqueue:orders");
        assert_eq!(make_key(""), "infraqueue:");
    }
}