shenyu_client_rust/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::ShenYuConfig;
19use crate::error::ShenYuError;
20use crate::model::{EventType, UriInfo};
21use dashmap::DashMap;
22use serde_json::Value;
23use std::collections::HashMap;
24use std::io::{Error, ErrorKind};
25use std::net::IpAddr;
26use tracing::{error, info, warn};
27use ureq::OrAnyStatus;
28
29/// Shenyu admin http interface path.
30pub const REGISTER_META_DATA_SUFFIX: &str = "/shenyu-client/register-metadata";
31
32/// Shenyu admin http interface path.
33pub const REGISTER_URI_SUFFIX: &str = "/shenyu-client/register-uri";
34
35/// Shenyu admin http interface path.
36pub const REGISTER_DISCOVERY_CONFIG_SUFFIX: &str = "/shenyu-client/register-discoveryConfig";
37
38/// Shenyu admin http interface path.
39pub const REGISTER_OFFLINE_SUFFIX: &str = "/shenyu-client/offline";
40
41/// Shenyu admin http interface path.
42pub const PLATFORM_LOGIN_SUFFIX: &str = "/platform/login";
43
44/// Shenyu admin default namespace id.
45pub const SYS_DEFAULT_NAMESPACE_ID: &str = "649330b6-c2d7-4edc-be8e-8a54df9eb385";
46
47/// The shenyu client.
48#[derive(Debug)]
49#[warn(dead_code)]
50pub struct ShenyuClient {
51    pub(super) headers: DashMap<String, String>,
52    app_name: String,
53    env: ShenYuConfig,
54    host: Option<String>,
55    port: u16,
56    namespace_ids: Vec<String>,
57    gateway_base_urls: Vec<String>,
58    register_meta_data_path_list: Vec<String>,
59    register_uri_list: Vec<String>,
60    register_token_servers: Vec<String>,
61    register_discover_config_servers: Vec<String>,
62    register_offline_servers: Vec<String>,
63    uri_infos: Vec<UriInfo>,
64}
65
66impl ShenyuClient {
67    /// Register to shenyu admin.
68    pub fn register(&self) -> Result<(), Error> {
69        if let Ok(token) = self.get_register_token() {
70            info!(
71                "[SUCCESS], get register token success, register token: {:#?}",
72                &token
73            );
74            _ = self
75                .headers
76                .insert("X-Access-Token".to_string(), token.to_string());
77        } else {
78            error!("Can't get register token");
79        }
80        self.register_all_metadata(true);
81        self.register_uri();
82        self.register_discovery_config();
83        Ok(())
84    }
85
86    /// Create a new `ShenyuClient`.
87    pub fn new(
88        config: ShenYuConfig,
89        app_name: &str,
90        uri_infos: &[UriInfo],
91        port: u16,
92    ) -> Result<Self, String> {
93        let headers = DashMap::new();
94        _ = headers.insert(
95            "Content-Type".to_string(),
96            "application/json;charset=UTF-8".to_string(),
97        );
98        let namespace_ids: Vec<String> = config
99            .register
100            .namespace_id
101            .clone()
102            .filter(|x| !x.is_empty())
103            .map_or(vec![SYS_DEFAULT_NAMESPACE_ID.to_string()], |x| {
104                x.split(';').map(ToString::to_string).collect()
105            });
106
107        let mut client = ShenyuClient {
108            headers,
109            app_name: app_name.to_string(),
110            env: config,
111            host: None,
112            port,
113            namespace_ids,
114            gateway_base_urls: vec![],
115            register_meta_data_path_list: vec![],
116            register_uri_list: vec![],
117            register_token_servers: vec![],
118            register_discover_config_servers: vec![],
119            register_offline_servers: vec![],
120            uri_infos: uri_infos.to_owned(),
121        };
122        client.set_up_gateway_service_url()?;
123        Ok(client)
124    }
125}
126
127impl ShenyuClient {
128    fn set_up_gateway_service_url(&mut self) -> Result<(), String> {
129        self.gateway_base_urls = self
130            .env
131            .register
132            .servers
133            .split(',')
134            .map(ToString::to_string)
135            .collect();
136        if self.gateway_base_urls.is_empty() {
137            return Err(String::from("shenyu.register.servers is empty"));
138        }
139
140        self.register_meta_data_path_list = self
141            .gateway_base_urls
142            .iter()
143            .map(|url| format!("{url}{REGISTER_META_DATA_SUFFIX}"))
144            .collect();
145        self.register_uri_list = self
146            .gateway_base_urls
147            .iter()
148            .map(|url| format!("{url}{REGISTER_URI_SUFFIX}"))
149            .collect();
150        self.register_token_servers = self
151            .gateway_base_urls
152            .iter()
153            .map(|url| format!("{url}{PLATFORM_LOGIN_SUFFIX}"))
154            .collect();
155        self.register_discover_config_servers = self
156            .gateway_base_urls
157            .iter()
158            .map(|url| format!("{url}{REGISTER_DISCOVERY_CONFIG_SUFFIX}"))
159            .collect();
160        self.register_offline_servers = self
161            .gateway_base_urls
162            .iter()
163            .map(|url| format!("{url}{REGISTER_OFFLINE_SUFFIX}"))
164            .collect();
165
166        #[allow(unused_assignments)]
167        let mut host = None;
168        #[cfg(not(target_os = "macos"))]
169        {
170            host = match local_ip_address::local_ip() {
171                Ok(IpAddr::V4(ipv4)) => Some(IpAddr::V4(ipv4)),
172                Ok(IpAddr::V6(ipv6)) => Some(IpAddr::from(ipv6.to_ipv4().unwrap())),
173                _ => None,
174            };
175        }
176        #[cfg(target_os = "macos")]
177        {
178            use local_ip_address::macos;
179            for (_, ipaddr) in macos::list_afinet_netifas().unwrap() {
180                if IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)).eq(&ipaddr) {
181                    continue;
182                }
183                host = match ipaddr {
184                    IpAddr::V4(ipv4) => Some(IpAddr::from(ipv4)),
185                    IpAddr::V6(_) => continue,
186                };
187            }
188        }
189        if let Some(host) = host {
190            self.host = Some(host.to_string());
191            Ok(())
192        } else {
193            Err("Failed to determine local IP address".to_string())
194        }
195    }
196
197    fn request(&self, url: &str, json_data: &Value) -> Result<bool, Error> {
198        let mut builder = ureq::post(url);
199        // 遍历header, 添加到builder中
200        for r in &self.headers {
201            builder = builder.set(r.key(), r.value());
202        }
203        let res = builder.send_json(json_data).map_err(|e| {
204            Error::new(ErrorKind::Other, format!("request {url} failed, cause {e}"))
205        })?;
206        let status_code = res.status();
207        let msg = res.into_string()?;
208
209        if msg == "success" {
210            Ok(true)
211        } else {
212            warn!(
213                "Request ({}) failed, status code: {}, msg: {}",
214                url, status_code, msg
215            );
216            Ok(false)
217        }
218    }
219
220    pub(crate) fn get_register_token(&self) -> Result<String, Error> {
221        let hashmap = &self.env.register.props;
222        let params = [
223            ("userName", hashmap.get("username").unwrap().as_str()),
224            ("password", hashmap.get("password").unwrap().as_str()),
225        ];
226
227        let result = Err(ShenYuError::new(500, "Can't get register token".to_string()).into());
228        for url in &self.register_token_servers {
229            let res_data: Value = ureq::get(url)
230                .query_pairs(params)
231                .call()
232                .or_any_status()
233                .map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?
234                .into_json()?;
235            match res_data
236                .get("data")
237                .and_then(|data| data.get("token"))
238                .and_then(|token| token.as_str())
239            {
240                Some(token) => return Ok(token.to_string()),
241                None => continue,
242            }
243        }
244        result
245    }
246
247    /// Register uri.
248    pub fn register_uri(&self) {
249        let app_name = &self.app_name.clone();
250        let rpc_type = &self.env.uri.rpc_type.clone();
251        let context_path = &self.env.uri.context_path.clone();
252        let namespace_ids = &self.namespace_ids.clone();
253        namespace_ids.iter().for_each(|namespace_id| {
254            self._register_uri(app_name, rpc_type, context_path, namespace_id);
255        });
256    }
257
258    fn _register_uri(
259        &self,
260        app_name: &str,
261        rpc_type: &str,
262        context_path: &str,
263        namespace_id: &str,
264    ) {
265        let port = &self.port;
266        let host = &self.host;
267
268        let json_data = serde_json::json!({
269            "appName": app_name,
270            "contextPath": context_path,
271            "protocol": rpc_type,
272            "rpcType": rpc_type,
273            "host": host.clone().unwrap(),
274            "port": port,
275            "namespaceId": namespace_id,
276            "eventType": EventType::REGISTER.to_string(),
277        });
278
279        // Broadcast to all shenyu admin.
280        for url in &self.register_uri_list {
281            if let Ok(true) = self.request(url, &json_data) {
282                info!(
283                    "[SUCCESS], register uri success, register data: {:#?}",
284                    json_data
285                );
286                continue;
287            }
288            error!(
289                "[ERROR], register uri to {} failed, app_name: {}, host: {}, port: {}",
290                url,
291                app_name,
292                host.clone().unwrap(),
293                port
294            );
295        }
296    }
297
298    /// Register metadata.
299    pub fn register_all_metadata(&self, enabled: bool) {
300        for x in &self.uri_infos {
301            self.register_metadata(
302                false,
303                Some(&x.path),
304                Some(&x.method_name),
305                Some(&x.rule_name),
306                enabled,
307            );
308        }
309    }
310
311    fn register_metadata(
312        &self,
313        register_all: bool,
314        path: Option<&str>,
315        method: Option<&str>,
316        rule_name: Option<&str>,
317        enabled: bool,
318    ) {
319        let context_path = &self.env.uri.context_path.clone();
320        let app_name = &self.app_name.clone();
321        let namespace_ids = &self.namespace_ids.clone();
322        let rpc_type = &self.env.uri.rpc_type.clone();
323        let path = if register_all {
324            format!("{context_path}**")
325        } else {
326            path.unwrap_or("").to_string()
327        };
328
329        let rule_name = rule_name.unwrap_or(&path).to_string();
330        namespace_ids.iter().for_each(|namespace_id| {
331            self._register_metadata(
332                app_name,
333                rpc_type,
334                context_path.to_string(),
335                path.as_str(),
336                method,
337                rule_name.clone(),
338                namespace_id,
339                enabled,
340            );
341        });
342    }
343
344    fn _register_metadata(
345        &self,
346        app_name: &str,
347        rpc_type: &str,
348        context_path: String,
349        path: &str,
350        method: Option<&str>,
351        rule_name: String,
352        namespace_id: &str,
353        enabled: bool,
354    ) {
355        let json_data = serde_json::json!({
356            "appName": app_name,
357            "contextPath": context_path.clone(),
358            "path": context_path.clone() + path,
359            "pathDesc": "",
360            "rpcType": rpc_type,
361            "ruleName": context_path.clone() + rule_name.as_str(),
362            "serviceName": app_name,
363            "methodName": method.unwrap_or("").to_string(),
364            "parameterTypes": "",
365            "rpcExt": "",
366            "host": self.host.clone().unwrap(),
367            "port": self.port,
368            "namespaceId": namespace_id,
369            "enabled": enabled,
370            "registerMetaData": "",
371            "pluginNames": []
372        });
373
374        for url in &self.register_meta_data_path_list {
375            if let Ok(true) = self.request(url, &json_data) {
376                info!(
377                    "[SUCCESS], register metadata success, register data: {:#?}",
378                    &json_data
379                );
380                continue;
381            }
382            error!(
383                "[ERROR], register metadata to {} failed, app_name: {}, path: {}, contextPath: {}",
384                url, app_name, path, context_path
385            );
386        }
387    }
388
389    /// Register discovery config.
390    pub fn register_discovery_config(&self) {
391        let discovery_type = &self.env.discovery.discovery_type.clone();
392        let register_path = &self.env.discovery.register_path.clone();
393        let server_lists = &self.env.discovery.server_lists.clone();
394        let props = &self.env.discovery.props.clone();
395        let plugin_name = &self.env.discovery.plugin_name.clone();
396        let context_path = &self.env.uri.context_path.clone();
397        let namespace_ids = &self.namespace_ids.clone();
398
399        let port = &self.port;
400        let host = &self.host;
401        namespace_ids.iter().for_each(|namespace_id| {
402            self._register_discovery_config(
403                discovery_type,
404                register_path,
405                server_lists,
406                props,
407                plugin_name,
408                context_path,
409                namespace_id,
410                host,
411                port,
412            );
413        });
414    }
415
416    fn _register_discovery_config(
417        &self,
418        discovery_type: &str,
419        register_path: &str,
420        server_lists: &str,
421        props: &HashMap<String, String>,
422        plugin_name: &str,
423        context_path: &str,
424        namespace_id: &str,
425        host: &Option<String>,
426        port: &u16,
427    ) {
428        let json_data = serde_json::json!({
429            "name": "default".to_string() + discovery_type,
430            "selectorName": context_path,
431            "handler": "{}",
432            "listenerNode":register_path,
433            "serverList": server_lists,
434            "props": props,
435            "discoveryType": discovery_type,
436            "pluginName": plugin_name,
437            "namespaceId": namespace_id,
438        });
439
440        // Broadcast to all shenyu admin.
441        for url in &self.register_discover_config_servers {
442            if let Ok(true) = self.request(url, &json_data) {
443                info!(
444                    "[SUCCESS], register discover config success, register data: {:#?}",
445                    &json_data
446                );
447                continue;
448            }
449            error!(
450                "[ERROR], register discover config to {} failed, discovery_type: {}, host: {}, port: {}",
451                url, discovery_type, host.clone().unwrap(), port
452            );
453        }
454    }
455
456    /// Offline from shenyu.
457    pub fn offline_register(&self) {
458        let app_name = &self.app_name.clone();
459        let namespace_ids = &self.namespace_ids.clone();
460        let rpc_type = &self.env.uri.rpc_type.clone();
461        let context_path = &self.env.uri.context_path.clone();
462
463        let port = &self.port;
464        let host = &self.host;
465        namespace_ids.iter().for_each(|namespace_id| {
466            self._offline_register(app_name, rpc_type, context_path, namespace_id, port, host);
467        });
468    }
469    fn _offline_register(
470        &self,
471        app_name: &str,
472        rpc_type: &str,
473        context_path: &str,
474        namespace_id: &str,
475        port: &u16,
476        host: &Option<String>,
477    ) {
478        let json_data = serde_json::json!({
479            "appName": app_name,
480            "contextPath": context_path,
481            "protocol": rpc_type,
482            "host": host.clone().unwrap(),
483            "port": port,
484            "namespaceId": namespace_id,
485            "eventType": EventType::OFFLINE.to_string(),
486        });
487
488        // Broadcast offline to all shenyu admin.
489        for url in &self.register_offline_servers {
490            if let Ok(true) = self.request(url, &json_data) {
491                info!(
492                    "[SUCCESS], offline success, register data: {:#?}",
493                    &json_data
494                );
495                continue;
496            }
497            error!(
498                "[ERROR], offline from {} failed, app_name: {}, host: {}, port: {}",
499                url,
500                app_name,
501                host.clone().unwrap(),
502                port
503            );
504        }
505    }
506}