apache_dubbo/triple/transport/connector/
http_connector.rs1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
19use std::str::FromStr;
20
21use http::Uri;
22use hyper::client::connect::dns::Name;
23use tokio::net::TcpStream;
24use tower_service::Service;
25
26use crate::triple::transport::resolver::dns::DnsResolver;
27use crate::triple::transport::resolver::Resolve;
28
29#[derive(Clone, Default)]
30pub struct HttpConnector<R = DnsResolver> {
31 resolver: R,
32}
33
34impl HttpConnector {
35 pub fn new() -> Self {
36 Self {
37 resolver: DnsResolver::default(),
38 }
39 }
40}
41
42impl<R> HttpConnector<R> {
43 pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
44 Self { resolver }
45 }
46}
47
48impl<R> Service<Uri> for HttpConnector<R>
49where
50 R: Resolve + Clone + Send + Sync + 'static,
51 R::Future: Send,
52{
53 type Response = TcpStream;
54
55 type Error = crate::Error;
56
57 type Future = crate::BoxFuture<Self::Response, Self::Error>;
58
59 fn poll_ready(
60 &mut self,
61 cx: &mut std::task::Context<'_>,
62 ) -> std::task::Poll<Result<(), Self::Error>> {
63 self.resolver.poll_ready(cx).map_err(|err| err.into())
64 }
65
66 fn call(&mut self, uri: Uri) -> Self::Future {
67 let mut inner = self.clone();
68
69 Box::pin(async move { inner.call_async(uri).await })
70 }
71}
72
73impl<R> HttpConnector<R>
74where
75 R: Resolve + Send + Sync + 'static,
76{
77 async fn call_async(&mut self, uri: Uri) -> Result<TcpStream, crate::Error> {
78 let host = uri.host().unwrap();
79 let port = uri.port_u16().unwrap();
80
81 let addr = if let Ok(addr) = host.parse::<Ipv4Addr>() {
82 tracing::info!("host is ip address: {:?}", host);
83 SocketAddr::V4(SocketAddrV4::new(addr, port))
84 } else {
85 tracing::info!("host is dns: {:?}", host);
86 let addrs = self
87 .resolver
88 .resolve(Name::from_str(host).unwrap())
89 .await
90 .map_err(|err| err.into())?;
91 let addrs: Vec<SocketAddr> = addrs
92 .map(|mut addr| {
93 addr.set_port(port);
94 addr
95 })
96 .collect();
97 addrs[0]
98 };
99
100 let conn = TcpStream::connect(addr).await?;
101
102 Ok(conn)
103 }
104}