reqwest_chain/
middleware.rs

1//! `ChainMiddleware` implements retrying requests on given conditions.
2
3use crate::chainable::{ChainMiddleware, Chainer};
4use anyhow::anyhow;
5use http::Extensions;
6use reqwest_middleware::reqwest::{Request, Response};
7use reqwest_middleware::{Error, Middleware, Next, Result};
8
9#[async_trait::async_trait]
10impl<T, S> Middleware for ChainMiddleware<T>
11where
12    T: Chainer<State = S> + Send + Sync + 'static,
13    S: Send + Sync + Default + 'static,
14{
15    async fn handle(
16        &self,
17        req: Request,
18        extensions: &mut Extensions,
19        next: Next<'_>,
20    ) -> Result<Response> {
21        // TODO: Ideally we should create a new instance of the `Extensions` map to pass
22        // downstream. This will guard against previous retries poluting `Extensions`.
23        // That is, we only return what's populated in the typemap for the last retry attempt
24        // and copy those into the the `global` Extensions map.
25        execute_with_chain(self.inner(), req, next, extensions).await
26    }
27}
28
29/// This function will try to execute the request, chaining any additional
30/// requests if required.
31async fn execute_with_chain<'a, T, S>(
32    chain_middleware: &'a T,
33    mut request: Request,
34    next: Next<'a>,
35    ext: &'a mut Extensions,
36) -> Result<Response>
37where
38    T: Chainer<State = S> + Sync,
39    S: Default,
40{
41    let mut request_state = S::default();
42    let mut n_past_retries = 0;
43    let max_chain_length = chain_middleware.max_chain_length();
44    loop {
45        if n_past_retries >= max_chain_length {
46            return Err(Error::Middleware(anyhow!(
47                "Maximum chain length {max_chain_length} exceeded"
48            )));
49        };
50
51        // Cloning the request object before-the-fact is not ideal..
52        // However, if the body of the request is not static, e.g of type `Bytes`,
53        // the Clone operation should be of constant complexity and not O(N)
54        // since the byte abstraction is a shared pointer over a buffer.
55        let duplicate_request = request.try_clone().ok_or_else(|| {
56            Error::Middleware(anyhow!(
57                "Request object is not clonable. Are you passing a streaming body?".to_string()
58            ))
59        })?;
60        let result = next.clone().run(duplicate_request, ext).await;
61
62        let action = chain_middleware
63            .chain(result, &mut request_state, &mut request)
64            .await?;
65        if let Some(response) = action {
66            return Ok(response);
67        } else {
68            n_past_retries += 1;
69        }
70    }
71}