use anyhow::Result;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use crate::model::InfraQueueMessage;
use crate::queue::{NackOutcome, RetryPolicy};
#[derive(Debug, Serialize, Deserialize)]
pub struct EnqueueRequest {
pub sender: String,
pub topic: String,
pub payload: String,
pub priority: Option<u8>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DequeueRequest {
pub topic: String,
pub visibility_ms: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DequeueResponse {
pub receipt: String,
pub message: InfraQueueMessage,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AckRequest {
pub topic: String,
pub receipt: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NackRequest {
pub topic: String,
pub receipt: String,
pub max_retries: Option<u32>,
pub base_delay_ms: Option<u64>,
pub max_delay_ms: Option<u64>,
pub multiplier: Option<f64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeartbeatRequest {
pub topic: String,
pub consumer_id: String,
pub ttl_seconds: Option<u64>,
}
pub struct InfraQueueClient {
client: reqwest::Client,
base_url: String,
}
impl InfraQueueClient {
pub fn new(base_url: String, api_key: Option<String>) -> Result<Self> {
let mut headers = HeaderMap::new();
if let Some(key) = api_key {
headers.insert("X-API-Key", HeaderValue::from_str(&key)?);
}
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
Ok(Self {
client,
base_url: base_url.trim_end_matches('/').to_string(),
})
}
pub fn from_env() -> Result<Self> {
let base_url = std::env::var("INFRAQUEUE_URL")
.or_else(|_| std::env::var("INFRAQUEUE_SERVER_URL"))
.unwrap_or_else(|_| "http://localhost:3000".to_string());
let api_key = std::env::var("INFRAQUEUE_API_KEY").ok();
Self::new(base_url, api_key)
}
pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<()> {
let url = format!("{}/enqueue", self.base_url);
let req = EnqueueRequest {
sender: msg.sender,
topic: msg.topic,
payload: msg.payload,
priority: msg.priority,
};
let resp = self.client.post(url)
.json(&req)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let err_text = resp.text().await.unwrap_or_default();
anyhow::bail!("enqueue failed: status={} body={}", status, err_text);
}
Ok(())
}
pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>> {
self.dequeue_with_visibility(topic, None).await.map(|opt| opt.map(|r| r.message))
}
pub async fn dequeue_with_visibility(&self, topic: &str, visibility_ms: Option<u64>) -> Result<Option<DequeueResponse>> {
let url = format!("{}/dequeue", self.base_url);
let req = DequeueRequest {
topic: topic.to_string(),
visibility_ms,
};
let resp = self.client.post(url)
.json(&req)
.send()
.await?;
let status = resp.status();
if status == reqwest::StatusCode::NO_CONTENT {
return Ok(None);
}
if !status.is_success() {
let err_text = resp.text().await.unwrap_or_default();
anyhow::bail!("dequeue failed: status={} body={}", status, err_text);
}
let body: DequeueResponse = resp.json().await?;
Ok(Some(body))
}
pub async fn ack(&self, topic: &str, receipt: &str) -> Result<()> {
let url = format!("{}/ack", self.base_url);
let req = AckRequest {
topic: topic.to_string(),
receipt: receipt.to_string(),
};
let resp = self.client.post(url)
.json(&req)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let err_text = resp.text().await.unwrap_or_default();
anyhow::bail!("ack failed: status={} body={}", status, err_text);
}
Ok(())
}
pub async fn nack(&self, topic: &str, receipt: &str, policy: &RetryPolicy) -> Result<NackOutcome> {
let url = format!("{}/nack", self.base_url);
let req = NackRequest {
topic: topic.to_string(),
receipt: receipt.to_string(),
max_retries: Some(policy.max_retries),
base_delay_ms: Some(policy.base_delay_ms),
max_delay_ms: Some(policy.max_delay_ms),
multiplier: Some(policy.multiplier),
};
let resp = self.client.post(url)
.json(&req)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let err_text = resp.text().await.unwrap_or_default();
anyhow::bail!("nack failed: status={} body={}", status, err_text);
}
#[derive(Deserialize)]
struct NackResp {
status: String,
delay_ms: Option<u64>,
retry_count: Option<u32>,
}
let body: NackResp = resp.json().await?;
if body.status == "requeued" {
Ok(NackOutcome::Requeued {
delay_ms: body.delay_ms.unwrap_or_default(),
retry_count: body.retry_count.unwrap_or_default(),
})
} else {
Ok(NackOutcome::DeadLettered)
}
}
pub async fn heartbeat(&self, topic: &str, consumer_id: &str, ttl_seconds: u64) -> Result<()> {
let url = format!("{}/heartbeat", self.base_url);
let req = HeartbeatRequest {
topic: topic.to_string(),
consumer_id: consumer_id.to_string(),
ttl_seconds: Some(ttl_seconds),
};
let resp = self.client.post(url)
.json(&req)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let err_text = resp.text().await.unwrap_or_default();
anyhow::bail!("heartbeat failed: status={} body={}", status, err_text);
}
Ok(())
}
}