nacos-sdk 0.8.0

Nacos client in Rust.
Documentation
use std::{
    collections::HashMap,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    time::Duration,
};

use async_trait::async_trait;
use tokio::{
    sync::RwLock,
    time::{self, sleep},
};
use tracing::{Instrument, debug, debug_span, instrument};

use crate::api::error::Result;
use crate::common::{executor, remote::grpc::NacosGrpcClient};

pub(crate) mod automatic_request;

pub(crate) struct RedoTaskExecutor {
    map: Arc<RwLock<HashMap<String, Arc<dyn RedoTask>>>>,
    id: String,
}

impl RedoTaskExecutor {
    pub(crate) fn new(id: String) -> Self {
        let executor = Self {
            id,
            map: Arc::new(RwLock::new(HashMap::new())),
        };
        executor.start_schedule();
        executor
    }

    fn start_schedule(&self) {
        let _span_enter = debug_span!("RedoTaskExecutor-task", id = self.id).entered();
        debug!("start schedule automatic request task.");
        let map = self.map.clone();
        executor::spawn(
            async move {
                sleep(Duration::from_millis(3000)).await;
                let mut interval = time::interval(Duration::from_millis(3000));
                loop {
                    interval.tick().await;

                    let map = map.read().await;
                    let active_tasks: Vec<Arc<dyn RedoTask>> = map
                        .iter()
                        .filter(|(_, v)| v.is_active())
                        .map(|(_, v)| v.clone())
                        .collect();
                    if !active_tasks.is_empty() {
                        debug!("automatic request task triggered!");
                    }
                    for task in active_tasks {
                        debug!("automatic request task: {:?}", task.task_key());
                        task.run().await;
                    }
                }
            }
            .in_current_span(),
        );
    }

    #[instrument(skip_all)]
    pub(crate) async fn add_task(&self, task: Arc<dyn RedoTask>) {
        let mut map = self.map.write().await;
        let task_key = task.task_key();
        map.insert(task_key, task);
    }

    #[instrument(fields(task_key = task_key), skip_all)]
    pub(crate) async fn remove_task(&self, task_key: &str) {
        let mut map = self.map.write().await;
        map.remove(task_key);
    }

    #[instrument(fields(client_id = &self.id), skip_all)]
    pub(crate) async fn on_grpc_client_reconnect(&self) {
        let map = self.map.read().await;
        for (_, v) in map.iter() {
            v.active()
        }
    }

    #[instrument(fields(client_id = &self.id), skip_all)]
    pub(crate) async fn on_grpc_client_disconnect(&self) {
        let map = self.map.read().await;
        for (_, v) in map.iter() {
            v.frozen()
        }
    }
}

#[async_trait]
pub(crate) trait RedoTask: Send + Sync + 'static {
    fn task_key(&self) -> String;

    fn frozen(&self);

    fn active(&self);

    fn is_active(&self) -> bool;

    async fn run(&self);
}

type CallBack = Box<dyn Fn(Result<()>) + Send + Sync + 'static>;

#[async_trait]
pub(crate) trait AutomaticRequest: Send + Sync + 'static {
    async fn run(&self, grpc_client: Arc<NacosGrpcClient>, call_back: CallBack);

    fn name(&self) -> String;
}

pub(crate) struct NamingRedoTask {
    active: Arc<AtomicBool>,
    automatic_request: Arc<dyn AutomaticRequest>,
    grpc_client: Arc<NacosGrpcClient>,
}

impl NamingRedoTask {
    pub(crate) fn new(
        grpc_client: Arc<NacosGrpcClient>,
        automatic_request: Arc<dyn AutomaticRequest>,
    ) -> Self {
        Self {
            active: Arc::new(AtomicBool::new(false)),
            automatic_request,
            grpc_client,
        }
    }
}

#[async_trait]
impl RedoTask for NamingRedoTask {
    fn task_key(&self) -> String {
        self.automatic_request.name()
    }

    fn frozen(&self) {
        self.active.store(false, Ordering::Release)
    }

    fn active(&self) {
        self.active.store(true, Ordering::Release)
    }

    fn is_active(&self) -> bool {
        self.active.load(Ordering::Acquire)
    }

    #[instrument(skip_all)]
    async fn run(&self) {
        let active = self.active.clone();
        self.automatic_request
            .run(
                self.grpc_client.clone(),
                Box::new(move |ret| {
                    if ret.is_ok() {
                        active.store(false, Ordering::Release);
                    } else {
                        active.store(true, Ordering::Release);
                    }
                }),
            )
            .await;
    }
}