use std::time::Duration;
use super::backoff::{BackOffStatus, BackOffWhenFail};
use http::{Request, Uri};
use hyper_openssl::client::legacy::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use openssl::{
error::ErrorStack,
pkey::PKey,
ssl::{SslConnector, SslMethod},
x509::X509,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::{channel::Change, Channel, Endpoint};
use tower::Service;
use tower::{balance::p2c::Balance, buffer::Buffer, discover::Change as TowerChange};
use crate::error::Result;
pub type SslConnectorBuilder = openssl::ssl::SslConnectorBuilder;
pub type OpenSslResult<T> = std::result::Result<T, ErrorStack>;
pub type TonicRequest = Request<tonic::body::Body>;
pub type Balanced<T> = Balance<T, TonicRequest>;
pub type OpenSslChannel =
Buffer<TonicRequest, <Balanced<OpenSslDiscover<Uri>> as Service<TonicRequest>>::Future>;
pub type OpenSslDiscover<K> = ReceiverStream<Result<TowerChange<K, BackOffWhenFail<Channel>>>>;
#[derive(Clone)]
pub struct OpenSslConnector(HttpsConnector<HttpConnector>);
impl std::fmt::Debug for OpenSslConnector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("OpenSslConnector").finish()
}
}
impl OpenSslConnector {
pub fn create_default() -> OpenSslResult<Self> {
let conf = OpenSslClientConfig::default();
conf.build()
}
}
#[cfg(feature = "tls")]
compile_error!(concat!(
"**You should only enable one of `tls` and `tls-openssl`.** Reason: ",
"For now, `tls-openssl` would take over the transport layer (sockets) to implement TLS based connection. ",
"As a result, once using with `tonic`'s internal TLS implementation (which based on `rustls`), ",
"we may create TLS tunnels over TLS tunnels or directly fail because of some sorts of misconfiguration.")
);
pub fn balanced_channel(
connector: OpenSslConnector,
) -> Result<(OpenSslChannel, Sender<Change<Uri, Endpoint>>)> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
let tls_conn = create_openssl_discover(connector, rx);
let balance = Balance::new(tls_conn);
let buffered = Buffer::new(balance, 1024);
Ok((buffered, tx))
}
fn create_openssl_connector(builder: SslConnectorBuilder) -> OpenSslResult<OpenSslConnector> {
let mut http = HttpConnector::new();
http.enforce_http(false);
let https = HttpsConnector::with_connector(http, builder)?;
Ok(OpenSslConnector(https))
}
fn create_openssl_discover<K: Send + 'static>(
connector: OpenSslConnector,
mut incoming: Receiver<Change<K, Endpoint>>,
) -> OpenSslDiscover<K> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
let fut = async move {
while let Some(x) = incoming.recv().await {
let r = async {
match x {
Change::Insert(name, e) => {
let chan = e.connect_with_connector_lazy(connector.clone().0);
Ok(TowerChange::Insert(
name,
BackOffWhenFail::new(
chan,
BackOffStatus::new(
Duration::from_secs(1),
Duration::from_secs(256),
),
),
))
}
Change::Remove(name) => Ok(TowerChange::Remove(name)),
}
}
.await;
if tx.send(r).await.is_err() {
return;
}
}
};
tokio::task::spawn(fut);
ReceiverStream::new(rx)
}
pub struct OpenSslClientConfig(OpenSslResult<SslConnectorBuilder>);
impl Default for OpenSslClientConfig {
fn default() -> Self {
let get_builder = || {
let mut b = SslConnector::builder(SslMethod::tls_client())?;
b.set_alpn_protos(b"\x02h2")?;
OpenSslResult::Ok(b)
};
Self(get_builder())
}
}
impl std::fmt::Debug for OpenSslClientConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("OpenSslClientConfig")
.field(&"callbacks")
.finish()
}
}
impl OpenSslClientConfig {
pub fn manually(self, f: impl FnOnce(&mut SslConnectorBuilder) -> OpenSslResult<()>) -> Self {
Self(self.0.and_then(|mut b| {
f(&mut b)?;
Ok(b)
}))
}
pub fn ca_cert_pem(self, s: &[u8]) -> Self {
if s.is_empty() {
return self;
}
self.manually(move |cb| {
let ca = X509::from_pem(s)?;
cb.cert_store_mut().add_cert(ca)?;
Ok(())
})
}
pub fn client_cert_pem_and_key(self, cert_pem: &[u8], key_pem: &[u8]) -> Self {
if cert_pem.is_empty() || key_pem.is_empty() {
return self;
}
self.manually(|cb| {
let client = X509::from_pem(cert_pem)?;
let client_key = PKey::private_key_from_pem(key_pem)?;
cb.set_certificate(&client)?;
cb.set_private_key(&client_key)?;
Ok(())
})
}
pub(crate) fn build(self) -> OpenSslResult<OpenSslConnector> {
self.0.and_then(create_openssl_connector)
}
}