Skip to main content

infraqueue_lib/
client.rs

1use anyhow::Result;
2use reqwest::header::{HeaderMap, HeaderValue};
3use serde::{Deserialize, Serialize};
4use crate::model::InfraQueueMessage;
5use crate::queue::{NackOutcome, RetryPolicy};
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct EnqueueRequest {
9    pub sender: String,
10    pub topic: String,
11    pub payload: String,
12    pub priority: Option<u8>,
13}
14
15#[derive(Debug, Serialize, Deserialize)]
16pub struct DequeueRequest {
17    pub topic: String,
18    pub visibility_ms: Option<u64>,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22pub struct DequeueResponse {
23    pub receipt: String,
24    pub message: InfraQueueMessage,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28pub struct AckRequest {
29    pub topic: String,
30    pub receipt: String,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct NackRequest {
35    pub topic: String,
36    pub receipt: String,
37    pub max_retries: Option<u32>,
38    pub base_delay_ms: Option<u64>,
39    pub max_delay_ms: Option<u64>,
40    pub multiplier: Option<f64>,
41}
42
43#[derive(Debug, Serialize, Deserialize)]
44pub struct HeartbeatRequest {
45    pub topic: String,
46    pub consumer_id: String,
47    pub ttl_seconds: Option<u64>,
48}
49
50pub struct InfraQueueClient {
51    client: reqwest::Client,
52    base_url: String,
53}
54
55impl InfraQueueClient {
56    pub fn new(base_url: String, api_key: Option<String>) -> Result<Self> {
57        let mut headers = HeaderMap::new();
58        if let Some(key) = api_key {
59            headers.insert("X-API-Key", HeaderValue::from_str(&key)?);
60        }
61        
62        let client = reqwest::Client::builder()
63            .default_headers(headers)
64            .build()?;
65            
66        Ok(Self {
67            client,
68            base_url: base_url.trim_end_matches('/').to_string(),
69        })
70    }
71
72    pub fn from_env() -> Result<Self> {
73        let base_url = std::env::var("INFRAQUEUE_URL")
74            .or_else(|_| std::env::var("INFRAQUEUE_SERVER_URL"))
75            .unwrap_or_else(|_| "http://localhost:3000".to_string());
76        let api_key = std::env::var("INFRAQUEUE_API_KEY").ok();
77        Self::new(base_url, api_key)
78    }
79
80    pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<()> {
81        let url = format!("{}/enqueue", self.base_url);
82        let req = EnqueueRequest {
83            sender: msg.sender,
84            topic: msg.topic,
85            payload: msg.payload,
86            priority: msg.priority,
87        };
88        
89        let resp = self.client.post(url)
90            .json(&req)
91            .send()
92            .await?;
93            
94        let status = resp.status();
95        if !status.is_success() {
96            let err_text = resp.text().await.unwrap_or_default();
97            anyhow::bail!("enqueue failed: status={} body={}", status, err_text);
98        }
99        
100        Ok(())
101    }
102
103    pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>> {
104        self.dequeue_with_visibility(topic, None).await.map(|opt| opt.map(|r| r.message))
105    }
106
107    pub async fn dequeue_with_visibility(&self, topic: &str, visibility_ms: Option<u64>) -> Result<Option<DequeueResponse>> {
108        let url = format!("{}/dequeue", self.base_url);
109        let req = DequeueRequest {
110            topic: topic.to_string(),
111            visibility_ms,
112        };
113        
114        let resp = self.client.post(url)
115            .json(&req)
116            .send()
117            .await?;
118            
119        let status = resp.status();
120        if status == reqwest::StatusCode::NO_CONTENT {
121            return Ok(None);
122        }
123        
124        if !status.is_success() {
125            let err_text = resp.text().await.unwrap_or_default();
126            anyhow::bail!("dequeue failed: status={} body={}", status, err_text);
127        }
128        
129        let body: DequeueResponse = resp.json().await?;
130        Ok(Some(body))
131    }
132
133    pub async fn ack(&self, topic: &str, receipt: &str) -> Result<()> {
134        let url = format!("{}/ack", self.base_url);
135        let req = AckRequest {
136            topic: topic.to_string(),
137            receipt: receipt.to_string(),
138        };
139        
140        let resp = self.client.post(url)
141            .json(&req)
142            .send()
143            .await?;
144            
145        let status = resp.status();
146        if !status.is_success() {
147            let err_text = resp.text().await.unwrap_or_default();
148            anyhow::bail!("ack failed: status={} body={}", status, err_text);
149        }
150        
151        Ok(())
152    }
153
154    pub async fn nack(&self, topic: &str, receipt: &str, policy: &RetryPolicy) -> Result<NackOutcome> {
155        let url = format!("{}/nack", self.base_url);
156        let req = NackRequest {
157            topic: topic.to_string(),
158            receipt: receipt.to_string(),
159            max_retries: Some(policy.max_retries),
160            base_delay_ms: Some(policy.base_delay_ms),
161            max_delay_ms: Some(policy.max_delay_ms),
162            multiplier: Some(policy.multiplier),
163        };
164        
165        let resp = self.client.post(url)
166            .json(&req)
167            .send()
168            .await?;
169            
170        let status = resp.status();
171        if !status.is_success() {
172            let err_text = resp.text().await.unwrap_or_default();
173            anyhow::bail!("nack failed: status={} body={}", status, err_text);
174        }
175        
176        #[derive(Deserialize)]
177        struct NackResp {
178            status: String,
179            delay_ms: Option<u64>,
180            retry_count: Option<u32>,
181        }
182        
183        let body: NackResp = resp.json().await?;
184        if body.status == "requeued" {
185            Ok(NackOutcome::Requeued {
186                delay_ms: body.delay_ms.unwrap_or_default(),
187                retry_count: body.retry_count.unwrap_or_default(),
188            })
189        } else {
190            Ok(NackOutcome::DeadLettered)
191        }
192    }
193
194    pub async fn heartbeat(&self, topic: &str, consumer_id: &str, ttl_seconds: u64) -> Result<()> {
195        let url = format!("{}/heartbeat", self.base_url);
196        let req = HeartbeatRequest {
197            topic: topic.to_string(),
198            consumer_id: consumer_id.to_string(),
199            ttl_seconds: Some(ttl_seconds),
200        };
201        
202        let resp = self.client.post(url)
203            .json(&req)
204            .send()
205            .await?;
206            
207        let status = resp.status();
208        if !status.is_success() {
209            let err_text = resp.text().await.unwrap_or_default();
210            anyhow::bail!("heartbeat failed: status={} body={}", status, err_text);
211        }
212        
213        Ok(())
214    }
215}