atomic_web_push/helpers/clients/
reqwest_client.rs

1use async_trait::async_trait;
2
3use http::header::{CONTENT_LENGTH, RETRY_AFTER};
4use http::Request;
5use reqwest::{Body, Client};
6
7use crate::helpers::clients::{request_builder, WebPushClient};
8use crate::helpers::error::{RetryAfter, WebPushError};
9use crate::helpers::message::WebPushMessage;
10
11/// An async client for sending the notification payload.
12///
13/// This client is thread-safe. Clones of this client will share the same underlying resources,
14/// so cloning is a cheap and effective method to provide access to the client.
15///
16/// This client is [`hyper`](https://crates.io/crates/hyper) based, and will only work in Tokio contexts.
17#[derive(Clone)]
18pub struct ReqwestWebPushClient {
19    client: Client,
20}
21
22impl Default for ReqwestWebPushClient {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl From<Client> for ReqwestWebPushClient {
29    /// Creates a new client from a custom hyper HTTP client.
30    fn from(client: Client) -> Self {
31        Self { client }
32    }
33}
34
35impl ReqwestWebPushClient {
36    /// Creates a new client.
37    pub fn new() -> Self {
38        Self {
39            client: Client::builder().use_rustls_tls().build().unwrap(),
40        }
41    }
42}
43
44#[async_trait]
45impl WebPushClient for ReqwestWebPushClient {
46    /// Sends a notification. Never times out.
47    async fn send(&self, message: WebPushMessage) -> Result<(), WebPushError> {
48        trace!("Message: {:?}", message);
49
50        let request: Request<Body> = request_builder::build_request(message);
51
52        let body = request.body().as_bytes().unwrap().to_vec();
53        debug!("Request: {:?}", request);
54        let mut builder = self.client.post(request.uri().to_string());
55        for (key, value) in request.headers() {
56            builder = builder.header(key, value);
57        }
58
59        let requesting = builder.body(body).send();
60
61        let mut response = requesting.await?;
62
63        trace!("Response: {:?}", response);
64
65        let retry_after = response
66            .headers()
67            .get(RETRY_AFTER)
68            .and_then(|ra| ra.to_str().ok())
69            .and_then(RetryAfter::from_str);
70
71        let response_status = response.status();
72        trace!("Response status: {}", response_status);
73
74        let content_length = response
75            .headers()
76            .get(CONTENT_LENGTH)
77            .and_then(|s| s.to_str().ok())
78            .and_then(|s| s.parse().ok())
79            .unwrap_or(0);
80
81        let mut body: Vec<u8> = Vec::with_capacity(content_length);
82
83        while let Ok(Some(chunk)) = response.chunk().await {
84            body.extend(&chunk);
85        }
86        trace!("Body: {:?}", body);
87
88        trace!("Body text: {:?}", std::str::from_utf8(&body));
89
90        let response = request_builder::parse_response(response_status, body.to_vec());
91
92        debug!("Response: {:?}", response);
93
94        if let Err(WebPushError::ServerError {
95            retry_after: None,
96            info,
97        }) = response
98        {
99            Err(WebPushError::ServerError { retry_after, info })
100        } else {
101            Ok(response?)
102        }
103    }
104}