nacos-sdk 0.7.0

Nacos client in Rust.
Documentation
use std::{
    collections::HashMap,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    time::Duration,
};

use tokio::{sync::Mutex, time::sleep};
use tracing::{Instrument, debug, error, info, instrument, warn};

use crate::common::{
    cache::Cache,
    executor,
    remote::{
        generate_request_id,
        grpc::{NacosGrpcClient, message::GrpcResponseMessage},
    },
};

use super::{
    dto::ServiceInfo,
    message::{request::ServiceQueryRequest, response::QueryServiceResponse},
    observable::service_info_observable::ServiceInfoEmitter,
};

pub(crate) struct ServiceInfoUpdater {
    service_info_emitter: Arc<ServiceInfoEmitter>,
    cache: Arc<Cache<ServiceInfo>>,
    nacos_grpc_client: Arc<NacosGrpcClient>,
    task_map: Mutex<HashMap<String, ServiceInfoUpdateTask>>,
}

impl ServiceInfoUpdater {
    pub(crate) fn new(
        service_info_emitter: Arc<ServiceInfoEmitter>,
        cache: Arc<Cache<ServiceInfo>>,
        nacos_grpc_client: Arc<NacosGrpcClient>,
    ) -> Self {
        Self {
            service_info_emitter,
            cache,
            nacos_grpc_client,
            task_map: Mutex::new(HashMap::default()),
        }
    }

    #[instrument(fields(group_name = group_name, cluster = cluster), skip_all)]
    pub(crate) async fn schedule_update(
        &self,
        namespace: String,
        service_name: String,
        group_name: String,
        cluster: String,
    ) {
        let task_key = ServiceInfoUpdater::update_task_key(&service_name, &group_name, &cluster);
        let mut lock = self.task_map.lock().await;

        let is_exist = lock.contains_key(&task_key);
        if !is_exist {
            let update_task = ServiceInfoUpdateTask::new(
                service_name,
                namespace,
                group_name,
                cluster,
                self.cache.clone(),
                self.nacos_grpc_client.clone(),
                self.service_info_emitter.clone(),
            );
            update_task.start();

            let _ = lock.insert(task_key, update_task);
        }
    }

    #[instrument(fields(group_name = group_name, cluster = cluster), skip_all)]
    pub(crate) async fn stop_update(
        &self,
        service_name: String,
        group_name: String,
        cluster: String,
    ) {
        let task_key = ServiceInfoUpdater::update_task_key(&service_name, &group_name, &cluster);
        let mut lock = self.task_map.lock().await;
        let ret = lock.remove(&task_key);
        if let Some(task) = ret {
            task.stop();
        }
    }

    fn update_task_key(service_name: &str, group_name: &str, cluster: &str) -> String {
        let group_service_name = ServiceInfo::get_grouped_service_name(service_name, group_name);
        ServiceInfo::get_key(&group_service_name, cluster)
    }
}

struct ServiceInfoUpdateTask {
    running: Arc<AtomicBool>,
    service_name: String,
    namespace: String,
    group_name: String,
    cluster: String,
    cache: Arc<Cache<ServiceInfo>>,
    nacos_grpc_client: Arc<NacosGrpcClient>,
    service_info_emitter: Arc<ServiceInfoEmitter>,
}

impl ServiceInfoUpdateTask {
    const DEFAULT_DELAY: u64 = 1000;
    const DEFAULT_UPDATE_CACHE_TIME_MULTIPLE: u8 = 6;
    const MAX_FAILED: u8 = 6;

    fn new(
        service_name: String,
        namespace: String,
        group_name: String,
        cluster: String,
        cache: Arc<Cache<ServiceInfo>>,
        nacos_grpc_client: Arc<NacosGrpcClient>,
        service_info_emitter: Arc<ServiceInfoEmitter>,
    ) -> Self {
        Self {
            running: Arc::new(AtomicBool::new(false)),
            service_name,
            namespace,
            group_name,
            cluster,
            cache,
            nacos_grpc_client,
            service_info_emitter,
        }
    }

