gateway_common/nacos/
mod.rs1use std::sync::Arc;
2
3use fusen_proc_macro::builder;
4use nacos_sdk::api::{
5 config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
6 error::Error,
7 props::ClientProps,
8};
9use tokio::sync::mpsc;
10use tracing::{error, info};
11
12use crate::{
13 config::{toml::get_toml_by_context, yaml::get_yaml_by_context, HotConfig},
14 error::BoxError,
15};
16
17#[builder]
18pub struct NacosConfig {
19 pub server_addr: String,
20 pub namespace: String,
21 pub group: Option<String>,
22 pub app_name: Option<String>,
23 pub username: String,
24 pub password: String,
25}
26
27#[derive(Clone)]
28pub struct NacosConfiguration {
29 config_service: Arc<Box<dyn ConfigService>>,
30 _config: Arc<NacosConfig>,
31}
32
33impl NacosConfiguration {
34 pub async fn init_nacos_configuration(
35 config: Arc<NacosConfig>,
36 ) -> Result<NacosConfiguration, Error> {
37 let mut client_props = ClientProps::new();
38 let app_name = config
39 .app_name
40 .as_ref()
41 .map_or("service".to_owned(), |e| e.to_owned());
42 client_props = client_props
43 .server_addr(config.server_addr.clone())
44 .namespace(config.namespace.clone())
45 .app_name(app_name.clone())
46 .auth_username(config.username.clone())
47 .auth_password(config.password.clone());
48 let builder = ConfigServiceBuilder::new(client_props);
49 let builder = if !config.username.is_empty() {
50 builder.enable_auth_plugin_http()
51 } else {
52 builder
53 };
54 Ok(NacosConfiguration {
55 config_service: Arc::new(Box::new(builder.build()?)),
56 _config: config,
57 })
58 }
59
60 pub async fn get_config<'a, T: serde::Deserialize<'a>>(
61 &self,
62 data_id: &str,
63 group: &str,
64 ) -> Result<T, BoxError> {
65 let config_response = self
66 .config_service
67 .get_config(data_id.to_owned(), group.to_owned())
68 .await?;
69 NacosConfiguration::config_build(config_response)
70 }
71
72 pub async fn get_receive_config<'a, T: serde::Deserialize<'a> + Send + 'static>(
73 &self,
74 data_id: &str,
75 group: &str,
76 ) -> Result<mpsc::Receiver<T>, BoxError> {
77 let ident: T = self.get_config(data_id, group).await?;
78 let (sender, receiver) = mpsc::channel(1);
79 sender
80 .send(ident)
81 .await
82 .map_err(|e| format!("get_receive_config error : {}", e))?;
83 let (config_listener, mut listener) = HotConfigChangeListener::new();
84 self.config_service
85 .add_listener(
86 data_id.to_owned(),
87 group.to_owned(),
88 Arc::new(config_listener),
89 )
90 .await?;
91 tokio::spawn(async move {
92 while let Some(response) = listener.recv().await {
93 if let Ok(ident) = NacosConfiguration::config_build::<T>(response) {
94 let _ = sender.send(ident).await;
95 }
96 }
97 });
98 Ok(receiver)
99 }
100
101 pub fn config_build<'a, T: serde::Deserialize<'a>>(
102 config_response: ConfigResponse,
103 ) -> Result<T, BoxError> {
104 match config_response.content_type().as_str() {
105 "toml" => get_toml_by_context(config_response.content()),
106 "yaml" => get_yaml_by_context(config_response.content()),
107 _type => Err(format!("not support {:?}", _type).into()),
108 }
109 }
110
111 pub async fn get_hot_config<'a, T: serde::Deserialize<'a> + HotConfig>(
112 &self,
113 data_id: &str,
114 group: &str,
115 ) -> Result<T, BoxError> {
116 let temp_ident: T = self.get_config(data_id, group).await?;
117 let mut ident: T = self.get_config(data_id, group).await?;
118 let (config_listener, receiver) = HotConfigChangeListener::new();
119 ident.build_hot_config(temp_ident, receiver)?;
120 self.config_service
121 .add_listener(
122 data_id.to_owned(),
123 group.to_owned(),
124 Arc::new(config_listener),
125 )
126 .await?;
127 Ok(ident)
128 }
129}
130
131pub struct HotConfigChangeListener {
132 sender: mpsc::Sender<nacos_sdk::api::config::ConfigResponse>,
133}
134impl HotConfigChangeListener {
135 pub fn new() -> (Self, mpsc::Receiver<nacos_sdk::api::config::ConfigResponse>) {
136 let (sender, receiver) = mpsc::channel(1);
137 (Self { sender }, receiver)
138 }
139}
140
141impl ConfigChangeListener for HotConfigChangeListener {
142 fn notify(&self, config_resp: nacos_sdk::api::config::ConfigResponse) {
143 let sender = self.sender.clone();
144 tokio::spawn(async move {
145 info!("Listener ConfigResponse Change : {}", config_resp);
146 if let Err(error) = sender.send(config_resp).await {
147 error!("listener error : {}", error);
148 }
149 });
150 }
151}