use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use url::Host;
use crate::client::name_resolution::Resolver;
use crate::client::name_resolution::ResolverOptions;
use crate::client::name_resolution::Target;
use crate::client::name_resolution::backoff::BackoffConfig;
use crate::client::name_resolution::backoff::DEFAULT_EXPONENTIAL_CONFIG;
use crate::client::name_resolution::dns::DnsOptions;
use crate::client::name_resolution::dns::DnsResolver;
use crate::client::name_resolution::dns::HostPort;
use crate::client::name_resolution::dns::ParseResult;
use crate::client::name_resolution::dns::get_min_resolution_interval;
use crate::client::name_resolution::dns::get_resolving_timeout;
use crate::client::name_resolution::dns::parse_endpoint_and_authority;
use crate::client::name_resolution::dns::reg;
use crate::client::name_resolution::global_registry;
use crate::client::name_resolution::test_utils::TestChannelController;
use crate::client::name_resolution::test_utils::TestWorkScheduler;
use crate::rt;
use crate::rt::GrpcEndpoint;
use crate::rt::GrpcRuntime;
use crate::rt::Runtime;
use crate::rt::Sleep;
use crate::rt::TaskHandle;
use crate::rt::TcpOptions;
use crate::rt::default_runtime;
use crate::rt::tokio::TokioRuntime;
const DEFAULT_TEST_SHORT_TIMEOUT: Duration = Duration::from_millis(10);
#[test]
pub(crate) fn target_parsing() {
struct TestCase {
input: &'static str,
want_result: Result<ParseResult, String>,
}
let test_cases = vec![
TestCase {
input: "dns:///grpc.io",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 443,
},
authority: None,
}),
},
TestCase {
input: "dns:///grpc.io:1234",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 1234,
},
authority: None,
}),
},
TestCase {
input: "dns://8.8.8.8/grpc.io:1234",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 1234,
},
authority: Some("8.8.8.8:53".parse().unwrap()),
}),
},
TestCase {
input: "dns://8.8.8.8:5678/grpc.io:1234/abc",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 1234,
},
authority: Some("8.8.8.8:5678".parse().unwrap()),
}),
},
TestCase {
input: "dns://[::1]:5678/grpc.io:1234/abc",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 1234,
},
authority: Some("[::1]:5678".parse().unwrap()),
}),
},
TestCase {
input: "dns://[fe80::1]:5678/127.0.0.1:1234/abc",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Ipv4("127.0.0.1".parse().unwrap()),
port: 1234,
},
authority: Some("[fe80::1]:5678".parse().unwrap()),
}),
},
TestCase {
input: "dns:///[fe80::1%80]:5678/abc",
want_result: Err("SocketAddr doesn't support IPv6 addresses with zones".to_string()),
},
TestCase {
input: "dns:///:5678/abc",
want_result: Err("Empty host with port".to_string()),
},
TestCase {
input: "dns:///grpc.io:abc/abc",
want_result: Err("Non numeric port".to_string()),
},
TestCase {
input: "dns:///grpc.io:/",
want_result: Ok(ParseResult {
endpoint: HostPort {
host: Host::Domain("grpc.io".to_string()),
port: 443,
},
authority: None,
}),
},
TestCase {
input: "dns:///:",
want_result: Err("No host and port".to_string()),
},
TestCase {
input: "dns:///[2001:db8:a0b:12f0::1",
want_result: Err("Invalid address".to_string()),
},
];
for tc in test_cases {
let target: Target = tc.input.parse().unwrap();
let got = parse_endpoint_and_authority(&target);
if got.is_err() != tc.want_result.is_err() {
panic!(
"Got error {:?}, want error: {:?}",
got.err(),
tc.want_result.err()
);
}
if got.is_err() {
continue;
}
assert_eq!(got.unwrap(), tc.want_result.unwrap());
}
}
#[tokio::test]
pub(crate) async fn dns_basic() {
reg();
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///localhost:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
}
#[tokio::test]
pub(crate) async fn invalid_target() {
reg();
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(
update
.endpoints
.err()
.unwrap()
.contains(&target.to_string())
);
}
#[derive(Clone, Debug)]
struct FakeDns {
latency: Duration,
lookup_result: Result<Vec<std::net::IpAddr>, String>,
}
#[tonic::async_trait]
impl rt::DnsResolver for FakeDns {
async fn lookup_host_name(&self, _: &str) -> Result<Vec<std::net::IpAddr>, String> {
tokio::time::sleep(self.latency).await;
self.lookup_result.clone()
}
async fn lookup_txt(&self, _: &str) -> Result<Vec<String>, String> {
Err("unimplemented".to_string())
}
}
#[derive(Debug)]
struct FakeRuntime {
inner: TokioRuntime,
dns: FakeDns,
}
impl Runtime for FakeRuntime {
fn spawn(
&self,
task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) -> Box<dyn TaskHandle> {
self.inner.spawn(task)
}
fn get_dns_resolver(&self, _: rt::ResolverOptions) -> Result<Box<dyn rt::DnsResolver>, String> {
Ok(Box::new(self.dns.clone()))
}
fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
self.inner.sleep(duration)
}
fn tcp_stream(
&self,
target: std::net::SocketAddr,
opts: TcpOptions,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn GrpcEndpoint>, String>> + Send>> {
self.inner.tcp_stream(target, opts)
}
}
#[tokio::test]
pub(crate) async fn dns_lookup_error() {
reg();
let builder = global_registry().get("dns").unwrap();
let target = &"dns:///grpc.io:1234".parse().unwrap();
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let runtime = FakeRuntime {
inner: TokioRuntime::default(),
dns: FakeDns {
latency: Duration::from_secs(0),
lookup_result: Err("test_error".to_string()),
},
};
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: GrpcRuntime::new(runtime),
work_scheduler: work_scheduler.clone(),
};
let mut resolver = builder.build(target, opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap_err().contains("test_error"));
}
#[tokio::test]
pub(crate) async fn dns_lookup_timeout() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let runtime = FakeRuntime {
inner: TokioRuntime::default(),
dns: FakeDns {
latency: Duration::from_secs(20),
lookup_result: Ok(Vec::new()),
},
};
let dns_client = runtime.dns.clone();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: GrpcRuntime::new(runtime),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
min_resolution_interval: get_min_resolution_interval(),
resolving_timeout: DEFAULT_TEST_SHORT_TIMEOUT,
backoff_config: DEFAULT_EXPONENTIAL_CONFIG,
host: "grpc.io".to_string(),
port: 1234,
};
let mut resolver = DnsResolver::new(Box::new(dns_client), opts, dns_opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap_err().contains("Timed out"));
}
#[tokio::test]
pub(crate) async fn rate_limit() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
work_scheduler: work_scheduler.clone(),
};
let dns_client = opts
.runtime
.get_dns_resolver(rt::ResolverOptions { server_addr: None })
.unwrap();
let dns_opts = DnsOptions {
min_resolution_interval: Duration::from_secs(20),
resolving_timeout: get_resolving_timeout(),
backoff_config: DEFAULT_EXPONENTIAL_CONFIG,
host: "localhost".to_string(),
port: 1234,
};
let mut resolver = DnsResolver::new(dns_client, opts, dns_opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
for _ in 0..5 {
resolver.resolve_now();
tokio::select! {
_ = work_rx.recv() => {
panic!("Received unexpected work request from resolver");
}
_ = tokio::time::sleep(DEFAULT_TEST_SHORT_TIMEOUT) => {
println!("No work requested from resolver.");
}
};
}
}
#[tokio::test]
pub(crate) async fn re_resolution_after_success() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
min_resolution_interval: Duration::from_millis(1),
resolving_timeout: get_resolving_timeout(),
backoff_config: DEFAULT_EXPONENTIAL_CONFIG,
host: "localhost".to_string(),
port: 1234,
};
let dns_client = opts
.runtime
.get_dns_resolver(rt::ResolverOptions { server_addr: None })
.unwrap();
let mut resolver = DnsResolver::new(dns_client, opts, dns_opts);
work_rx.recv().await.unwrap();
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
resolver.resolve_now();
work_rx.recv().await.unwrap();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
}
#[tokio::test]
pub(crate) async fn backoff_on_error() {
let (work_scheduler, mut work_rx) = TestWorkScheduler::new_pair();
let opts = ResolverOptions {
authority: "ignored".to_string(),
runtime: default_runtime(),
work_scheduler: work_scheduler.clone(),
};
let dns_opts = DnsOptions {
min_resolution_interval: Duration::from_millis(1),
resolving_timeout: get_resolving_timeout(),
backoff_config: BackoffConfig {
base_delay: Duration::from_millis(1),
multiplier: 1.0,
jitter: 0.0,
max_delay: Duration::from_millis(1),
},
host: "localhost".to_string(),
port: 1234,
};
let dns_client = opts
.runtime
.get_dns_resolver(rt::ResolverOptions { server_addr: None })
.unwrap();
let mut resolver = DnsResolver::new(dns_client, opts, dns_opts);
let (mut channel_controller, mut update_rx) = TestChannelController::new_pair();
channel_controller.set_update_result(Err("test_error".to_string()));
for _ in 0..5 {
work_rx.recv().await.unwrap();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
}
channel_controller.set_update_result(Ok(()));
work_rx.recv().await.unwrap();
resolver.work(&mut channel_controller);
let update = update_rx.recv().await.unwrap();
assert!(update.endpoints.unwrap().len() > 1);
tokio::select! {
_ = work_rx.recv() => {
panic!("Received unexpected work request from resolver.");
}
_ = tokio::time::sleep(DEFAULT_TEST_SHORT_TIMEOUT) => {
println!("No event received from resolver.");
}
};
}