tin_nacos_wrapper/
client.rs1use crate::config::NacosConfig;
2use crate::error::{Error, Result};
3use crate::listener::{SimpleConfigChangeListener, SimpleInstanceChangeListener};
4use nacos_sdk::api::{
5 config::{ConfigService, ConfigServiceBuilder},
6 naming::{NamingService, NamingServiceBuilder, ServiceInstance},
7 props::ClientProps,
8};
9
10#[derive(Clone)]
12pub struct NacosClient {
13 client_config: NacosConfig,
14 config_service: ConfigService,
15 naming_service: NamingService,
16}
17
18impl NacosClient {
19 pub async fn init(
21 client_config: NacosConfig,
22 ) -> Result<NacosClient> {
23 let client_props = Self::create_client_props(
24 client_config.server_addr.clone(),
25 client_config.namespace.clone(),
26 client_config.app_name.clone(),
27 client_config.auth_username.clone(),
28 client_config.auth_password.clone(),
29 );
30 let config_service = Self::create_config_service(
31 client_props.clone(),
32 client_config.app_name.clone(),
33 client_config.group_name.clone(),
34 )
35 .await?;
36 let naming_service = Self::create_naming_service(client_props).await?;
37
38 let nacos_client = NacosClient {
39 client_config,
40 config_service,
41 naming_service,
42 };
43
44 nacos_client.register_self().await?;
45
46 Ok(nacos_client)
47 }
48
49 fn create_client_props(
51 server_addr: String,
52 namespace: String,
53 app_name: String,
54 auth_username: Option<String>,
55 auth_password: Option<String>,
56 ) -> ClientProps {
57 let mut client_props = ClientProps::new()
58 .server_addr(server_addr)
59 .namespace(namespace)
60 .app_name(app_name)
61 .remote_grpc_port(9848);
62
63 if let Some(username) = auth_username {
64 client_props = client_props.auth_username(username);
65 }
66 if let Some(password) = auth_password {
67 client_props = client_props.auth_password(password);
68 }
69
70 client_props
71 }
72
73 async fn create_config_service(
75 client_props: ClientProps,
76 data_id: String,
77 group_name: String,
78 ) -> Result<ConfigService> {
79 let config_service = ConfigServiceBuilder::new(client_props)
80 .enable_auth_plugin_http()
81 .build()?;
82
83 let listen = config_service
84 .add_listener(
85 data_id,
86 group_name,
87 std::sync::Arc::new(SimpleConfigChangeListener {}),
88 )
89 .await;
90
91 match listen {
92 Ok(_) => log::info!("成功添加Nacos配置监听器"),
93 Err(err) => log::error!("添加Nacos配置监听器失败,错误信息: {err:?}"),
94 }
95
96 Ok(config_service)
97 }
98
99 async fn create_naming_service(
101 client_props: ClientProps,
102 ) -> Result<NamingService> {
103 let naming_service = NamingServiceBuilder::new(client_props)
104 .enable_auth_plugin_http()
105 .build();
106
107 let naming_service = match naming_service {
108 Ok(service) => {
109 log::info!("Nacos服务注册与发现客户端构建成功");
110 service
111 }
112 Err(e) => {
113 log::error!("构建NamingService失败: {e}");
114 return Err(Error::Nacos(e));
115 }
116 };
117
118 Ok(naming_service)
119 }
120
121 pub async fn register_instance(
123 &self,
124 service_name: String,
125 group_name: Option<String>,
126 ip: String,
127 port: i32,
128 ) -> Result<()> {
129 let service_instance = ServiceInstance {
130 ip,
131 port,
132 ..Default::default()
133 };
134
135 let register_result = self
136 .naming_service
137 .register_instance(service_name.clone(), group_name, service_instance)
138 .await;
139
140 match register_result {
141 Ok(_) => {
142 log::info!("成功注册服务实例: {service_name}");
143 Ok(())
144 }
145 Err(err) => {
146 log::error!("注册服务实例 {service_name} 失败,错误信息: {err:?}");
147 Err(Error::Nacos(err))
148 }
149 }
150 }
151
152 pub async fn subscribe_service(
154 &self,
155 group_name: Option<String>,
156 service_name: String,
157 ) -> Result<()> {
158 let subscribe_result = self
159 .naming_service
160 .subscribe(
161 service_name.clone(),
162 group_name,
163 vec![],
164 std::sync::Arc::new(SimpleInstanceChangeListener {}),
165 )
166 .await;
167
168 match subscribe_result {
169 Ok(_) => {
170 log::info!("成功订阅服务 '{service_name}'");
171 Ok(())
172 }
173 Err(err) => {
174 log::error!("订阅服务 '{service_name}' 失败,错误信息: {err:?}");
175 Err(Error::Nacos(err))
176 }
177 }
178 }
179
180 async fn register_self(&self) -> Result<()> {
182 let service_instance = ServiceInstance {
183 ip: self.client_config.client_ip.clone(),
184 port: self.client_config.client_port,
185 ..Default::default()
186 };
187
188 let register_result = self
189 .naming_service
190 .register_instance(
191 self.client_config.app_name.clone(),
192 Some(self.client_config.group_name.clone()),
193 service_instance,
194 )
195 .await;
196
197 let service_name = self.client_config.app_name.clone();
198
199 match register_result {
200 Ok(_) => {
201 log::info!("成功注册服务实例: {service_name}");
202 Ok(())
203 }
204 Err(err) => {
205 log::error!("注册服务实例失败 - 服务名: {service_name}, 错误信息: {err:?}");
206 Err(Error::Nacos(err))
207 }
208 }
209 }
210
211 pub fn get_config_service(&self) -> &ConfigService {
213 &self.config_service
214 }
215
216 pub fn get_naming_service(&self) -> &NamingService {
218 &self.naming_service
219 }
220
221 pub fn get_config(&self) -> &NacosConfig {
223 &self.client_config
224 }
225}