use crate::{
auth_service::AuthService,
connection_manager::{ConnectionManager, RpcConnection},
errors::{DanubeError, Result},
retry_manager::RetryManager,
};
use danube_core::proto::{
health_check_client::HealthCheckClient, health_check_request::ClientType,
health_check_response::ClientStatus, HealthCheckRequest,
};
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use tokio::time::{sleep, Duration};
use tonic::transport::Uri;
use tracing::warn;
const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
#[derive(Debug, Clone)]
pub(crate) struct HealthCheckService {
cnx_manager: Arc<ConnectionManager>,
auth_service: AuthService,
request_id: Arc<AtomicU64>,
}
impl HealthCheckService {
pub fn new(cnx_manager: Arc<ConnectionManager>, auth_service: AuthService) -> Self {
HealthCheckService {
cnx_manager,
auth_service,
request_id: Arc::new(AtomicU64::new(0)),
}
}
pub(crate) async fn start_health_check(
&self,
connect_url: &Uri,
broker_addr: &Uri,
proxy: bool,
client_type: ClientType,
client_id: u64,
stop_signal: Arc<AtomicBool>,
) -> Result<()> {
let grpc_cnx = self
.cnx_manager
.get_connection(connect_url, connect_url)
.await?;
let stop_signal = Arc::clone(&stop_signal);
let request_id = Arc::clone(&self.request_id);
let api_key = self.cnx_manager.connection_options.api_key.clone();
let connect_url = connect_url.clone();
let broker_addr = broker_addr.clone();
let auth_service = self.auth_service.clone();
tokio::spawn(async move {
loop {
if stop_signal.load(Ordering::Relaxed) {
break;
}
let stop_signal_clone = Arc::clone(&stop_signal);
let request_id_clone = Arc::clone(&request_id);
let grpc_cnx_clone = Arc::clone(&grpc_cnx);
if let Err(e) = HealthCheckService::health_check(
request_id_clone,
grpc_cnx_clone,
client_type,
client_id,
stop_signal_clone,
api_key.as_deref(),
&connect_url,
&broker_addr,
proxy,
auth_service.clone(),
)
.await
{
warn!("Error in health check: {:?}", e);
break;
}
sleep(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS)).await;
}
});
Ok(())
}
async fn health_check(
request_id: Arc<AtomicU64>,
grpc_cnx: Arc<RpcConnection>,
client_type: ClientType,
client_id: u64,
stop_signal: Arc<AtomicBool>,
api_key: Option<&str>,
connect_url: &Uri,
broker_addr: &Uri,
proxy: bool,
auth_service: AuthService,
) -> Result<()> {
let health_request = HealthCheckRequest {
request_id: request_id.fetch_add(1, Ordering::SeqCst),
client: client_type as i32,
id: client_id,
};
let mut request = tonic::Request::new(health_request);
auth_service
.insert_token_if_needed(api_key, &mut request, connect_url)
.await?;
RetryManager::insert_proxy_header(&mut request, broker_addr, proxy);
let mut client = HealthCheckClient::new(grpc_cnx.grpc_cnx.clone());
match client.health_check(request).await {
Ok(response) => {
if response.get_ref().status == ClientStatus::Close as i32 {
warn!("Received stop signal from broker in health check response");
stop_signal.store(true, Ordering::Relaxed);
Ok(())
} else {
Ok(())
}
}
Err(status) => Err(DanubeError::FromStatus(status)),
}
}
}