h3_util/
client.rs

1use std::net::SocketAddr;
2
3use futures::future::BoxFuture;
4use hyper::body::{Body, Bytes};
5use hyper::{Request, Response, Uri};
6
7use crate::{client_body::H3IncomingClient, connection::CacheSendRequestService};
8
9pub trait H3Connector: Send + 'static + Clone {
10    type CONN: h3::quic::Connection<
11            Bytes,
12            OpenStreams = Self::OS,
13            SendStream = Self::SS,
14            RecvStream = Self::RS,
15            OpenError = Self::OE,
16        > + Send;
17    type OS: h3::quic::OpenStreams<Bytes, OpenError = Self::OE, BidiStream = Self::BS>
18        + Clone
19        + Send; // Clone is needed for cloning send_request
20    type SS: h3::quic::SendStream<Bytes> + Send;
21    type RS: h3::quic::RecvStream + Send;
22    type OE: Into<Box<dyn std::error::Error>> + Send;
23    type BS: h3::quic::BidiStream<Bytes, RecvStream = Self::RS, SendStream = Self::SS> + Send;
24
25    fn connect(
26        &self,
27    ) -> impl std::future::Future<Output = Result<Self::CONN, crate::Error>> + std::marker::Send;
28}
29
30/// Use the host:port portion of the uri and resolve to an sockaddr.
31/// If uri host portion is an ip string, then directly use the ip addr without
32/// dns lookup.
33pub async fn dns_resolve(uri: &Uri) -> std::io::Result<Vec<SocketAddr>> {
34    let host_port = uri
35        .authority()
36        .ok_or(std::io::Error::from(std::io::ErrorKind::InvalidInput))?
37        .as_str();
38    match host_port.parse::<SocketAddr>() {
39        Ok(addr) => Ok(vec![addr]),
40        Err(_) => {
41            // uri is using a dns name. try resolve it and return the first.
42            tokio::net::lookup_host(host_port)
43                .await
44                .map(|a| a.collect::<Vec<_>>())
45        }
46    }
47}
48
49/// h3 client connection, wrapping inner types for ease of use.
50/// All request will be sent to the connection established using the connector.
51/// Currently connector can only connect to a fixed server (to support grpc use case).
52/// Expand connector to do resolve different server based on uri can be added in future.
53pub struct H3Connection<C, B>
54where
55    C: H3Connector,
56    B: Body + Send + 'static + Unpin,
57    B::Data: Send,
58    B::Error: Into<crate::Error>,
59{
60    #[allow(clippy::type_complexity)]
61    inner:
62        tower::util::BoxService<Request<B>, Response<H3IncomingClient<C::RS, Bytes>>, crate::Error>,
63}
64
65impl<C, B> H3Connection<C, B>
66where
67    C: H3Connector,
68    B: Body + Send + 'static + Unpin,
69    B::Data: Send,
70    B::Error: Into<crate::Error> + Send,
71{
72    pub fn new(connector: C, uri: Uri) -> Self {
73        let cache_mk_svc = CacheSendRequestService::new(connector);
74
75        let client_svc = ClientService {
76            inner: cache_mk_svc,
77            uri,
78        };
79
80        Self {
81            inner: tower::util::BoxService::new(client_svc),
82        }
83    }
84}
85
86impl<C, B> tower::Service<Request<B>> for H3Connection<C, B>
87where
88    C: H3Connector,
89    B: Body + Send + 'static + Unpin,
90    B::Data: Send,
91    B::Error: Into<crate::Error>,
92{
93    type Response = Response<H3IncomingClient<C::RS, Bytes>>;
94    type Error = crate::Error;
95    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
96
97    fn poll_ready(
98        &mut self,
99        cx: &mut std::task::Context<'_>,
100    ) -> std::task::Poll<Result<(), Self::Error>> {
101        tower::Service::poll_ready(&mut self.inner, cx)
102    }
103
104    fn call(&mut self, req: Request<B>) -> Self::Future {
105        self.inner.call(req)
106    }
107}
108
109/// Client service that includes cache and reconnection.
110pub struct ClientService<C>
111where
112    C: H3Connector,
113{
114    inner: CacheSendRequestService<C>,
115    uri: Uri,
116}
117
118impl<C, B> tower::Service<Request<B>> for ClientService<C>
119where
120    C: H3Connector,
121    B: Body + Send + 'static + Unpin,
122    B::Data: Send,
123    B::Error: Into<crate::Error> + Send,
124{
125    type Response = Response<H3IncomingClient<C::RS, Bytes>>;
126
127    type Error = crate::Error;
128
129    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
130
131    fn poll_ready(
132        &mut self,
133        cx: &mut std::task::Context<'_>,
134    ) -> std::task::Poll<Result<(), Self::Error>> {
135        self.inner.poll_ready(cx)
136    }
137
138    fn call(&mut self, mut req: Request<B>) -> Self::Future {
139        let uri = &self.uri;
140        // fix up uri with full uri.
141        let uri2 = Uri::builder()
142            .scheme(uri.scheme().unwrap().clone())
143            .authority(uri.authority().unwrap().clone())
144            .path_and_query(req.uri().path_and_query().unwrap().clone())
145            .build()
146            .unwrap();
147        *req.uri_mut() = uri2;
148
149        let fut = self.inner.call(());
150        Box::pin(async move {
151            let mut send_request = fut.await?;
152            send_request.call(req).await
153        })
154    }
155}
156
157/// http3 client.
158/// Note the client does not do dns resolve but blindly sends requests
159/// using connections created by the connector.
160pub struct H3Client<C, B>
161where
162    C: H3Connector,
163    B: Body + Send + 'static + Unpin,
164    B::Data: Send,
165    B::Error: Into<crate::Error> + Send,
166{
167    channel: H3Connection<C, B>,
168}
169
170impl<C, B> H3Client<C, B>
171where
172    C: H3Connector,
173    B: Body + Send + 'static + Unpin,
174    B::Data: Send,
175    B::Error: Into<crate::Error> + Send,
176{
177    pub fn new(inner: H3Connection<C, B>) -> Self {
178        Self { channel: inner }
179    }
180
181    pub async fn send(
182        &mut self,
183        req: Request<B>,
184    ) -> Result<Response<H3IncomingClient<C::RS, Bytes>>, crate::Error> {
185        use tower::Service;
186        // wait for ready
187        futures::future::poll_fn(|cx| self.channel.poll_ready(cx)).await?;
188        self.channel.call(req).await
189    }
190}