infraqueue-lib 0.1.1

Core library for INFRAQUEUE
Documentation
use anyhow::Result;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use crate::model::InfraQueueMessage;
use crate::queue::{NackOutcome, RetryPolicy};

#[derive(Debug, Serialize, Deserialize)]
pub struct EnqueueRequest {
    pub sender: String,
    pub topic: String,
    pub payload: String,
    pub priority: Option<u8>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DequeueRequest {
    pub topic: String,
    pub visibility_ms: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DequeueResponse {
    pub receipt: String,
    pub message: InfraQueueMessage,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AckRequest {
    pub topic: String,
    pub receipt: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct NackRequest {
    pub topic: String,
    pub receipt: String,
    pub max_retries: Option<u32>,
    pub base_delay_ms: Option<u64>,
    pub max_delay_ms: Option<u64>,
    pub multiplier: Option<f64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct HeartbeatRequest {
    pub topic: String,
    pub consumer_id: String,
    pub ttl_seconds: Option<u64>,
}

pub struct InfraQueueClient {
    client: reqwest::Client,
    base_url: String,
}

impl InfraQueueClient {
    pub fn new(base_url: String, api_key: Option<String>) -> Result<Self> {
        let mut headers = HeaderMap::new();
        if let Some(key) = api_key {
            headers.insert("X-API-Key", HeaderValue::from_str(&key)?);
        }
        
        let client = reqwest::Client::builder()
            .default_headers(headers)
            .build()?;
            
        Ok(Self {
            client,
            base_url: base_url.trim_end_matches('/').to_string(),
        })
    }

    pub fn from_env() -> Result<Self> {
        let base_url = std::env::var("INFRAQUEUE_URL")
            .or_else(|_| std::env::var("INFRAQUEUE_SERVER_URL"))
            .unwrap_or_else(|_| "http://localhost:3000".to_string());
        let api_key = std::env::var("INFRAQUEUE_API_KEY").ok();
        Self::new(base_url, api_key)
    }

    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<()> {
        let url = format!("{}/enqueue", self.base_url);
        let req = EnqueueRequest {
            sender: msg.sender,
            topic: msg.topic,
            payload: msg.payload,
            priority: msg.priority,
        };
        
        let resp = self.client.post(url)
            .json(&req)
            .send()
            .await?;
            
        let status = resp.status();
        if !status.is_success() {
            let err_text = resp.text().await.unwrap_or_default();
            anyhow::bail!("enqueue failed: status={} body={}", status, err_text);
        }
        
        Ok(())
    }

    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>> {
        self.dequeue_with_visibility(topic, None).await.map(|opt| opt.map(|r| r.message))
    }

    pub async fn dequeue_with_visibility(&self, topic: &str, visibility_ms: Option<u64>) -> Result<Option<DequeueResponse>> {
        let url = format!("{}/dequeue", self.base_url);
        let req = DequeueRequest {
            topic: topic.to_string(),
            visibility_ms,
        };
        
        let resp = self.client.post(url)
            .json(&req)
            .send()
            .await?;
            
        let status = resp.status();
        if status == reqwest::StatusCode::NO_CONTENT {
            return Ok(None);
        }
        
        if !status.is_success() {
            let err_text = resp.text().await.unwrap_or_default();
            anyhow::bail!("dequeue failed: status={} body={}", status, err_text);
        }
        
        let body: DequeueResponse = resp.json().await?;
        Ok(Some(body))
    }

    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<()> {
        let url = format!("{}/ack", self.base_url);
        let req = AckRequest {
            topic: topic.to_string(),
            receipt: receipt.to_string(),
        };
        
        let resp = self.client.post(url)
            .json(&req)
            .send()
            .await?;
            
        let status = resp.status();
        if !status.is_success() {
            let err_text = resp.text().await.unwrap_or_default();
            anyhow::bail!("ack failed: status={} body={}", status, err_text);
        }
        
        Ok(())
    }

    pub async fn nack(&self, topic: &str, receipt: &str, policy: &RetryPolicy) -> Result<NackOutcome> {
        let url = format!("{}/nack", self.base_url);
        let req = NackRequest {
            topic: topic.to_string(),
            receipt: receipt.to_string(),
            max_retries: Some(policy.max_retries),
            base_delay_ms: Some(policy.base_delay_ms),
            max_delay_ms: Some(policy.max_delay_ms),
            multiplier: Some(policy.multiplier),
        };
        
        let resp = self.client.post(url)
            .json(&req)
            .send()
            .await?;
            
        let status = resp.status();
        if !status.is_success() {
            let err_text = resp.text().await.unwrap_or_default();
            anyhow::bail!("nack failed: status={} body={}", status, err_text);
        }
        
        #[derive(Deserialize)]
        struct NackResp {
            status: String,
            delay_ms: Option<u64>,
            retry_count: Option<u32>,
        }
        
        let body: NackResp = resp.json().await?;
        if body.status == "requeued" {
            Ok(NackOutcome::Requeued {
                delay_ms: body.delay_ms.unwrap_or_default(),
                retry_count: body.retry_count.unwrap_or_default(),
            })
        } else {
            Ok(NackOutcome::DeadLettered)
        }
    }

    pub async fn heartbeat(&self, topic: &str, consumer_id: &str, ttl_seconds: u64) -> Result<()> {
        let url = format!("{}/heartbeat", self.base_url);
        let req = HeartbeatRequest {
            topic: topic.to_string(),
            consumer_id: consumer_id.to_string(),
            ttl_seconds: Some(ttl_seconds),
        };
        
        let resp = self.client.post(url)
            .json(&req)
            .send()
            .await?;
            
        let status = resp.status();
        if !status.is_success() {
            let err_text = resp.text().await.unwrap_or_default();
            anyhow::bail!("heartbeat failed: status={} body={}", status, err_text);
        }
        
        Ok(())
    }
}