apache_dubbo/triple/transport/connector/
http_connector.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
19use std::str::FromStr;
20
21use http::Uri;
22use hyper::client::connect::dns::Name;
23use tokio::net::TcpStream;
24use tower_service::Service;
25
26use crate::triple::transport::resolver::dns::DnsResolver;
27use crate::triple::transport::resolver::Resolve;
28
29#[derive(Clone, Default)]
30pub struct HttpConnector<R = DnsResolver> {
31    resolver: R,
32}
33
34impl HttpConnector {
35    pub fn new() -> Self {
36        Self {
37            resolver: DnsResolver::default(),
38        }
39    }
40}
41
42impl<R> HttpConnector<R> {
43    pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
44        Self { resolver }
45    }
46}
47
48impl<R> Service<Uri> for HttpConnector<R>
49where
50    R: Resolve + Clone + Send + Sync + 'static,
51    R::Future: Send,
52{
53    type Response = TcpStream;
54
55    type Error = crate::Error;
56
57    type Future = crate::BoxFuture<Self::Response, Self::Error>;
58
59    fn poll_ready(
60        &mut self,
61        cx: &mut std::task::Context<'_>,
62    ) -> std::task::Poll<Result<(), Self::Error>> {
63        self.resolver.poll_ready(cx).map_err(|err| err.into())
64    }
65
66    fn call(&mut self, uri: Uri) -> Self::Future {
67        let mut inner = self.clone();
68
69        Box::pin(async move { inner.call_async(uri).await })
70    }
71}
72
73impl<R> HttpConnector<R>
74where
75    R: Resolve + Send + Sync + 'static,
76{
77    async fn call_async(&mut self, uri: Uri) -> Result<TcpStream, crate::Error> {
78        let host = uri.host().unwrap();
79        let port = uri.port_u16().unwrap();
80
81        let addr = if let Ok(addr) = host.parse::<Ipv4Addr>() {
82            tracing::info!("host is ip address: {:?}", host);
83            SocketAddr::V4(SocketAddrV4::new(addr, port))
84        } else {
85            tracing::info!("host is dns: {:?}", host);
86            let addrs = self
87                .resolver
88                .resolve(Name::from_str(host).unwrap())
89                .await
90                .map_err(|err| err.into())?;
91            let addrs: Vec<SocketAddr> = addrs
92                .map(|mut addr| {
93                    addr.set_port(port);
94                    addr
95                })
96                .collect();
97            addrs[0]
98        };
99
100        let conn = TcpStream::connect(addr).await?;
101
102        Ok(conn)
103    }
104}