1use crate::ratectl::SlackTokioRateController;
2use async_recursion::async_recursion;
3use bytes::Buf;
4use futures::future::TryFutureExt;
5use futures::future::{BoxFuture, FutureExt};
6use hyper::body::HttpBody;
7use hyper::client::*;
8use hyper::http::StatusCode;
9use hyper::{Body, Request, Response, Uri};
10use hyper_rustls::HttpsConnector;
11use mime::Mime;
12use rvstruct::ValueStruct;
13use slack_morphism::errors::*;
14use slack_morphism::prelude::{SlackApiMethodRateControlConfig, SlackApiRateControlConfig};
15use slack_morphism::signature_verifier::SlackEventAbsentSignatureError;
16use slack_morphism::signature_verifier::SlackEventSignatureVerifier;
17use slack_morphism::*;
18use slack_morphism_models::{SlackClientId, SlackClientSecret};
19use std::collections::HashMap;
20use std::io::Read;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::*;
24use url::Url;
25
26#[derive(Clone, Debug)]
27pub struct SlackClientHyperConnector<H: Send + Sync + Clone + connect::Connect> {
28 hyper_connector: Client<H>,
29 tokio_rate_controller: Option<Arc<SlackTokioRateController>>,
30}
31
32pub type SlackClientHyperHttpsConnector = SlackClientHyperConnector<HttpsConnector<HttpConnector>>;
33
34impl SlackClientHyperConnector<HttpsConnector<HttpConnector>> {
35 pub fn new() -> Self {
36 let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
37 .with_native_roots()
38 .https_only()
39 .enable_http2()
40 .build();
41 Self::with_connector(https_connector)
42 }
43}
44
45impl From<HttpsConnector<HttpConnector>>
46 for SlackClientHyperConnector<HttpsConnector<HttpConnector>>
47{
48 fn from(https_connector: hyper_rustls::HttpsConnector<HttpConnector>) -> Self {
49 Self::with_connector(https_connector)
50 }
51}
52
53impl<H: 'static + Send + Sync + Clone + connect::Connect> SlackClientHyperConnector<H> {
54 pub fn with_connector(connector: H) -> Self {
55 Self {
56 hyper_connector: Client::builder().build::<_, hyper::Body>(connector),
57 tokio_rate_controller: None,
58 }
59 }
60
61 pub fn with_rate_control(self, rate_control_config: SlackApiRateControlConfig) -> Self {
62 Self {
63 tokio_rate_controller: Some(Arc::new(SlackTokioRateController::new(
64 rate_control_config,
65 ))),
66 ..self
67 }
68 }
69
70 pub(crate) fn parse_query_params(request: &Request<Body>) -> HashMap<String, String> {
71 request
72 .uri()
73 .query()
74 .map(|v| {
75 url::form_urlencoded::parse(v.as_bytes())
76 .into_owned()
77 .collect()
78 })
79 .unwrap_or_else(HashMap::new)
80 }
81
82 pub(crate) fn hyper_redirect_to(
83 url: &str,
84 ) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
85 Response::builder()
86 .status(hyper::http::StatusCode::FOUND)
87 .header(hyper::header::LOCATION, url)
88 .body(Body::empty())
89 .map_err(|e| e.into())
90 }
91
92 fn setup_token_auth_header(
93 request_builder: hyper::http::request::Builder,
94 token: Option<&SlackApiToken>,
95 ) -> hyper::http::request::Builder {
96 if token.is_none() {
97 request_builder
98 } else {
99 let token_header_value = format!("Bearer {}", token.unwrap().token_value.value());
100 request_builder.header(hyper::header::AUTHORIZATION, token_header_value)
101 }
102 }
103
104 pub(crate) fn setup_basic_auth_header(
105 request_builder: hyper::http::request::Builder,
106 username: &str,
107 password: &str,
108 ) -> hyper::http::request::Builder {
109 let header_value = format!(
110 "Basic {}",
111 base64::encode(format!("{}:{}", username, password))
112 );
113 request_builder.header(hyper::header::AUTHORIZATION, header_value)
114 }
115
116 pub(crate) fn create_http_request(
117 url: Url,
118 method: hyper::http::Method,
119 ) -> hyper::http::request::Builder {
120 let uri: Uri = url.as_str().parse().unwrap();
121 hyper::http::request::Builder::new()
122 .method(method)
123 .uri(uri)
124 .header("accept-charset", "utf-8")
125 }
126
127 async fn http_body_to_string<T>(body: T) -> AnyStdResult<String>
128 where
129 T: HttpBody,
130 T::Error: std::error::Error + Sync + Send + 'static,
131 {
132 let http_body = hyper::body::aggregate(body).await?;
133 let mut http_reader = http_body.reader();
134 let mut http_body_str = String::new();
135 http_reader.read_to_string(&mut http_body_str)?;
136 Ok(http_body_str)
137 }
138
139 fn http_response_content_type<RS>(response: &Response<RS>) -> Option<Mime> {
140 let http_headers = response.headers();
141 http_headers.get(hyper::header::CONTENT_TYPE).map(|hv| {
142 let hvs = hv.to_str().unwrap();
143 hvs.parse::<Mime>().unwrap()
144 })
145 }
146
147 async fn send_http_request<RS>(&self, request: Request<Body>) -> ClientResult<RS>
148 where
149 RS: for<'de> serde::de::Deserialize<'de>,
150 {
151 let uri_str = request.uri().to_string();
152 debug!(
153 slack_uri = uri_str.as_str(),
154 "Sending HTTP request to {}",
155 request.uri()
156 );
157
158 let http_res = self
159 .hyper_connector
160 .request(request)
161 .await
162 .map_err(Self::map_http_error)?;
163 let http_status = http_res.status();
164 let http_headers = http_res.headers().clone();
165 let http_content_type = Self::http_response_content_type(&http_res);
166 let http_body_str = Self::http_body_to_string(http_res)
167 .map_err(Self::map_system_error)
168 .await?;
169 let http_content_is_json = http_content_type.iter().all(|response_mime| {
170 response_mime.type_() == mime::APPLICATION && response_mime.subtype() == mime::JSON
171 });
172
173 debug!(
174 slack_uri = uri_str.as_str(),
175 slack_http_status = http_status.as_u16(),
176 "Received HTTP response {}",
177 http_status
178 );
179
180 match http_status {
181 StatusCode::OK if http_content_is_json => {
182 let slack_message: SlackEnvelopeMessage =
183 serde_json::from_str(http_body_str.as_str())
184 .map_err(|err| Self::map_serde_error(err, Some(http_body_str.as_str())))?;
185 match slack_message.error {
186 None => {
187 let decoded_body =
188 serde_json::from_str(http_body_str.as_str()).map_err(|err| {
189 Self::map_serde_error(err, Some(http_body_str.as_str()))
190 })?;
191 Ok(decoded_body)
192 }
193 Some(slack_error) => Err(SlackClientError::ApiError(
194 SlackClientApiError::new(slack_error)
195 .opt_errors(slack_message.errors)
196 .opt_warnings(slack_message.warnings)
197 .with_http_response_body(http_body_str),
198 )),
199 }
200 }
201 StatusCode::OK | StatusCode::NO_CONTENT => {
202 serde_json::from_str("{}").map_err(|err| Self::map_serde_error(err, Some("{}")))
203 }
204 StatusCode::TOO_MANY_REQUESTS if http_content_is_json => {
205 let slack_message: SlackEnvelopeMessage =
206 serde_json::from_str(http_body_str.as_str())
207 .map_err(|err| Self::map_serde_error(err, Some(http_body_str.as_str())))?;
208
209 Err(SlackClientError::RateLimitError(
210 SlackRateLimitError::new()
211 .opt_retry_after(
212 http_headers
213 .get(hyper::header::RETRY_AFTER)
214 .and_then(|ra| ra.to_str().ok().and_then(|s| s.parse().ok()))
215 .map(Duration::from_secs),
216 )
217 .opt_code(slack_message.error)
218 .opt_warnings(slack_message.warnings)
219 .with_http_response_body(http_body_str),
220 ))
221 }
222 StatusCode::TOO_MANY_REQUESTS => Err(SlackClientError::RateLimitError(
223 SlackRateLimitError::new()
224 .opt_retry_after(
225 http_headers
226 .get(hyper::header::RETRY_AFTER)
227 .and_then(|ra| ra.to_str().ok().and_then(|s| s.parse().ok()))
228 .map(Duration::from_secs),
229 )
230 .with_http_response_body(http_body_str),
231 )),
232 _ => Err(SlackClientError::HttpError(
233 SlackClientHttpError::new(http_status).with_http_response_body(http_body_str),
234 )),
235 }
236 }
237
238 #[async_recursion]
239 async fn send_rate_controlled_request<'a, R, RS>(
240 &'a self,
241 request: R,
242 token: Option<&'a SlackApiToken>,
243 rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
244 delayed: Option<Duration>,
245 retried: usize,
246 ) -> ClientResult<RS>
247 where
248 R: Fn() -> ClientResult<Request<Body>> + Send + Sync,
249 RS: for<'de> serde::de::Deserialize<'de> + Send,
250 {
251 match (self.tokio_rate_controller.as_ref(), rate_control_params) {
252 (Some(rate_controller), maybe_method_rate_params) => {
253 rate_controller
254 .throttle_delay(
255 maybe_method_rate_params,
256 token.and_then(|t| t.team_id.clone()),
257 delayed,
258 )
259 .await;
260
261 self.retry_request_if_needed(
262 rate_controller.clone(),
263 self.send_http_request(request()?).await,
264 retried,
265 request,
266 token,
267 rate_control_params,
268 )
269 .await
270 }
271 (None, _) => self.send_http_request(request()?).await,
272 }
273 }
274
275 async fn retry_request_if_needed<R, RS>(
276 &self,
277 rate_controller: Arc<SlackTokioRateController>,
278 result: ClientResult<RS>,
279 retried: usize,
280 request: R,
281 token: Option<&SlackApiToken>,
282 rate_control_params: Option<&SlackApiMethodRateControlConfig>,
283 ) -> ClientResult<RS>
284 where
285 R: Fn() -> ClientResult<Request<Body>> + Send + Sync,
286 RS: for<'de> serde::de::Deserialize<'de> + Send,
287 {
288 match result {
289 Err(err) => match rate_controller.config.max_retries {
290 Some(max_retries) if max_retries > retried => match err {
291 SlackClientError::RateLimitError(ref rate_error) => {
292 debug!(
293 "Rate limit error received: {}. Retrying: {}/{}",
294 rate_error,
295 retried + 1,
296 max_retries
297 );
298
299 self.send_rate_controlled_request(
300 request,
301 token,
302 rate_control_params,
303 rate_error.retry_after,
304 retried + 1,
305 )
306 .await
307 }
308 _ => Err(err),
309 },
310 _ => Err(err),
311 },
312 Ok(result) => Ok(result),
313 }
314 }
315
316 pub(crate) async fn decode_signed_response(
317 req: Request<Body>,
318 signature_verifier: &SlackEventSignatureVerifier,
319 ) -> AnyStdResult<String> {
320 let headers = &req.headers().clone();
321 let req_body = req.into_body();
322 match (
323 headers.get(SlackEventSignatureVerifier::SLACK_SIGNED_HASH_HEADER),
324 headers.get(SlackEventSignatureVerifier::SLACK_SIGNED_TIMESTAMP),
325 ) {
326 (Some(received_hash), Some(received_ts)) => {
327 Self::http_body_to_string(req_body)
328 .and_then(|body| async {
329 signature_verifier
330 .verify(
331 received_hash.to_str().unwrap(),
332 &body,
333 received_ts.to_str().unwrap(),
334 )
335 .map(|_| body)
336 .map_err(|e| e.into())
337 })
338 .await
339 }
340 _ => Err(Box::new(SlackEventAbsentSignatureError::new())),
341 }
342 }
343
344 pub(crate) fn map_http_error(hyper_err: hyper::Error) -> SlackClientError {
345 SlackClientError::HttpProtocolError(
346 SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)),
347 )
348 }
349
350 pub(crate) fn map_hyper_http_error(hyper_err: hyper::http::Error) -> SlackClientError {
351 SlackClientError::HttpProtocolError(
352 SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)),
353 )
354 }
355
356 pub(crate) fn map_serde_error(
357 err: serde_json::Error,
358 tried_to_parse: Option<&str>,
359 ) -> SlackClientError {
360 SlackClientError::ProtocolError(
361 SlackClientProtocolError::new(err).opt_json_body(tried_to_parse.map(|s| s.to_string())),
362 )
363 }
364
365 pub(crate) fn map_system_error(
366 err: Box<dyn std::error::Error + Sync + Send>,
367 ) -> SlackClientError {
368 SlackClientError::SystemError(SlackClientSystemError::new().with_cause(err))
369 }
370}
371
372impl<H: 'static + Send + Sync + Clone + connect::Connect> SlackClientHttpConnector
373 for SlackClientHyperConnector<H>
374{
375 fn http_get_uri<'a, RS>(
376 &'a self,
377 full_uri: Url,
378 token: Option<&'a SlackApiToken>,
379 rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
380 ) -> BoxFuture<'a, ClientResult<RS>>
381 where
382 RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send,
383 {
384 async move {
385 let body = self
386 .send_rate_controlled_request(
387 || {
388 let base_http_request =
389 Self::create_http_request(full_uri.clone(), hyper::http::Method::GET);
390
391 let http_request = Self::setup_token_auth_header(base_http_request, token);
392
393 http_request
394 .body(Body::empty())
395 .map_err(Self::map_hyper_http_error)
396 },
397 token,
398 rate_control_params,
399 None,
400 0,
401 )
402 .await?;
403
404 Ok(body)
405 }
406 .boxed()
407 }
408
409 fn http_get_with_client_secret<'a, RS>(
410 &'a self,
411 full_uri: Url,
412 client_id: &'a SlackClientId,
413 client_secret: &'a SlackClientSecret,
414 ) -> BoxFuture<'a, ClientResult<RS>>
415 where
416 RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + 'a + Send,
417 {
418 async move {
419 self.send_rate_controlled_request(
420 || {
421 Self::setup_basic_auth_header(
422 Self::create_http_request(full_uri.clone(), hyper::http::Method::GET),
423 client_id.value(),
424 client_secret.value(),
425 )
426 .body(Body::empty())
427 .map_err(Self::map_hyper_http_error)
428 },
429 None,
430 None,
431 None,
432 0,
433 )
434 .await
435 }
436 .boxed()
437 }
438
439 fn http_post_uri<'a, RQ, RS>(
440 &'a self,
441 full_uri: Url,
442 request_body: &'a RQ,
443 token: Option<&'a SlackApiToken>,
444 rate_control_params: Option<&'a SlackApiMethodRateControlConfig>,
445 ) -> BoxFuture<'a, ClientResult<RS>>
446 where
447 RQ: serde::ser::Serialize + Send + Sync,
448 RS: for<'de> serde::de::Deserialize<'de> + Send + 'a + Send + 'a,
449 {
450 async move {
451 let post_json = serde_json::to_string(&request_body)
452 .map_err(|err| Self::map_serde_error(err, None))?;
453
454 let response_body = self
455 .send_rate_controlled_request(
456 || {
457 let base_http_request =
458 Self::create_http_request(full_uri.clone(), hyper::http::Method::POST)
459 .header("content-type", "application/json; charset=utf-8");
460
461 let http_request = Self::setup_token_auth_header(base_http_request, token);
462
463 http_request
464 .body(post_json.clone().into())
465 .map_err(Self::map_hyper_http_error)
466 },
467 token,
468 rate_control_params,
469 None,
470 0,
471 )
472 .await?;
473
474 Ok(response_body)
475 }
476 .boxed()
477 }
478}