datafusion_table_providers/util/
ns_lookup.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;

use snafu::prelude::*;
use trust_dns_resolver::AsyncResolver;

#[derive(Debug, Snafu)]
pub enum Error {
    #[snafu(display("Failed to connect to {host}:{port}, are the host and port correct?"))]
    UnableToConnect { host: String, port: u16 },

    #[snafu(display("Failed to parse endpoint {endpoint}: {source}"))]
    UnableToParseUrl {
        endpoint: String,
        source: url::ParseError,
    },

    #[snafu(display("Invalid endpoint (no host provided): {endpoint}"))]
    InvalidHost { endpoint: String },

    #[snafu(display("Invalid endpoint (no port specified): {endpoint}"))]
    InvalidPort { endpoint: String },
}

pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Verify NS lookup and TCP connect for the provided `endpoint`.
///
/// # Arguments
///
/// * `endpoint` - The endpoint to lookup.
///
/// # Errors
///
/// Returns `Error` if unable to parse endpoint or if the NS lookup or TCP connect fails.
pub async fn verify_endpoint_connection(endpoint: &str) -> Result<()> {
    let url = url::Url::parse(endpoint).context(UnableToParseUrlSnafu {
        endpoint: endpoint.to_string(),
    })?;

    let host = url.host_str().context(InvalidHostSnafu {
        endpoint: endpoint.to_string(),
    })?;

    let port = url.port_or_known_default().context(InvalidPortSnafu {
        endpoint: endpoint.to_string(),
    })?;

    verify_ns_lookup_and_tcp_connect(host, port).await
}

/// Verify NS lookup and TCP connect of the provided `host` and `port`.
///
/// # Arguments
///
/// * `host` - The host to lookup.
/// * `port` - The port to connect to.
///
/// # Errors
///
/// Returns an `Error` if the NS lookup or TCP connect fails.
pub async fn verify_ns_lookup_and_tcp_connect(host: &str, port: u16) -> Result<()> {
    // DefaultConfig uses google as upstream nameservers which won't work for kubernetes name
    // resolving
    let resolver = AsyncResolver::tokio_from_system_conf().map_err(|_| Error::UnableToConnect {
        host: host.to_string(),
        port,
    })?;
    match resolver.lookup_ip(host).await {
        Ok(ips) => {
            for ip in ips.iter() {
                let addr = SocketAddr::new(ip, port);
                if TcpStream::connect_timeout(&addr, Duration::from_secs(30)).is_ok() {
                    return Ok(());
                }
            }

            tracing::debug!("Failed to connect to {host}:{port}, connection timed out");

            UnableToConnectSnafu {
                host: host.to_string(),
                port,
            }
            .fail()
        }
        Err(err) => {
            tracing::debug!("Failed to resolve host: {err}");
            UnableToConnectSnafu {
                host: host.to_string(),
                port,
            }
            .fail()
        }
    }
}