slack_morphism_hyper/
connector.rs

1use crate::ratectl::SlackTokioRateController;
2use async_recursion::async_recursion;
3use bytes::Buf;
4use futures::future::TryFutureExt;
5use futures::future::{BoxFuture, FutureExt};
6use hyper::body::HttpBody;
7use hyper::client::*;
8use hyper::http::StatusCode;
9use hyper::{Body, Request, Response, Uri};
10use hyper_rustls::HttpsConnector;
11use mime::Mime;
12use rvstruct::ValueStruct;
13use slack_morphism::errors::*;
14use slack_morphism::prelude::{SlackApiMethodRateControlConfig, SlackApiRateControlConfig};
15use slack_morphism::signature_verifier::SlackEventAbsentSignatureError;
16use slack_morphism::signature_verifier::SlackEventSignatureVerifier;
17use slack_morphism::*;
18use slack_morphism_models::{SlackClientId, SlackClientSecret};
19use std::collections::HashMap;
20use std::io::Read;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::*;
24use url::Url;
25
26#[derive(Clone, Debug)]
27pub struct SlackClientHyperConnector<H: Send + Sync + Clone + connect::Connect> {
28    hyper_connector: Client<H>,
29    tokio_rate_controller: Option<Arc<SlackTokioRateController>>,
30}
31
32pub type SlackClientHyperHttpsConnector = SlackClientHyperConnector<HttpsConnector<HttpConnector>>;
33
34impl SlackClientHyperConnector<HttpsConnector<HttpConnector>> {
35    pub fn new() -> Self {
36        let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
37            .with_native_roots()
38            .https_only()
39            .enable_http2()
40            .build();
41        Self::with_connector(https_connector)
42    }
43}
44
45impl From<HttpsConnector<HttpConnector>>
46    for SlackClientHyperConnector<HttpsConnector<HttpConnector>>
47{
48    fn from(https_connector: hyper_rustls::HttpsConnector<HttpConnector>) -> Self {
49        Self::with_connector(https_connector)
50    }
51}
52
53impl<H: 'static + Send + Sync + Clone + connect::Connect> SlackClientHyperConnector<H> {
54    pub fn with_connector(connector: H) -> Self {
55        Self {
56            hyper_connector: Client::builder().build::<_, hyper::Body>(connector),
57            tokio_rate_controller: None,
58        }
59    }
60
61    pub fn with_rate_control(self, rate_control_config: SlackApiRateControlConfig) -> Self {
62        Self {
63            tokio_rate_controller: Some(Arc::new(SlackTokioRateController::new(
64                rate_control_config,
65            ))),
66            ..self
67        }
68    }
69
70    pub(crate) fn parse_query_params(request: &Request<Body>) -> HashMap<String, String> {
71        request
72            .uri()
73            .query()
74            .map(|v| {
75                url::form_urlencoded::parse(v.as_bytes())
76                    .into_owned()
77                    .collect()
78            })
79            .unwrap_or_else(HashMap::new)
80    }
81
82    pub(crate) fn hyper_redirect_to(
83        url: &str,
84    ) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
85        Response::builder()
86            .status(hyper::http::StatusCode::FOUND)
87            .header(hyper::header::LOCATION, url)
88            .body(Body::empty())
89            .map_err(|e| e.into())
90    }
91
92    fn setup_token_auth_header(
93        request_builder: hyper::http::request::Builder,
94        token: Option<&SlackApiToken>,
95    ) -> hyper::http::request::Builder {
96        if token.is_none() {
97            request_builder
98        } else {
99            let token_header_value = format!("Bearer {}", token.unwrap().token_value.value());
100            request_builder.header(hyper::header::AUTHORIZATION, token_header_value)
101        }
102    }
103
104    pub(crate) fn setup_basic_auth_header(
105        request_builder: hyper::http::request::Builder,
106        username: &str,
107        password: &str,
108    ) -> hyper::http::request::Builder {
109        let header_value = format!(
110            "Basic {}",
111            base64::encode(format!("{}:{}", username, password))
112        );
113        request_builder.header(hyper::header::AUTHORIZATION, header_value)
114    }
115
116    pub(crate) fn create_http_request(
117        url: Url,
118        method: hyper::http::Method,
119    ) -> hyper::http::request::Builder {
120        let uri: Uri = url.as_str().parse().unwrap();
121        hyper::http::request::Builder::new()
122            .method(method)
123            .uri(uri)
124            .header("accept-charset", "utf-8")
125    }
126
127    async fn http_body_to_string<T>(body: T) -> AnyStdResult<String>
128    where
129        T: HttpBody,
130        T::Error: std::error::Error + Sync + Send + 'static,
131    {
132        let http_body = hyper::body::aggregate(body).await?;
133        let mut http_reader = http_body.reader();
134        let mut http_body_str = String::new();
135        http_reader.read_to_string(&mut http_body_str)?;
136        Ok(http_body_str)
137    }
138
139    fn http_response_content_type<RS>(response: &Response<RS>) -> Option<Mime> {
140        let http_headers = response.headers();
141        http_headers.get(hyper::header::CONTENT_TYPE).map(|hv| {
142            let hvs = hv.to_str().unwrap();
143            hvs.parse::<Mime>().unwrap()
144        })
145    }
146
147    async fn send_http_request<RS>(&self, request: Request<Body>) -> ClientResult<RS>
148    where
149        RS: for<'de> serde::de::Deserialize<'de>,
150    {
151        let uri_str = request.uri().to_string();
152        debug!(
153            slack_uri = uri_str.as_str(),
154            "Sending HTTP request to {}",
155            request.uri()
156        );
157
158        let http_res = self
159            .hyper_connector
160            .request(request)
161            .await
162            .map_err(Self::map_http_error)?;
163        let http_status = http_res.status();
164        let http_headers = http_res.headers().clone();
165        let http_content_type = Self::http_response_content_type(&http_res);
166        let http_body_str = Self::http_body_to_string(http_res)
167            .map_err(Self::map_system_error)
168            .await?;
169        let http_content_is_json = http_content_type.iter().all(|response_mime| {
170            response_mime.type_() == mime::APPLICATION && response_mime.subtype() == mime::JSON
171        });
172
173        debug!(
174            slack_uri = uri_str.as_str(),
175            slack_http_status = http_status.as_u16(),
176            "Received HTTP response {}",
177            http_status
178        );
179
180        match http_status {
181            StatusCode::OK if http_content_is_json => {
182                let slack_message: SlackEnvelopeMessage =
183                    serde_json::from_str(http_body_str.as_str())
184                        .map_err(|err| Self::map_serde_error(err, Some(http_body_str.as_str())))?;
185                match slack_message.error {
186                    None => {
187                        let decoded_body =
188                            serde_json::from_str(http_body_str.as_str()).map_err(|err| {
189                                Self::map_serde_error(err, Some(http_body_str.as_str()))
190                            })?;
191                        Ok(decoded_body)
192                    }
193                    Some(slack_error) => Err(SlackClientError::ApiError(
194                        SlackClientApiError::new(slack_error)
195                            .opt_errors(slack_message.errors)
196                            .opt_warnings(slack_message.warnings)
197                            .with_http_response_body(http_body_str),
198                    )),
199                }
200            }
201            StatusCode::OK | StatusCode::NO_CONTENT => {
202                serde_json::from_str("{}").map_err(|err| Self::map_serde_error(err, Some("{}")))
203            }
204            StatusCode::TOO_MANY_REQUESTS if http_content_is_json => {
205                let slack_message: SlackEnvelopeMessage =
206                    serde_json::from_str(http_body_str.as_str())
207                        .map_err(|err| Self::map_serde_error(err, Some(http_body_str.as_str())))?;
208
209                Err(SlackClientError::RateLimitError(
210                    SlackRateLimitError::new()
211                        .opt_retry_after(
212                            http_headers
213                                .get(hyper::header::RETRY_AFTER)
214                                .and_then(|ra| ra.to_str().ok().and_then(|s| s.parse().ok()))
215                                .map(Duration::from_secs),
216                        )
217                        .opt_code(slack_message.error)
218                        .opt_warnings(slack_message.warnings)
219                        .with_http_response_body(http_body_str),
220                ))
221            }
222            StatusCode::TOO_MANY_REQUESTS => Err(SlackClientError::RateLimitError(
223                SlackRateLimitError::new()
224                    .opt_retry_after(
225                        http_headers
226                            .get(hyper::header::RETRY_AFTER)
227                            .and_then(|ra| ra.to_str().ok().and_then(|s| s.parse().ok()))
228                            .map(Duration::from_secs),
229                    )
230                    .with_http_response_body(http_body_str),
231            )),
232            _ => Err(SlackClientError::HttpError(
233                SlackClientHttpError::new(http_status).with_http_response_body(http_body_str),
234            )),
235        }
236    }
237
238    #[async_recursion]
239    async fn send_rate_controlled_request<'a, R, RS>(
240        &'a self,
241        request: R,
242        token: Option<&'a SlackApiToken>,
243        rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
244        delayed: Option<Duration>,
245        retried: usize,
246    ) -> ClientResult<RS>
247    where
248        R: Fn() -> ClientResult<Request<Body>> + Send + Sync,
249        RS: for<'de> serde::de::Deserialize<'de> + Send,
250    {
251        match (self.tokio_rate_controller.as_ref(), rate_control_params) {
252            (Some(rate_controller), maybe_method_rate_params) => {
253                rate_controller
254                    .throttle_delay(
255                        maybe_method_rate_params,
256                        token.and_then(|t| t.team_id.clone()),
257                        delayed,
258                    )
259                    .await;
260
261                self.retry_request_if_needed(
262                    rate_controller.clone(),
263                    self.send_http_request(request()?).await,
264                    retried,
265                    request,
266                    token,
267                    rate_control_params,
268                )
269                .await
270            }
271            (None, _) => self.send_http_request(request()?).await,
272        }
273    }
274
275    async fn retry_request_if_needed<R, RS>(
276        &self,
277        rate_controller: Arc<SlackTokioRateController>,
278        result: ClientResult<RS>,
279        retried: usize,
280        request: R,
281        token: Option<&SlackApiToken>,
282        rate_control_params: Option<&SlackApiMethodRateControlConfig>,
283    ) -> ClientResult<RS>
284    where
285        R: Fn() -> ClientResult<Request<Body>> + Send + Sync,
286        RS: for<'de> serde::de::Deserialize<'de> + Send,
287    {
288        match result {
289            Err(err) => match rate_controller.config.max_retries {
290                Some(max_retries) if max_retries > retried => match err {
291                    SlackClientError::RateLimitError(ref rate_error) => {
292                        debug!(
293                            "Rate limit error received: {}. Retrying: {}/{}",
294                            rate_error,
295                            retried + 1,
296                            max_retries
297                        );
298
299                        self.send_rate_controlled_request(
300                            request,
301                            token,
302                            rate_control_params,
303                            rate_error.retry_after,
304                            retried + 1,
305                        )
306                        .await
307                    }
308                    _ => Err(err),
309                },
310                _ => Err(err),
311            },
312            Ok(result) => Ok(result),
313        }
314    }
315
316    pub(crate) async fn decode_signed_response(
317        req: Request<Body>,
318        signature_verifier: &SlackEventSignatureVerifier,
319    ) -> AnyStdResult<String> {
320        let headers = &req.headers().clone();
321        let req_body = req.into_body();
322        match (
323            headers.get(SlackEventSignatureVerifier::SLACK_SIGNED_HASH_HEADER),
324            headers.get(SlackEventSignatureVerifier::SLACK_SIGNED_TIMESTAMP),
325        ) {
326            (Some(received_hash), Some(received_ts)) => {
327                Self::http_body_to_string(req_body)
328                    .and_then(|body| async {
329                        signature_verifier
330                            .verify(
331                                received_hash.to_str().unwrap(),
332                                &body,
333                                received_ts.to_str().unwrap(),
334                            )
335                            .map(|_| body)
336                            .map_err(|e| e.into())
337                    })
338                    .await
339            }
340            _ => Err(Box::new(SlackEventAbsentSignatureError::new())),
341        }
342    }
343
344    pub(crate) fn map_http_error(hyper_err: hyper::Error) -> SlackClientError {
345        SlackClientError::HttpProtocolError(
346            SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)),
347        )
348    }
349
350    pub(crate) fn map_hyper_http_error(hyper_err: hyper::http::Error) -> SlackClientError {
351        SlackClientError::HttpProtocolError(
352            SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)),
353        )
354    }
355
356    pub(crate) fn map_serde_error(
357        err: serde_json::Error,
358        tried_to_parse: Option<&str>,
359    ) -> SlackClientError {
360        SlackClientError::ProtocolError(
361            SlackClientProtocolError::new(err).opt_json_body(tried_to_parse.map(|s| s.to_string())),
362        )
363    }
364
365    pub(crate) fn map_system_error(
366        err: Box<dyn std::error::Error + Sync + Send>,
367    ) -> SlackClientError {
368        SlackClientError::SystemError(SlackClientSystemError::new().with_cause(err))
369    }
370}
371
372impl<H: 'static + Send + Sync + Clone + connect::Connect> SlackClientHttpConnector
373    for SlackClientHyperConnector<H>
374{
375    fn http_get_uri<'a, RS>(
376        &'a self,
377        full_uri: Url,
378        token: Option<&'a SlackApiToken>,
379        rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
380    ) -> BoxFuture<'a, ClientResult<RS>>
381    where
382        RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send,
383    {
384        async move {
385            let body = self
386                .send_rate_controlled_request(
387                    || {
388                        let base_http_request =
389                            Self::create_http_request(full_uri.clone(), hyper::http::Method::GET);
390
391                        let http_request = Self::setup_token_auth_header(base_http_request, token);
392
393                        http_request
394                            .body(Body::empty())
395                            .map_err(Self::map_hyper_http_error)
396                    },
397                    token,
398                    rate_control_params,
399                    None,
400                    0,
401                )
402                .await?;
403
404            Ok(body)
405        }
406        .boxed()
407    }
408
409    fn http_get_with_client_secret<'a, RS>(
410        &'a self,
411        full_uri: Url,
412        client_id: &'a SlackClientId,
413        client_secret: &'a SlackClientSecret,
414    ) -> BoxFuture<'a, ClientResult<RS>>
415    where
416        RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + 'a + Send,
417    {
418        async move {
419            self.send_rate_controlled_request(
420                || {
421                    Self::setup_basic_auth_header(
422                        Self::create_http_request(full_uri.clone(), hyper::http::Method::GET),
423                        client_id.value(),
424                        client_secret.value(),
425                    )
426                    .body(Body::empty())
427                    .map_err(Self::map_hyper_http_error)
428                },
429                None,
430                None,
431                None,
432                0,
433            )
434            .await
435        }
436        .boxed()
437    }
438
439    fn http_post_uri<'a, RQ, RS>(
440        &'a self,
441        full_uri: Url,
442        request_body: &'a RQ,
443        token: Option<&'a SlackApiToken>,
444        rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
445    ) -> BoxFuture<'a, ClientResult<RS>>
446    where
447        RQ: serde::ser::Serialize + Send + Sync,
448        RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send + 'a,
449    {
450        async move {
451            let post_json = serde_json::to_string(&request_body)
452                .map_err(|err| Self::map_serde_error(err, None))?;
453
454            let response_body = self
455                .send_rate_controlled_request(
456                    || {
457                        let base_http_request =
458                            Self::create_http_request(full_uri.clone(), hyper::http::Method::POST)
459                                .header("content-type", "application/json; charset=utf-8");
460
461                        let http_request = Self::setup_token_auth_header(base_http_request, token);
462
463                        http_request
464                            .body(post_json.clone().into())
465                            .map_err(Self::map_hyper_http_error)
466                    },
467                    token,
468                    rate_control_params,
469                    None,
470                    0,
471                )
472                .await?;
473
474            Ok(response_body)
475        }
476        .boxed()
477    }
478}