1use crate::config::ShenYuConfig;
19use crate::error::ShenYuError;
20use crate::model::{EventType, UriInfo};
21use dashmap::DashMap;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::io::{Error, ErrorKind};
25use std::net::IpAddr;
26use tracing::{error, info, warn};
27use ureq::OrAnyStatus;
28
29pub const REGISTER_META_DATA_SUFFIX: &str = "/shenyu-client/register-metadata";
31
32pub const REGISTER_URI_SUFFIX: &str = "/shenyu-client/register-uri";
34
35pub const REGISTER_DISCOVERY_CONFIG_SUFFIX: &str = "/shenyu-client/register-discoveryConfig";
37
38pub const REGISTER_OFFLINE_SUFFIX: &str = "/shenyu-client/offline";
40
41pub const PLATFORM_LOGIN_SUFFIX: &str = "/platform/login";
43
44pub const SYS_DEFAULT_NAMESPACE_ID: &str = "649330b6-c2d7-4edc-be8e-8a54df9eb385";
46
47#[derive(Debug)]
49#[warn(dead_code)]
50pub struct ShenyuClient {
51 pub(super) headers: DashMap<String, String>,
52 app_name: String,
53 env: ShenYuConfig,
54 host: Option<String>,
55 port: u16,
56 namespace_ids: Vec<String>,
57 gateway_base_urls: Vec<String>,
58 register_meta_data_path_list: Vec<String>,
59 register_uri_list: Vec<String>,
60 register_token_servers: Vec<String>,
61 register_discover_config_servers: Vec<String>,
62 register_offline_servers: Vec<String>,
63 uri_infos: Vec<UriInfo>,
64}
65
66impl ShenyuClient {
67 pub fn register(&self) -> Result<(), Error> {
69 if let Ok(token) = self.get_register_token() {
70 info!(
71 "[SUCCESS], get register token success, register token: {:#?}",
72 &token
73 );
74 _ = self
75 .headers
76 .insert("X-Access-Token".to_string(), token.to_string());
77 } else {
78 error!("Can't get register token");
79 }
80 self.register_all_metadata(true);
81 self.register_uri();
82 self.register_discovery_config();
83 Ok(())
84 }
85
86 pub fn new(
88 config: ShenYuConfig,
89 app_name: &str,
90 uri_infos: &[UriInfo],
91 port: u16,
92 ) -> Result<Self, String> {
93 let headers = DashMap::new();
94 _ = headers.insert(
95 "Content-Type".to_string(),
96 "application/json;charset=UTF-8".to_string(),
97 );
98 let namespace_ids: Vec<String> = config
99 .register
100 .namespace_id
101 .clone()
102 .filter(|x| !x.is_empty())
103 .map_or(vec![SYS_DEFAULT_NAMESPACE_ID.to_string()], |x| {
104 x.split(';').map(ToString::to_string).collect()
105 });
106
107 let mut client = ShenyuClient {
108 headers,
109 app_name: app_name.to_string(),
110 env: config,
111 host: None,
112 port,
113 namespace_ids,
114 gateway_base_urls: vec![],
115 register_meta_data_path_list: vec![],
116 register_uri_list: vec![],
117 register_token_servers: vec![],
118 register_discover_config_servers: vec![],
119 register_offline_servers: vec![],
120 uri_infos: uri_infos.to_owned(),
121 };
122 client.set_up_gateway_service_url()?;
123 Ok(client)
124 }
125}
126
127impl ShenyuClient {
128 fn set_up_gateway_service_url(&mut self) -> Result<(), String> {
129 self.gateway_base_urls = self
130 .env
131 .register
132 .servers
133 .split(',')
134 .map(ToString::to_string)
135 .collect();
136 if self.gateway_base_urls.is_empty() {
137 return Err(String::from("shenyu.register.servers is empty"));
138 }
139
140 self.register_meta_data_path_list = self
141 .gateway_base_urls
142 .iter()
143 .map(|url| format!("{url}{REGISTER_META_DATA_SUFFIX}"))
144 .collect();
145 self.register_uri_list = self
146 .gateway_base_urls
147 .iter()
148 .map(|url| format!("{url}{REGISTER_URI_SUFFIX}"))
149 .collect();
150 self.register_token_servers = self
151 .gateway_base_urls
152 .iter()
153 .map(|url| format!("{url}{PLATFORM_LOGIN_SUFFIX}"))
154 .collect();
155 self.register_discover_config_servers = self
156 .gateway_base_urls
157 .iter()
158 .map(|url| format!("{url}{REGISTER_DISCOVERY_CONFIG_SUFFIX}"))
159 .collect();
160 self.register_offline_servers = self
161 .gateway_base_urls
162 .iter()
163 .map(|url| format!("{url}{REGISTER_OFFLINE_SUFFIX}"))
164 .collect();
165
166 #[allow(unused_assignments)]
167 let mut host = None;
168 #[cfg(not(target_os = "macos"))]
169 {
170 host = match local_ip_address::local_ip() {
171 Ok(IpAddr::V4(ipv4)) => Some(IpAddr::V4(ipv4)),
172 Ok(IpAddr::V6(ipv6)) => Some(IpAddr::from(ipv6.to_ipv4().unwrap())),
173 _ => None,
174 };
175 }
176 #[cfg(target_os = "macos")]
177 {
178 use local_ip_address::macos;
179 for (_, ipaddr) in macos::list_afinet_netifas().unwrap() {
180 if IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)).eq(&ipaddr) {
181 continue;
182 }
183 host = match ipaddr {
184 IpAddr::V4(ipv4) => Some(IpAddr::from(ipv4)),
185 IpAddr::V6(_) => continue,
186 };
187 }
188 }
189 if let Some(host) = host {
190 self.host = Some(host.to_string());
191 Ok(())
192 } else {
193 Err("Failed to determine local IP address".to_string())
194 }
195 }
196
197 fn request(&self, url: &str, json_data: &Value) -> Result<bool, Error> {
198 let mut builder = ureq::post(url);
199 for r in &self.headers {
201 builder = builder.set(r.key(), r.value());
202 }
203 let res = builder.send_json(json_data).map_err(|e| {
204 Error::new(ErrorKind::Other, format!("request {url} failed, cause {e}"))
205 })?;
206 let status_code = res.status();
207 let msg = res.into_string()?;
208
209 if msg == "success" {
210 Ok(true)
211 } else {
212 warn!(
213 "Request ({}) failed, status code: {}, msg: {}",
214 url, status_code, msg
215 );
216 Ok(false)
217 }
218 }
219
220 pub(crate) fn get_register_token(&self) -> Result<String, Error> {
221 let hashmap = &self.env.register.props;
222 let params = [
223 ("userName", hashmap.get("username").unwrap().as_str()),
224 ("password", hashmap.get("password").unwrap().as_str()),
225 ];
226
227 let result = Err(ShenYuError::new(500, "Can't get register token".to_string()).into());
228 for url in &self.register_token_servers {
229 let res_data: Value = ureq::get(url)
230 .query_pairs(params)
231 .call()
232 .or_any_status()
233 .map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?
234 .into_json()?;
235 match res_data
236 .get("data")
237 .and_then(|data| data.get("token"))
238 .and_then(|token| token.as_str())
239 {
240 Some(token) => return Ok(token.to_string()),
241 None => continue,
242 }
243 }
244 result
245 }
246
247 pub fn register_uri(&self) {
249 let app_name = &self.app_name.clone();
250 let rpc_type = &self.env.uri.rpc_type.clone();
251 let context_path = &self.env.uri.context_path.clone();
252 let namespace_ids = &self.namespace_ids.clone();
253 namespace_ids.iter().for_each(|namespace_id| {
254 self._register_uri(app_name, rpc_type, context_path, namespace_id);
255 });
256 }
257
258 fn _register_uri(
259 &self,
260 app_name: &str,
261 rpc_type: &str,
262 context_path: &str,
263 namespace_id: &str,
264 ) {
265 let port = &self.port;
266 let host = &self.host;
267
268 let json_data = serde_json::json!({
269 "appName": app_name,
270 "contextPath": context_path,
271 "protocol": rpc_type,
272 "rpcType": rpc_type,
273 "host": host.clone().unwrap(),
274 "port": port,
275 "namespaceId": namespace_id,
276 "eventType": EventType::REGISTER.to_string(),
277 });
278
279 for url in &self.register_uri_list {
281 if let Ok(true) = self.request(url, &json_data) {
282 info!(
283 "[SUCCESS], register uri success, register data: {:#?}",
284 json_data
285 );
286 continue;
287 }
288 error!(
289 "[ERROR], register uri to {} failed, app_name: {}, host: {}, port: {}",
290 url,
291 app_name,
292 host.clone().unwrap(),
293 port
294 );
295 }
296 }
297
298 pub fn register_all_metadata(&self, enabled: bool) {
300 for x in &self.uri_infos {
301 self.register_metadata(
302 false,
303 Some(&x.path),
304 Some(&x.method_name),
305 Some(&x.rule_name),
306 enabled,
307 );
308 }
309 }
310
311 fn register_metadata(
312 &self,
313 register_all: bool,
314 path: Option<&str>,
315 method: Option<&str>,
316 rule_name: Option<&str>,
317 enabled: bool,
318 ) {
319 let context_path = &self.env.uri.context_path.clone();
320 let app_name = &self.app_name.clone();
321 let namespace_ids = &self.namespace_ids.clone();
322 let rpc_type = &self.env.uri.rpc_type.clone();
323 let path = if register_all {
324 format!("{context_path}**")
325 } else {
326 path.unwrap_or("").to_string()
327 };
328
329 let rule_name = rule_name.unwrap_or(&path).to_string();
330 namespace_ids.iter().for_each(|namespace_id| {
331 self._register_metadata(
332 app_name,
333 rpc_type,
334 context_path.to_string(),
335 path.as_str(),
336 method,
337 rule_name.clone(),
338 namespace_id,
339 enabled,
340 );
341 });
342 }
343
344 fn _register_metadata(
345 &self,
346 app_name: &str,
347 rpc_type: &str,
348 context_path: String,
349 path: &str,
350 method: Option<&str>,
351 rule_name: String,
352 namespace_id: &str,
353 enabled: bool,
354 ) {
355 let json_data = serde_json::json!({
356 "appName": app_name,
357 "contextPath": context_path.clone(),
358 "path": context_path.clone() + path,
359 "pathDesc": "",
360 "rpcType": rpc_type,
361 "ruleName": context_path.clone() + rule_name.as_str(),
362 "serviceName": app_name,
363 "methodName": method.unwrap_or("").to_string(),
364 "parameterTypes": "",
365 "rpcExt": "",
366 "host": self.host.clone().unwrap(),
367 "port": self.port,
368 "namespaceId": namespace_id,
369 "enabled": enabled,
370 "registerMetaData": "",
371 "pluginNames": []
372 });
373
374 for url in &self.register_meta_data_path_list {
375 if let Ok(true) = self.request(url, &json_data) {
376 info!(
377 "[SUCCESS], register metadata success, register data: {:#?}",
378 &json_data
379 );
380 continue;
381 }
382 error!(
383 "[ERROR], register metadata to {} failed, app_name: {}, path: {}, contextPath: {}",
384 url, app_name, path, context_path
385 );
386 }
387 }
388
389 pub fn register_discovery_config(&self) {
391 let discovery_type = &self.env.discovery.discovery_type.clone();
392 let register_path = &self.env.discovery.register_path.clone();
393 let server_lists = &self.env.discovery.server_lists.clone();
394 let props = &self.env.discovery.props.clone();
395 let plugin_name = &self.env.discovery.plugin_name.clone();
396 let context_path = &self.env.uri.context_path.clone();
397 let namespace_ids = &self.namespace_ids.clone();
398
399 let port = &self.port;
400 let host = &self.host;
401 namespace_ids.iter().for_each(|namespace_id| {
402 self._register_discovery_config(
403 discovery_type,
404 register_path,
405 server_lists,
406 props,
407 plugin_name,
408 context_path,
409 namespace_id,
410 host,
411 port,
412 );
413 });
414 }
415
416 fn _register_discovery_config(
417 &self,
418 discovery_type: &str,
419 register_path: &str,
420 server_lists: &str,
421 props: &HashMap<String, String>,
422 plugin_name: &str,
423 context_path: &str,
424 namespace_id: &str,
425 host: &Option<String>,
426 port: &u16,
427 ) {
428 let json_data = serde_json::json!({
429 "name": "default".to_string() + discovery_type,
430 "selectorName": context_path,
431 "handler": "{}",
432 "listenerNode":register_path,
433 "serverList": server_lists,
434 "props": props,
435 "discoveryType": discovery_type,
436 "pluginName": plugin_name,
437 "namespaceId": namespace_id,
438 });
439
440 for url in &self.register_discover_config_servers {
442 if let Ok(true) = self.request(url, &json_data) {
443 info!(
444 "[SUCCESS], register discover config success, register data: {:#?}",
445 &json_data
446 );
447 continue;
448 }
449 error!(
450 "[ERROR], register discover config to {} failed, discovery_type: {}, host: {}, port: {}",
451 url, discovery_type, host.clone().unwrap(), port
452 );
453 }
454 }
455
456 pub fn offline_register(&self) {
458 let app_name = &self.app_name.clone();
459 let namespace_ids = &self.namespace_ids.clone();
460 let rpc_type = &self.env.uri.rpc_type.clone();
461 let context_path = &self.env.uri.context_path.clone();
462
463 let port = &self.port;
464 let host = &self.host;
465 namespace_ids.iter().for_each(|namespace_id| {
466 self._offline_register(app_name, rpc_type, context_path, namespace_id, port, host);
467 });
468 }
469 fn _offline_register(
470 &self,
471 app_name: &str,
472 rpc_type: &str,
473 context_path: &str,
474 namespace_id: &str,
475 port: &u16,
476 host: &Option<String>,
477 ) {
478 let json_data = serde_json::json!({
479 "appName": app_name,
480 "contextPath": context_path,
481 "protocol": rpc_type,
482 "host": host.clone().unwrap(),
483 "port": port,
484 "namespaceId": namespace_id,
485 "eventType": EventType::OFFLINE.to_string(),
486 });
487
488 for url in &self.register_offline_servers {
490 if let Ok(true) = self.request(url, &json_data) {
491 info!(
492 "[SUCCESS], offline success, register data: {:#?}",
493 &json_data
494 );
495 continue;
496 }
497 error!(
498 "[ERROR], offline from {} failed, app_name: {}, host: {}, port: {}",
499 url,
500 app_name,
501 host.clone().unwrap(),
502 port
503 );
504 }
505 }
506}