1use anyhow::Result;
2use reqwest::header::{HeaderMap, HeaderValue};
3use serde::{Deserialize, Serialize};
4use crate::model::InfraQueueMessage;
5use crate::queue::{NackOutcome, RetryPolicy};
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct EnqueueRequest {
9 pub sender: String,
10 pub topic: String,
11 pub payload: String,
12 pub priority: Option<u8>,
13}
14
15#[derive(Debug, Serialize, Deserialize)]
16pub struct DequeueRequest {
17 pub topic: String,
18 pub visibility_ms: Option<u64>,
19}
20
21#[derive(Debug, Serialize, Deserialize)]
22pub struct DequeueResponse {
23 pub receipt: String,
24 pub message: InfraQueueMessage,
25}
26
27#[derive(Debug, Serialize, Deserialize)]
28pub struct AckRequest {
29 pub topic: String,
30 pub receipt: String,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct NackRequest {
35 pub topic: String,
36 pub receipt: String,
37 pub max_retries: Option<u32>,
38 pub base_delay_ms: Option<u64>,
39 pub max_delay_ms: Option<u64>,
40 pub multiplier: Option<f64>,
41}
42
43#[derive(Debug, Serialize, Deserialize)]
44pub struct HeartbeatRequest {
45 pub topic: String,
46 pub consumer_id: String,
47 pub ttl_seconds: Option<u64>,
48}
49
50pub struct InfraQueueClient {
51 client: reqwest::Client,
52 base_url: String,
53}
54
55impl InfraQueueClient {
56 pub fn new(base_url: String, api_key: Option<String>) -> Result<Self> {
57 let mut headers = HeaderMap::new();
58 if let Some(key) = api_key {
59 headers.insert("X-API-Key", HeaderValue::from_str(&key)?);
60 }
61
62 let client = reqwest::Client::builder()
63 .default_headers(headers)
64 .build()?;
65
66 Ok(Self {
67 client,
68 base_url: base_url.trim_end_matches('/').to_string(),
69 })
70 }
71
72 pub fn from_env() -> Result<Self> {
73 let base_url = std::env::var("INFRAQUEUE_URL")
74 .or_else(|_| std::env::var("INFRAQUEUE_SERVER_URL"))
75 .unwrap_or_else(|_| "http://localhost:3000".to_string());
76 let api_key = std::env::var("INFRAQUEUE_API_KEY").ok();
77 Self::new(base_url, api_key)
78 }
79
80 pub async fn enqueue(&self, msg: InfraQueueMessage) -> Result<()> {
81 let url = format!("{}/enqueue", self.base_url);
82 let req = EnqueueRequest {
83 sender: msg.sender,
84 topic: msg.topic,
85 payload: msg.payload,
86 priority: msg.priority,
87 };
88
89 let resp = self.client.post(url)
90 .json(&req)
91 .send()
92 .await?;
93
94 let status = resp.status();
95 if !status.is_success() {
96 let err_text = resp.text().await.unwrap_or_default();
97 anyhow::bail!("enqueue failed: status={} body={}", status, err_text);
98 }
99
100 Ok(())
101 }
102
103 pub async fn dequeue(&self, topic: &str) -> Result<Option<InfraQueueMessage>> {
104 self.dequeue_with_visibility(topic, None).await.map(|opt| opt.map(|r| r.message))
105 }
106
107 pub async fn dequeue_with_visibility(&self, topic: &str, visibility_ms: Option<u64>) -> Result<Option<DequeueResponse>> {
108 let url = format!("{}/dequeue", self.base_url);
109 let req = DequeueRequest {
110 topic: topic.to_string(),
111 visibility_ms,
112 };
113
114 let resp = self.client.post(url)
115 .json(&req)
116 .send()
117 .await?;
118
119 let status = resp.status();
120 if status == reqwest::StatusCode::NO_CONTENT {
121 return Ok(None);
122 }
123
124 if !status.is_success() {
125 let err_text = resp.text().await.unwrap_or_default();
126 anyhow::bail!("dequeue failed: status={} body={}", status, err_text);
127 }
128
129 let body: DequeueResponse = resp.json().await?;
130 Ok(Some(body))
131 }
132
133 pub async fn ack(&self, topic: &str, receipt: &str) -> Result<()> {
134 let url = format!("{}/ack", self.base_url);
135 let req = AckRequest {
136 topic: topic.to_string(),
137 receipt: receipt.to_string(),
138 };
139
140 let resp = self.client.post(url)
141 .json(&req)
142 .send()
143 .await?;
144
145 let status = resp.status();
146 if !status.is_success() {
147 let err_text = resp.text().await.unwrap_or_default();
148 anyhow::bail!("ack failed: status={} body={}", status, err_text);
149 }
150
151 Ok(())
152 }
153
154 pub async fn nack(&self, topic: &str, receipt: &str, policy: &RetryPolicy) -> Result<NackOutcome> {
155 let url = format!("{}/nack", self.base_url);
156 let req = NackRequest {
157 topic: topic.to_string(),
158 receipt: receipt.to_string(),
159 max_retries: Some(policy.max_retries),
160 base_delay_ms: Some(policy.base_delay_ms),
161 max_delay_ms: Some(policy.max_delay_ms),
162 multiplier: Some(policy.multiplier),
163 };
164
165 let resp = self.client.post(url)
166 .json(&req)
167 .send()
168 .await?;
169
170 let status = resp.status();
171 if !status.is_success() {
172 let err_text = resp.text().await.unwrap_or_default();
173 anyhow::bail!("nack failed: status={} body={}", status, err_text);
174 }
175
176 #[derive(Deserialize)]
177 struct NackResp {
178 status: String,
179 delay_ms: Option<u64>,
180 retry_count: Option<u32>,
181 }
182
183 let body: NackResp = resp.json().await?;
184 if body.status == "requeued" {
185 Ok(NackOutcome::Requeued {
186 delay_ms: body.delay_ms.unwrap_or_default(),
187 retry_count: body.retry_count.unwrap_or_default(),
188 })
189 } else {
190 Ok(NackOutcome::DeadLettered)
191 }
192 }
193
194 pub async fn heartbeat(&self, topic: &str, consumer_id: &str, ttl_seconds: u64) -> Result<()> {
195 let url = format!("{}/heartbeat", self.base_url);
196 let req = HeartbeatRequest {
197 topic: topic.to_string(),
198 consumer_id: consumer_id.to_string(),
199 ttl_seconds: Some(ttl_seconds),
200 };
201
202 let resp = self.client.post(url)
203 .json(&req)
204 .send()
205 .await?;
206
207 let status = resp.status();
208 if !status.is_success() {
209 let err_text = resp.text().await.unwrap_or_default();
210 anyhow::bail!("heartbeat failed: status={} body={}", status, err_text);
211 }
212
213 Ok(())
214 }
215}