chimes_utils/utils/
gpt_sdk.rs

1use futures_util::Future;
2use serde_derive::{Deserialize, Serialize};
3use serde_json::Value;
4use std::cell::RefCell;
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::Mutex;
8
9use crate::{
10    bool_from_str, get_task_queue, i64_from_str, queue_registry_handler, AppConfig, ChimesClient,
11    ChimesError, ChimesPerformanceInfo, ProcessTask,
12};
13
14lazy_static! {
15    pub static ref MAP_GATEWAY_PROXY: Mutex<RefCell<HashMap<String, GatewayProxyManagement>>> =
16        Mutex::new(RefCell::new(HashMap::new()));
17}
18
19#[derive(Debug, Clone, Deserialize, Serialize)]
20pub struct ApiResult<T> {
21    pub status: i32,
22    pub message: String,
23    pub data: Option<T>,
24    pub timestamp: Option<u64>,
25}
26
27pub struct GatewayProxyManagement {
28    pub app_name: String,
29    pub proxy_list: RefCell<Vec<GatewayProxyInfo>>,
30}
31
32impl GatewayProxyManagement {
33    fn insert_or_update_proxy_info(&self, dt: &GatewayProxyInfo) {
34        let mut found = false;
35        let len = self.proxy_list.borrow().len();
36        for i in 0..len {
37            let bit = &mut self.proxy_list.borrow_mut()[i];
38            if bit.app_name == dt.app_name && bit.proxy_address == dt.proxy_address {
39                bit.health_status = dt.health_status;
40                bit.load_avg = dt.load_avg;
41                bit.proxy_token = dt.proxy_token.clone();
42                found = true;
43            }
44        }
45
46        if !found {
47            self.proxy_list.borrow_mut().push(dt.clone());
48        }
49    }
50
51    pub fn add_proxy_info(dt: &GatewayProxyInfo) -> bool {
52        let proxy_map = MAP_GATEWAY_PROXY.lock().unwrap();
53        let app_name = dt.app_name.clone().unwrap_or_default();
54        if proxy_map.borrow_mut().contains_key(&app_name) {
55            proxy_map.borrow_mut()[&app_name].insert_or_update_proxy_info(dt);
56        } else {
57            proxy_map.borrow_mut().insert(
58                app_name.clone(),
59                GatewayProxyManagement {
60                    app_name: app_name.clone(),
61                    proxy_list: RefCell::new(vec![dt.clone()]),
62                },
63            );
64        }
65
66        true
67    }
68}
69
70#[derive(Debug, Clone, Default, Deserialize, Serialize)]
71pub struct GatewayProxyInfo {
72    pub app_id: Option<String>,
73    pub app_name: Option<String>,
74    pub proxy_address: Option<String>,
75    #[serde(default)]
76    #[serde(deserialize_with = "bool_from_str")]
77    pub rewrite_url: Option<bool>, //  need to rewrite url by change the url when the style is prefix
78    pub echo_text: Option<String>,
79    pub proxy_prefix: Option<String>, // should be provide if style is prefix
80    pub proxy_token: Option<String>,
81    pub load_avg: Option<f64>, // CPU and memory coverage
82    pub load_count: Option<u64>,
83    pub health_status: Option<u64>,
84    pub living_time: Option<rbatis::DateTimeNative>,
85}
86
87#[derive(Debug, Clone, Default, Deserialize, Serialize)]
88pub struct GatewayRegisterInfo {
89    pub app_id: Option<String>,
90    pub app_secret: Option<String>,
91    pub server: Option<String>,
92    #[serde(default)]
93    #[serde(deserialize_with = "i64_from_str")]
94    pub port: Option<i64>,
95    #[serde(default)]
96    #[serde(deserialize_with = "bool_from_str")]
97    pub rewrite_url: Option<bool>, //  need to rewrite url by change the url when the style is prefix
98    pub protocol: Option<String>, // default is http
99    pub heath_check: Option<String>,
100    pub echo_text: Option<String>,
101    pub proxy_prefix: Option<String>, // should be provide if style is prefix
102}
103
104#[derive(Debug, Clone, Default, Deserialize, Serialize)]
105pub struct HealthCheckInfo {
106    pub app_id: Option<String>,
107    pub app_secret: Option<String>,
108    pub performance: Option<ChimesPerformanceInfo>,
109}
110
111pub fn build_register_info() -> GatewayRegisterInfo {
112    let conf = AppConfig::get().lock().unwrap();
113    GatewayRegisterInfo {
114        app_id: conf.app_id.clone(),
115        app_secret: conf.app_secret.clone(),
116        server: None,
117        port: Some(conf.webserver_conf.port),
118        rewrite_url: Some(false),
119        protocol: Some("http".to_string()),
120        heath_check: Some("/api/v1/healthcheck".to_string()),
121        echo_text: Some("Iamliving".to_string()),
122        proxy_prefix: None,
123    }
124}
125
126pub fn build_health_info(with_perf: bool) -> HealthCheckInfo {
127    let conf = AppConfig::get().lock().unwrap();
128    let perf = if with_perf {
129        match ChimesPerformanceInfo::get_performance_info() {
130            Ok(p) => Some(p),
131            Err(_) => None,
132        }
133    } else {
134        None
135    };
136    HealthCheckInfo {
137        app_id: conf.app_id.clone(),
138        app_secret: conf.app_secret.clone(),
139        performance: perf,
140    }
141}
142
143/**
144 * Global Gateway Proxy Server
145 * 向代理服务器进行注册
146 * 或向Gateway进行注册
147 */
148pub async fn ggp_register_proxy(gr: GatewayRegisterInfo) -> Result<(), ChimesError> {
149    let cc = ChimesClient::new_timeout(300);
150    let url = AppConfig::get().lock().unwrap().gateway_address.clone();
151    if url.is_none() {
152        return Err(ChimesError::custom(
153            10040,
154            "Gateway Server address was not defined.",
155        ));
156    }
157    let fullurl = format!(
158        "{}/gateway/api/v1/gateway/register",
159        url.unwrap_or_default()
160    );
161    let res = cc.post(&fullurl, &gr).await?;
162    log::info!("Register response: {}", res.clone());
163    let ret = serde_json::from_str::<Value>(res.as_str()).unwrap();
164    match ret {
165        Value::Object(mp) => match mp.get("status") {
166            Some(mtt) => match mtt {
167                Value::Number(tc) => {
168                    if tc.as_i64() == Some(200) || tc.as_i64() == Some(0) {
169                        Ok(())
170                    } else {
171                        Err(ChimesError::custom(
172                            10040,
173                            format!("Bad response. {}", res.clone()),
174                        ))
175                    }
176                }
177                Value::String(tcc) => {
178                    if tcc.clone() == *"200" || tcc.clone() == *"0" {
179                        Ok(())
180                    } else {
181                        Err(ChimesError::custom(
182                            10040,
183                            format!("Bad response. {}", res.clone()),
184                        ))
185                    }
186                }
187                _ => Err(ChimesError::custom(
188                    10040,
189                    format!("Bad response. {}", res.clone()),
190                )),
191            },
192            None => Err(ChimesError::custom(
193                10040,
194                format!("Bad response. {}", res.clone()),
195            )),
196        },
197        _ => Err(ChimesError::custom(
198            10043,
199            "Gateway Server response bad format.",
200        )),
201    }
202}
203
204/**
205 * Global Gateway Proxy Server
206 * 代理服务器的健康检查
207 * 时不时的向代理服务器自己的健康情况
208 */
209pub async fn ggp_health_check() -> Result<(), ChimesError> {
210    let cc = ChimesClient::new();
211    let url = AppConfig::get().lock().unwrap().gateway_address.clone();
212    if url.is_none() {
213        return Err(ChimesError::custom(
214            10040,
215            "Gateway Server address was not defined.",
216        ));
217    }
218
219    let fullurl = format!("{}/gateway/api/v1/healthcheck", url.unwrap_or_default());
220    let perf = build_health_info(true);
221    let res = cc.post(&fullurl, &perf).await?;
222    log::info!("Healthcheck response: {}", res.clone());
223    match serde_json::from_str::<ApiResult<Vec<GatewayProxyInfo>>>(res.as_str()) {
224        Ok(lts) => {
225            if lts.status == 200 || lts.status == 0 {
226                if lts.data.is_some() {
227                    for ts in lts.data.unwrap() {
228                        let _ = GatewayProxyManagement::add_proxy_info(&ts);
229                    }
230                }
231                Ok(())
232            } else {
233                Err(ChimesError::custom(
234                    10043,
235                    format!(
236                        "Gateway Server responses warning message. {}",
237                        lts.message.clone()
238                    ),
239                ))
240            }
241        }
242        Err(err) => Err(ChimesError::custom(
243            10043,
244            format!("Gateway Server response bad format. {}", err),
245        )),
246    }
247}
248
249fn execute_healthcheck(_task: &ProcessTask) -> Pin<Box<dyn Future<Output = ()>>> {
250    Box::pin(async move {
251        match ggp_health_check().await {
252            Ok(_) => {}
253            Err(err) => {
254                log::info!("Error for health check. {}", err);
255            }
256        }
257    })
258}
259
260fn execute_register(_task: &ProcessTask) -> Pin<Box<dyn Future<Output = ()>>> {
261    Box::pin(async move {
262        let gt = build_register_info();
263        match ggp_register_proxy(gt).await {
264            Ok(_) => {
265                log::info!("Gateway was registered.");
266            }
267            Err(err) => {
268                log::info!("Unable to register in gateway. {}", err);
269            }
270        }
271    })
272}
273
274pub async fn ggp_add_health_check_task() {
275    let conf = AppConfig::get().lock().unwrap().clone();
276    if conf.app_id.is_some() && conf.app_secret.is_some() {
277        queue_registry_handler("execute_healthcheck", execute_healthcheck).await;
278        let task = ProcessTask::new_with_cookie_fn("health_check", true, 60, "execute_healthcheck");
279        get_task_queue().queue_add(task).await;
280    }
281}
282
283pub async fn ggp_delay_register() {
284    let conf = AppConfig::get().lock().unwrap().clone();
285    if conf.app_id.is_some() && conf.app_secret.is_some() {
286        queue_registry_handler("execute_register", execute_register).await;
287        let task = ProcessTask::new_with_cookie_fn("register", false, 10, "execute_register");
288        get_task_queue().queue_add(task).await;
289    }
290}