datafusion_table_providers/util/
ns_lookup.rs

1use std::net::{SocketAddr, TcpStream};
2use std::time::Duration;
3
4use snafu::prelude::*;
5use trust_dns_resolver::AsyncResolver;
6
7#[derive(Debug, Snafu)]
8pub enum Error {
9    #[snafu(display("Failed to connect to {host}:{port}, are the host and port correct?"))]
10    UnableToConnect { host: String, port: u16 },
11
12    #[snafu(display("Failed to parse endpoint {endpoint}: {source}"))]
13    UnableToParseUrl {
14        endpoint: String,
15        source: url::ParseError,
16    },
17
18    #[snafu(display("Invalid endpoint (no host provided): {endpoint}"))]
19    InvalidHost { endpoint: String },
20
21    #[snafu(display("Invalid endpoint (no port specified): {endpoint}"))]
22    InvalidPort { endpoint: String },
23}
24
25pub type Result<T, E = Error> = std::result::Result<T, E>;
26
27/// Verify NS lookup and TCP connect for the provided `endpoint`.
28///
29/// # Arguments
30///
31/// * `endpoint` - The endpoint to lookup.
32///
33/// # Errors
34///
35/// Returns `Error` if unable to parse endpoint or if the NS lookup or TCP connect fails.
36pub async fn verify_endpoint_connection(endpoint: &str) -> Result<()> {
37    let url = url::Url::parse(endpoint).context(UnableToParseUrlSnafu {
38        endpoint: endpoint.to_string(),
39    })?;
40
41    let host = url.host_str().context(InvalidHostSnafu {
42        endpoint: endpoint.to_string(),
43    })?;
44
45    let port = url.port_or_known_default().context(InvalidPortSnafu {
46        endpoint: endpoint.to_string(),
47    })?;
48
49    verify_ns_lookup_and_tcp_connect(host, port).await
50}
51
52/// Verify NS lookup and TCP connect of the provided `host` and `port`.
53///
54/// # Arguments
55///
56/// * `host` - The host to lookup.
57/// * `port` - The port to connect to.
58///
59/// # Errors
60///
61/// Returns an `Error` if the NS lookup or TCP connect fails.
62pub async fn verify_ns_lookup_and_tcp_connect(host: &str, port: u16) -> Result<()> {
63    // DefaultConfig uses google as upstream nameservers which won't work for kubernetes name
64    // resolving
65    let resolver = AsyncResolver::tokio_from_system_conf().map_err(|_| Error::UnableToConnect {
66        host: host.to_string(),
67        port,
68    })?;
69    match resolver.lookup_ip(host).await {
70        Ok(ips) => {
71            for ip in ips.iter() {
72                let addr = SocketAddr::new(ip, port);
73                if TcpStream::connect_timeout(&addr, Duration::from_secs(30)).is_ok() {
74                    return Ok(());
75                }
76            }
77
78            tracing::debug!("Failed to connect to {host}:{port}, connection timed out");
79
80            UnableToConnectSnafu {
81                host: host.to_string(),
82                port,
83            }
84            .fail()
85        }
86        Err(err) => {
87            tracing::debug!("Failed to resolve host: {err}");
88            UnableToConnectSnafu {
89                host: host.to_string(),
90                port,
91            }
92            .fail()
93        }
94    }
95}