lmstfy_client/
api.rs

1use crate::errors::{APIError, ErrType};
2use crate::retrier::{retry_async, fixed_retry_strategy};
3
4use reqwest::{header::HeaderValue, Client as HTTPClient, Method, Response, StatusCode};
5use serde::{Deserialize, Serialize};
6use std::path::Path;
7/// Max read timeout value in second
8const MAX_READ_TIMEOUT: u32 = 600;
9/// Max batch consume size
10const MAX_BATCH_CONSUME_SIZE: u32 = 100;
11
12type Result<T> = std::result::Result<T, APIError>;
13
14/// The client for lmstfy
15pub struct Client {
16    /// The namespace for the client
17    pub namespace: String,
18    /// The access token of api
19    pub token: String,
20    /// The target host for the server
21    pub host: String,
22    /// The target port for the server
23    pub port: u32,
24    /// Retry when `publish` failed
25    pub retry: u32,
26    /// Backoff milliseconds when retrying
27    pub backoff: u32,
28    /// http client which is used to communicate with server
29    pub http_client: HTTPClient,
30}
31
32/// The normal response for publishing tasks
33#[derive(Deserialize)]
34pub struct PublishResponse {
35    /// The job id corresponding to the published task
36    job_id: String,
37}
38
39#[derive(Deserialize, Debug)]
40pub struct Job {
41    /// The namespace of the job
42    pub namespace: String,
43    /// The queue which holds the job
44    pub queue: String,
45    /// The data holded by the job, note that data is base64 encoded
46    pub data: String,
47    /// The id of the job
48    pub job_id: String,
49    /// The time-to-live of the job
50    pub ttl: u64,
51    /// The elapsed value in ms
52    pub elapsed_ms: u64,
53}
54
55#[derive(Deserialize)]
56pub struct QueueInfo {
57    /// The namespace of the queue
58    pub namespace: String,
59    /// The queue name
60    pub queue: String,
61    /// The size of the queue
62    pub size: u32,
63}
64
65#[derive(Deserialize)]
66pub struct DeadLetterInfo {
67    /// The namespace of the queue
68    pub namespace: String,
69    /// The queue name
70    pub queue: String,
71    /// The size of the dead letters
72    pub deadletter_size: u32,
73    /// The header of the letters
74    pub deadletter_head: String,
75}
76
77#[derive(Deserialize)]
78pub struct RespawnResult {
79    /// The count of respawned dead letters
80    pub count: u32,
81}
82
83#[derive(Deserialize)]
84pub struct ResponseError {
85    /// The error info returned from server
86    pub error: String,
87}
88
89/// The client implementation
90impl Client {
91    /// Returns a client with the given parameters
92    ///
93    /// # Arguments
94    ///
95    /// * `namespace` - A str that holds the namespace of the client
96    /// * `token` - A str that holds the token for api request
97    /// * `host` - A str that holds the target server
98    /// * `port` - A u32 value that holds the target port
99    /// * `retry` - A u32 value that holds the retry count
100    /// * `backoff` - A u32 value that holds the backoff value
101    pub fn new(
102        namespace: &str,
103        token: &str,
104        host: &str,
105        port: u32,
106        retry: u32,
107        backoff: u32,
108    ) -> Self {
109        Client {
110            namespace: namespace.to_string(),
111            token: token.to_string(),
112            host: host.to_string(),
113            port: port,
114            retry: retry,
115            backoff: backoff,
116            http_client: HTTPClient::new(),
117        }
118    }
119
120    /// Request for a response
121    ///
122    /// # Arguments
123    ///
124    /// * `method` - A http method, such as GET/PUT/POST etc
125    /// * `relative_path` - A str that holds the relative url path
126    /// * `query` - A option that holds query pairs if any
127    /// * `body` - A vector that holds body data if any
128    async fn request<T: Serialize + ?Sized>(
129        &self,
130        method: Method,
131        relative_path: &str,
132        query: Option<&T>,
133        body: Option<Vec<u8>>,
134    ) -> std::result::Result<(String, reqwest::Response), reqwest::Error> {
135        let url = format!("http://{}:{}", self.host, self.port);
136        let url = Path::new(&url)
137            .join("api")
138            .join(&self.namespace)
139            .join(relative_path)
140            .to_str()
141            .unwrap()
142            .to_string();
143
144        println!("url = {}", url);
145
146        let mut builder = self.http_client.request(method, &url);
147
148        if query.is_some() {
149            builder = builder.query(query.unwrap())
150        }
151
152        if body.is_some() {
153            builder = builder.body(body.unwrap())
154        }
155
156        let response = builder.header("X-Token", &self.token).send().await?;
157
158        println!("got response");
159        let request_id = response
160            .headers()
161            .get("X-Request-ID")
162            .unwrap_or(&HeaderValue::from_str("").unwrap())
163            .to_str()
164            .unwrap()
165            .to_string();
166
167        Ok((request_id, response))
168    }
169
170    /// Innner function to publish task
171    ///
172    /// # Arguments
173    ///
174    /// * `queue` - A string that holds the queue for the task
175    /// * `ack_job_id` - A string that holds the job id about to acknowledged
176    /// * `data` - A vector of byte that holds the content of task
177    /// * `ttl` - A u32 value that holds the time-to-live value
178    /// * `tries` - A u32 value that holds the maximize retry count
179    /// * `delay` - A u32 value that holds the delay value in second
180    async fn do_publish(
181        &self,
182        queue: String,
183        ack_job_id: String,
184        data: Vec<u8>,
185        ttl: u32,
186        tries: u32,
187        delay: u32,
188    ) -> std::result::Result<(String, reqwest::Response), reqwest::Error> {
189        let mut relative_path = queue.clone();
190        if ack_job_id != "" {
191            relative_path = Path::new(&relative_path)
192                .join("job")
193                .join(ack_job_id)
194                .to_str()
195                .unwrap()
196                .to_string();
197        }
198
199        let query = [("ttl", ttl), ("tries", tries), ("delay", delay)];
200        retry_async(fixed_retry_strategy(self.backoff as u64, self.retry as usize), || {
201            Box::pin(self.request(
202                Method::PUT,
203                relative_path.as_str(),
204                Some(&query),
205                Some(data.clone()),
206            ))
207        }).await
208    }
209
210    /// Inner function to consume a job
211    ///
212    /// # Arguments
213    ///
214    /// * `queues` - A string vector that holds the queues for consuming
215    ///
216    /// * `ttr` - A u32 value that holds the time-to-run value in second.
217    /// If the job is not finished before the `ttr` expires, the job will be
218    /// released for consuming again if the `(tries-1) > 0`
219    ///
220    /// * `timeout` - A u32 value that holds the max waiting time for long polling
221    /// If it's zero, this method will return immediately with or without a job;
222    /// if it's positive, this method would polling for new job until timeout.
223    ///
224    /// * `count` - A u32 value that holds the job count of this consume.
225    /// If it's zero or over 100, this method will return an error.
226    /// If it's positive, this method would return some jobs, and it's count is between 0 and count.
227    async fn do_consume(
228        &self,
229        queues: Vec<String>,
230        ttr: u32,
231        timeout: u32,
232        count: u32,
233    ) -> Result<Vec<Job>> {
234        if ttr <= 0 {
235            return Err(APIError {
236                err_type: ErrType::RequestErr,
237                reason: "ttr should be > 0".to_string(),
238                ..APIError::default()
239            });
240        }
241
242        if timeout >= MAX_READ_TIMEOUT {
243            return Err(APIError {
244                err_type: ErrType::RequestErr,
245                reason: format!("timeout should be >= 0 && < {}", MAX_READ_TIMEOUT),
246                ..APIError::default()
247            });
248        }
249
250        if count <= 0 || count > MAX_BATCH_CONSUME_SIZE {
251            return Err(APIError {
252                err_type: ErrType::RequestErr,
253                reason: format!("count should be > 0 && < {}", MAX_BATCH_CONSUME_SIZE),
254                ..APIError::default()
255            });
256        }
257
258        let relative_path = queues.join(",");
259        // FIXME: count can be omitted when consuming a single job
260        let query = [("ttr", ttr), ("timeout", timeout), ("count", count)];
261
262        let ret = self
263            .request(Method::GET, relative_path.as_str(), Some(&query), None)
264            .await;
265
266        if ret.is_err() {
267            return Err(APIError {
268                err_type: ErrType::RequestErr,
269                reason: ret.unwrap_err().to_string(),
270                ..APIError::default()
271            });
272        }
273
274        let (request_id, response) = ret.unwrap();
275        let status_code = response.status();
276        if status_code == StatusCode::NOT_FOUND {
277            return Ok(Vec::new());
278        }
279
280        if status_code != StatusCode::OK {
281            return Err(APIError {
282                err_type: ErrType::ResponseErr,
283                reason: self.parse_response_error(response).await,
284                request_id: request_id,
285                ..APIError::default()
286            });
287        }
288
289        if count == 1 {
290            return response
291                .json::<Job>()
292                .await
293                .map(|job| vec![job])
294                .map_err(|e| APIError {
295                    err_type: ErrType::ResponseErr,
296                    reason: e.to_string(),
297                    request_id: request_id,
298                    ..APIError::default()
299                });
300        }
301        // FIXME: deserialize vector of string
302        response.json::<Vec<Job>>().await.map_err(|e| APIError {
303            err_type: ErrType::ResponseErr,
304            reason: e.to_string(),
305            request_id: request_id,
306            ..APIError::default()
307        })
308    }
309
310    /// Inner function to peek a job
311    ///
312    /// # Arguments
313    ///
314    /// * queue - A string that holds the queue name
315    /// * job_id - An option of string that holds the job id
316    async fn do_peek_job(&self, queue: String, job_id: Option<String>) -> Result<Option<Job>> {
317        let path = Path::new(&queue);
318
319        let path_buf = if job_id.is_none() {
320            path.join("peek")
321        } else {
322            path.join("job").join(&job_id.unwrap())
323        };
324
325        let relative_path = path_buf.to_str().unwrap().to_string();
326
327        let ret = self
328            .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
329            .await;
330
331        if ret.is_err() {
332            return Err(APIError {
333                err_type: ErrType::RequestErr,
334                reason: ret.unwrap_err().to_string(),
335                ..APIError::default()
336            });
337        }
338
339        let (request_id, response) = ret.unwrap();
340        let status_code = response.status();
341        match status_code {
342            StatusCode::NOT_FOUND => Ok(None),
343            StatusCode::OK => response
344                .json::<Job>()
345                .await
346                .map(|job| Some(job))
347                .map_err(|e| APIError {
348                    err_type: ErrType::ResponseErr,
349                    reason: e.to_string(),
350                    request_id: request_id,
351                    ..APIError::default()
352                }),
353            _ => Err(APIError {
354                err_type: ErrType::ResponseErr,
355                reason: self.parse_response_error(response).await,
356                job_id: "".to_string(),
357                request_id: request_id,
358            }),
359        }
360    }
361
362    /// Inner function to parse error info from response
363    ///
364    /// # Arguments
365    ///
366    /// * `response` - A response that holds the request result
367    async fn parse_response_error(&self, response: Response) -> String {
368        let status_code = response.status();
369        match response.json::<ResponseError>().await {
370            Ok(re) => re.error,
371            Err(e) => format!("[{}]{}", status_code, e),
372        }
373    }
374}
375
376/// The API implementation for lmstfy
377impl Client {
378    /// Set parameters for retrying
379    /// 
380    /// # Arguments
381    /// 
382    /// * `retry` - A u32 value that holds the retry count
383    /// * `backoff` - A u32 value that holds the dalay milliseconds between retries
384    pub fn config_retry(
385        &mut self,
386        retry: u32,
387        backoff: u32,
388    ) -> Result<()> {
389        self.retry = retry;
390        self.backoff = backoff;
391        Ok(())
392    }
393    
394    /// Publish task to the server
395    ///
396    /// # Arguments
397    ///
398    /// * `queue` - A string that holds the queue for the task
399    /// * `ack_job_id` - A string that holds the job id about to acknowledged
400    /// * `data` - A vector of byte that holds the content of task
401    /// * `ttl` - A u32 value that holds the time-to-live value
402    /// * `tries` - A u32 value that holds the maximize retry count
403    /// * `delay` - A u32 value that holds the delay value in second
404    pub async fn publish(
405        &self,
406        queue: String,
407        ack_job_id: String,
408        data: Vec<u8>,
409        ttl: u32,
410        tries: u32,
411        delay: u32,
412    ) -> Result<(String, String)> {
413        let ret = self
414            .do_publish(queue, ack_job_id, data, ttl, tries, delay)
415            .await;
416
417        if ret.is_err() {
418            return Err(APIError {
419                err_type: ErrType::RequestErr,
420                reason: ret.unwrap_err().to_string(),
421                ..APIError::default()
422            });
423        }
424
425        let (request_id, response) = ret.unwrap();
426        let status_code = response.status();
427
428        match status_code {
429            StatusCode::CREATED => response
430                .json::<PublishResponse>()
431                .await
432                .map(|pr| (pr.job_id, request_id.clone()))
433                .map_err(|e| APIError {
434                    err_type: ErrType::ResponseErr,
435                    reason: e.to_string(),
436                    request_id: request_id,
437                    ..APIError::default()
438                }),
439            _ => Err(APIError {
440                err_type: ErrType::ResponseErr,
441                reason: self.parse_response_error(response).await,
442                request_id: request_id,
443                ..APIError::default()
444            }),
445        }
446    }
447
448    /// Consume a job, consuming will decrease the job's retry count by 1 firstly
449    ///
450    /// # Arguments
451    ///
452    /// * `queue` - A string that holds the queue for consuming
453    ///
454    /// * `ttr` - A u32 value that holds the time-to-run value in second.
455    /// If the job is not finished before the `ttr` expires, the job will be
456    /// released for consuming again if the `(tries-1) > 0`
457    ///
458    /// * `timeout` - A u32 value that holds the max waiting time for long polling
459    /// If it's zero, this method will return immediately with or without a job;
460    /// if it's positive, this method would polling for new job until timeout.
461    pub async fn consume(&self, queue: String, ttr: u32, timeout: u32) -> Result<Vec<Job>> {
462        self.do_consume(vec![queue], ttr, timeout, 1).await
463    }
464
465    /// Consume a batch of jobs
466    ///
467    /// # Arguments
468    ///
469    /// * `queue` - A string that holds the queue for consuming
470    ///
471    /// * `ttr` - A u32 value that holds the time-to-run value in second.
472    /// If the job is not finished before the `ttr` expires, the job will be
473    /// released for consuming again if the `(tries-1) > 0`
474    ///
475    /// * `timeout` - A u32 value that holds the max waiting time for long polling
476    /// If it's zero, this method will return immediately with or without a job;
477    /// if it's positive, this method would polling for new job until timeout.
478    ///
479    /// * `count` - A u32 value that holds the count of wanted jobs
480    pub async fn batch_consume(
481        &self,
482        queue: String,
483        ttr: u32,
484        timeout: u32,
485        count: u32,
486    ) -> Result<Vec<Job>> {
487        self.do_consume(vec![queue], ttr, timeout, count).await
488    }
489
490    /// Consume from multiple queues. Note that the order of the queues in the params
491    /// implies the priority. eg. consume_from_queues(120, 5, vec!("queue-a", "queue-b", "queue-c"))
492    /// if all the queues have jobs to be fetched, the job in `queue-a` will be return.
493    ///
494    /// # Arguments
495    ///
496    /// * `queues` - A string vector that holds the queues for consuming
497    ///
498    /// * `ttr` - A u32 value that holds the time-to-run value in second.
499    /// If the job is not finished before the `ttr` expires, the job will be
500    /// released for consuming again if the `(tries-1) > 0`
501    ///
502    /// * `timeout` - A u32 value that holds the max waiting time for long polling
503    /// If it's zero, this method will return immediately with or without a job;
504    /// if it's positive, this method would polling for new job until timeout.
505    ///
506    /// * `count` - A u32 value that holds the count of wanted jobs
507    pub async fn consume_from_queues(
508        &self,
509        queues: Vec<String>,
510        ttr: u32,
511        timeout: u32,
512    ) -> Result<Vec<Job>> {
513        self.do_consume(queues, ttr, timeout, 1).await
514    }
515
516    /// Mark a job as finished, so it won't be retried by others
517    ///
518    /// # Arguments
519    ///
520    /// * `queue` - A string that holds the queue for the job
521    /// * `job_id` - A string that holds the job id
522    pub async fn ack(&self, queue: String, job_id: String) -> Result<()> {
523        let relative_path = Path::new(&queue)
524            .join("job")
525            .join(job_id.clone())
526            .to_str()
527            .unwrap()
528            .to_string();
529
530        let ret = self
531            .request::<(String, u32)>(Method::DELETE, relative_path.as_str(), None, None)
532            .await;
533
534        if ret.is_err() {
535            return Err(APIError {
536                err_type: ErrType::RequestErr,
537                reason: ret.unwrap_err().to_string(),
538                job_id: job_id,
539                request_id: "".to_string(),
540            });
541        }
542
543        let (request_id, response) = ret.unwrap();
544        let status_code = response.status();
545
546        match status_code {
547            StatusCode::NO_CONTENT => Ok(()),
548            _ => Err(APIError {
549                err_type: ErrType::ResponseErr,
550                reason: self.parse_response_error(response).await,
551                job_id: job_id,
552                request_id: request_id,
553            }),
554        }
555    }
556
557    /// Get the queue size.
558    /// How many pending jobs are ready for consuming.
559    ///
560    /// # Arguments
561    /// * `queue` - A string that holds the queue name
562    pub async fn queue_size(&self, queue: String) -> Result<u32> {
563        let relative_path = Path::new(&queue)
564            .join("size")
565            .to_str()
566            .unwrap()
567            .to_string();
568
569        let ret = self
570            .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
571            .await;
572
573        if ret.is_err() {
574            return Err(APIError {
575                err_type: ErrType::RequestErr,
576                reason: ret.unwrap_err().to_string(),
577                ..APIError::default()
578            });
579        }
580
581        let (request_id, response) = ret.unwrap();
582        let status_code = response.status();
583
584        match status_code {
585            StatusCode::OK => response
586                .json::<QueueInfo>()
587                .await
588                .map(|info| info.size)
589                .map_err(|e| APIError {
590                    err_type: ErrType::ResponseErr,
591                    reason: e.to_string(),
592                    request_id: request_id,
593                    ..APIError::default()
594                }),
595            _ => Err(APIError {
596                err_type: ErrType::ResponseErr,
597                reason: self.parse_response_error(response).await,
598                job_id: "".to_string(),
599                request_id: request_id,
600            }),
601        }
602    }
603
604    /// Peek job from queue without consuming
605    ///
606    /// # Arguments
607    ///
608    /// * `queue` - A string that holds the queue name
609    pub async fn peek_queue(&self, queue: String) -> Result<Option<Job>> {
610        self.do_peek_job(queue, None).await
611    }
612
613    /// Peek a specified job
614    ///
615    /// # Arguments
616    ///
617    /// * queue - A string that holds the queue name
618    /// * job_id - A string that holds the job id
619    pub async fn peek_job(&self, queue: String, job_id: String) -> Result<Option<Job>> {
620        self.do_peek_job(queue, Some(job_id)).await
621    }
622
623    /// Peek the deadletter of the queue
624    ///
625    /// # Arguments
626    ///
627    /// * `queue` - A string that holds the queue name
628    pub async fn peek_dead_letter(&self, queue: String) -> Result<(u32, String)> {
629        let relative_path = Path::new(&queue)
630            .join("deadletter")
631            .to_str()
632            .unwrap()
633            .to_string();
634
635        let ret = self
636            .request::<(String, u32)>(Method::GET, relative_path.as_str(), None, None)
637            .await;
638
639        if ret.is_err() {
640            return Err(APIError {
641                err_type: ErrType::RequestErr,
642                reason: ret.unwrap_err().to_string(),
643                ..APIError::default()
644            });
645        }
646
647        let (request_id, response) = ret.unwrap();
648        let status_code = response.status();
649        match status_code {
650            StatusCode::OK => response
651                .json::<DeadLetterInfo>()
652                .await
653                .map(|info| (info.deadletter_size, info.deadletter_head))
654                .map_err(|e| APIError {
655                    err_type: ErrType::ResponseErr,
656                    reason: e.to_string(),
657                    request_id: request_id,
658                    ..APIError::default()
659                }),
660            _ => Err(APIError {
661                err_type: ErrType::ResponseErr,
662                reason: self.parse_response_error(response).await,
663                job_id: "".to_string(),
664                request_id: request_id,
665            }),
666        }
667    }
668
669    /// Respawn dead letter
670    ///
671    /// # Arguments
672    /// * `queue` - A string that holds the queue name
673    /// * `limit` - A i64 value that holds the limit
674    /// * `ttl` - A i64 value that holds the time-to-live value in second
675    pub async fn respawn_dead_letter(&self, queue: String, limit: i64, ttl: i64) -> Result<u32> {
676        if limit <= 0 {
677            return Err(APIError {
678                err_type: ErrType::RequestErr,
679                reason: "limit should be > 0".to_string(),
680                ..APIError::default()
681            });
682        }
683
684        if ttl <= 0 {
685            return Err(APIError {
686                err_type: ErrType::RequestErr,
687                reason: "ttl should be > 0".to_string(),
688                ..APIError::default()
689            });
690        }
691
692        let relative_path = Path::new(&queue)
693            .join("deadletter")
694            .to_str()
695            .unwrap()
696            .to_string();
697
698        let query = [("limit", limit), ("ttl", ttl)];
699        let ret = self
700            .request(Method::PUT, relative_path.as_str(), Some(&query), None)
701            .await;
702
703        if ret.is_err() {
704            return Err(APIError {
705                err_type: ErrType::RequestErr,
706                reason: ret.unwrap_err().to_string(),
707                ..APIError::default()
708            });
709        }
710
711        let (request_id, response) = ret.unwrap();
712        let status_code = response.status();
713        match status_code {
714            StatusCode::OK => response
715                .json::<RespawnResult>()
716                .await
717                .map(|rr| rr.count)
718                .map_err(|e| APIError {
719                    err_type: ErrType::ResponseErr,
720                    reason: e.to_string(),
721                    request_id: request_id,
722                    ..APIError::default()
723                }),
724            _ => Err(APIError {
725                err_type: ErrType::ResponseErr,
726                reason: self.parse_response_error(response).await,
727                job_id: "".to_string(),
728                request_id: request_id,
729            }),
730        }
731    }
732}