use core::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use std::hash::Hash;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;
use crate::attributes::Attributes;
use crate::byte_str::ByteStr;
use crate::client::service_config::ServiceConfig;
use crate::rt::GrpcRuntime;
mod backoff;
mod registry;
#[cfg(test)]
pub(crate) mod test_utils;
pub(crate) mod dns;
#[cfg(unix)]
pub(crate) mod unix;
#[cfg(target_os = "linux")]
pub(crate) mod unix_abstract;
pub(crate) use registry::global_registry;
#[derive(Debug, Clone)]
pub(crate) struct Target {
url: Url,
}
impl FromStr for Target {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.parse::<Url>() {
Ok(url) => Ok(Target { url }),
Err(err) => Err(err.to_string()),
}
}
}
impl From<url::Url> for Target {
fn from(url: url::Url) -> Self {
Target { url }
}
}
impl Target {
pub fn scheme(&self) -> &str {
self.url.scheme()
}
pub fn authority_host(&self) -> &str {
self.url.host_str().unwrap_or("")
}
pub fn authority_port(&self) -> Option<u16> {
self.url.port()
}
pub fn authority_host_port(&self) -> String {
let host = self.authority_host();
let port = self.authority_port();
if let Some(port) = port {
format!("{host}:{port}")
} else {
host.to_owned()
}
}
pub fn path(&self) -> &str {
self.url.path()
}
}
impl Display for Target {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}://{}{}",
self.scheme(),
self.authority_host_port(),
self.path()
)
}
}
pub(crate) trait ResolverBuilder: Send + Sync {
fn build(&self, target: &Target, options: ResolverOptions) -> Box<dyn Resolver>;
fn scheme(&self) -> &str;
fn default_authority(&self, target: &Target) -> String {
let path = target.path();
path.strip_prefix("/").unwrap_or(path).to_string()
}
fn is_valid_uri(&self, uri: &Target) -> bool;
}
#[non_exhaustive]
pub(crate) struct ResolverOptions {
pub authority: String,
pub runtime: GrpcRuntime,
pub work_scheduler: Arc<dyn WorkScheduler>,
}
pub(crate) trait WorkScheduler: Send + Sync {
fn schedule_work(&self);
}
pub(crate) trait Resolver: Send + Sync {
fn resolve_now(&mut self);
fn work(&mut self, channel_controller: &mut dyn ChannelController);
}
pub(crate) trait ChannelController: Send + Sync {
fn update(&mut self, update: ResolverUpdate) -> Result<(), String>;
fn parse_service_config(&self, config: &str) -> Result<ServiceConfig, String>;
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub(crate) struct ResolverUpdate {
pub attributes: Attributes,
pub endpoints: Result<Vec<Endpoint>, String>,
pub service_config: Result<Option<ServiceConfig>, String>,
pub resolution_note: Option<String>,
}
impl Default for ResolverUpdate {
fn default() -> Self {
ResolverUpdate {
service_config: Ok(Default::default()),
attributes: Default::default(),
endpoints: Ok(Default::default()),
resolution_note: Default::default(),
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub(crate) struct Endpoint {
pub addresses: Vec<Address>,
pub attributes: Attributes,
}
impl Hash for Endpoint {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.addresses.hash(state);
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Default, PartialEq, Eq, Ord, PartialOrd)]
pub(crate) struct Address {
pub network_type: &'static str,
pub address: ByteStr,
pub attributes: Attributes,
}
impl Hash for Address {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.network_type.hash(state);
self.address.hash(state);
}
}
impl Display for Address {
#[allow(clippy::to_string_in_format_args)]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.network_type, self.address.to_string())
}
}
pub(crate) static TCP_IP_NETWORK_TYPE: &str = "tcp";
pub(crate) static UNIX_NETWORK_TYPE: &str = "unix";
struct NopResolver {
pub update: Option<ResolverUpdate>,
}
impl Resolver for NopResolver {
fn resolve_now(&mut self) {}
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
if let Some(update) = self.update.take() {
let _ = channel_controller.update(update);
}
}
}
impl NopResolver {
fn new_with_err(err: String, options: ResolverOptions) -> Box<dyn Resolver> {
options.work_scheduler.schedule_work();
Box::new(NopResolver {
update: Some(ResolverUpdate {
endpoints: Err(err),
..Default::default()
}),
})
}
fn new_with_addr(addr: Address, options: ResolverOptions) -> Box<dyn Resolver> {
options.work_scheduler.schedule_work();
Box::new(NopResolver {
update: Some(ResolverUpdate {
endpoints: Ok(vec![Endpoint {
addresses: vec![addr],
..Default::default()
}]),
..Default::default()
}),
})
}
}
#[cfg(test)]
mod test {
use super::Target;
use crate::attributes::Attributes;
use crate::byte_str::ByteStr;
use crate::client::name_resolution::Address;
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
#[test]
pub fn parse_target() {
#[derive(Default)]
struct TestCase {
input: &'static str,
want_scheme: &'static str,
want_host: &'static str,
want_port: Option<u16>,
want_host_port: &'static str,
want_path: &'static str,
want_str: &'static str,
}
let test_cases = vec![
TestCase {
input: "dns:///grpc.io",
want_scheme: "dns",
want_host_port: "",
want_host: "",
want_port: None,
want_path: "/grpc.io",
want_str: "dns:///grpc.io",
},
TestCase {
input: "dns://8.8.8.8:53/grpc.io/docs",
want_scheme: "dns",
want_host_port: "8.8.8.8:53",
want_host: "8.8.8.8",
want_port: Some(53),
want_path: "/grpc.io/docs",
want_str: "dns://8.8.8.8:53/grpc.io/docs",
},
TestCase {
input: "unix:path/to/file",
want_scheme: "unix",
want_host_port: "",
want_host: "",
want_port: None,
want_path: "path/to/file",
want_str: "unix://path/to/file",
},
TestCase {
input: "unix:///run/containerd/containerd.sock",
want_scheme: "unix",
want_host_port: "",
want_host: "",
want_port: None,
want_path: "/run/containerd/containerd.sock",
want_str: "unix:///run/containerd/containerd.sock",
},
];
for tc in test_cases {
let target: Target = tc.input.parse().unwrap();
assert_eq!(target.scheme(), tc.want_scheme);
assert_eq!(target.authority_host(), tc.want_host);
assert_eq!(target.authority_port(), tc.want_port);
assert_eq!(target.authority_host_port(), tc.want_host_port);
assert_eq!(target.path(), tc.want_path);
assert_eq!(&target.to_string(), tc.want_str);
}
}
#[test]
fn test_address_hashmap_asymmetric_collision() {
let addr_base = "127.0.0.1:8080";
let addr_a = Address {
network_type: "tcp",
address: ByteStr::from(addr_base.to_string()),
attributes: Attributes::new(),
};
let attrs = Attributes::new().add("metadata_payload".to_string());
let addr_b = Address {
network_type: "tcp",
address: ByteStr::from(addr_base.to_string()),
attributes: attrs,
};
let mut hasher_a = DefaultHasher::new();
let mut hasher_b = DefaultHasher::new();
addr_a.hash(&mut hasher_a);
addr_b.hash(&mut hasher_b);
assert_eq!(
hasher_a.finish(),
hasher_b.finish(),
"Identical Address hashes must route to the same HashMap memory bucket!"
);
let hash_a = hasher_a.finish();
let addr_diff_net = Address {
network_type: "uds",
address: ByteStr::from(addr_base.to_string()),
attributes: Attributes::new(),
};
let mut hasher_diff_net = DefaultHasher::new();
addr_diff_net.hash(&mut hasher_diff_net);
assert_ne!(
hash_a,
hasher_diff_net.finish(),
"Changing network_type must change the hash!"
);
let addr_diff_addr = Address {
network_type: "tcp",
address: ByteStr::from("127.0.0.1:8081".to_string()),
attributes: Attributes::new(),
};
let mut hasher_diff_addr = DefaultHasher::new();
addr_diff_addr.hash(&mut hasher_diff_addr);
assert_ne!(
hash_a,
hasher_diff_addr.finish(),
"Changing address must change the hash!"
);
let mut map = HashMap::new();
map.insert(addr_a.clone(), "subchannel_a");
assert!(
map.remove(&addr_b).is_none(),
"HashMap incorrectly matched key despite mismatched attributes!"
);
assert_eq!(map.remove(&addr_a), Some("subchannel_a"));
assert!(map.is_empty());
}
}