1use percent_encoding::utf8_percent_encode;
2use percent_encoding::NON_ALPHANUMERIC;
3use reqwest::header::CONTENT_TYPE;
4use reqwest::Method;
5use serde::de::DeserializeOwned;
6use serde::Deserialize;
7use serde::Serialize;
8use serde_with::serde_as;
9use serde_with::DurationSeconds;
10use std::error::Error;
11use std::fmt::Display;
12use std::time::Duration;
13
14#[derive(Debug)]
15pub enum QueuedClientError {
16 Api {
17 status: u16,
18 error: String,
19 error_details: Option<String>,
20 },
21 Unauthorized,
22 Request(reqwest::Error),
23}
24
25impl Display for QueuedClientError {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 match self {
28 QueuedClientError::Api {
29 status,
30 error,
31 error_details,
32 } => write!(f, "API error ({status} - {error}): {error_details:?}"),
33 QueuedClientError::Unauthorized => write!(f, "unauthorized"),
34 QueuedClientError::Request(e) => write!(f, "request error: {e}"),
35 }
36 }
37}
38
39impl Error for QueuedClientError {}
40
41pub type QueuedClientResult<T> = Result<T, QueuedClientError>;
42
43#[derive(Clone, Debug)]
44pub struct QueuedClientCfg {
45 pub api_key: Option<String>,
46 pub endpoint: String,
47}
48
49#[derive(Clone, Debug)]
50pub struct QueuedClient {
51 r: reqwest::Client,
52 cfg: QueuedClientCfg,
53}
54
55impl QueuedClient {
56 pub fn with_request_client(request_client: reqwest::Client, cfg: QueuedClientCfg) -> Self {
57 Self {
58 r: request_client,
59 cfg,
60 }
61 }
62
63 pub fn new(cfg: QueuedClientCfg) -> Self {
64 Self::with_request_client(reqwest::Client::new(), cfg)
65 }
66
67 async fn raw_request<I: Serialize, O: DeserializeOwned>(
68 &self,
69 method: Method,
70 path: impl AsRef<str>,
71 body: Option<&I>,
72 ) -> QueuedClientResult<O> {
73 let mut req = self
74 .r
75 .request(method, format!("{}{}", self.cfg.endpoint, path.as_ref()))
76 .header("accept", "application/msgpack");
77 if let Some(k) = &self.cfg.api_key {
78 req = req.header("authorization", k);
79 };
80 if let Some(b) = body {
81 let raw = rmp_serde::to_vec_named(b).unwrap();
82 req = req.header("content-type", "application/msgpack").body(raw);
83 };
84 let res = req
85 .send()
86 .await
87 .map_err(|err| QueuedClientError::Request(err))?;
88 let status = res.status().as_u16();
89 let res_type = res
90 .headers()
91 .get(CONTENT_TYPE)
92 .and_then(|v| v.to_str().ok().map(|v| v.to_string()))
93 .unwrap_or_default();
94 let res_body_raw = res
95 .bytes()
96 .await
97 .map_err(|err| QueuedClientError::Request(err))?;
98 if status == 401 {
99 return Err(QueuedClientError::Unauthorized);
100 };
101 #[derive(Deserialize)]
102 struct ApiError {
103 error: String,
104 error_details: Option<String>,
105 }
106 if status < 200 || status > 299 || !res_type.starts_with("application/msgpack") {
107 return Err(match rmp_serde::from_slice::<ApiError>(&res_body_raw) {
109 Ok(api_error) => QueuedClientError::Api {
110 status,
111 error: api_error.error,
112 error_details: api_error.error_details,
113 },
114 Err(_) => QueuedClientError::Api {
115 status,
116 error: String::from_utf8_lossy(&res_body_raw).into_owned(),
118 error_details: None,
119 },
120 });
121 };
122 Ok(rmp_serde::from_slice(&res_body_raw).unwrap())
123 }
124
125 pub fn queue(&self, queue_name: &str) -> QueuedQueueClient {
126 QueuedQueueClient {
127 c: self.clone(),
128 qpp: format!(
129 "/queue/{}",
130 utf8_percent_encode(&queue_name, NON_ALPHANUMERIC)
131 ),
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
137pub struct QueuedQueueClient {
138 c: QueuedClient,
139 qpp: String,
140}
141
142#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
143pub struct Message {
144 pub id: u64,
145 pub poll_tag: u32,
146}
147
148impl From<PolledMessage> for Message {
149 fn from(value: PolledMessage) -> Self {
150 value.message()
151 }
152}
153
154#[derive(Deserialize, Clone, Debug)]
155pub struct PolledMessage {
156 #[serde(with = "serde_bytes")]
157 pub contents: Vec<u8>,
158 pub id: u64,
159 pub poll_tag: u32,
160}
161
162impl PolledMessage {
163 pub fn message(&self) -> Message {
164 Message {
165 id: self.id,
166 poll_tag: self.poll_tag,
167 }
168 }
169}
170
171#[derive(Deserialize)]
172pub struct PollMessagesOutput {
173 pub messages: Vec<PolledMessage>,
174}
175
176#[serde_as]
177#[derive(Serialize)]
178pub struct PushMessage {
179 #[serde(with = "serde_bytes")]
180 pub contents: Vec<u8>,
181 #[serde_as(as = "DurationSeconds<u64>")]
182 #[serde(rename = "visibility_timeout_secs")]
183 pub visibility_timeout: Duration,
184}
185
186#[derive(Deserialize)]
187pub struct PushMessagesOutput {
188 pub ids: Vec<u64>,
189}
190
191#[derive(Deserialize)]
192pub struct UpdateMessageOutput {
193 pub new_poll_tag: u32,
194}
195
196#[derive(Deserialize)]
197pub struct DeleteMessagesOutput {}
198
199impl QueuedQueueClient {
200 pub async fn poll_messages(
201 &self,
202 count: u64,
203 visibility_timeout: Duration,
204 ) -> QueuedClientResult<PollMessagesOutput> {
205 #[derive(Serialize)]
206 struct Input {
207 count: u64,
208 visibility_timeout_secs: u64,
209 }
210 self
211 .c
212 .raw_request(
213 Method::POST,
214 format!("{}/messages/poll", self.qpp),
215 Some(&Input {
216 count,
217 visibility_timeout_secs: visibility_timeout.as_secs(),
218 }),
219 )
220 .await
221 }
222
223 pub async fn push_messages(
224 &self,
225 msgs: impl AsRef<[PushMessage]>,
226 ) -> QueuedClientResult<PushMessagesOutput> {
227 #[derive(Serialize)]
228 struct Input<'a> {
229 messages: &'a [PushMessage],
230 }
231 self
232 .c
233 .raw_request(
234 Method::POST,
235 format!("{}/messages/push", self.qpp),
236 Some(&Input {
237 messages: msgs.as_ref(),
238 }),
239 )
240 .await
241 }
242
243 pub async fn update_message(
244 &self,
245 m: Message,
246 new_visibility_timeout: Duration,
247 ) -> QueuedClientResult<UpdateMessageOutput> {
248 #[derive(Serialize)]
249 struct Input {
250 id: u64,
251 poll_tag: u32,
252 visibility_timeout_secs: u64,
253 }
254 self
255 .c
256 .raw_request(
257 Method::POST,
258 format!("{}/messages/update", self.qpp),
259 Some(&Input {
260 id: m.id,
261 poll_tag: m.poll_tag,
262 visibility_timeout_secs: new_visibility_timeout.as_secs(),
263 }),
264 )
265 .await
266 }
267
268 pub async fn delete_messages(
269 &self,
270 msgs: impl IntoIterator<Item = Message>,
271 ) -> QueuedClientResult<DeleteMessagesOutput> {
272 #[derive(Serialize)]
273 struct Input {
274 messages: Vec<Message>,
275 }
276 self
277 .c
278 .raw_request(
279 Method::POST,
280 format!("{}/messages/delete", self.qpp),
281 Some(&Input {
282 messages: msgs.into_iter().collect(),
283 }),
284 )
285 .await
286 }
287}