h3_util/
client.rs

1use std::net::SocketAddr;
2
3use futures::future::BoxFuture;
4use hyper::body::{Body, Bytes};
5use hyper::{Request, Response, Uri};
6
7use crate::client_body::H3IncomingClient;
8use crate::client_conn;
9
10pub trait H3Connector: Send + 'static + Clone {
11    type CONN: h3::quic::Connection<
12            Bytes,
13            OpenStreams = Self::OS,
14            SendStream = Self::SS,
15            RecvStream = Self::RS,
16        > + Send;
17    type OS: h3::quic::OpenStreams<Bytes, BidiStream = Self::BS> + Clone + Send; // Clone is needed for cloning send_request
18    type SS: h3::quic::SendStream<Bytes> + Send;
19    type RS: h3::quic::RecvStream + Send;
20    type BS: h3::quic::BidiStream<Bytes, RecvStream = Self::RS, SendStream = Self::SS> + Send;
21
22    fn connect(
23        &self,
24    ) -> impl std::future::Future<Output = Result<Self::CONN, crate::Error>> + std::marker::Send;
25}
26
27/// Use the host:port portion of the uri and resolve to an sockaddr.
28/// If uri host portion is an ip string, then directly use the ip addr without
29/// dns lookup.
30pub async fn dns_resolve(uri: &Uri) -> std::io::Result<Vec<SocketAddr>> {
31    let host_port = uri
32        .authority()
33        .ok_or(std::io::Error::from(std::io::ErrorKind::InvalidInput))?
34        .as_str();
35    match host_port.parse::<SocketAddr>() {
36        Ok(addr) => Ok(vec![addr]),
37        Err(_) => {
38            // uri is using a dns name. try resolve it and return the first.
39            tokio::net::lookup_host(host_port)
40                .await
41                .map(|a| a.collect::<Vec<_>>())
42        }
43    }
44}
45
46/// h3 client connection, wrapping inner types for ease of use.
47/// All request will be sent to the connection established using the connector.
48/// Currently connector can only connect to a fixed server (to support grpc use case).
49/// Expand connector to do resolve different server based on uri can be added in future.
50pub struct H3Connection<C, B>
51where
52    C: H3Connector,
53    B: Body + Send + 'static + Unpin,
54    B::Data: Send,
55    B::Error: Into<crate::Error>,
56{
57    #[allow(clippy::type_complexity)]
58    inner:
59        tower::util::BoxService<Request<B>, Response<H3IncomingClient<C::RS, Bytes>>, crate::Error>,
60}
61
62impl<C, B> H3Connection<C, B>
63where
64    C: H3Connector,
65    B: Body + Send + 'static + Unpin,
66    B::Data: Send,
67    B::Error: Into<crate::Error> + Send,
68{
69    pub fn new(connector: C, uri: Uri) -> Self {
70        let sender = client_conn::RequestSender::new(connector, uri);
71        Self {
72            inner: tower::util::BoxService::new(sender),
73        }
74    }
75}
76
77impl<C, B> tower::Service<Request<B>> for H3Connection<C, B>
78where
79    C: H3Connector,
80    B: Body + Send + 'static + Unpin,
81    B::Data: Send,
82    B::Error: Into<crate::Error>,
83{
84    type Response = Response<H3IncomingClient<C::RS, Bytes>>;
85    type Error = crate::Error;
86    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
87
88    fn poll_ready(
89        &mut self,
90        cx: &mut std::task::Context<'_>,
91    ) -> std::task::Poll<Result<(), Self::Error>> {
92        tower::Service::poll_ready(&mut self.inner, cx)
93    }
94
95    fn call(&mut self, req: Request<B>) -> Self::Future {
96        self.inner.call(req)
97    }
98}
99
100/// http3 client.
101/// Note the client does not do dns resolve but blindly sends requests
102/// using connections created by the connector.
103/// Used for sending HTTP request directly.
104pub struct H3Client<C, B>
105where
106    C: H3Connector,
107    B: Body + Send + 'static + Unpin,
108    B::Data: Send,
109    B::Error: Into<crate::Error> + Send,
110{
111    channel: H3Connection<C, B>,
112}
113
114impl<C, B> H3Client<C, B>
115where
116    C: H3Connector,
117    B: Body + Send + 'static + Unpin,
118    B::Data: Send,
119    B::Error: Into<crate::Error> + Send,
120{
121    pub fn new(inner: H3Connection<C, B>) -> Self {
122        Self { channel: inner }
123    }
124
125    pub async fn send(
126        &mut self,
127        req: Request<B>,
128    ) -> Result<Response<H3IncomingClient<C::RS, Bytes>>, crate::Error> {
129        use tower::Service;
130        // wait for ready
131        futures::future::poll_fn(|cx| self.channel.poll_ready(cx)).await?;
132        self.channel.call(req).await
133    }
134}