reqwest/async_impl/h3_client/
mod.rs

1#![cfg(feature = "http3")]
2
3pub(crate) mod connect;
4pub(crate) mod dns;
5mod pool;
6
7use crate::async_impl::body::ResponseBody;
8use crate::async_impl::h3_client::pool::{Key, Pool, PoolClient};
9#[cfg(feature = "cookies")]
10use crate::cookie;
11use crate::error::{BoxError, Error, Kind};
12use crate::{error, Body};
13use connect::H3Connector;
14use http::{Request, Response};
15use log::trace;
16use std::future::{self, Future};
17use std::pin::Pin;
18#[cfg(feature = "cookies")]
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::Duration;
22use sync_wrapper::SyncWrapper;
23use tower::Service;
24
25#[derive(Clone)]
26pub(crate) struct H3Client {
27    pool: Pool,
28    connector: H3Connector,
29    #[cfg(feature = "cookies")]
30    cookie_store: Option<Arc<dyn cookie::CookieStore>>,
31}
32
33impl H3Client {
34    #[cfg(not(feature = "cookies"))]
35    pub fn new(connector: H3Connector, pool_timeout: Option<Duration>) -> Self {
36        H3Client {
37            pool: Pool::new(pool_timeout),
38            connector,
39        }
40    }
41
42    #[cfg(feature = "cookies")]
43    pub fn new(
44        connector: H3Connector,
45        pool_timeout: Option<Duration>,
46        cookie_store: Option<Arc<dyn cookie::CookieStore>>,
47    ) -> Self {
48        H3Client {
49            pool: Pool::new(pool_timeout),
50            connector,
51            cookie_store,
52        }
53    }
54
55    async fn get_pooled_client(&mut self, key: Key) -> Result<PoolClient, BoxError> {
56        if let Some(client) = self.pool.try_pool(&key) {
57            trace!("getting client from pool with key {key:?}");
58            return Ok(client);
59        }
60
61        trace!("did not find connection {key:?} in pool so connecting...");
62
63        let dest = pool::domain_as_uri(key.clone());
64
65        let lock = match self.pool.connecting(&key) {
66            pool::Connecting::InProgress(waiter) => {
67                trace!("connecting to {key:?} is already in progress, subscribing...");
68
69                match waiter.receive().await {
70                    Some(client) => return Ok(client),
71                    None => return Err("failed to establish connection for HTTP/3 request".into()),
72                }
73            }
74            pool::Connecting::Acquired(lock) => lock,
75        };
76        trace!("connecting to {key:?}...");
77        let (driver, tx) = self.connector.connect(dest).await?;
78        trace!("saving new pooled connection to {key:?}");
79        Ok(self.pool.new_connection(lock, driver, tx))
80    }
81
82    #[cfg(not(feature = "cookies"))]
83    async fn send_request(
84        mut self,
85        key: Key,
86        req: Request<Body>,
87    ) -> Result<Response<ResponseBody>, Error> {
88        let mut pooled = match self.get_pooled_client(key).await {
89            Ok(client) => client,
90            Err(e) => return Err(error::request(e)),
91        };
92        pooled
93            .send_request(req)
94            .await
95            .map_err(|e| Error::new(Kind::Request, Some(e)))
96    }
97
98    #[cfg(feature = "cookies")]
99    async fn send_request(
100        mut self,
101        key: Key,
102        mut req: Request<Body>,
103    ) -> Result<Response<ResponseBody>, Error> {
104        let mut pooled = match self.get_pooled_client(key).await {
105            Ok(client) => client,
106            Err(e) => return Err(error::request(e)),
107        };
108
109        let url = url::Url::parse(req.uri().to_string().as_str()).unwrap();
110        if let Some(cookie_store) = self.cookie_store.as_ref() {
111            if req.headers().get(crate::header::COOKIE).is_none() {
112                let headers = req.headers_mut();
113                crate::util::add_cookie_header(headers, &**cookie_store, &url);
114            }
115        }
116
117        let res = pooled
118            .send_request(req)
119            .await
120            .map_err(|e| Error::new(Kind::Request, Some(e)));
121
122        if let Some(ref cookie_store) = self.cookie_store {
123            if let Ok(res) = &res {
124                let mut cookies = cookie::extract_response_cookie_headers(res.headers()).peekable();
125                if cookies.peek().is_some() {
126                    cookie_store.set_cookies(&mut cookies, &url);
127                }
128            }
129        }
130
131        res
132    }
133
134    pub fn request(&self, mut req: Request<Body>) -> H3ResponseFuture {
135        let pool_key = match pool::extract_domain(req.uri_mut()) {
136            Ok(s) => s,
137            Err(e) => {
138                return H3ResponseFuture {
139                    inner: SyncWrapper::new(Box::pin(future::ready(Err(e)))),
140                }
141            }
142        };
143        H3ResponseFuture {
144            inner: SyncWrapper::new(Box::pin(self.clone().send_request(pool_key, req))),
145        }
146    }
147}
148
149impl Service<Request<Body>> for H3Client {
150    type Response = Response<ResponseBody>;
151    type Error = Error;
152    type Future = H3ResponseFuture;
153
154    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        Poll::Ready(Ok(()))
156    }
157
158    fn call(&mut self, req: Request<Body>) -> Self::Future {
159        self.request(req)
160    }
161}
162
163pub(crate) struct H3ResponseFuture {
164    inner: SyncWrapper<Pin<Box<dyn Future<Output = Result<Response<ResponseBody>, Error>> + Send>>>,
165}
166
167impl Future for H3ResponseFuture {
168    type Output = Result<Response<ResponseBody>, Error>;
169
170    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        self.inner.get_mut().as_mut().poll(cx)
172    }
173}