ppproxy/hdr/
mod.rs

1//!
2//! # Network Service Handler
3//!
4//! Operations to deal with the requests from client.
5//!
6//! 与 Server 同名接口的对外表现完全一致.
7//!
8
9mod add_env;
10pub(crate) mod sync;
11
12use crate::{def::*, send_err, send_ok, CFG, PROXY, SOCK_MID};
13use add_env::add_env;
14use async_std::{net::SocketAddr, task};
15use lazy_static::lazy_static;
16use myutil::{err::*, *};
17use parking_lot::RwLock;
18use ppserver_def::*;
19use serde::Deserialize;
20use serde::Serialize;
21use std::{
22    collections::HashMap,
23    mem,
24    sync::{
25        atomic::{AtomicU64, Ordering},
26        Arc,
27    },
28};
29
30lazy_static! {
31    static ref ENV_MAP: Arc<RwLock<HashMap<EnvId, Vec<SocketAddr>>>> =
32        Arc::new(RwLock::new(map! {}));
33    static ref SLAVE_INFO: Arc<RwLock<HashMap<SocketAddr, RespGetServerInfo>>> =
34        Arc::new(RwLock::new(map! {}));
35}
36
37type Ops = fn(usize, SocketAddr, Vec<u8>) -> Result<()>;
38include!("included_ops_map.rs");
39
40/// 将客户端的请求,
41/// 分发至后台的各个 Slave Server.
42#[macro_export(crate)]
43macro_rules! fwd_to_slave {
44    (@@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
45        let num_to_wait = $addr_set.len();
46        let proxy_uuid = gen_proxy_uuid();
47
48        register_resp_hdr(num_to_wait, $cb, $peeraddr, $req.uuid, proxy_uuid);
49
50        let cli_id = $peeraddr.ip().to_string();
51        send_req_to_slave($ops_id, Req::newx(proxy_uuid, Some(cli_id), mem::take(&mut $req.msg)), $addr_set)
52            .c(d!())
53    }};
54    (@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
55        let mut req = $req;
56        fwd_to_slave!(@@@$ops_id, req, $peeraddr, $cb, $addr_set)
57            .or_else(|e| send_err!(req.uuid, e, $peeraddr))
58    }};
59    (@$ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr, $cb: tt) => {{
60        let req = serde_json::from_slice::<$req_kind>(&$orig_req).c(d!())?;
61        let addr_set = if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
62            set.clone()
63        } else {
64            return send_err!(req.uuid, eg!("Not exists!"), $peeraddr);
65        };
66        fwd_to_slave!(@@$ops_id, req, $peeraddr, $cb, &addr_set)
67    }};
68    ($ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr) => {{
69        fwd_to_slave!(@$ops_id, $orig_req, $req_kind, $peeraddr, resp_cb_simple)
70    }};
71}
72
73/// 注册 Cli, 一般无需调用,
74/// 创建 Env 时若发现 Cli 不存在会自动创建之,
75/// 此接口在 Proxy 中实现为"什么都不做, 直接返回成功".
76fn register_client_id(
77    _ops_id: usize,
78    peeraddr: SocketAddr,
79    request: Vec<u8>,
80) -> Result<()> {
81    let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
82    let resp = Resp {
83        uuid: req.uuid,
84        status: RetStatus::Success,
85        msg: vct![],
86    };
87    send_ok!(req.uuid, resp, peeraddr)
88}
89
90/// 获取服务端的资源分配信息,
91/// 直接从定时任务的结果中提取, 不做实时请求.
92fn get_server_info(
93    ops_id: usize,
94    peeraddr: SocketAddr,
95    request: Vec<u8>,
96) -> Result<()> {
97    // 汇聚各 Slave 的信息
98    fn cb(r: &mut SlaveRes) {
99        info_omit!(resp_cb_merge::<RespGetServerInfo>(r));
100    }
101
102    let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
103    let addr_set = CFG.server_addr_set.clone();
104
105    fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
106}
107
108/// 获取服务端已存在的 Env 概略信息
109fn get_env_list(
110    ops_id: usize,
111    peeraddr: SocketAddr,
112    request: Vec<u8>,
113) -> Result<()> {
114    // 汇聚各 Slave 的信息
115    fn cb(r: &mut SlaveRes) {
116        info_omit!(resp_cb_merge::<RespGetEnvList>(r));
117    }
118
119    let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
120    let addr_set = CFG.server_addr_set.clone();
121
122    fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
123}
124
125// 获取服务端已存在的 Env 详细信息
126fn get_env_info(
127    ops_id: usize,
128    peeraddr: SocketAddr,
129    request: Vec<u8>,
130) -> Result<()> {
131    #[derive(Deserialize)]
132    struct MyReq {
133        pub uuid: u64,
134        pub msg: ReqGetEnvInfo,
135    }
136
137    // 汇聚各 Slave 的信息
138    fn cb(r: &mut SlaveRes) {
139        info_omit!(resp_cb_merge::<RespGetEnvInfo>(r));
140    }
141
142    let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
143    let addr_set = {
144        let lk = ENV_MAP.read();
145        let res = req
146            .msg
147            .env_set
148            .iter()
149            .filter_map(|env_id| lk.get(env_id))
150            .flatten()
151            .copied()
152            .collect::<Vec<_>>();
153        if res.is_empty() {
154            let msg: HashMap<String, RespGetEnvInfo> = map! {};
155            return send_ok!(req.uuid, msg, peeraddr);
156        } else {
157            res
158        }
159    };
160
161    fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
162}
163
164/// 从已有 ENV 中踢出指定的 VM 实例
165fn update_env_kick_vm(
166    ops_id: usize,
167    peeraddr: SocketAddr,
168    request: Vec<u8>,
169) -> Result<()> {
170    #[derive(Deserialize)]
171    struct MyReq {
172        pub uuid: u64,
173        pub msg: ReqUpdateEnvKickVm,
174    }
175
176    fwd_to_slave!(ops_id, request, MyReq, peeraddr)
177}
178
179/// 更新已有 Env 的生命周期
180fn update_env_lifetime(
181    ops_id: usize,
182    peeraddr: SocketAddr,
183    request: Vec<u8>,
184) -> Result<()> {
185    #[derive(Deserialize)]
186    struct MyReq {
187        pub uuid: u64,
188        pub msg: ReqUpdateEnvLife,
189    }
190
191    let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
192
193    if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
194        fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, set)
195    } else {
196        send_err!(req.uuid, eg!("Not exists!"), peeraddr)
197    }
198}
199
200/// 删除 Env
201fn del_env(
202    ops_id: usize,
203    peeraddr: SocketAddr,
204    request: Vec<u8>,
205) -> Result<()> {
206    #[derive(Deserialize)]
207    struct MyReq {
208        pub uuid: u64,
209        pub msg: ReqDelEnv,
210    }
211
212    let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
213    let addr_set = if let Some(set) = ENV_MAP.write().remove(&req.msg.env_id) {
214        set
215    } else {
216        return send_ok!(req.uuid, "Success!", peeraddr);
217    };
218
219    fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, &addr_set)
220}
221
222/// 获取服务端已存在的 Env 概略信息(全局)
223#[inline(always)]
224fn get_env_list_all(
225    ops_id: usize,
226    peeraddr: SocketAddr,
227    request: Vec<u8>,
228) -> Result<()> {
229    get_env_list(ops_id, peeraddr, request).c(d!())
230}
231
232/// 生成不重复的 uuid
233fn gen_proxy_uuid() -> u64 {
234    lazy_static! {
235        static ref UUID: AtomicU64 = AtomicU64::new(9999);
236    }
237    UUID.fetch_add(1, Ordering::Relaxed)
238}
239
240// 用于回复 Client 的通用回调
241fn resp_cb_simple(r: &mut SlaveRes) {
242    if 0 < r.num_to_wait {
243        send_err!(
244            r.uuid,
245            eg!("Not all slave-server[s] reponsed!"),
246            r.peeraddr
247        )
248        .unwrap_or_else(|e| p(e));
249    } else if r.msg.values().any(|v| v.status == RetStatus::Fail) {
250        send_err!(r.uuid, eg!("Some slave-server[s] got error!"), r.peeraddr)
251            .unwrap_or_else(|e| p(e));
252    } else {
253        send_ok!(r.uuid, "Success!", r.peeraddr).unwrap_or_else(|e| p(e));
254    }
255}
256
257// 汇聚各 Slave 的信息, 回复给 Client;
258// 该回调仅用于查询类接口, 采用 “尽力而为” 模式, 部分返回即视为成功.
259fn resp_cb_merge<'a, T: Serialize + Deserialize<'a>>(
260    r: &'a mut SlaveRes,
261) -> Result<()> {
262    if 0 < r.num_to_wait {
263        p(eg!("Not all slave-server[s] reponsed!"));
264    }
265
266    // if r.msg.values().any(|v| v.status == RetStatus::Fail) {
267    //     p(eg!("Some slave-server[s] got error!"));
268    // }
269
270    let res = r
271        .msg
272        .iter()
273        .filter(|(_, raw)| raw.status == RetStatus::Success)
274        .filter_map(|(slave, raw)| {
275            info!(serde_json::from_slice::<HashMap<ServerAddr, T>>(&raw.msg))
276                .ok()
277                .and_then(|resp| resp.into_iter().next())
278                .map(|resp| (slave.to_string(), resp.1))
279        })
280        .collect::<HashMap<_, _>>();
281
282    send_ok!(r.uuid, res, r.peeraddr)
283}
284
285/// 分发请求至各 Slave Server
286fn send_req_to_slave<T: Serialize>(
287    ops_id: usize,
288    req: Req<T>,
289    slave_set: &[SocketAddr],
290) -> Result<()> {
291    let mut req_bytes = serde_json::to_vec(&req).c(d!())?;
292    let mut body =
293        format!("{id:>0width$}", id = ops_id, width = OPS_ID_LEN).into_bytes();
294    body.append(&mut req_bytes);
295
296    macro_rules! do_send {
297        ($body: expr, $slave: expr) => {
298            task::spawn(async move {
299                info_omit!(SOCK_MID.send_to(&$body, $slave).await);
300            });
301        };
302    }
303
304    if 1 < slave_set.len() {
305        for slave in slave_set.iter().skip(1).copied() {
306            let b = body.clone();
307            do_send!(b, slave);
308        }
309    } else if slave_set.is_empty() {
310        return Ok(());
311    }
312
313    // 单独处理第一个 slave, 避免无谓的 clone
314    let first_slave = slave_set[0];
315    do_send!(body, first_slave);
316
317    Ok(())
318}
319
320/// 将待回复的 handler 注册到 Proxy 中
321fn register_resp_hdr(
322    num_to_wait: usize,
323    cb: fn(&mut SlaveRes),
324    peeraddr: SocketAddr,
325    orig_uuid: UUID,
326    proxy_uuid: UUID,
327) {
328    let ts = ts!();
329    let idx = ts as usize % TIMEOUT_SECS;
330    let sr = SlaveRes {
331        msg: map! {},
332        num_to_wait,
333        start_ts: ts,
334        do_resp: cb,
335        peeraddr,
336        uuid: orig_uuid,
337    };
338    let mut proxy = PROXY.lock();
339    proxy.idx_map.insert(proxy_uuid, idx);
340    proxy.buckets[idx].ts = ts;
341    proxy.buckets[idx].res.insert(proxy_uuid, sr);
342}