    fn start(&self) {
        let running = self.running.clone();
        if self.running.load(Ordering::Acquire) {
            return;
        }
        self.running.store(true, Ordering::Release);

        let cluster = self.cluster.clone();
        let group_name = self.group_name.clone();
        let namespace = self.namespace.clone();
        let service_name = self.service_name.clone();

        let cache = self.cache.clone();
        let grpc_client = self.nacos_grpc_client.clone();
        let service_info_emitter = self.service_info_emitter.clone();

        executor::spawn(async move {
            let mut delay_time = ServiceInfoUpdateTask::DEFAULT_DELAY;

            let mut last_refresh_time = u64::MAX;

            let mut failed_count = 0;

            let request = ServiceQueryRequest {
                cluster,
                group_name: Some(group_name),
                healthy_only: false,
                udp_port: 0,
                namespace: Some(namespace),
                service_name: Some(service_name),
                ..Default::default()
            };

            let log_tag = format!(
                "{}:{}:{}:{}",
                request.namespace.as_deref().unwrap_or_default(),
                request.group_name.as_deref().unwrap_or_default(),
                request.service_name.as_deref().unwrap_or_default(),
                request.cluster
            );

            info!("{log_tag}:ServiceInfoUpdateTask started");

            while running.load(Ordering::Acquire) {
                let delay_time_millis = Duration::from_millis(
                    (delay_time << failed_count).min(ServiceInfoUpdateTask::DEFAULT_DELAY * 60),
                );
                debug!("{log_tag}:ServiceInfoUpdateTask delay sleep {delay_time_millis:?}");
                sleep(delay_time_millis).await;

                if !running.load(Ordering::Acquire) {
                    warn!("{log_tag}:ServiceInfoUpdateTask has been already stopped!");
                    break;
                }

                debug!("{log_tag}:ServiceInfoUpdateTask refreshing");

                let service_info = {
                    let group_name = request.group_name.as_deref().unwrap_or_default();
                    let service_name = request.service_name.as_deref().unwrap_or_default();
                    let cluster = &request.cluster;

                    let grouped_name =
                        ServiceInfo::get_grouped_service_name(service_name, group_name);
                    let key = ServiceInfo::get_key(&grouped_name, cluster);
                    cache.get(&key).map(|data| data.clone())
                };

                let mut need_query_service_info = true;

                if let Some(service_info) = service_info {
                    let is_outdate = last_refresh_time >= (service_info.last_ref_time as u64);
                    if is_outdate {
                        need_query_service_info = true;
                    } else {
                        need_query_service_info = false;
                        last_refresh_time = service_info.last_ref_time as u64;
                    }
                }

                if !need_query_service_info {
                    warn!("{log_tag}:ServiceInfoUpdateTask don't need to refresh service info");
                    continue;
                }

                let request = ServiceQueryRequest {
                    request_id: Some(generate_request_id()),
                    ..request.clone()
                };

                let ret = grpc_client
                    .send_request::<ServiceQueryRequest, QueryServiceResponse>(request)
                    .in_current_span()
                    .await;
                let Ok(response) = ret else {
                    error!("{log_tag}:ServiceInfoUpdateTask occur an error: {ret:?}");
                    if failed_count < ServiceInfoUpdateTask::MAX_FAILED {
                        failed_count += 1;
                    }
                    continue;
                };
                debug!("{log_tag}:ServiceInfoUpdateTask query service info response: {response:?}");
                if !response.is_success() {
                    let result_code = response.result_code;
                    let error_code = response.error_code;
                    let ret_message = response.message.unwrap_or_default();
                    error!("{log_tag}:ServiceInfoUpdateTask query services failed: resultCode: {result_code}, errorCode:{error_code}, message:{ret_message}");
                    if failed_count < ServiceInfoUpdateTask::MAX_FAILED {
                        failed_count += 1;
                    }
                    continue;
                }
                let service_info = response.service_info;
                last_refresh_time = service_info.last_ref_time as u64;
                delay_time = (service_info.cache_millis
                    * (ServiceInfoUpdateTask::DEFAULT_UPDATE_CACHE_TIME_MULTIPLE as i64))
                    as u64;

                service_info_emitter.emit(service_info).in_current_span().await;

                failed_count = 0;
                debug!("{log_tag}:ServiceInfoUpdateTask finish");
            }

            warn!("{log_tag}:ServiceInfoUpdateTask is stopped");
        }.in_current_span());
    }

    fn stop(&self) {
        self.running.store(false, Ordering::Release);
    }
}