1use std::env;
6use std::error::Error;
7use std::fmt;
8use std::future::Future;
9use std::io;
10use std::io::Error as IoError;
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::Arc;
14use std::time::Duration;
15
16use bytes::{Bytes, BytesMut};
17use futures::{FutureExt, StreamExt};
18use http::header::{HeaderName, HeaderValue};
19use http::{HeaderMap, Method, StatusCode};
20use hyper::client::connect::Connect;
21use hyper::client::Builder as HyperBuilder;
22use hyper::client::HttpConnector;
23use hyper::Error as HyperError;
24use hyper::{Body, Client as HyperClient, Request as HyperRequest, Response as HyperResponse};
25use lazy_static::lazy_static;
26use tokio::time;
27
28use log::Level::Debug;
29use log::*;
30
31use crate::signature::SignedRequest;
32use crate::stream::ByteStream;
33use crate::tls::HttpsConnector;
34
35include!(concat!(env!("OUT_DIR"), "/user_agent_vars.rs"));
37
38lazy_static! {
41 static ref DEFAULT_USER_AGENT: String = format!(
42 "rusoto/{} rust/{} {}",
43 env!("CARGO_PKG_VERSION"),
44 RUST_VERSION,
45 env::consts::OS
46 );
47}
48
49pub struct HttpResponse {
51 pub status: StatusCode,
53 pub body: ByteStream,
55 pub headers: HeaderMap<String>,
57}
58
59#[derive(PartialEq)]
61pub struct BufferedHttpResponse {
62 pub status: StatusCode,
64 pub body: Bytes,
66 pub headers: HeaderMap<String>,
68}
69
70impl BufferedHttpResponse {
71 pub fn body_as_str(&self) -> &str {
73 match std::str::from_utf8(&self.body) {
74 Ok(msg) => msg,
75 _ => "unknown error",
76 }
77 }
78}
79
80impl fmt::Debug for BufferedHttpResponse {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 match std::str::from_utf8(&self.body) {
84 Ok(msg) => write!(
85 f,
86 "BufferedHttpResponse {{status: {:?}, body: {:?}, headers: {:?} }}",
87 self.status, msg, self.headers
88 ),
89 _ => write!(
90 f,
91 "BufferedHttpResponse {{ status: {:?}, body: {:?}, headers: {:?} }}",
92 self.status, self.body, self.headers
93 ),
94 }
95 }
96}
97
98impl HttpResponse {
99 pub async fn buffer(&mut self) -> Result<BufferedHttpResponse, HttpDispatchError> {
101 let mut bytes = BytesMut::new();
102 while let Some(try_chunk) = self.body.next().await {
103 let chunk = try_chunk.map_err(|e| HttpDispatchError {
104 message: format!("Error obtaining body: {}", e),
105 })?;
106 bytes.extend(chunk);
107 }
108 Ok(BufferedHttpResponse {
109 status: self.status,
110 headers: self.headers.clone(),
111 body: bytes.freeze(),
112 })
113 }
114
115 async fn from_hyper(hyper_response: HyperResponse<Body>) -> HttpResponse {
116 let status = hyper_response.status();
117 let headers = hyper_response
118 .headers()
119 .iter()
120 .map(|(h, v)| {
121 let value_string = v.to_str().unwrap().to_owned();
122 (h.clone(), value_string)
123 })
124 .collect();
125 let body = hyper_response.into_body().map(|try_chunk| {
126 try_chunk.map(|c| c).map_err(|e| {
127 IoError::new(
128 io::ErrorKind::Other,
129 format!("Error obtaining chunk: {}", e),
130 )
131 })
132 });
133
134 HttpResponse {
135 status,
136 headers,
137 body: ByteStream::new(body),
138 }
139 }
140}
141
142#[derive(Clone, Debug, PartialEq)]
143pub struct HttpDispatchError {
145 message: String,
146}
147
148impl HttpDispatchError {
149 pub fn new(message: String) -> HttpDispatchError {
151 HttpDispatchError { message }
152 }
153}
154
155impl Error for HttpDispatchError {}
156
157impl fmt::Display for HttpDispatchError {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 write!(f, "{}", self.message)
160 }
161}
162
163impl From<HyperError> for HttpDispatchError {
164 fn from(err: HyperError) -> HttpDispatchError {
165 HttpDispatchError {
166 message: err.to_string(),
167 }
168 }
169}
170
171impl From<IoError> for HttpDispatchError {
172 fn from(err: IoError) -> HttpDispatchError {
173 HttpDispatchError {
174 message: err.to_string(),
175 }
176 }
177}
178
179pub type DispatchSignedRequestFuture =
181 Pin<Box<dyn Future<Output = Result<HttpResponse, HttpDispatchError>> + Send>>;
182
183pub trait DispatchSignedRequest {
185 fn dispatch(
187 &self,
188 request: SignedRequest,
189 timeout: Option<Duration>,
190 ) -> DispatchSignedRequestFuture;
191}
192
193impl<D: DispatchSignedRequest> DispatchSignedRequest for Rc<D> {
194 fn dispatch(
195 &self,
196 request: SignedRequest,
197 timeout: Option<Duration>,
198 ) -> DispatchSignedRequestFuture {
199 D::dispatch(&*self, request, timeout)
200 }
201}
202
203impl<D: DispatchSignedRequest> DispatchSignedRequest for Arc<D> {
204 fn dispatch(
205 &self,
206 request: SignedRequest,
207 timeout: Option<Duration>,
208 ) -> DispatchSignedRequestFuture {
209 D::dispatch(&*self, request, timeout)
210 }
211}
212
213pub struct HttpClient<C = HttpsConnector<HttpConnector>> {
215 inner: HyperClient<C, Body>,
216 local_agent: Option<String>,
217}
218
219impl HttpClient {
220 pub fn new() -> Result<Self, TlsError> {
222 #[cfg(feature = "native-tls")]
223 let connector = HttpsConnector::new();
224
225 #[cfg(feature = "rustls")]
226 let connector = HttpsConnector::new();
227
228 Ok(Self::from_connector(connector))
229 }
230
231 pub fn new_with_config(config: HttpConfig) -> Result<Self, TlsError> {
233 #[cfg(feature = "native-tls")]
234 let connector = HttpsConnector::new();
235
236 #[cfg(feature = "rustls")]
237 let connector = HttpsConnector::new();
238
239 Ok(Self::from_connector_with_config(connector, config))
240 }
241
242 pub fn local_agent(&mut self, local_agent: String) {
245 self.local_agent = Some(local_agent)
246 }
247}
248
249impl<C> HttpClient<C>
250where
251 C: Connect + Clone + Send + Sync,
252{
253 pub fn from_connector(connector: C) -> Self {
255 let inner = HyperClient::builder().build(connector);
256 HttpClient {
257 inner,
258 local_agent: None,
259 }
260 }
261
262 pub fn from_connector_with_config(connector: C, config: HttpConfig) -> Self {
265 let mut builder = HyperClient::builder();
266 config
267 .read_buf_size
268 .map(|sz| builder.http1_read_buf_exact_size(sz));
269 let inner = builder.build(connector);
270
271 HttpClient {
272 inner,
273 local_agent: None,
274 }
275 }
276
277 pub fn from_builder(builder: HyperBuilder, connector: C) -> Self {
279 let inner = builder.build(connector);
280 HttpClient {
281 inner,
282 local_agent: None,
283 }
284 }
285}
286
287pub struct HttpConfig {
289 read_buf_size: Option<usize>,
290}
291
292impl HttpConfig {
293 pub fn new() -> HttpConfig {
295 HttpConfig {
296 read_buf_size: None,
297 }
298 }
299 pub fn read_buf_size(&mut self, sz: usize) {
303 self.read_buf_size = Some(sz);
304 }
305}
306
307impl Default for HttpConfig {
308 fn default() -> HttpConfig {
310 HttpConfig::new()
311 }
312}
313
314async fn http_client_dispatch<'a, C>(
315 client: HyperClient<C, Body>,
316 request: SignedRequest,
317 timeout: Option<Duration>,
318 user_agent: HeaderValue,
319) -> Result<HttpResponse, HttpDispatchError>
320where
321 C: Connect + Send + Sync + Clone + 'static,
322{
323 let hyper_method = match request.method().as_ref() {
324 "POST" => Method::POST,
325 "PUT" => Method::PUT,
326 "DELETE" => Method::DELETE,
327 "GET" => Method::GET,
328 "HEAD" => Method::HEAD,
329 v => {
330 return Err(HttpDispatchError {
331 message: format!("Unsupported HTTP verb {}", v),
332 });
333 }
334 };
335
336 let mut hyper_headers = HeaderMap::new();
338 for h in request.headers().iter() {
339 let header_name = match h.0.parse::<HeaderName>() {
340 Ok(name) => name,
341 Err(err) => {
342 return Err(HttpDispatchError {
343 message: format!("error parsing header name: {}", err),
344 });
345 }
346 };
347 for v in h.1.iter() {
348 let header_value = match HeaderValue::from_bytes(v) {
349 Ok(value) => value,
350 Err(err) => {
351 return Err(HttpDispatchError {
352 message: format!("error parsing header value: {}", err),
353 });
354 }
355 };
356 hyper_headers.append(&header_name, header_value);
357 }
358 }
359
360 if !hyper_headers.contains_key("user-agent") {
362 hyper_headers.insert("user-agent", user_agent);
363 }
364
365 let mut final_uri = format!(
366 "{}://{}{}",
367 request.scheme(),
368 request.hostname(),
369 request.canonical_path()
370 );
371 if !request.canonical_query_string().is_empty() {
372 final_uri = final_uri + &format!("?{}", request.canonical_query_string());
373 }
374
375 if log_enabled!(Debug) {
376 debug!(
377 "Full request: \n method: {}\n final_uri: {}\nHeaders:\n",
378 hyper_method, final_uri
379 );
380 for (h, v) in hyper_headers.iter() {
381 debug!("{}:{:?}", h.as_str(), v);
382 }
383 }
384
385 let http_request_builder = HyperRequest::builder().method(hyper_method).uri(final_uri);
386
387 let try_http_request = if let Some(p) = request.payload {
388 http_request_builder.body(p.into_body())
389 } else {
390 http_request_builder.body(Body::empty())
391 };
392
393 let mut http_request = try_http_request.map_err(|err| HttpDispatchError {
394 message: format!("error building request: {}", err),
395 })?;
396
397 *http_request.headers_mut() = hyper_headers;
398
399 let f = client.request(http_request);
400
401 let try_resp = match timeout {
402 None => f.await,
403 Some(duration) => match time::timeout(duration, f).await {
404 Err(_e) => {
405 return Err(HttpDispatchError {
406 message: "Timeout while dispatching request".to_owned(),
407 })
408 }
409 Ok(try_req) => try_req,
410 },
411 };
412 let resp = try_resp.map_err(|e| HttpDispatchError {
413 message: format!("Error during dispatch: {}", e),
414 })?;
415 Ok(HttpResponse::from_hyper(resp).await)
416}
417
418impl<C> DispatchSignedRequest for HttpClient<C>
419where
420 C: Connect + Clone + Send + Sync + 'static,
421{
422 fn dispatch(
423 &self,
424 request: SignedRequest,
425 timeout: Option<Duration>,
426 ) -> DispatchSignedRequestFuture {
427 let user_agent = self
428 .local_agent
429 .as_ref()
430 .map(|agent| format!("{} {}", agent, *DEFAULT_USER_AGENT).parse())
431 .unwrap_or_else(|| DEFAULT_USER_AGENT.parse())
432 .expect("failed to parse user-agent string");
433
434 http_client_dispatch::<C>(self.inner.clone(), request, timeout, user_agent).boxed()
435 }
436}
437
438#[derive(Debug, PartialEq)]
439pub struct TlsError {
441 message: String,
442}
443
444impl Error for TlsError {}
445
446impl fmt::Display for TlsError {
447 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
448 write!(f, "{}", self.message)
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::signature::SignedRequest;
456 use crate::Region;
457
458 #[test]
459 fn http_client_is_send_and_sync() {
460 fn is_send_and_sync<T: Send + Sync>() {}
461
462 is_send_and_sync::<HttpClient>();
463 }
464
465 #[test]
466 fn custom_region_http() {
467 let a_region = Region::Custom {
468 endpoint: "http://localhost".to_owned(),
469 name: "eu-west-3".to_owned(),
470 };
471 let request = SignedRequest::new("POST", "sqs", &a_region, "/");
472 assert_eq!("http", request.scheme());
473 assert_eq!("localhost", request.hostname());
474 }
475
476 #[test]
477 fn custom_region_https() {
478 let a_region = Region::Custom {
479 endpoint: "https://localhost".to_owned(),
480 name: "eu-west-3".to_owned(),
481 };
482 let request = SignedRequest::new("POST", "sqs", &a_region, "/");
483 assert_eq!("https", request.scheme());
484 assert_eq!("localhost", request.hostname());
485 }
486
487 #[test]
488 fn custom_region_with_port() {
489 let a_region = Region::Custom {
490 endpoint: "https://localhost:8000".to_owned(),
491 name: "eu-west-3".to_owned(),
492 };
493 let request = SignedRequest::new("POST", "sqs", &a_region, "/");
494 assert_eq!("https", request.scheme());
495 assert_eq!("localhost:8000", request.hostname());
496 }
497
498 #[test]
499 fn custom_region_no_scheme() {
500 let a_region = Region::Custom {
501 endpoint: "localhost".to_owned(),
502 name: "eu-west-3".to_owned(),
503 };
504 let request = SignedRequest::new("POST", "sqs", &a_region, "/");
505 assert_eq!("https", request.scheme());
506 assert_eq!("localhost", request.hostname());
507 }
508
509 #[test]
510 fn from_io_error_preserves_error_message() {
511 let io_error = ::std::io::Error::new(::std::io::ErrorKind::Other, "my error message");
512 let error = HttpDispatchError::from(io_error);
513 assert_eq!(error.to_string(), "my error message")
514 }
515}