reqwest/async_impl/h3_client/
mod.rs1#![cfg(feature = "http3")]
2
3pub(crate) mod connect;
4pub(crate) mod dns;
5mod pool;
6
7use crate::async_impl::body::ResponseBody;
8use crate::async_impl::h3_client::pool::{Key, Pool, PoolClient};
9#[cfg(feature = "cookies")]
10use crate::cookie;
11use crate::error::{BoxError, Error, Kind};
12use crate::{error, Body};
13use connect::H3Connector;
14use http::{Request, Response};
15use log::trace;
16use std::future::{self, Future};
17use std::pin::Pin;
18#[cfg(feature = "cookies")]
19use std::sync::Arc;
20use std::task::{Context, Poll};
21use std::time::Duration;
22use sync_wrapper::SyncWrapper;
23use tower::Service;
24
25#[derive(Clone)]
26pub(crate) struct H3Client {
27 pool: Pool,
28 connector: H3Connector,
29 #[cfg(feature = "cookies")]
30 cookie_store: Option<Arc<dyn cookie::CookieStore>>,
31}
32
33impl H3Client {
34 #[cfg(not(feature = "cookies"))]
35 pub fn new(connector: H3Connector, pool_timeout: Option<Duration>) -> Self {
36 H3Client {
37 pool: Pool::new(pool_timeout),
38 connector,
39 }
40 }
41
42 #[cfg(feature = "cookies")]
43 pub fn new(
44 connector: H3Connector,
45 pool_timeout: Option<Duration>,
46 cookie_store: Option<Arc<dyn cookie::CookieStore>>,
47 ) -> Self {
48 H3Client {
49 pool: Pool::new(pool_timeout),
50 connector,
51 cookie_store,
52 }
53 }
54
55 async fn get_pooled_client(&mut self, key: Key) -> Result<PoolClient, BoxError> {
56 if let Some(client) = self.pool.try_pool(&key) {
57 trace!("getting client from pool with key {key:?}");
58 return Ok(client);
59 }
60
61 trace!("did not find connection {key:?} in pool so connecting...");
62
63 let dest = pool::domain_as_uri(key.clone());
64
65 let lock = match self.pool.connecting(&key) {
66 pool::Connecting::InProgress(waiter) => {
67 trace!("connecting to {key:?} is already in progress, subscribing...");
68
69 match waiter.receive().await {
70 Some(client) => return Ok(client),
71 None => return Err("failed to establish connection for HTTP/3 request".into()),
72 }
73 }
74 pool::Connecting::Acquired(lock) => lock,
75 };
76 trace!("connecting to {key:?}...");
77 let (driver, tx) = self.connector.connect(dest).await?;
78 trace!("saving new pooled connection to {key:?}");
79 Ok(self.pool.new_connection(lock, driver, tx))
80 }
81
82 #[cfg(not(feature = "cookies"))]
83 async fn send_request(
84 mut self,
85 key: Key,
86 req: Request<Body>,
87 ) -> Result<Response<ResponseBody>, Error> {
88 let mut pooled = match self.get_pooled_client(key).await {
89 Ok(client) => client,
90 Err(e) => return Err(error::request(e)),
91 };
92 pooled
93 .send_request(req)
94 .await
95 .map_err(|e| Error::new(Kind::Request, Some(e)))
96 }
97
98 #[cfg(feature = "cookies")]
99 async fn send_request(
100 mut self,
101 key: Key,
102 mut req: Request<Body>,
103 ) -> Result<Response<ResponseBody>, Error> {
104 let mut pooled = match self.get_pooled_client(key).await {
105 Ok(client) => client,
106 Err(e) => return Err(error::request(e)),
107 };
108
109 let url = url::Url::parse(req.uri().to_string().as_str()).unwrap();
110 if let Some(cookie_store) = self.cookie_store.as_ref() {
111 if req.headers().get(crate::header::COOKIE).is_none() {
112 let headers = req.headers_mut();
113 crate::util::add_cookie_header(headers, &**cookie_store, &url);
114 }
115 }
116
117 let res = pooled
118 .send_request(req)
119 .await
120 .map_err(|e| Error::new(Kind::Request, Some(e)));
121
122 if let Some(ref cookie_store) = self.cookie_store {
123 if let Ok(res) = &res {
124 let mut cookies = cookie::extract_response_cookie_headers(res.headers()).peekable();
125 if cookies.peek().is_some() {
126 cookie_store.set_cookies(&mut cookies, &url);
127 }
128 }
129 }
130
131 res
132 }
133
134 pub fn request(&self, mut req: Request<Body>) -> H3ResponseFuture {
135 let pool_key = match pool::extract_domain(req.uri_mut()) {
136 Ok(s) => s,
137 Err(e) => {
138 return H3ResponseFuture {
139 inner: SyncWrapper::new(Box::pin(future::ready(Err(e)))),
140 }
141 }
142 };
143 H3ResponseFuture {
144 inner: SyncWrapper::new(Box::pin(self.clone().send_request(pool_key, req))),
145 }
146 }
147}
148
149impl Service<Request<Body>> for H3Client {
150 type Response = Response<ResponseBody>;
151 type Error = Error;
152 type Future = H3ResponseFuture;
153
154 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 Poll::Ready(Ok(()))
156 }
157
158 fn call(&mut self, req: Request<Body>) -> Self::Future {
159 self.request(req)
160 }
161}
162
163pub(crate) struct H3ResponseFuture {
164 inner: SyncWrapper<Pin<Box<dyn Future<Output = Result<Response<ResponseBody>, Error>> + Send>>>,
165}
166
167impl Future for H3ResponseFuture {
168 type Output = Result<Response<ResponseBody>, Error>;
169
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 self.inner.get_mut().as_mut().poll(cx)
172 }
173}