1use std::net::SocketAddr;
2
3use futures::future::BoxFuture;
4use hyper::body::{Body, Bytes};
5use hyper::{Request, Response, Uri};
6
7use crate::{client_body::H3IncomingClient, connection::CacheSendRequestService};
8
9pub trait H3Connector: Send + 'static + Clone {
10 type CONN: h3::quic::Connection<
11 Bytes,
12 OpenStreams = Self::OS,
13 SendStream = Self::SS,
14 RecvStream = Self::RS,
15 OpenError = Self::OE,
16 > + Send;
17 type OS: h3::quic::OpenStreams<Bytes, OpenError = Self::OE, BidiStream = Self::BS>
18 + Clone
19 + Send; type SS: h3::quic::SendStream<Bytes> + Send;
21 type RS: h3::quic::RecvStream + Send;
22 type OE: Into<Box<dyn std::error::Error>> + Send;
23 type BS: h3::quic::BidiStream<Bytes, RecvStream = Self::RS, SendStream = Self::SS> + Send;
24
25 fn connect(
26 &self,
27 ) -> impl std::future::Future<Output = Result<Self::CONN, crate::Error>> + std::marker::Send;
28}
29
30pub async fn dns_resolve(uri: &Uri) -> std::io::Result<Vec<SocketAddr>> {
34 let host_port = uri
35 .authority()
36 .ok_or(std::io::Error::from(std::io::ErrorKind::InvalidInput))?
37 .as_str();
38 match host_port.parse::<SocketAddr>() {
39 Ok(addr) => Ok(vec![addr]),
40 Err(_) => {
41 tokio::net::lookup_host(host_port)
43 .await
44 .map(|a| a.collect::<Vec<_>>())
45 }
46 }
47}
48
49pub struct H3Connection<C, B>
54where
55 C: H3Connector,
56 B: Body + Send + 'static + Unpin,
57 B::Data: Send,
58 B::Error: Into<crate::Error>,
59{
60 #[allow(clippy::type_complexity)]
61 inner:
62 tower::util::BoxService<Request<B>, Response<H3IncomingClient<C::RS, Bytes>>, crate::Error>,
63}
64
65impl<C, B> H3Connection<C, B>
66where
67 C: H3Connector,
68 B: Body + Send + 'static + Unpin,
69 B::Data: Send,
70 B::Error: Into<crate::Error> + Send,
71{
72 pub fn new(connector: C, uri: Uri) -> Self {
73 let cache_mk_svc = CacheSendRequestService::new(connector);
74
75 let client_svc = ClientService {
76 inner: cache_mk_svc,
77 uri,
78 };
79
80 Self {
81 inner: tower::util::BoxService::new(client_svc),
82 }
83 }
84}
85
86impl<C, B> tower::Service<Request<B>> for H3Connection<C, B>
87where
88 C: H3Connector,
89 B: Body + Send + 'static + Unpin,
90 B::Data: Send,
91 B::Error: Into<crate::Error>,
92{
93 type Response = Response<H3IncomingClient<C::RS, Bytes>>;
94 type Error = crate::Error;
95 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
96
97 fn poll_ready(
98 &mut self,
99 cx: &mut std::task::Context<'_>,
100 ) -> std::task::Poll<Result<(), Self::Error>> {
101 tower::Service::poll_ready(&mut self.inner, cx)
102 }
103
104 fn call(&mut self, req: Request<B>) -> Self::Future {
105 self.inner.call(req)
106 }
107}
108
109pub struct ClientService<C>
111where
112 C: H3Connector,
113{
114 inner: CacheSendRequestService<C>,
115 uri: Uri,
116}
117
118impl<C, B> tower::Service<Request<B>> for ClientService<C>
119where
120 C: H3Connector,
121 B: Body + Send + 'static + Unpin,
122 B::Data: Send,
123 B::Error: Into<crate::Error> + Send,
124{
125 type Response = Response<H3IncomingClient<C::RS, Bytes>>;
126
127 type Error = crate::Error;
128
129 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
130
131 fn poll_ready(
132 &mut self,
133 cx: &mut std::task::Context<'_>,
134 ) -> std::task::Poll<Result<(), Self::Error>> {
135 self.inner.poll_ready(cx)
136 }
137
138 fn call(&mut self, mut req: Request<B>) -> Self::Future {
139 let uri = &self.uri;
140 let uri2 = Uri::builder()
142 .scheme(uri.scheme().unwrap().clone())
143 .authority(uri.authority().unwrap().clone())
144 .path_and_query(req.uri().path_and_query().unwrap().clone())
145 .build()
146 .unwrap();
147 *req.uri_mut() = uri2;
148
149 let fut = self.inner.call(());
150 Box::pin(async move {
151 let mut send_request = fut.await?;
152 send_request.call(req).await
153 })
154 }
155}
156
157pub struct H3Client<C, B>
161where
162 C: H3Connector,
163 B: Body + Send + 'static + Unpin,
164 B::Data: Send,
165 B::Error: Into<crate::Error> + Send,
166{
167 channel: H3Connection<C, B>,
168}
169
170impl<C, B> H3Client<C, B>
171where
172 C: H3Connector,
173 B: Body + Send + 'static + Unpin,
174 B::Data: Send,
175 B::Error: Into<crate::Error> + Send,
176{
177 pub fn new(inner: H3Connection<C, B>) -> Self {
178 Self { channel: inner }
179 }
180
181 pub async fn send(
182 &mut self,
183 req: Request<B>,
184 ) -> Result<Response<H3IncomingClient<C::RS, Bytes>>, crate::Error> {
185 use tower::Service;
186 futures::future::poll_fn(|cx| self.channel.poll_ready(cx)).await?;
188 self.channel.call(req).await
189 }
190}