danube-client 0.7.3

The async client for Danube Messaging Broker platform
Documentation
use crate::{
    auth_service::AuthService,
    connection_manager::{ConnectionManager, RpcConnection},
    errors::{DanubeError, Result},
    retry_manager::RetryManager,
};

use danube_core::proto::{
    health_check_client::HealthCheckClient, health_check_request::ClientType,
    health_check_response::ClientStatus, HealthCheckRequest,
};
use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering},
    Arc,
};
use tokio::time::{sleep, Duration};
use tonic::transport::Uri;
use tracing::warn;

/// Interval between health check pings to the broker.
const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;

// HealthCheckService is used to validate that the producer/consumer are still served by the connected broker
#[derive(Debug, Clone)]
pub(crate) struct HealthCheckService {
    cnx_manager: Arc<ConnectionManager>,
    auth_service: AuthService,
    // unique identifier for every request sent by LookupService
    request_id: Arc<AtomicU64>,
}

impl HealthCheckService {
    pub fn new(cnx_manager: Arc<ConnectionManager>, auth_service: AuthService) -> Self {
        HealthCheckService {
            cnx_manager,
            auth_service,
            request_id: Arc::new(AtomicU64::new(0)),
        }
    }

    pub(crate) async fn start_health_check(
        &self,
        connect_url: &Uri,
        broker_addr: &Uri,
        proxy: bool,
        client_type: ClientType,
        client_id: u64,
        stop_signal: Arc<AtomicBool>,
    ) -> Result<()> {
        let grpc_cnx = self
            .cnx_manager
            .get_connection(connect_url, connect_url)
            .await?;
        let stop_signal = Arc::clone(&stop_signal);
        let request_id = Arc::clone(&self.request_id);
        let api_key = self.cnx_manager.connection_options.api_key.clone();
        let connect_url = connect_url.clone();
        let broker_addr = broker_addr.clone();
        let auth_service = self.auth_service.clone();
        tokio::spawn(async move {
            loop {
                if stop_signal.load(Ordering::Relaxed) {
                    break;
                }
                let stop_signal_clone = Arc::clone(&stop_signal);
                let request_id_clone = Arc::clone(&request_id);
                let grpc_cnx_clone = Arc::clone(&grpc_cnx);
                if let Err(e) = HealthCheckService::health_check(
                    request_id_clone,
                    grpc_cnx_clone,
                    client_type,
                    client_id,
                    stop_signal_clone,
                    api_key.as_deref(),
                    &connect_url,
                    &broker_addr,
                    proxy,
                    auth_service.clone(),
                )
                .await
                {
                    warn!("Error in health check: {:?}", e);
                    break;
                }
                sleep(Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS)).await;
            }
        });
        Ok(())
    }

    async fn health_check(
        request_id: Arc<AtomicU64>,
        grpc_cnx: Arc<RpcConnection>,
        client_type: ClientType,
        client_id: u64,
        stop_signal: Arc<AtomicBool>,
        api_key: Option<&str>,
        connect_url: &Uri,
        broker_addr: &Uri,
        proxy: bool,
        auth_service: AuthService,
    ) -> Result<()> {
        let health_request = HealthCheckRequest {
            request_id: request_id.fetch_add(1, Ordering::SeqCst),
            client: client_type as i32,
            id: client_id,
        };

        let mut request = tonic::Request::new(health_request);

        auth_service
            .insert_token_if_needed(api_key, &mut request, connect_url)
            .await?;
        RetryManager::insert_proxy_header(&mut request, broker_addr, proxy);

        let mut client = HealthCheckClient::new(grpc_cnx.grpc_cnx.clone());

        match client.health_check(request).await {
            Ok(response) => {
                if response.get_ref().status == ClientStatus::Close as i32 {
                    warn!("Received stop signal from broker in health check response");
                    stop_signal.store(true, Ordering::Relaxed);
                    Ok(())
                } else {
                    Ok(())
                }
            }
            Err(status) => Err(DanubeError::FromStatus(status)),
        }
    }
}