hreq/client/
agent.rs

1//! Connection pooling, redirects, cookies etc.
2
3use super::conn::BodyBuf;
4use super::connect;
5use super::cookies::Cookies;
6use super::Connection;
7use crate::async_impl::AsyncRuntime;
8use crate::params::resolve_hreq_params;
9use crate::params::HReqParams;
10use crate::params::QueryParams;
11use crate::uri_ext::UriExt;
12use crate::Body;
13use crate::Error;
14use crate::ResponseExt;
15use cookie::Cookie;
16use std::fmt;
17use std::future::Future;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22/// Agents provide redirects, connection pooling, cookies and retries.
23///
24/// Every request is sent through an agent, also when using the extension trait
25/// (`Request.send()`). When using the trait, the agent is intantiated with default
26/// parameters and lives for the length of the request.
27///
28/// To amend the default parameters, or reuse an agent over many requests, use `Agent::new()`
29/// and send the request using `agent.send(req)`.
30///
31/// The default agent have the following settings:
32///
33///   * Redirects: 5
34///   * Retries: 5
35///   * Connection pooling: on
36///   * Cookies: on
37///
38/// The settings can be changed, and are used for the next `.send()` call. It is possible
39/// to change the settings between calls.
40///
41/// ```
42/// use hreq::prelude::*;
43/// use hreq::Agent;
44///
45/// let mut agent = Agent::new();
46/// agent.retries(0); // disable all retries
47///
48/// let req = Request::get("https://httpbin.org/get")
49///     .with_body(()).unwrap();
50///
51/// let res = agent.send(req).block();
52/// ```
53#[derive(Default)]
54pub struct Agent {
55    connections: Vec<Connection>,
56    cookies: Option<Cookies>,
57    redirects: i8,
58    retries: i8,
59    pooling: bool,
60    use_cookies: bool,
61}
62
63impl Agent {
64    /// Creates a new agent with default parameters.
65    ///
66    /// ```
67    /// use hreq::Agent;
68    ///
69    /// let agent = Agent::new();
70    /// ```
71    pub fn new() -> Self {
72        Agent {
73            connections: vec![],
74            cookies: None,
75            redirects: 5,
76            retries: 5,
77            pooling: true,
78            use_cookies: true,
79        }
80    }
81
82    /// Changes number of redirects.
83    ///
84    /// Defaults to `5`. Set to `0` to disable redirects.
85    ///
86    /// The number of redirects will be used for the next call to `.send()`.
87    ///
88    /// ```
89    /// use hreq::Agent;
90    ///
91    /// let mut agent = Agent::new();
92    /// agent.redirects(0);
93    /// ```
94    pub fn redirects(&mut self, amount: u8) {
95        self.redirects = amount as i8;
96    }
97
98    /// Changes the number of retry attempts.
99    ///
100    /// Defaults to `5`. Set to `0` to disable retries.
101    ///
102    /// The number of retries will be used for the next call to `.send()`.
103    ///
104    /// ```
105    /// use hreq::Agent;
106    ///
107    /// let mut agent = Agent::new();
108    /// agent.retries(0);
109    /// ```
110    pub fn retries(&mut self, amount: u8) {
111        self.retries = amount as i8;
112    }
113
114    /// Turns connection pooling on or off.
115    ///
116    /// Defaults to `true`. Set to `false` to disable connection pooling.
117    ///
118    /// The setting will be used for the next call to `.send()`.
119    ///
120    /// When set to `false` any existing connection currently pooled will be dropped.
121    ///
122    /// ```
123    /// use hreq::Agent;
124    ///
125    /// let mut agent = Agent::new();
126    /// agent.pooling(false);
127    /// ```
128    pub fn pooling(&mut self, enabled: bool) {
129        self.pooling = enabled;
130        if !enabled {
131            self.connections.clear();
132        }
133    }
134
135    /// Turns on or off the use of cookies.
136    ///
137    /// Defaults to `true`. Set to `false` to disable use of cookies.
138    ///
139    /// The setting will be used for the next call to `.send()`.
140    ///
141    /// When set to `false`, any previous collected cookie will be dropped.
142    ///
143    /// ```
144    /// use hreq::Agent;
145    ///
146    /// let mut agent = Agent::new();
147    /// agent.cookies(false);
148    /// ```
149    pub fn cookies(&mut self, enabled: bool) {
150        self.use_cookies = enabled;
151        if !enabled {
152            self.cookies = None;
153        }
154    }
155
156    /// Get all cookies held in this agent matching the given uri.
157    pub fn get_cookies(&self, uri: &http::Uri) -> Vec<&Cookie<'static>> {
158        if let Some(cookies) = &self.cookies {
159            cookies.get(uri)
160        } else {
161            vec![]
162        }
163    }
164
165    fn reuse_from_pool(&mut self, uri: &http::Uri) -> Result<Option<&mut Connection>, Error> {
166        if !self.pooling {
167            return Ok(None);
168        }
169        let host_port = uri.host_port()?;
170        let ret = self
171            .connections
172            .iter_mut()
173            // http2 multiplexes over the same connection, http1 needs to finish previous req
174            .find(|c| {
175                c.host_port() == &host_port && (c.is_http2() || c.unfinished_requests() == 0)
176            });
177        if ret.is_some() {
178            debug!("Reuse from pool: {}", uri);
179        }
180        let ret = None;
181        Ok(ret)
182    }
183
184    pub(crate) fn send_future<'a>(mut self, req: http::Request<Body>) -> ResponseFuture {
185        let do_fut = async move { self.send(req).await };
186        ResponseFuture::new(do_fut)
187    }
188
189    /// Sends a request using this agent.
190    ///
191    /// The parameters configured in the agent are used for the request.
192    ///
193    /// Depending on agent settings, connections are pooled and cookies reused between
194    /// repeated calls to `send()`.
195    ///
196    /// ```
197    /// use hreq::prelude::*;
198    /// use hreq::Agent;
199    ///
200    /// let mut agent = Agent::new();
201    /// agent.retries(0);
202    /// agent.redirects(0);
203    ///
204    /// let req = Request::get("https://fails-badly-yeah")
205    ///     .with_body(()).unwrap();
206    ///
207    /// let res = agent.send(req).block();
208    ///
209    /// assert!(res.is_err());
210    /// assert!(res.unwrap_err().is_io());
211    /// ```
212    pub async fn send<B: Into<Body>>(
213        &mut self,
214        req: http::Request<B>,
215    ) -> Result<http::Response<Body>, Error> {
216        let (parts, body) = req.into_parts();
217
218        let body = body.into();
219
220        // apply the parameters, query params affect the request uri.
221        let parts = resolve_hreq_params(parts);
222
223        let params = parts.extensions.get::<HReqParams>().unwrap().clone();
224
225        // Buffer of body data so we can handle resending the body on 307/308 redirects.
226        let mut body_buffer = BodyBuf::new(params.redirect_body_buffer);
227
228        // the request should be time limited regardless of retries. the entire do_send()
229        // is wrapped in a ticking timer...
230        let deadline = params.deadline();
231
232        // for lifetime reasons it's easier to handle the cookie storage separately
233        let mut cookies = self.cookies.take();
234
235        let ret = deadline
236            .race(self.do_send(parts, body, params, &mut cookies, &mut body_buffer))
237            .await;
238
239        self.cookies = cookies;
240
241        ret
242    }
243
244    async fn do_send(
245        &mut self,
246        parts: http::request::Parts,
247        body: Body,
248        params: HReqParams,
249        cookies: &mut Option<Cookies>,
250        body_buffer: &mut BodyBuf,
251    ) -> Result<http::Response<Body>, Error> {
252        trace!("Agent {} {}", parts.method, parts.uri);
253
254        let mut retries = self.retries;
255        let mut backoff_millis: u64 = 125;
256        let mut redirects = self.redirects;
257        let pooling = self.pooling;
258        let mut unpooled: Option<Connection> = None;
259        let use_cookies = self.use_cookies;
260
261        // if we have a param.with_override, whenever we are to open a connection,
262        // we check whether the current uri has an equal hostport to this, that
263        // way we can override also subsequent requests for the original uri.
264        let orig_hostport = parts.uri.host_port()?.to_owned();
265
266        let mut next_req = http::Request::from_parts(parts, body);
267
268        loop {
269            let mut req = next_req;
270            let uri = req.uri().clone();
271
272            // add cookies to send
273            if self.use_cookies {
274                if let Some(cookies) = cookies {
275                    let cookies = cookies.get(&uri);
276                    for cookie in cookies {
277                        // TODO this is a bit inefficient, the .encoded() returns
278                        // the full cookie including ;HttpOnly etc.
279                        let no_param = Cookie::new(cookie.name(), cookie.value());
280                        let cval = no_param.encoded().to_string();
281                        let val = http::header::HeaderValue::from_str(&cval)
282                            .expect("Cookie header value");
283                        // TODO combine multiple cookies into less headers.
284                        req.headers_mut().append("cookie", val);
285                    }
286                }
287            }
288
289            // remember whether request is idempotent in case we are to retry
290            let is_idempotent = req.method().is_idempotent();
291
292            // next_req holds our (potential) next request in case of redirects.
293            next_req = clone_to_empty_body(&req);
294
295            // grab connection for the current request
296            let conn = match self.reuse_from_pool(&uri)? {
297                Some(conn) => conn,
298                None => {
299                    let hostport_uri = uri.host_port()?;
300                    let mut conn: Option<Connection> = None;
301
302                    let HReqParams {
303                        force_http2,
304                        tls_disable_verify,
305                        ..
306                    } = params;
307
308                    // if the current request is for the same uri (hostport part) as
309                    // the original uri, we will use the override.
310                    if orig_hostport == hostport_uri {
311                        if let Some(arc) = params.with_override.clone() {
312                            let hostport = &*arc;
313                            debug!("Connect new: {} with override: {}", uri, hostport);
314                            conn = Some(connect(hostport, force_http2, tls_disable_verify).await?);
315                        }
316                    }
317
318                    let conn = match conn {
319                        Some(conn) => conn,
320                        // no override for this connection.
321                        None => {
322                            debug!("Connect new: {}", hostport_uri);
323                            connect(&hostport_uri, force_http2, tls_disable_verify).await?
324                        }
325                    };
326
327                    if pooling {
328                        self.connections.push(conn);
329                        let idx = self.connections.len() - 1;
330                        self.connections.get_mut(idx).unwrap()
331                    } else {
332                        unpooled.replace(conn);
333                        unpooled.as_mut().unwrap()
334                    }
335                }
336            };
337
338            debug!("{} {}", req.method(), req.uri());
339
340            match conn.send_request(req, body_buffer).await {
341                Ok(mut res) => {
342                    // whether we are to retain this connection in the pool.
343                    let mut retain = true;
344
345                    // squirrel away cookies (also in redirects)
346                    if use_cookies {
347                        for cookie_head in res.headers().get_all("set-cookie") {
348                            if let Ok(v) = cookie_head.to_str() {
349                                if let Ok(cookie) = Cookie::parse_encoded(v.to_string()) {
350                                    if cookies.is_none() {
351                                        *cookies = Some(Cookies::new());
352                                    }
353                                    cookies.as_mut().unwrap().add(&uri, cookie);
354                                } else {
355                                    info!("Failed to parse cookie: {}", v);
356                                }
357                            } else {
358                                info!("Failed to read cookie value: {:?}", cookie_head);
359                            }
360                        }
361                    }
362
363                    // We only handle redirections with Location header.
364                    fn is_handled_redirect(status: http::StatusCode) -> bool {
365                        match status.as_u16() {
366                            301 | 302 | 307 | 308 => true,
367                            _ => false,
368                        }
369                    }
370
371                    // follow redirections
372                    if is_handled_redirect(res.status()) {
373                        redirects -= 1;
374
375                        // no more redirections. return what we have.
376                        if redirects < 0 {
377                            trace!("Not following more redirections");
378                            break Ok(res);
379                        }
380
381                        // amend uri in next_req relative to the old request.
382                        let location = res.header("location").ok_or_else(|| {
383                            Error::Proto("Redirect without Location header".into())
384                        })?;
385
386                        trace!("Redirect to: {}", location);
387
388                        let (mut parts, body) = next_req.into_parts();
389                        parts.uri = parts.uri.parse_relative(location)?;
390                        next_req = http::Request::from_parts(parts, body);
391
392                        let code = res.status_code();
393                        let is_307ish = code > 303;
394
395                        // 307/308 keep resends the body data, if the buffer is big enough.
396                        if let Some(body) = body_buffer.reset(is_307ish) {
397                            let (parts, _) = next_req.into_parts();
398                            next_req = http::Request::from_parts(parts, body);
399                        }
400
401                        if is_307ish
402                            && !conn.is_http2()
403                            && conn.host_port() == &next_req.uri().host_port()?
404                        {
405                            // there's a big chance we started sending the body to the
406                            // current host before we received the 307/308. for http1
407                            // that means the upstream is "clogged" with a half body.
408                            // drop and start over.
409                            retain = false;
410                        }
411
412                        // exhaust the previous body before following the redirect.
413                        // this is to ensure http1.1 connections are in a good state.
414                        if res.body_mut().read_and_discard().await.is_err() {
415                            // some servers just close the connection after a redirect.
416                            retain = false;
417                        }
418
419                        // drop connection from pool if need be.
420                        if !retain {
421                            let conn_id = conn.id();
422                            debug!("Remove from pool: {}", conn.host_port());
423                            self.connections.retain(|c| c.id() != conn_id);
424                        }
425
426                        // following redirects means priming next_req and looping from the top
427                        continue;
428                    }
429
430                    // a non-redirect is a ready response returned to the user
431                    break Ok(res);
432                }
433                Err(err) => {
434                    // remove this (failed) connection from the pool.
435                    let conn_id = conn.id();
436                    self.connections.retain(|c| c.id() != conn_id);
437
438                    // retry?
439                    retries -= 1;
440                    if retries == 0 || !is_idempotent || !err.is_retryable() {
441                        trace!("Abort with error, {}", err);
442                        break Err(err);
443                    }
444
445                    trace!("Retrying on error, {}", err);
446                }
447            }
448            // retry backoff
449            trace!("Retry backoff: {}ms", backoff_millis);
450            AsyncRuntime::timeout(Duration::from_millis(backoff_millis)).await;
451            backoff_millis = (backoff_millis * 2).min(10_000);
452        }
453    }
454}
455
456/// On redirects, we need the entire request sans the original body.
457fn clone_to_empty_body(from: &http::Request<Body>) -> http::Request<Body> {
458    // most things can be cloned in the builder.
459    let req = http::Request::builder()
460        .method(from.method().clone())
461        .uri(from.uri().clone())
462        .version(from.version().clone())
463        .body(Body::empty())
464        .unwrap();
465
466    let (mut parts, body) = req.into_parts();
467
468    // headers can not be inserted as a complete cloned HeaderMap
469    parts.headers = from.headers().clone();
470
471    // there might be other extensions we're missing here.
472    if let Some(params) = from.extensions().get::<HReqParams>() {
473        parts.extensions.insert(params.clone());
474    }
475    if let Some(params) = from.extensions().get::<QueryParams>() {
476        parts.extensions.insert(params.clone());
477    }
478
479    http::Request::from_parts(parts, body)
480}
481
482impl fmt::Debug for Agent {
483    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
484        write!(f, "Agent")
485    }
486}
487
488/// A future hreq response.
489///
490/// Instances should be `.await` or `.block()`.
491pub struct ResponseFuture {
492    req: Box<dyn Future<Output = Result<http::Response<Body>, Error>> + Send>,
493}
494
495impl ResponseFuture {
496    pub(crate) fn new(
497        t: impl Future<Output = Result<http::Response<Body>, Error>> + Send + 'static,
498    ) -> Self {
499        ResponseFuture { req: Box::new(t) }
500    }
501}
502
503impl Future for ResponseFuture {
504    type Output = Result<http::Response<Body>, Error>;
505
506    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
507        let this = self.get_mut();
508        // this unsafe is ok because the boxed closure is not going to move anywhere.
509        unsafe { Pin::new_unchecked(&mut *this.req) }.poll(cx)
510    }
511}
512
513impl fmt::Debug for ResponseFuture {
514    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
515        write!(f, "ResponseFuture")
516    }
517}