1use futures_util::Future;
2use serde_derive::{Deserialize, Serialize};
3use serde_json::Value;
4use std::cell::RefCell;
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::Mutex;
8
9use crate::{
10 bool_from_str, get_task_queue, i64_from_str, queue_registry_handler, AppConfig, ChimesClient,
11 ChimesError, ChimesPerformanceInfo, ProcessTask,
12};
13
14lazy_static! {
15 pub static ref MAP_GATEWAY_PROXY: Mutex<RefCell<HashMap<String, GatewayProxyManagement>>> =
16 Mutex::new(RefCell::new(HashMap::new()));
17}
18
19#[derive(Debug, Clone, Deserialize, Serialize)]
20pub struct ApiResult<T> {
21 pub status: i32,
22 pub message: String,
23 pub data: Option<T>,
24 pub timestamp: Option<u64>,
25}
26
27pub struct GatewayProxyManagement {
28 pub app_name: String,
29 pub proxy_list: RefCell<Vec<GatewayProxyInfo>>,
30}
31
32impl GatewayProxyManagement {
33 fn insert_or_update_proxy_info(&self, dt: &GatewayProxyInfo) {
34 let mut found = false;
35 let len = self.proxy_list.borrow().len();
36 for i in 0..len {
37 let bit = &mut self.proxy_list.borrow_mut()[i];
38 if bit.app_name == dt.app_name && bit.proxy_address == dt.proxy_address {
39 bit.health_status = dt.health_status;
40 bit.load_avg = dt.load_avg;
41 bit.proxy_token = dt.proxy_token.clone();
42 found = true;
43 }
44 }
45
46 if !found {
47 self.proxy_list.borrow_mut().push(dt.clone());
48 }
49 }
50
51 pub fn add_proxy_info(dt: &GatewayProxyInfo) -> bool {
52 let proxy_map = MAP_GATEWAY_PROXY.lock().unwrap();
53 let app_name = dt.app_name.clone().unwrap_or_default();
54 if proxy_map.borrow_mut().contains_key(&app_name) {
55 proxy_map.borrow_mut()[&app_name].insert_or_update_proxy_info(dt);
56 } else {
57 proxy_map.borrow_mut().insert(
58 app_name.clone(),
59 GatewayProxyManagement {
60 app_name: app_name.clone(),
61 proxy_list: RefCell::new(vec![dt.clone()]),
62 },
63 );
64 }
65
66 true
67 }
68}
69
70#[derive(Debug, Clone, Default, Deserialize, Serialize)]
71pub struct GatewayProxyInfo {
72 pub app_id: Option<String>,
73 pub app_name: Option<String>,
74 pub proxy_address: Option<String>,
75 #[serde(default)]
76 #[serde(deserialize_with = "bool_from_str")]
77 pub rewrite_url: Option<bool>, pub echo_text: Option<String>,
79 pub proxy_prefix: Option<String>, pub proxy_token: Option<String>,
81 pub load_avg: Option<f64>, pub load_count: Option<u64>,
83 pub health_status: Option<u64>,
84 pub living_time: Option<rbatis::DateTimeNative>,
85}
86
87#[derive(Debug, Clone, Default, Deserialize, Serialize)]
88pub struct GatewayRegisterInfo {
89 pub app_id: Option<String>,
90 pub app_secret: Option<String>,
91 pub server: Option<String>,
92 #[serde(default)]
93 #[serde(deserialize_with = "i64_from_str")]
94 pub port: Option<i64>,
95 #[serde(default)]
96 #[serde(deserialize_with = "bool_from_str")]
97 pub rewrite_url: Option<bool>, pub protocol: Option<String>, pub heath_check: Option<String>,
100 pub echo_text: Option<String>,
101 pub proxy_prefix: Option<String>, }
103
104#[derive(Debug, Clone, Default, Deserialize, Serialize)]
105pub struct HealthCheckInfo {
106 pub app_id: Option<String>,
107 pub app_secret: Option<String>,
108 pub performance: Option<ChimesPerformanceInfo>,
109}
110
111pub fn build_register_info() -> GatewayRegisterInfo {
112 let conf = AppConfig::get().lock().unwrap();
113 GatewayRegisterInfo {
114 app_id: conf.app_id.clone(),
115 app_secret: conf.app_secret.clone(),
116 server: None,
117 port: Some(conf.webserver_conf.port),
118 rewrite_url: Some(false),
119 protocol: Some("http".to_string()),
120 heath_check: Some("/api/v1/healthcheck".to_string()),
121 echo_text: Some("Iamliving".to_string()),
122 proxy_prefix: None,
123 }
124}
125
126pub fn build_health_info(with_perf: bool) -> HealthCheckInfo {
127 let conf = AppConfig::get().lock().unwrap();
128 let perf = if with_perf {
129 match ChimesPerformanceInfo::get_performance_info() {
130 Ok(p) => Some(p),
131 Err(_) => None,
132 }
133 } else {
134 None
135 };
136 HealthCheckInfo {
137 app_id: conf.app_id.clone(),
138 app_secret: conf.app_secret.clone(),
139 performance: perf,
140 }
141}
142
143pub async fn ggp_register_proxy(gr: GatewayRegisterInfo) -> Result<(), ChimesError> {
149 let cc = ChimesClient::new_timeout(300);
150 let url = AppConfig::get().lock().unwrap().gateway_address.clone();
151 if url.is_none() {
152 return Err(ChimesError::custom(
153 10040,
154 "Gateway Server address was not defined.",
155 ));
156 }
157 let fullurl = format!(
158 "{}/gateway/api/v1/gateway/register",
159 url.unwrap_or_default()
160 );
161 let res = cc.post(&fullurl, &gr).await?;
162 log::info!("Register response: {}", res.clone());
163 let ret = serde_json::from_str::<Value>(res.as_str()).unwrap();
164 match ret {
165 Value::Object(mp) => match mp.get("status") {
166 Some(mtt) => match mtt {
167 Value::Number(tc) => {
168 if tc.as_i64() == Some(200) || tc.as_i64() == Some(0) {
169 Ok(())
170 } else {
171 Err(ChimesError::custom(
172 10040,
173 format!("Bad response. {}", res.clone()),
174 ))
175 }
176 }
177 Value::String(tcc) => {
178 if tcc.clone() == *"200" || tcc.clone() == *"0" {
179 Ok(())
180 } else {
181 Err(ChimesError::custom(
182 10040,
183 format!("Bad response. {}", res.clone()),
184 ))
185 }
186 }
187 _ => Err(ChimesError::custom(
188 10040,
189 format!("Bad response. {}", res.clone()),
190 )),
191 },
192 None => Err(ChimesError::custom(
193 10040,
194 format!("Bad response. {}", res.clone()),
195 )),
196 },
197 _ => Err(ChimesError::custom(
198 10043,
199 "Gateway Server response bad format.",
200 )),
201 }
202}
203
204pub async fn ggp_health_check() -> Result<(), ChimesError> {
210 let cc = ChimesClient::new();
211 let url = AppConfig::get().lock().unwrap().gateway_address.clone();
212 if url.is_none() {
213 return Err(ChimesError::custom(
214 10040,
215 "Gateway Server address was not defined.",
216 ));
217 }
218
219 let fullurl = format!("{}/gateway/api/v1/healthcheck", url.unwrap_or_default());
220 let perf = build_health_info(true);
221 let res = cc.post(&fullurl, &perf).await?;
222 log::info!("Healthcheck response: {}", res.clone());
223 match serde_json::from_str::<ApiResult<Vec<GatewayProxyInfo>>>(res.as_str()) {
224 Ok(lts) => {
225 if lts.status == 200 || lts.status == 0 {
226 if lts.data.is_some() {
227 for ts in lts.data.unwrap() {
228 let _ = GatewayProxyManagement::add_proxy_info(&ts);
229 }
230 }
231 Ok(())
232 } else {
233 Err(ChimesError::custom(
234 10043,
235 format!(
236 "Gateway Server responses warning message. {}",
237 lts.message.clone()
238 ),
239 ))
240 }
241 }
242 Err(err) => Err(ChimesError::custom(
243 10043,
244 format!("Gateway Server response bad format. {}", err),
245 )),
246 }
247}
248
249fn execute_healthcheck(_task: &ProcessTask) -> Pin<Box<dyn Future<Output = ()>>> {
250 Box::pin(async move {
251 match ggp_health_check().await {
252 Ok(_) => {}
253 Err(err) => {
254 log::info!("Error for health check. {}", err);
255 }
256 }
257 })
258}
259
260fn execute_register(_task: &ProcessTask) -> Pin<Box<dyn Future<Output = ()>>> {
261 Box::pin(async move {
262 let gt = build_register_info();
263 match ggp_register_proxy(gt).await {
264 Ok(_) => {
265 log::info!("Gateway was registered.");
266 }
267 Err(err) => {
268 log::info!("Unable to register in gateway. {}", err);
269 }
270 }
271 })
272}
273
274pub async fn ggp_add_health_check_task() {
275 let conf = AppConfig::get().lock().unwrap().clone();
276 if conf.app_id.is_some() && conf.app_secret.is_some() {
277 queue_registry_handler("execute_healthcheck", execute_healthcheck).await;
278 let task = ProcessTask::new_with_cookie_fn("health_check", true, 60, "execute_healthcheck");
279 get_task_queue().queue_add(task).await;
280 }
281}
282
283pub async fn ggp_delay_register() {
284 let conf = AppConfig::get().lock().unwrap().clone();
285 if conf.app_id.is_some() && conf.app_secret.is_some() {
286 queue_registry_handler("execute_register", execute_register).await;
287 let task = ProcessTask::new_with_cookie_fn("register", false, 10, "execute_register");
288 get_task_queue().queue_add(task).await;
289 }
290}