rusoto_core/
request.rs

1//! AWS API requests.
2//!
3//! Wraps the `hyper` library to send PUT, POST, DELETE and GET requests.
4
5use std::env;
6use std::error::Error;
7use std::fmt;
8use std::future::Future;
9use std::io;
10use std::io::Error as IoError;
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::Arc;
14use std::time::Duration;
15
16use bytes::{Bytes, BytesMut};
17use futures::{FutureExt, StreamExt};
18use http::header::{HeaderName, HeaderValue};
19use http::{HeaderMap, Method, StatusCode};
20use hyper::client::connect::Connect;
21use hyper::client::Builder as HyperBuilder;
22use hyper::client::HttpConnector;
23use hyper::Error as HyperError;
24use hyper::{Body, Client as HyperClient, Request as HyperRequest, Response as HyperResponse};
25use lazy_static::lazy_static;
26use tokio::time;
27
28use log::Level::Debug;
29use log::*;
30
31use crate::signature::SignedRequest;
32use crate::stream::ByteStream;
33use crate::tls::HttpsConnector;
34
35// Pulls in the statically generated rustc version.
36include!(concat!(env!("OUT_DIR"), "/user_agent_vars.rs"));
37
38// Use a lazy static to cache the default User-Agent header
39// because it never changes once it's been computed.
40lazy_static! {
41    static ref DEFAULT_USER_AGENT: String = format!(
42        "rusoto/{} rust/{} {}",
43        env!("CARGO_PKG_VERSION"),
44        RUST_VERSION,
45        env::consts::OS
46    );
47}
48
49/// Stores the response from a HTTP request.
50pub struct HttpResponse {
51    /// Status code of HTTP Request
52    pub status: StatusCode,
53    /// Contents of Response
54    pub body: ByteStream,
55    /// Response headers
56    pub headers: HeaderMap<String>,
57}
58
59/// Stores the buffered response from a HTTP request.
60#[derive(PartialEq)]
61pub struct BufferedHttpResponse {
62    /// Status code of HTTP Request
63    pub status: StatusCode,
64    /// Contents of Response
65    pub body: Bytes,
66    /// Response headers
67    pub headers: HeaderMap<String>,
68}
69
70impl BufferedHttpResponse {
71    ///! Best effort to turn response body into more readable &str.
72    pub fn body_as_str(&self) -> &str {
73        match std::str::from_utf8(&self.body) {
74            Ok(msg) => msg,
75            _ => "unknown error",
76        }
77    }
78}
79
80/// Best effort based Debug implementation to make generic error's body more readable.
81impl fmt::Debug for BufferedHttpResponse {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        match std::str::from_utf8(&self.body) {
84            Ok(msg) => write!(
85                f,
86                "BufferedHttpResponse {{status: {:?}, body: {:?}, headers: {:?} }}",
87                self.status, msg, self.headers
88            ),
89            _ => write!(
90                f,
91                "BufferedHttpResponse {{ status: {:?}, body: {:?}, headers: {:?} }}",
92                self.status, self.body, self.headers
93            ),
94        }
95    }
96}
97
98impl HttpResponse {
99    /// Buffer the full response body in memory, resulting in a `BufferedHttpResponse`.
100    pub async fn buffer(&mut self) -> Result<BufferedHttpResponse, HttpDispatchError> {
101        let mut bytes = BytesMut::new();
102        while let Some(try_chunk) = self.body.next().await {
103            let chunk = try_chunk.map_err(|e| HttpDispatchError {
104                message: format!("Error obtaining body: {}", e),
105            })?;
106            bytes.extend(chunk);
107        }
108        Ok(BufferedHttpResponse {
109            status: self.status,
110            headers: self.headers.clone(),
111            body: bytes.freeze(),
112        })
113    }
114
115    async fn from_hyper(hyper_response: HyperResponse<Body>) -> HttpResponse {
116        let status = hyper_response.status();
117        let headers = hyper_response
118            .headers()
119            .iter()
120            .map(|(h, v)| {
121                let value_string = v.to_str().unwrap().to_owned();
122                (h.clone(), value_string)
123            })
124            .collect();
125        let body = hyper_response.into_body().map(|try_chunk| {
126            try_chunk.map(|c| c).map_err(|e| {
127                IoError::new(
128                    io::ErrorKind::Other,
129                    format!("Error obtaining chunk: {}", e),
130                )
131            })
132        });
133
134        HttpResponse {
135            status,
136            headers,
137            body: ByteStream::new(body),
138        }
139    }
140}
141
142#[derive(Clone, Debug, PartialEq)]
143/// An error produced when sending the request, such as a timeout error.
144pub struct HttpDispatchError {
145    message: String,
146}
147
148impl HttpDispatchError {
149    /// Construct a new HttpDispatchError for testing purposes
150    pub fn new(message: String) -> HttpDispatchError {
151        HttpDispatchError { message }
152    }
153}
154
155impl Error for HttpDispatchError {}
156
157impl fmt::Display for HttpDispatchError {
158    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159        write!(f, "{}", self.message)
160    }
161}
162
163impl From<HyperError> for HttpDispatchError {
164    fn from(err: HyperError) -> HttpDispatchError {
165        HttpDispatchError {
166            message: err.to_string(),
167        }
168    }
169}
170
171impl From<IoError> for HttpDispatchError {
172    fn from(err: IoError) -> HttpDispatchError {
173        HttpDispatchError {
174            message: err.to_string(),
175        }
176    }
177}
178
179/// Type returned from `dispatch` for a `DispatchSignedRequest` implementor
180pub type DispatchSignedRequestFuture =
181    Pin<Box<dyn Future<Output = Result<HttpResponse, HttpDispatchError>> + Send>>;
182
183/// Trait for implementing HTTP Request/Response
184pub trait DispatchSignedRequest {
185    /// Dispatch Request, and then return a Response
186    fn dispatch(
187        &self,
188        request: SignedRequest,
189        timeout: Option<Duration>,
190    ) -> DispatchSignedRequestFuture;
191}
192
193impl<D: DispatchSignedRequest> DispatchSignedRequest for Rc<D> {
194    fn dispatch(
195        &self,
196        request: SignedRequest,
197        timeout: Option<Duration>,
198    ) -> DispatchSignedRequestFuture {
199        D::dispatch(&*self, request, timeout)
200    }
201}
202
203impl<D: DispatchSignedRequest> DispatchSignedRequest for Arc<D> {
204    fn dispatch(
205        &self,
206        request: SignedRequest,
207        timeout: Option<Duration>,
208    ) -> DispatchSignedRequestFuture {
209        D::dispatch(&*self, request, timeout)
210    }
211}
212
213/// Http client for use with AWS services.
214pub struct HttpClient<C = HttpsConnector<HttpConnector>> {
215    inner: HyperClient<C, Body>,
216    local_agent: Option<String>,
217}
218
219impl HttpClient {
220    /// Create a tls-enabled http client.
221    pub fn new() -> Result<Self, TlsError> {
222        #[cfg(feature = "native-tls")]
223        let connector = HttpsConnector::new();
224
225        #[cfg(feature = "rustls")]
226        let connector = HttpsConnector::new();
227
228        Ok(Self::from_connector(connector))
229    }
230
231    /// Create a tls-enabled http client.
232    pub fn new_with_config(config: HttpConfig) -> Result<Self, TlsError> {
233        #[cfg(feature = "native-tls")]
234        let connector = HttpsConnector::new();
235
236        #[cfg(feature = "rustls")]
237        let connector = HttpsConnector::new();
238
239        Ok(Self::from_connector_with_config(connector, config))
240    }
241
242    /// Sets a local agent that is prepended to the default HTTP
243    /// `User-Agent` used by Rusoto.
244    pub fn local_agent(&mut self, local_agent: String) {
245        self.local_agent = Some(local_agent)
246    }
247}
248
249impl<C> HttpClient<C>
250where
251    C: Connect + Clone + Send + Sync,
252{
253    /// Allows for a custom connector to be used with the HttpClient
254    pub fn from_connector(connector: C) -> Self {
255        let inner = HyperClient::builder().build(connector);
256        HttpClient {
257            inner,
258            local_agent: None,
259        }
260    }
261
262    /// Allows for a custom connector to be used with the HttpClient
263    /// with extra configuration options
264    pub fn from_connector_with_config(connector: C, config: HttpConfig) -> Self {
265        let mut builder = HyperClient::builder();
266        config
267            .read_buf_size
268            .map(|sz| builder.http1_read_buf_exact_size(sz));
269        let inner = builder.build(connector);
270
271        HttpClient {
272            inner,
273            local_agent: None,
274        }
275    }
276
277    /// Alows for a custom builder and connector to be used with the HttpClient
278    pub fn from_builder(builder: HyperBuilder, connector: C) -> Self {
279        let inner = builder.build(connector);
280        HttpClient {
281            inner,
282            local_agent: None,
283        }
284    }
285}
286
287/// Configuration options for the HTTP Client
288pub struct HttpConfig {
289    read_buf_size: Option<usize>,
290}
291
292impl HttpConfig {
293    /// Create a new HttpConfig
294    pub fn new() -> HttpConfig {
295        HttpConfig {
296            read_buf_size: None,
297        }
298    }
299    /// Sets the size of the read buffer for inbound data
300    /// A larger buffer size might result in better performance
301    /// by requiring fewer copies out of the socket buffer.
302    pub fn read_buf_size(&mut self, sz: usize) {
303        self.read_buf_size = Some(sz);
304    }
305}
306
307impl Default for HttpConfig {
308    /// Create a new HttpConfig. Same as HttpConfig::new().
309    fn default() -> HttpConfig {
310        HttpConfig::new()
311    }
312}
313
314async fn http_client_dispatch<'a, C>(
315    client: HyperClient<C, Body>,
316    request: SignedRequest,
317    timeout: Option<Duration>,
318    user_agent: HeaderValue,
319) -> Result<HttpResponse, HttpDispatchError>
320where
321    C: Connect + Send + Sync + Clone + 'static,
322{
323    let hyper_method = match request.method().as_ref() {
324        "POST" => Method::POST,
325        "PUT" => Method::PUT,
326        "DELETE" => Method::DELETE,
327        "GET" => Method::GET,
328        "HEAD" => Method::HEAD,
329        v => {
330            return Err(HttpDispatchError {
331                message: format!("Unsupported HTTP verb {}", v),
332            });
333        }
334    };
335
336    // translate the headers map to a format Hyper likes
337    let mut hyper_headers = HeaderMap::new();
338    for h in request.headers().iter() {
339        let header_name = match h.0.parse::<HeaderName>() {
340            Ok(name) => name,
341            Err(err) => {
342                return Err(HttpDispatchError {
343                    message: format!("error parsing header name: {}", err),
344                });
345            }
346        };
347        for v in h.1.iter() {
348            let header_value = match HeaderValue::from_bytes(v) {
349                Ok(value) => value,
350                Err(err) => {
351                    return Err(HttpDispatchError {
352                        message: format!("error parsing header value: {}", err),
353                    });
354                }
355            };
356            hyper_headers.append(&header_name, header_value);
357        }
358    }
359
360    // Add a default user-agent header if one is not already present.
361    if !hyper_headers.contains_key("user-agent") {
362        hyper_headers.insert("user-agent", user_agent);
363    }
364
365    let mut final_uri = format!(
366        "{}://{}{}",
367        request.scheme(),
368        request.hostname(),
369        request.canonical_path()
370    );
371    if !request.canonical_query_string().is_empty() {
372        final_uri = final_uri + &format!("?{}", request.canonical_query_string());
373    }
374
375    if log_enabled!(Debug) {
376        debug!(
377            "Full request: \n method: {}\n final_uri: {}\nHeaders:\n",
378            hyper_method, final_uri
379        );
380        for (h, v) in hyper_headers.iter() {
381            debug!("{}:{:?}", h.as_str(), v);
382        }
383    }
384
385    let http_request_builder = HyperRequest::builder().method(hyper_method).uri(final_uri);
386
387    let try_http_request = if let Some(p) = request.payload {
388        http_request_builder.body(p.into_body())
389    } else {
390        http_request_builder.body(Body::empty())
391    };
392
393    let mut http_request = try_http_request.map_err(|err| HttpDispatchError {
394        message: format!("error building request: {}", err),
395    })?;
396
397    *http_request.headers_mut() = hyper_headers;
398
399    let f = client.request(http_request);
400
401    let try_resp = match timeout {
402        None => f.await,
403        Some(duration) => match time::timeout(duration, f).await {
404            Err(_e) => {
405                return Err(HttpDispatchError {
406                    message: "Timeout while dispatching request".to_owned(),
407                })
408            }
409            Ok(try_req) => try_req,
410        },
411    };
412    let resp = try_resp.map_err(|e| HttpDispatchError {
413        message: format!("Error during dispatch: {}", e),
414    })?;
415    Ok(HttpResponse::from_hyper(resp).await)
416}
417
418impl<C> DispatchSignedRequest for HttpClient<C>
419where
420    C: Connect + Clone + Send + Sync + 'static,
421{
422    fn dispatch(
423        &self,
424        request: SignedRequest,
425        timeout: Option<Duration>,
426    ) -> DispatchSignedRequestFuture {
427        let user_agent = self
428            .local_agent
429            .as_ref()
430            .map(|agent| format!("{} {}", agent, *DEFAULT_USER_AGENT).parse())
431            .unwrap_or_else(|| DEFAULT_USER_AGENT.parse())
432            .expect("failed to parse user-agent string");
433
434        http_client_dispatch::<C>(self.inner.clone(), request, timeout, user_agent).boxed()
435    }
436}
437
438#[derive(Debug, PartialEq)]
439/// An error produced when the user has an invalid TLS client
440pub struct TlsError {
441    message: String,
442}
443
444impl Error for TlsError {}
445
446impl fmt::Display for TlsError {
447    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
448        write!(f, "{}", self.message)
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::signature::SignedRequest;
456    use crate::Region;
457
458    #[test]
459    fn http_client_is_send_and_sync() {
460        fn is_send_and_sync<T: Send + Sync>() {}
461
462        is_send_and_sync::<HttpClient>();
463    }
464
465    #[test]
466    fn custom_region_http() {
467        let a_region = Region::Custom {
468            endpoint: "http://localhost".to_owned(),
469            name: "eu-west-3".to_owned(),
470        };
471        let request = SignedRequest::new("POST", "sqs", &a_region, "/");
472        assert_eq!("http", request.scheme());
473        assert_eq!("localhost", request.hostname());
474    }
475
476    #[test]
477    fn custom_region_https() {
478        let a_region = Region::Custom {
479            endpoint: "https://localhost".to_owned(),
480            name: "eu-west-3".to_owned(),
481        };
482        let request = SignedRequest::new("POST", "sqs", &a_region, "/");
483        assert_eq!("https", request.scheme());
484        assert_eq!("localhost", request.hostname());
485    }
486
487    #[test]
488    fn custom_region_with_port() {
489        let a_region = Region::Custom {
490            endpoint: "https://localhost:8000".to_owned(),
491            name: "eu-west-3".to_owned(),
492        };
493        let request = SignedRequest::new("POST", "sqs", &a_region, "/");
494        assert_eq!("https", request.scheme());
495        assert_eq!("localhost:8000", request.hostname());
496    }
497
498    #[test]
499    fn custom_region_no_scheme() {
500        let a_region = Region::Custom {
501            endpoint: "localhost".to_owned(),
502            name: "eu-west-3".to_owned(),
503        };
504        let request = SignedRequest::new("POST", "sqs", &a_region, "/");
505        assert_eq!("https", request.scheme());
506        assert_eq!("localhost", request.hostname());
507    }
508
509    #[test]
510    fn from_io_error_preserves_error_message() {
511        let io_error = ::std::io::Error::new(::std::io::ErrorKind::Other, "my error message");
512        let error = HttpDispatchError::from(io_error);
513        assert_eq!(error.to_string(), "my error message")
514    }
515}