use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use hyper_util::rt::TokioIo;
use std::path::PathBuf;
use tokio::net::UnixStream;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::ClientTlsConfig;
use tonic::transport::{Channel, Endpoint, Uri};
use tonic_health::pb::{
health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse,
};
use tower::service_fn;
use tracing::{error, instrument};
use super::interceptor::InjectTracingInterceptor;
#[derive(Clone)]
pub struct HealthClient {
client: HealthGRPCClient<InterceptedService<Channel, InjectTracingInterceptor>>,
}
impl HealthClient {
pub async fn new(addr: &str, client_tls_config: Option<ClientTlsConfig>) -> Result<Self> {
let channel = match client_tls_config {
Some(client_tls_config) => Channel::from_shared(addr.to_string())
.map_err(|_| Error::InvalidURI(addr.into()))?
.tls_config(client_tls_config)?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr, err);
})
.or_err(ErrorType::ConnectError)?,
None => Channel::from_shared(addr.to_string())
.map_err(|_| Error::InvalidURI(addr.into()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)
.connect()
.await
.inspect_err(|err| {
error!("connect to {} failed: {}", addr, err);
})
.or_err(ErrorType::ConnectError)?,
};
let client = HealthGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })
}
pub async fn new_unix(socket_path: PathBuf) -> Result<Self> {
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
let socket_path = socket_path.clone();
async move {
Ok::<_, std::io::Error>(TokioIo::new(
UnixStream::connect(socket_path.clone()).await?,
))
}
}))
.await
.inspect_err(|err| {
error!("connect failed: {}", err);
})
.or_err(ErrorType::ConnectError)?;
let client = HealthGRPCClient::with_interceptor(channel, InjectTracingInterceptor)
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX);
Ok(Self { client })
}
#[instrument(skip_all)]
pub async fn check(&self) -> Result<HealthCheckResponse> {
let request = Self::make_request(HealthCheckRequest {
service: "".to_string(),
});
let response = self.client.clone().check(request).await?;
Ok(response.into_inner())
}
#[instrument(skip_all)]
pub async fn check_service(&self, service: String) -> Result<HealthCheckResponse> {
let request = Self::make_request(HealthCheckRequest { service });
let response = self.client.clone().check(request).await?;
Ok(response.into_inner())
}
#[instrument(skip_all)]
pub async fn check_dfdaemon_download(&self) -> Result<HealthCheckResponse> {
self.check_service("dfdaemon.v2.DfdaemonDownload".to_string())
.await
}
#[instrument(skip_all)]
pub async fn check_dfdaemon_upload(&self) -> Result<HealthCheckResponse> {
self.check_service("dfdaemon.v2.DfdaemonUpload".to_string())
.await
}
fn make_request<T>(request: T) -> tonic::Request<T> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
request
}
}