gateway_common/nacos/
mod.rs

1use std::sync::Arc;
2
3use fusen_proc_macro::builder;
4use nacos_sdk::api::{
5    config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
6    error::Error,
7    props::ClientProps,
8};
9use tokio::sync::mpsc;
10use tracing::{error, info};
11
12use crate::{
13    config::{toml::get_toml_by_context, yaml::get_yaml_by_context, HotConfig},
14    error::BoxError,
15};
16
17#[builder]
18pub struct NacosConfig {
19    pub server_addr: String,
20    pub namespace: String,
21    pub group: Option<String>,
22    pub app_name: Option<String>,
23    pub username: String,
24    pub password: String,
25}
26
27#[derive(Clone)]
28pub struct NacosConfiguration {
29    config_service: Arc<Box<dyn ConfigService>>,
30    _config: Arc<NacosConfig>,
31}
32
33impl NacosConfiguration {
34    pub async fn init_nacos_configuration(
35        config: Arc<NacosConfig>,
36    ) -> Result<NacosConfiguration, Error> {
37        let mut client_props = ClientProps::new();
38        let app_name = config
39            .app_name
40            .as_ref()
41            .map_or("service".to_owned(), |e| e.to_owned());
42        client_props = client_props
43            .server_addr(config.server_addr.clone())
44            .namespace(config.namespace.clone())
45            .app_name(app_name.clone())
46            .auth_username(config.username.clone())
47            .auth_password(config.password.clone());
48        let builder = ConfigServiceBuilder::new(client_props);
49        let builder = if !config.username.is_empty() {
50            builder.enable_auth_plugin_http()
51        } else {
52            builder
53        };
54        Ok(NacosConfiguration {
55            config_service: Arc::new(Box::new(builder.build()?)),
56            _config: config,
57        })
58    }
59
60    pub async fn get_config<'a, T: serde::Deserialize<'a>>(
61        &self,
62        data_id: &str,
63        group: &str,
64    ) -> Result<T, BoxError> {
65        let config_response = self
66            .config_service
67            .get_config(data_id.to_owned(), group.to_owned())
68            .await?;
69        NacosConfiguration::config_build(config_response)
70    }
71
72    pub async fn get_receive_config<'a, T: serde::Deserialize<'a> + Send + 'static>(
73        &self,
74        data_id: &str,
75        group: &str,
76    ) -> Result<mpsc::Receiver<T>, BoxError> {
77        let ident: T = self.get_config(data_id, group).await?;
78        let (sender, receiver) = mpsc::channel(1);
79        sender
80            .send(ident)
81            .await
82            .map_err(|e| format!("get_receive_config error : {}", e))?;
83        let (config_listener, mut listener) = HotConfigChangeListener::new();
84        self.config_service
85            .add_listener(
86                data_id.to_owned(),
87                group.to_owned(),
88                Arc::new(config_listener),
89            )
90            .await?;
91        tokio::spawn(async move {
92            while let Some(response) = listener.recv().await {
93                if let Ok(ident) = NacosConfiguration::config_build::<T>(response) {
94                    let _ = sender.send(ident).await;
95                }
96            }
97        });
98        Ok(receiver)
99    }
100
101    pub fn config_build<'a, T: serde::Deserialize<'a>>(
102        config_response: ConfigResponse,
103    ) -> Result<T, BoxError> {
104        match config_response.content_type().as_str() {
105            "toml" => get_toml_by_context(config_response.content()),
106            "yaml" => get_yaml_by_context(config_response.content()),
107            _type => Err(format!("not support {:?}", _type).into()),
108        }
109    }
110
111    pub async fn get_hot_config<'a, T: serde::Deserialize<'a> + HotConfig>(
112        &self,
113        data_id: &str,
114        group: &str,
115    ) -> Result<T, BoxError> {
116        let temp_ident: T = self.get_config(data_id, group).await?;
117        let mut ident: T = self.get_config(data_id, group).await?;
118        let (config_listener, receiver) = HotConfigChangeListener::new();
119        ident.build_hot_config(temp_ident, receiver)?;
120        self.config_service
121            .add_listener(
122                data_id.to_owned(),
123                group.to_owned(),
124                Arc::new(config_listener),
125            )
126            .await?;
127        Ok(ident)
128    }
129}
130
131pub struct HotConfigChangeListener {
132    sender: mpsc::Sender<nacos_sdk::api::config::ConfigResponse>,
133}
134impl HotConfigChangeListener {
135    pub fn new() -> (Self, mpsc::Receiver<nacos_sdk::api::config::ConfigResponse>) {
136        let (sender, receiver) = mpsc::channel(1);
137        (Self { sender }, receiver)
138    }
139}
140
141impl ConfigChangeListener for HotConfigChangeListener {
142    fn notify(&self, config_resp: nacos_sdk::api::config::ConfigResponse) {
143        let sender = self.sender.clone();
144        tokio::spawn(async move {
145            info!("Listener ConfigResponse Change : {}", config_resp);
146            if let Err(error) = sender.send(config_resp).await {
147                error!("listener error : {}", error);
148            }
149        });
150    }
151}