1use crate::errors::{APIError, ErrType};
2use crate::retrier::{retry_async, fixed_retry_strategy};
3
4use reqwest::{header::HeaderValue, Client as HTTPClient, Method, Response, StatusCode};
5use serde::{Deserialize, Serialize};
6use std::path::Path;
7const MAX_READ_TIMEOUT: u32 = 600;
9const MAX_BATCH_CONSUME_SIZE: u32 = 100;
11
12type Result<T> = std::result::Result<T, APIError>;
13
14pub struct Client {
16 pub namespace: String,
18 pub token: String,
20 pub host: String,
22 pub port: u32,
24 pub retry: u32,
26 pub backoff: u32,
28 pub http_client: HTTPClient,
30}
31
32#[derive(Deserialize)]
34pub struct PublishResponse {
35 job_id: String,
37}
38
39#[derive(Deserialize, Debug)]
40pub struct Job {
41 pub namespace: String,
43 pub queue: String,
45 pub data: String,
47 pub job_id: String,
49 pub ttl: u64,
51 pub elapsed_ms: u64,
53}
54
55#[derive(Deserialize)]
56pub struct QueueInfo {
57 pub namespace: String,
59 pub queue: String,
61 pub size: u32,
63}
64
65#[derive(Deserialize)]
66pub struct DeadLetterInfo {
67 pub namespace: String,
69 pub queue: String,
71 pub deadletter_size: u32,
73 pub deadletter_head: String,
75}
76
77#[derive(Deserialize)]
78pub struct RespawnResult {
79 pub count: u32,
81}
82
83#[derive(Deserialize)]
84pub struct ResponseError {
85 pub error: String,
87}
88
89impl Client {
91 pub fn new(
102 namespace: &str,
103 token: &str,
104 host: &str,
105 port: u32,
106 retry: u32,
107 backoff: u32,
108 ) -> Self {
109 Client {
110 namespace: namespace.to_string(),
111 token: token.to_string(),
112 host: host.to_string(),
113 port: port,
114 retry: retry,
115 backoff: backoff,
116 http_client: HTTPClient::new(),
117 }
118 }
119
120 async fn request<T: Serialize + ?Sized>(
129 &self,
130 method: Method,
131 relative_path: &str,
132 query: Option<&T>,
133 body: Option<Vec<u8>>,
134 ) -> std::result::Result<(String, reqwest::Response), reqwest::Error> {
135 let url = format!("http://{}:{}", self.host, self.port);
136 let url = Path::new(&url)
137 .join("api")
138 .join(&self.namespace)
139 .join(relative_path)
140 .to_str()
141 .unwrap()
142 .to_string();
143
144 println!("url = {}", url);
145
146 let mut builder = self.http_client.request(method, &url);
147
148 if query.is_some() {
149 builder = builder.query(query.unwrap())
150 }
151
152 if body.is_some() {
153 builder = builder.body(body.unwrap())
154 }
155
156 let response = builder.header("X-Token", &self.token).send().await?;
157
158 println!("got response");
159 let request_id = response
160 .headers()
161 .get("X-Request-ID")
162 .unwrap_or(&HeaderValue::from_str("").unwrap())
163 .to_str()
164 .unwrap()
165 .to_string();
166
167 Ok((request_id, response))
168 }
169
170 async fn do_publish(
181 &self,
182 queue: String,
183 ack_job_id: String,
184 data: Vec<u8>,
185 ttl: u32,
186 tries: u32,
187 delay: u32,
188 ) -> std::result::Result<(String, reqwest::Response), reqwest::Error> {
189 let mut relative_path = queue.clone();
190 if ack_job_id != "" {
191 relative_path = Path::new(&relative_path)
192 .join("job")
193 .join(ack_job_id)
194 .to_str()
195 .unwrap()
196 .to_string();
197 }
198
199 let query = [("ttl", ttl), ("tries", tries), ("delay", delay)];
200 retry_async(fixed_retry_strategy(self.backoff as u64, self.retry as usize), || {
201 Box::pin(self.request(
202 Method::PUT,
203 relative_path.as_str(),
204 Some(&query),
205 Some(data.clone()),
206 ))
207 }).await
208 }
209
210 async fn do_consume(
228 &self,
229 queues: Vec<String>,
230 ttr: u32,
231 timeout: u32,
232 count: u32,
233 ) -> Result<Vec<Job>> {
234 if ttr <= 0 {
235 return Err(APIError {
236 err_type: ErrType::RequestErr,
237 reason: "ttr should be > 0".to_string(),
238 ..APIError::default()
239 });
240 }
241
242 if timeout >= MAX_READ_TIMEOUT {
243 return Err(APIError {
244 err_type: ErrType::RequestErr,
245 reason: format!("timeout should be >= 0 && < {}", MAX_READ_TIMEOUT),
246 ..APIError::default()
247 });
248 }
249
250 if count <= 0 || count > MAX_BATCH_CONSUME_SIZE {
251 return Err(APIError {
252 err_type: ErrType::RequestErr,
253 reason: format!("count should be > 0 && < {}", MAX_BATCH_CONSUME_SIZE),
254 ..APIError::default()
255 });
256 }
257
258 let relative_path = queues.join(",");
259 let query = [("ttr", ttr), ("timeout", timeout), ("count", count)];
261
262 let ret = self
263 .request(Method::GET, relative_path.as_str(), Some(&query), None)
264 .await;
265
266 if ret.is_err() {
267 return Err(APIError {
268 err_type: ErrType::RequestErr,
269 reason: ret.unwrap_err().to_string(),
270 ..APIError::default()
271 });
272 }
273
274 let (request_id, response) = ret.unwrap();
275 let status_code = response.status();
276 if status_code == StatusCode::NOT_FOUND {
277 return Ok(Vec::new());
278 }
279
280 if status_code != StatusCode::OK {
281 return Err(APIError {
282 err_type: ErrType::ResponseErr,
283 reason: self.parse_response_error(response).await,
284 request_id: request_id,
285 ..APIError::default()
286 });
287 }
288
289 if count == 1 {
290 return response
291 .json::<Job>()
292 .await
293 .map(|job| vec![job])
294 .map_err(|e| APIError {
295 err_type: ErrType::ResponseErr,
296 reason: e.to_string(),
297 request_id: request_id,
298 ..APIError::default()
299 });
300 }
301 response.json::<Vec<Job>>().await.map_err(|e| APIError {
303 err_type: ErrType::ResponseErr,
304 reason: e.to_string(),
305 request_id: request_id,
306 ..APIError::default()
307 })
308 }
309
310 async fn do_peek_job(&self, queue: String, job_id: Option<String>) -> Result<Option<Job>> {
317 let path = Path::new(&queue);
318
319 let path_buf = if job_id.is_none() {
320 path.join("peek")
321 } else {
322 path.join("job").join(&job_id.unwrap())
323 };
324
325 let relative_path = path_buf.to_str().unwrap().to_string();
326
327 let ret = self
328 .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
329 .await;
330
331 if ret.is_err() {
332 return Err(APIError {
333 err_type: ErrType::RequestErr,
334 reason: ret.unwrap_err().to_string(),
335 ..APIError::default()
336 });
337 }
338
339 let (request_id, response) = ret.unwrap();
340 let status_code = response.status();
341 match status_code {
342 StatusCode::NOT_FOUND => Ok(None),
343 StatusCode::OK => response
344 .json::<Job>()
345 .await
346 .map(|job| Some(job))
347 .map_err(|e| APIError {
348 err_type: ErrType::ResponseErr,
349 reason: e.to_string(),
350 request_id: request_id,
351 ..APIError::default()
352 }),
353 _ => Err(APIError {
354 err_type: ErrType::ResponseErr,
355 reason: self.parse_response_error(response).await,
356 job_id: "".to_string(),
357 request_id: request_id,
358 }),
359 }
360 }
361
362 async fn parse_response_error(&self, response: Response) -> String {
368 let status_code = response.status();
369 match response.json::<ResponseError>().await {
370 Ok(re) => re.error,
371 Err(e) => format!("[{}]{}", status_code, e),
372 }
373 }
374}
375
376impl Client {
378 pub fn config_retry(
385 &mut self,
386 retry: u32,
387 backoff: u32,
388 ) -> Result<()> {
389 self.retry = retry;
390 self.backoff = backoff;
391 Ok(())
392 }
393
394 pub async fn publish(
405 &self,
406 queue: String,
407 ack_job_id: String,
408 data: Vec<u8>,
409 ttl: u32,
410 tries: u32,
411 delay: u32,
412 ) -> Result<(String, String)> {
413 let ret = self
414 .do_publish(queue, ack_job_id, data, ttl, tries, delay)
415 .await;
416
417 if ret.is_err() {
418 return Err(APIError {
419 err_type: ErrType::RequestErr,
420 reason: ret.unwrap_err().to_string(),
421 ..APIError::default()
422 });
423 }
424
425 let (request_id, response) = ret.unwrap();
426 let status_code = response.status();
427
428 match status_code {
429 StatusCode::CREATED => response
430 .json::<PublishResponse>()
431 .await
432 .map(|pr| (pr.job_id, request_id.clone()))
433 .map_err(|e| APIError {
434 err_type: ErrType::ResponseErr,
435 reason: e.to_string(),
436 request_id: request_id,
437 ..APIError::default()
438 }),
439 _ => Err(APIError {
440 err_type: ErrType::ResponseErr,
441 reason: self.parse_response_error(response).await,
442 request_id: request_id,
443 ..APIError::default()
444 }),
445 }
446 }
447
448 pub async fn consume(&self, queue: String, ttr: u32, timeout: u32) -> Result<Vec<Job>> {
462 self.do_consume(vec![queue], ttr, timeout, 1).await
463 }
464
465 pub async fn batch_consume(
481 &self,
482 queue: String,
483 ttr: u32,
484 timeout: u32,
485 count: u32,
486 ) -> Result<Vec<Job>> {
487 self.do_consume(vec![queue], ttr, timeout, count).await
488 }
489
490 pub async fn consume_from_queues(
508 &self,
509 queues: Vec<String>,
510 ttr: u32,
511 timeout: u32,
512 ) -> Result<Vec<Job>> {
513 self.do_consume(queues, ttr, timeout, 1).await
514 }
515
516 pub async fn ack(&self, queue: String, job_id: String) -> Result<()> {
523 let relative_path = Path::new(&queue)
524 .join("job")
525 .join(job_id.clone())
526 .to_str()
527 .unwrap()
528 .to_string();
529
530 let ret = self
531 .request::<(String, u32)>(Method::DELETE, relative_path.as_str(), None, None)
532 .await;
533
534 if ret.is_err() {
535 return Err(APIError {
536 err_type: ErrType::RequestErr,
537 reason: ret.unwrap_err().to_string(),
538 job_id: job_id,
539 request_id: "".to_string(),
540 });
541 }
542
543 let (request_id, response) = ret.unwrap();
544 let status_code = response.status();
545
546 match status_code {
547 StatusCode::NO_CONTENT => Ok(()),
548 _ => Err(APIError {
549 err_type: ErrType::ResponseErr,
550 reason: self.parse_response_error(response).await,
551 job_id: job_id,
552 request_id: request_id,
553 }),
554 }
555 }
556
557 pub async fn queue_size(&self, queue: String) -> Result<u32> {
563 let relative_path = Path::new(&queue)
564 .join("size")
565 .to_str()
566 .unwrap()
567 .to_string();
568
569 let ret = self
570 .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
571 .await;
572
573 if ret.is_err() {
574 return Err(APIError {
575 err_type: ErrType::RequestErr,
576 reason: ret.unwrap_err().to_string(),
577 ..APIError::default()
578 });
579 }
580
581 let (request_id, response) = ret.unwrap();
582 let status_code = response.status();
583
584 match status_code {
585 StatusCode::OK => response
586 .json::<QueueInfo>()
587 .await
588 .map(|info| info.size)
589 .map_err(|e| APIError {
590 err_type: ErrType::ResponseErr,
591 reason: e.to_string(),
592 request_id: request_id,
593 ..APIError::default()
594 }),
595 _ => Err(APIError {
596 err_type: ErrType::ResponseErr,
597 reason: self.parse_response_error(response).await,
598 job_id: "".to_string(),
599 request_id: request_id,
600 }),
601 }
602 }
603
604 pub async fn peek_queue(&self, queue: String) -> Result<Option<Job>> {
610 self.do_peek_job(queue, None).await
611 }
612
613 pub async fn peek_job(&self, queue: String, job_id: String) -> Result<Option<Job>> {
620 self.do_peek_job(queue, Some(job_id)).await
621 }
622
623 pub async fn peek_dead_letter(&self, queue: String) -> Result<(u32, String)> {
629 let relative_path = Path::new(&queue)
630 .join("deadletter")
631 .to_str()
632 .unwrap()
633 .to_string();
634
635 let ret = self
636 .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
637 .await;
638
639 if ret.is_err() {
640 return Err(APIError {
641 err_type: ErrType::RequestErr,
642 reason: ret.unwrap_err().to_string(),
643 ..APIError::default()
644 });
645 }
646
647 let (request_id, response) = ret.unwrap();
648 let status_code = response.status();
649 match status_code {
650 StatusCode::OK => response
651 .json::<DeadLetterInfo>()
652 .await
653 .map(|info| (info.deadletter_size, info.deadletter_head))
654 .map_err(|e| APIError {
655 err_type: ErrType::ResponseErr,
656 reason: e.to_string(),
657 request_id: request_id,
658 ..APIError::default()
659 }),
660 _ => Err(APIError {
661 err_type: ErrType::ResponseErr,
662 reason: self.parse_response_error(response).await,
663 job_id: "".to_string(),
664 request_id: request_id,
665 }),
666 }
667 }
668
669 pub async fn respawn_dead_letter(&self, queue: String, limit: i64, ttl: i64) -> Result<u32> {
676 if limit <= 0 {
677 return Err(APIError {
678 err_type: ErrType::RequestErr,
679 reason: "limit should be > 0".to_string(),
680 ..APIError::default()
681 });
682 }
683
684 if ttl <= 0 {
685 return Err(APIError {
686 err_type: ErrType::RequestErr,
687 reason: "ttl should be > 0".to_string(),
688 ..APIError::default()
689 });
690 }
691
692 let relative_path = Path::new(&queue)
693 .join("deadletter")
694 .to_str()
695 .unwrap()
696 .to_string();
697
698 let query = [("limit", limit), ("ttl", ttl)];
699 let ret = self
700 .request(Method::PUT, relative_path.as_str(), Some(&query), None)
701 .await;
702
703 if ret.is_err() {
704 return Err(APIError {
705 err_type: ErrType::RequestErr,
706 reason: ret.unwrap_err().to_string(),
707 ..APIError::default()
708 });
709 }
710
711 let (request_id, response) = ret.unwrap();
712 let status_code = response.status();
713 match status_code {
714 StatusCode::OK => response
715 .json::<RespawnResult>()
716 .await
717 .map(|rr| rr.count)
718 .map_err(|e| APIError {
719 err_type: ErrType::ResponseErr,
720 reason: e.to_string(),
721 request_id: request_id,
722 ..APIError::default()
723 }),
724 _ => Err(APIError {
725 err_type: ErrType::ResponseErr,
726 reason: self.parse_response_error(response).await,
727 job_id: "".to_string(),
728 request_id: request_id,
729 }),
730 }
731 }
732}