hreq/client/agent.rs
1//! Connection pooling, redirects, cookies etc.
2
3use super::conn::BodyBuf;
4use super::connect;
5use super::cookies::Cookies;
6use super::Connection;
7use crate::async_impl::AsyncRuntime;
8use crate::params::resolve_hreq_params;
9use crate::params::HReqParams;
10use crate::params::QueryParams;
11use crate::uri_ext::UriExt;
12use crate::Body;
13use crate::Error;
14use crate::ResponseExt;
15use cookie::Cookie;
16use std::fmt;
17use std::future::Future;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22/// Agents provide redirects, connection pooling, cookies and retries.
23///
24/// Every request is sent through an agent, also when using the extension trait
25/// (`Request.send()`). When using the trait, the agent is intantiated with default
26/// parameters and lives for the length of the request.
27///
28/// To amend the default parameters, or reuse an agent over many requests, use `Agent::new()`
29/// and send the request using `agent.send(req)`.
30///
31/// The default agent have the following settings:
32///
33/// * Redirects: 5
34/// * Retries: 5
35/// * Connection pooling: on
36/// * Cookies: on
37///
38/// The settings can be changed, and are used for the next `.send()` call. It is possible
39/// to change the settings between calls.
40///
41/// ```
42/// use hreq::prelude::*;
43/// use hreq::Agent;
44///
45/// let mut agent = Agent::new();
46/// agent.retries(0); // disable all retries
47///
48/// let req = Request::get("https://httpbin.org/get")
49/// .with_body(()).unwrap();
50///
51/// let res = agent.send(req).block();
52/// ```
53#[derive(Default)]
54pub struct Agent {
55 connections: Vec<Connection>,
56 cookies: Option<Cookies>,
57 redirects: i8,
58 retries: i8,
59 pooling: bool,
60 use_cookies: bool,
61}
62
63impl Agent {
64 /// Creates a new agent with default parameters.
65 ///
66 /// ```
67 /// use hreq::Agent;
68 ///
69 /// let agent = Agent::new();
70 /// ```
71 pub fn new() -> Self {
72 Agent {
73 connections: vec![],
74 cookies: None,
75 redirects: 5,
76 retries: 5,
77 pooling: true,
78 use_cookies: true,
79 }
80 }
81
82 /// Changes number of redirects.
83 ///
84 /// Defaults to `5`. Set to `0` to disable redirects.
85 ///
86 /// The number of redirects will be used for the next call to `.send()`.
87 ///
88 /// ```
89 /// use hreq::Agent;
90 ///
91 /// let mut agent = Agent::new();
92 /// agent.redirects(0);
93 /// ```
94 pub fn redirects(&mut self, amount: u8) {
95 self.redirects = amount as i8;
96 }
97
98 /// Changes the number of retry attempts.
99 ///
100 /// Defaults to `5`. Set to `0` to disable retries.
101 ///
102 /// The number of retries will be used for the next call to `.send()`.
103 ///
104 /// ```
105 /// use hreq::Agent;
106 ///
107 /// let mut agent = Agent::new();
108 /// agent.retries(0);
109 /// ```
110 pub fn retries(&mut self, amount: u8) {
111 self.retries = amount as i8;
112 }
113
114 /// Turns connection pooling on or off.
115 ///
116 /// Defaults to `true`. Set to `false` to disable connection pooling.
117 ///
118 /// The setting will be used for the next call to `.send()`.
119 ///
120 /// When set to `false` any existing connection currently pooled will be dropped.
121 ///
122 /// ```
123 /// use hreq::Agent;
124 ///
125 /// let mut agent = Agent::new();
126 /// agent.pooling(false);
127 /// ```
128 pub fn pooling(&mut self, enabled: bool) {
129 self.pooling = enabled;
130 if !enabled {
131 self.connections.clear();
132 }
133 }
134
135 /// Turns on or off the use of cookies.
136 ///
137 /// Defaults to `true`. Set to `false` to disable use of cookies.
138 ///
139 /// The setting will be used for the next call to `.send()`.
140 ///
141 /// When set to `false`, any previous collected cookie will be dropped.
142 ///
143 /// ```
144 /// use hreq::Agent;
145 ///
146 /// let mut agent = Agent::new();
147 /// agent.cookies(false);
148 /// ```
149 pub fn cookies(&mut self, enabled: bool) {
150 self.use_cookies = enabled;
151 if !enabled {
152 self.cookies = None;
153 }
154 }
155
156 /// Get all cookies held in this agent matching the given uri.
157 pub fn get_cookies(&self, uri: &http::Uri) -> Vec<&Cookie<'static>> {
158 if let Some(cookies) = &self.cookies {
159 cookies.get(uri)
160 } else {
161 vec![]
162 }
163 }
164
165 fn reuse_from_pool(&mut self, uri: &http::Uri) -> Result<Option<&mut Connection>, Error> {
166 if !self.pooling {
167 return Ok(None);
168 }
169 let host_port = uri.host_port()?;
170 let ret = self
171 .connections
172 .iter_mut()
173 // http2 multiplexes over the same connection, http1 needs to finish previous req
174 .find(|c| {
175 c.host_port() == &host_port && (c.is_http2() || c.unfinished_requests() == 0)
176 });
177 if ret.is_some() {
178 debug!("Reuse from pool: {}", uri);
179 }
180 let ret = None;
181 Ok(ret)
182 }
183
184 pub(crate) fn send_future<'a>(mut self, req: http::Request<Body>) -> ResponseFuture {
185 let do_fut = async move { self.send(req).await };
186 ResponseFuture::new(do_fut)
187 }
188
189 /// Sends a request using this agent.
190 ///
191 /// The parameters configured in the agent are used for the request.
192 ///
193 /// Depending on agent settings, connections are pooled and cookies reused between
194 /// repeated calls to `send()`.
195 ///
196 /// ```
197 /// use hreq::prelude::*;
198 /// use hreq::Agent;
199 ///
200 /// let mut agent = Agent::new();
201 /// agent.retries(0);
202 /// agent.redirects(0);
203 ///
204 /// let req = Request::get("https://fails-badly-yeah")
205 /// .with_body(()).unwrap();
206 ///
207 /// let res = agent.send(req).block();
208 ///
209 /// assert!(res.is_err());
210 /// assert!(res.unwrap_err().is_io());
211 /// ```
212 pub async fn send<B: Into<Body>>(
213 &mut self,
214 req: http::Request<B>,
215 ) -> Result<http::Response<Body>, Error> {
216 let (parts, body) = req.into_parts();
217
218 let body = body.into();
219
220 // apply the parameters, query params affect the request uri.
221 let parts = resolve_hreq_params(parts);
222
223 let params = parts.extensions.get::<HReqParams>().unwrap().clone();
224
225 // Buffer of body data so we can handle resending the body on 307/308 redirects.
226 let mut body_buffer = BodyBuf::new(params.redirect_body_buffer);
227
228 // the request should be time limited regardless of retries. the entire do_send()
229 // is wrapped in a ticking timer...
230 let deadline = params.deadline();
231
232 // for lifetime reasons it's easier to handle the cookie storage separately
233 let mut cookies = self.cookies.take();
234
235 let ret = deadline
236 .race(self.do_send(parts, body, params, &mut cookies, &mut body_buffer))
237 .await;
238
239 self.cookies = cookies;
240
241 ret
242 }
243
244 async fn do_send(
245 &mut self,
246 parts: http::request::Parts,
247 body: Body,
248 params: HReqParams,
249 cookies: &mut Option<Cookies>,
250 body_buffer: &mut BodyBuf,
251 ) -> Result<http::Response<Body>, Error> {
252 trace!("Agent {} {}", parts.method, parts.uri);
253
254 let mut retries = self.retries;
255 let mut backoff_millis: u64 = 125;
256 let mut redirects = self.redirects;
257 let pooling = self.pooling;
258 let mut unpooled: Option<Connection> = None;
259 let use_cookies = self.use_cookies;
260
261 // if we have a param.with_override, whenever we are to open a connection,
262 // we check whether the current uri has an equal hostport to this, that
263 // way we can override also subsequent requests for the original uri.
264 let orig_hostport = parts.uri.host_port()?.to_owned();
265
266 let mut next_req = http::Request::from_parts(parts, body);
267
268 loop {
269 let mut req = next_req;
270 let uri = req.uri().clone();
271
272 // add cookies to send
273 if self.use_cookies {
274 if let Some(cookies) = cookies {
275 let cookies = cookies.get(&uri);
276 for cookie in cookies {
277 // TODO this is a bit inefficient, the .encoded() returns
278 // the full cookie including ;HttpOnly etc.
279 let no_param = Cookie::new(cookie.name(), cookie.value());
280 let cval = no_param.encoded().to_string();
281 let val = http::header::HeaderValue::from_str(&cval)
282 .expect("Cookie header value");
283 // TODO combine multiple cookies into less headers.
284 req.headers_mut().append("cookie", val);
285 }
286 }
287 }
288
289 // remember whether request is idempotent in case we are to retry
290 let is_idempotent = req.method().is_idempotent();
291
292 // next_req holds our (potential) next request in case of redirects.
293 next_req = clone_to_empty_body(&req);
294
295 // grab connection for the current request
296 let conn = match self.reuse_from_pool(&uri)? {
297 Some(conn) => conn,
298 None => {
299 let hostport_uri = uri.host_port()?;
300 let mut conn: Option<Connection> = None;
301
302 let HReqParams {
303 force_http2,
304 tls_disable_verify,
305 ..
306 } = params;
307
308 // if the current request is for the same uri (hostport part) as
309 // the original uri, we will use the override.
310 if orig_hostport == hostport_uri {
311 if let Some(arc) = params.with_override.clone() {
312 let hostport = &*arc;
313 debug!("Connect new: {} with override: {}", uri, hostport);
314 conn = Some(connect(hostport, force_http2, tls_disable_verify).await?);
315 }
316 }
317
318 let conn = match conn {
319 Some(conn) => conn,
320 // no override for this connection.
321 None => {
322 debug!("Connect new: {}", hostport_uri);
323 connect(&hostport_uri, force_http2, tls_disable_verify).await?
324 }
325 };
326
327 if pooling {
328 self.connections.push(conn);
329 let idx = self.connections.len() - 1;
330 self.connections.get_mut(idx).unwrap()
331 } else {
332 unpooled.replace(conn);
333 unpooled.as_mut().unwrap()
334 }
335 }
336 };
337
338 debug!("{} {}", req.method(), req.uri());
339
340 match conn.send_request(req, body_buffer).await {
341 Ok(mut res) => {
342 // whether we are to retain this connection in the pool.
343 let mut retain = true;
344
345 // squirrel away cookies (also in redirects)
346 if use_cookies {
347 for cookie_head in res.headers().get_all("set-cookie") {
348 if let Ok(v) = cookie_head.to_str() {
349 if let Ok(cookie) = Cookie::parse_encoded(v.to_string()) {
350 if cookies.is_none() {
351 *cookies = Some(Cookies::new());
352 }
353 cookies.as_mut().unwrap().add(&uri, cookie);
354 } else {
355 info!("Failed to parse cookie: {}", v);
356 }
357 } else {
358 info!("Failed to read cookie value: {:?}", cookie_head);
359 }
360 }
361 }
362
363 // We only handle redirections with Location header.
364 fn is_handled_redirect(status: http::StatusCode) -> bool {
365 match status.as_u16() {
366 301 | 302 | 307 | 308 => true,
367 _ => false,
368 }
369 }
370
371 // follow redirections
372 if is_handled_redirect(res.status()) {
373 redirects -= 1;
374
375 // no more redirections. return what we have.
376 if redirects < 0 {
377 trace!("Not following more redirections");
378 break Ok(res);
379 }
380
381 // amend uri in next_req relative to the old request.
382 let location = res.header("location").ok_or_else(|| {
383 Error::Proto("Redirect without Location header".into())
384 })?;
385
386 trace!("Redirect to: {}", location);
387
388 let (mut parts, body) = next_req.into_parts();
389 parts.uri = parts.uri.parse_relative(location)?;
390 next_req = http::Request::from_parts(parts, body);
391
392 let code = res.status_code();
393 let is_307ish = code > 303;
394
395 // 307/308 keep resends the body data, if the buffer is big enough.
396 if let Some(body) = body_buffer.reset(is_307ish) {
397 let (parts, _) = next_req.into_parts();
398 next_req = http::Request::from_parts(parts, body);
399 }
400
401 if is_307ish
402 && !conn.is_http2()
403 && conn.host_port() == &next_req.uri().host_port()?
404 {
405 // there's a big chance we started sending the body to the
406 // current host before we received the 307/308. for http1
407 // that means the upstream is "clogged" with a half body.
408 // drop and start over.
409 retain = false;
410 }
411
412 // exhaust the previous body before following the redirect.
413 // this is to ensure http1.1 connections are in a good state.
414 if res.body_mut().read_and_discard().await.is_err() {
415 // some servers just close the connection after a redirect.
416 retain = false;
417 }
418
419 // drop connection from pool if need be.
420 if !retain {
421 let conn_id = conn.id();
422 debug!("Remove from pool: {}", conn.host_port());
423 self.connections.retain(|c| c.id() != conn_id);
424 }
425
426 // following redirects means priming next_req and looping from the top
427 continue;
428 }
429
430 // a non-redirect is a ready response returned to the user
431 break Ok(res);
432 }
433 Err(err) => {
434 // remove this (failed) connection from the pool.
435 let conn_id = conn.id();
436 self.connections.retain(|c| c.id() != conn_id);
437
438 // retry?
439 retries -= 1;
440 if retries == 0 || !is_idempotent || !err.is_retryable() {
441 trace!("Abort with error, {}", err);
442 break Err(err);
443 }
444
445 trace!("Retrying on error, {}", err);
446 }
447 }
448 // retry backoff
449 trace!("Retry backoff: {}ms", backoff_millis);
450 AsyncRuntime::timeout(Duration::from_millis(backoff_millis)).await;
451 backoff_millis = (backoff_millis * 2).min(10_000);
452 }
453 }
454}
455
456/// On redirects, we need the entire request sans the original body.
457fn clone_to_empty_body(from: &http::Request<Body>) -> http::Request<Body> {
458 // most things can be cloned in the builder.
459 let req = http::Request::builder()
460 .method(from.method().clone())
461 .uri(from.uri().clone())
462 .version(from.version().clone())
463 .body(Body::empty())
464 .unwrap();
465
466 let (mut parts, body) = req.into_parts();
467
468 // headers can not be inserted as a complete cloned HeaderMap
469 parts.headers = from.headers().clone();
470
471 // there might be other extensions we're missing here.
472 if let Some(params) = from.extensions().get::<HReqParams>() {
473 parts.extensions.insert(params.clone());
474 }
475 if let Some(params) = from.extensions().get::<QueryParams>() {
476 parts.extensions.insert(params.clone());
477 }
478
479 http::Request::from_parts(parts, body)
480}
481
482impl fmt::Debug for Agent {
483 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
484 write!(f, "Agent")
485 }
486}
487
488/// A future hreq response.
489///
490/// Instances should be `.await` or `.block()`.
491pub struct ResponseFuture {
492 req: Box<dyn Future<Output = Result<http::Response<Body>, Error>> + Send>,
493}
494
495impl ResponseFuture {
496 pub(crate) fn new(
497 t: impl Future<Output = Result<http::Response<Body>, Error>> + Send + 'static,
498 ) -> Self {
499 ResponseFuture { req: Box::new(t) }
500 }
501}
502
503impl Future for ResponseFuture {
504 type Output = Result<http::Response<Body>, Error>;
505
506 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
507 let this = self.get_mut();
508 // this unsafe is ok because the boxed closure is not going to move anywhere.
509 unsafe { Pin::new_unchecked(&mut *this.req) }.poll(cx)
510 }
511}
512
513impl fmt::Debug for ResponseFuture {
514 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
515 write!(f, "ResponseFuture")
516 }
517}