queued_client_rs/
lib.rs

1use percent_encoding::utf8_percent_encode;
2use percent_encoding::NON_ALPHANUMERIC;
3use reqwest::header::CONTENT_TYPE;
4use reqwest::Method;
5use serde::de::DeserializeOwned;
6use serde::Deserialize;
7use serde::Serialize;
8use serde_with::serde_as;
9use serde_with::DurationSeconds;
10use std::error::Error;
11use std::fmt::Display;
12use std::time::Duration;
13
14#[derive(Debug)]
15pub enum QueuedClientError {
16  Api {
17    status: u16,
18    error: String,
19    error_details: Option<String>,
20  },
21  Unauthorized,
22  Request(reqwest::Error),
23}
24
25impl Display for QueuedClientError {
26  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27    match self {
28      QueuedClientError::Api {
29        status,
30        error,
31        error_details,
32      } => write!(f, "API error ({status} - {error}): {error_details:?}"),
33      QueuedClientError::Unauthorized => write!(f, "unauthorized"),
34      QueuedClientError::Request(e) => write!(f, "request error: {e}"),
35    }
36  }
37}
38
39impl Error for QueuedClientError {}
40
41pub type QueuedClientResult<T> = Result<T, QueuedClientError>;
42
43#[derive(Clone, Debug)]
44pub struct QueuedClientCfg {
45  pub api_key: Option<String>,
46  pub endpoint: String,
47}
48
49#[derive(Clone, Debug)]
50pub struct QueuedClient {
51  r: reqwest::Client,
52  cfg: QueuedClientCfg,
53}
54
55impl QueuedClient {
56  pub fn with_request_client(request_client: reqwest::Client, cfg: QueuedClientCfg) -> Self {
57    Self {
58      r: request_client,
59      cfg,
60    }
61  }
62
63  pub fn new(cfg: QueuedClientCfg) -> Self {
64    Self::with_request_client(reqwest::Client::new(), cfg)
65  }
66
67  async fn raw_request<I: Serialize, O: DeserializeOwned>(
68    &self,
69    method: Method,
70    path: impl AsRef<str>,
71    body: Option<&I>,
72  ) -> QueuedClientResult<O> {
73    let mut req = self
74      .r
75      .request(method, format!("{}{}", self.cfg.endpoint, path.as_ref()))
76      .header("accept", "application/msgpack");
77    if let Some(k) = &self.cfg.api_key {
78      req = req.header("authorization", k);
79    };
80    if let Some(b) = body {
81      let raw = rmp_serde::to_vec_named(b).unwrap();
82      req = req.header("content-type", "application/msgpack").body(raw);
83    };
84    let res = req
85      .send()
86      .await
87      .map_err(|err| QueuedClientError::Request(err))?;
88    let status = res.status().as_u16();
89    let res_type = res
90      .headers()
91      .get(CONTENT_TYPE)
92      .and_then(|v| v.to_str().ok().map(|v| v.to_string()))
93      .unwrap_or_default();
94    let res_body_raw = res
95      .bytes()
96      .await
97      .map_err(|err| QueuedClientError::Request(err))?;
98    if status == 401 {
99      return Err(QueuedClientError::Unauthorized);
100    };
101    #[derive(Deserialize)]
102    struct ApiError {
103      error: String,
104      error_details: Option<String>,
105    }
106    if status < 200 || status > 299 || !res_type.starts_with("application/msgpack") {
107      // The server may be behind some proxy, LB, etc., so we don't know what the body looks like for sure.
108      return Err(match rmp_serde::from_slice::<ApiError>(&res_body_raw) {
109        Ok(api_error) => QueuedClientError::Api {
110          status,
111          error: api_error.error,
112          error_details: api_error.error_details,
113        },
114        Err(_) => QueuedClientError::Api {
115          status,
116          // We don't know if the response contains valid UTF-8 text or not.
117          error: String::from_utf8_lossy(&res_body_raw).into_owned(),
118          error_details: None,
119        },
120      });
121    };
122    Ok(rmp_serde::from_slice(&res_body_raw).unwrap())
123  }
124
125  pub fn queue(&self, queue_name: &str) -> QueuedQueueClient {
126    QueuedQueueClient {
127      c: self.clone(),
128      qpp: format!(
129        "/queue/{}",
130        utf8_percent_encode(&queue_name, NON_ALPHANUMERIC)
131      ),
132    }
133  }
134}
135
136#[derive(Clone, Debug)]
137pub struct QueuedQueueClient {
138  c: QueuedClient,
139  qpp: String,
140}
141
142#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
143pub struct Message {
144  pub id: u64,
145  pub poll_tag: u32,
146}
147
148impl From<PolledMessage> for Message {
149  fn from(value: PolledMessage) -> Self {
150    value.message()
151  }
152}
153
154#[derive(Deserialize, Clone, Debug)]
155pub struct PolledMessage {
156  #[serde(with = "serde_bytes")]
157  pub contents: Vec<u8>,
158  pub id: u64,
159  pub poll_tag: u32,
160}
161
162impl PolledMessage {
163  pub fn message(&self) -> Message {
164    Message {
165      id: self.id,
166      poll_tag: self.poll_tag,
167    }
168  }
169}
170
171#[derive(Deserialize)]
172pub struct PollMessagesOutput {
173  pub messages: Vec<PolledMessage>,
174}
175
176#[serde_as]
177#[derive(Serialize)]
178pub struct PushMessage {
179  #[serde(with = "serde_bytes")]
180  pub contents: Vec<u8>,
181  #[serde_as(as = "DurationSeconds<u64>")]
182  #[serde(rename = "visibility_timeout_secs")]
183  pub visibility_timeout: Duration,
184}
185
186#[derive(Deserialize)]
187pub struct PushMessagesOutput {
188  pub ids: Vec<u64>,
189}
190
191#[derive(Deserialize)]
192pub struct UpdateMessageOutput {
193  pub new_poll_tag: u32,
194}
195
196#[derive(Deserialize)]
197pub struct DeleteMessagesOutput {}
198
199impl QueuedQueueClient {
200  pub async fn poll_messages(
201    &self,
202    count: u64,
203    visibility_timeout: Duration,
204  ) -> QueuedClientResult<PollMessagesOutput> {
205    #[derive(Serialize)]
206    struct Input {
207      count: u64,
208      visibility_timeout_secs: u64,
209    }
210    self
211      .c
212      .raw_request(
213        Method::POST,
214        format!("{}/messages/poll", self.qpp),
215        Some(&Input {
216          count,
217          visibility_timeout_secs: visibility_timeout.as_secs(),
218        }),
219      )
220      .await
221  }
222
223  pub async fn push_messages(
224    &self,
225    msgs: impl AsRef<[PushMessage]>,
226  ) -> QueuedClientResult<PushMessagesOutput> {
227    #[derive(Serialize)]
228    struct Input<'a> {
229      messages: &'a [PushMessage],
230    }
231    self
232      .c
233      .raw_request(
234        Method::POST,
235        format!("{}/messages/push", self.qpp),
236        Some(&Input {
237          messages: msgs.as_ref(),
238        }),
239      )
240      .await
241  }
242
243  pub async fn update_message(
244    &self,
245    m: Message,
246    new_visibility_timeout: Duration,
247  ) -> QueuedClientResult<UpdateMessageOutput> {
248    #[derive(Serialize)]
249    struct Input {
250      id: u64,
251      poll_tag: u32,
252      visibility_timeout_secs: u64,
253    }
254    self
255      .c
256      .raw_request(
257        Method::POST,
258        format!("{}/messages/update", self.qpp),
259        Some(&Input {
260          id: m.id,
261          poll_tag: m.poll_tag,
262          visibility_timeout_secs: new_visibility_timeout.as_secs(),
263        }),
264      )
265      .await
266  }
267
268  pub async fn delete_messages(
269    &self,
270    msgs: impl IntoIterator<Item = Message>,
271  ) -> QueuedClientResult<DeleteMessagesOutput> {
272    #[derive(Serialize)]
273    struct Input {
274      messages: Vec<Message>,
275    }
276    self
277      .c
278      .raw_request(
279        Method::POST,
280        format!("{}/messages/delete", self.qpp),
281        Some(&Input {
282          messages: msgs.into_iter().collect(),
283        }),
284      )
285      .await
286  }
287}