use cpool::Error;
use cpool::backend::Address;
use hyper::Uri;
use hyper::http::uri::InvalidUri;
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
pub(crate) fn new_uri(mut base_url: String, uri: &Uri) -> Result<Uri, InvalidUri> {
base_url.push_str(uri.path());
if let Some(query) = uri.query() {
base_url.push('?');
base_url.push_str(query);
}
base_url.parse()
}
pub(crate) fn http_address(address: &Address) -> Result<Address, Error> {
if let Address::Ori(ori) = address {
let uri: Uri = ori.parse().map_err(|e| Error::from_other(e))?;
Ok(Address::from(format!(
"{}:{}",
uri.host().unwrap(),
uri.port_u16().unwrap_or(80)
)))
} else {
Ok(address.clone())
}
}
pub(crate) fn https_address(address: &Address) -> Result<Address, Error> {
if let Address::Ori(ori) = address {
let uri: Uri = ori.parse().map_err(|e| Error::from_other(e))?;
Ok(Address::from(format!(
"{}:{}",
uri.host().unwrap(),
uri.port_u16().unwrap_or(443)
)))
} else {
Ok(address.clone())
}
}
fn tls_client_config() -> tokio_rustls::rustls::ClientConfig {
let mut root_store = tokio_rustls::rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = tokio_rustls::rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
config
}
pub(crate) static HTTP1_TLS_CLIENT_CFG: LazyLock<Arc<tokio_rustls::rustls::ClientConfig>> =
LazyLock::new(|| Arc::new(tls_client_config()));
pub(crate) static HTTP2_TLS_CLIENT_CFG: LazyLock<Arc<tokio_rustls::rustls::ClientConfig>> =
LazyLock::new(|| {
let mut config = tls_client_config();
config.alpn_protocols = vec![
b"h2".to_vec(),
b"http/1.1".to_vec(),
b"grpc".to_vec(),
b"websocket".to_vec(),
];
Arc::new(config)
});
pub(crate) async fn run_conn<T, C: Future<Output = Result<(), hyper::Error>> + Send + 'static>(
ref_cnt: Arc<T>,
c: C,
ka: Option<Duration>,
) -> Result<(), hyper::Error> {
let r = if let Some(ka) = ka {
tokio::select! {
r = c => {
r
},
r = async {
let mut last = None;
loop {
if Arc::strong_count(&ref_cnt) <= 2 {
if last.is_none() {
last = Some(std::time::Instant::now());
}
if last.as_ref().unwrap().elapsed() > ka {
break;
}
} else {
last = None;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
Ok(())
} => {
r
}
}
} else {
c.await
};
r
}
pub(crate) async fn create_tls_tcp(
tcp: tokio::net::TcpStream,
addrs: &Address,
cfg: Arc<tokio_rustls::rustls::ClientConfig>,
) -> Result<tokio_rustls::client::TlsStream<tokio::net::TcpStream>, Error> {
let name = server_name(addrs)?;
let conn = tokio_rustls::TlsConnector::from(cfg);
conn.connect(name, tcp)
.await
.map_err(|e| Error::from_other(e))
}
pub(crate) async fn create_tcp_stream(addrs: &Address) -> Result<tokio::net::TcpStream, Error> {
let a = to_socket_addrs(addrs).await?;
tokio::net::TcpStream::connect(&a[..])
.await
.map_err(|e| Error::from_other(e))
}
pub(crate) async fn create_http_stream(addrs: &Address) -> Result<tokio::net::TcpStream, Error> {
let addrs = http_address(addrs)?;
create_tcp_stream(&addrs).await
}
pub(crate) async fn create_https_stream(addrs: &Address) -> Result<tokio::net::TcpStream, Error> {
let addrs = https_address(addrs)?;
create_tcp_stream(&addrs).await
}
pub(crate) fn server_name(
addrs: &Address,
) -> Result<tokio_rustls::rustls::pki_types::ServerName<'static>, Error> {
match addrs {
Address::Ori(ori) => {
let uri: hyper::Uri = ori.parse().map_err(|e| Error::from_other(e))?;
tokio_rustls::rustls::pki_types::ServerName::try_from(
uri.host().unwrap_or("").to_string(),
)
.map_err(|e| Error::from_other(e))
}
Address::Addr(addr) => Ok(tokio_rustls::rustls::pki_types::ServerName::from(addr.ip())),
}
}
pub(crate) async fn to_socket_addrs(addr: &Address) -> io::Result<Vec<SocketAddr>> {
async fn inner<T: tokio::net::ToSocketAddrs>(host: &T) -> io::Result<Vec<SocketAddr>> {
tokio::net::lookup_host(host)
.await
.map(|a| a.into_iter().collect())
}
match addr {
Address::Ori(ori) => inner(ori).await,
Address::Addr(addr) => inner(addr).await,
}
}
pub(crate) fn base_url(tls: bool, address: &Address) -> String {
if tls {
https_scheme_host(address)
} else {
http_scheme_host(address)
}
}
pub(crate) fn http_scheme_host(address: &Address) -> String {
match address {
Address::Addr(a) => {
format!("http://{}:{}", a.ip().to_string(), a.port())
}
Address::Ori(ori) => {
let uri: Result<hyper::Uri, InvalidUri> = ori.parse();
if let Ok(uri) = uri {
format!(
"http://{}:{}",
uri.host().unwrap(),
uri.port_u16().unwrap_or(80)
)
} else {
String::new()
}
}
}
}
pub(crate) fn https_scheme_host(address: &Address) -> String {
match address {
Address::Addr(a) => {
format!("https://{}:{}", a.ip().to_string(), a.port())
}
Address::Ori(ori) => {
let uri: Result<hyper::Uri, InvalidUri> = ori.parse();
if let Ok(uri) = uri {
format!(
"https://{}:{}",
uri.host().unwrap(),
uri.port_u16().unwrap_or(443)
)
} else {
String::new()
}
}
}
}