fusen-common 0.8.12

fusen-common
Documentation
use crate::{
    config::{ConfigManager, ConfigResponse, HotConfigChangeListener, config_build},
    nacos::NacosConfig,
};
use nacos_sdk::api::{
    config::{ConfigChangeListener, ConfigService, ConfigServiceBuilder},
    error::Error,
    props::ClientProps,
};
use std::sync::Arc;
use tracing::{debug, error};

#[derive(Clone)]
pub struct NacosConfiguration {
    config_service: Arc<ConfigService>,
}

impl NacosConfiguration {
    pub async fn init_nacos_configuration(
        config: Arc<NacosConfig>,
    ) -> Result<NacosConfiguration, Error> {
        let mut client_props = ClientProps::new();
        client_props = client_props
            .server_addr(config.server_addr.clone())
            .namespace(
                config
                    .namespace
                    .as_ref()
                    .map_or(Default::default(), |e| e.clone()),
            )
            .auth_username(
                config
                    .username
                    .as_ref()
                    .map_or(Default::default(), |e| e.clone()),
            )
            .auth_password(
                config
                    .password
                    .as_ref()
                    .map_or(Default::default(), |e| e.clone()),
            );
        let builder = ConfigServiceBuilder::new(client_props.clone());
        let builder = if config.username.is_some() {
            builder.enable_auth_plugin_http()
        } else {
            builder
        };
        let config_service = Arc::new(builder.build()?);
        Ok(NacosConfiguration { config_service })
    }

    pub async fn get_config<T: serde::de::DeserializeOwned>(
        &self,
        data_id: &str,
        group: &str,
    ) -> Result<T, crate::error::Error> {
        let nacos_config_response = self
            .config_service
            .get_config(data_id.to_owned(), group.to_owned())
            .await
            .map_err(|error| crate::error::Error::ConfigError(Box::new(error)))?;
        config_build(ConfigResponse {
            content_type: nacos_config_response.content_type().to_owned(),
            content: nacos_config_response.content().to_owned(),
        })
    }

    pub async fn get_config_manager<T: serde::de::DeserializeOwned + Send + Sync + 'static>(
        &self,
        data_id: &str,
        group: &str,
    ) -> Result<ConfigManager<T>, crate::error::Error> {
        let temp_ident: T = self.get_config(data_id, group).await?;
        let (config_listener, receiver) = HotConfigChangeListener::new();
        let config_manager = ConfigManager::build_hot_config(temp_ident, receiver)?;
        self.config_service
            .add_listener(
                data_id.to_owned(),
                group.to_owned(),
                Arc::new(config_listener),
            )
            .await
            .map_err(|error| crate::error::Error::ConfigError(Box::new(error)))?;
        Ok(config_manager)
    }
}

impl ConfigChangeListener for HotConfigChangeListener {
    fn notify(&self, config_resp: nacos_sdk::api::config::ConfigResponse) {
        let sender = self.sender.clone();
        tokio::spawn(async move {
            debug!("Listener ConfigResponse Change : {}", config_resp);
            if let Err(error) = sender
                .send(ConfigResponse {
                    content_type: config_resp.content_type().to_owned(),
                    content: config_resp.content().to_owned(),
                })
                .await
            {
                error!("listener error : {}", error);
            }
        });
    }
}