1mod add_env;
10pub(crate) mod sync;
11
12use crate::{def::*, send_err, send_ok, CFG, PROXY, SOCK_MID};
13use add_env::add_env;
14use async_std::{net::SocketAddr, task};
15use lazy_static::lazy_static;
16use myutil::{err::*, *};
17use parking_lot::RwLock;
18use ppserver_def::*;
19use serde::Deserialize;
20use serde::Serialize;
21use std::{
22 collections::HashMap,
23 mem,
24 sync::{
25 atomic::{AtomicU64, Ordering},
26 Arc,
27 },
28};
29
30lazy_static! {
31 static ref ENV_MAP: Arc<RwLock<HashMap<EnvId, Vec<SocketAddr>>>> =
32 Arc::new(RwLock::new(map! {}));
33 static ref SLAVE_INFO: Arc<RwLock<HashMap<SocketAddr, RespGetServerInfo>>> =
34 Arc::new(RwLock::new(map! {}));
35}
36
37type Ops = fn(usize, SocketAddr, Vec<u8>) -> Result<()>;
38include!("included_ops_map.rs");
39
40#[macro_export(crate)]
43macro_rules! fwd_to_slave {
44 (@@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
45 let num_to_wait = $addr_set.len();
46 let proxy_uuid = gen_proxy_uuid();
47
48 register_resp_hdr(num_to_wait, $cb, $peeraddr, $req.uuid, proxy_uuid);
49
50 let cli_id = $peeraddr.ip().to_string();
51 send_req_to_slave($ops_id, Req::newx(proxy_uuid, Some(cli_id), mem::take(&mut $req.msg)), $addr_set)
52 .c(d!())
53 }};
54 (@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
55 let mut req = $req;
56 fwd_to_slave!(@@@$ops_id, req, $peeraddr, $cb, $addr_set)
57 .or_else(|e| send_err!(req.uuid, e, $peeraddr))
58 }};
59 (@$ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr, $cb: tt) => {{
60 let req = serde_json::from_slice::<$req_kind>(&$orig_req).c(d!())?;
61 let addr_set = if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
62 set.clone()
63 } else {
64 return send_err!(req.uuid, eg!("Not exists!"), $peeraddr);
65 };
66 fwd_to_slave!(@@$ops_id, req, $peeraddr, $cb, &addr_set)
67 }};
68 ($ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr) => {{
69 fwd_to_slave!(@$ops_id, $orig_req, $req_kind, $peeraddr, resp_cb_simple)
70 }};
71}
72
73fn register_client_id(
77 _ops_id: usize,
78 peeraddr: SocketAddr,
79 request: Vec<u8>,
80) -> Result<()> {
81 let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
82 let resp = Resp {
83 uuid: req.uuid,
84 status: RetStatus::Success,
85 msg: vct![],
86 };
87 send_ok!(req.uuid, resp, peeraddr)
88}
89
90fn get_server_info(
93 ops_id: usize,
94 peeraddr: SocketAddr,
95 request: Vec<u8>,
96) -> Result<()> {
97 fn cb(r: &mut SlaveRes) {
99 info_omit!(resp_cb_merge::<RespGetServerInfo>(r));
100 }
101
102 let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
103 let addr_set = CFG.server_addr_set.clone();
104
105 fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
106}
107
108fn get_env_list(
110 ops_id: usize,
111 peeraddr: SocketAddr,
112 request: Vec<u8>,
113) -> Result<()> {
114 fn cb(r: &mut SlaveRes) {
116 info_omit!(resp_cb_merge::<RespGetEnvList>(r));
117 }
118
119 let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
120 let addr_set = CFG.server_addr_set.clone();
121
122 fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
123}
124
125fn get_env_info(
127 ops_id: usize,
128 peeraddr: SocketAddr,
129 request: Vec<u8>,
130) -> Result<()> {
131 #[derive(Deserialize)]
132 struct MyReq {
133 pub uuid: u64,
134 pub msg: ReqGetEnvInfo,
135 }
136
137 fn cb(r: &mut SlaveRes) {
139 info_omit!(resp_cb_merge::<RespGetEnvInfo>(r));
140 }
141
142 let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
143 let addr_set = {
144 let lk = ENV_MAP.read();
145 let res = req
146 .msg
147 .env_set
148 .iter()
149 .filter_map(|env_id| lk.get(env_id))
150 .flatten()
151 .copied()
152 .collect::<Vec<_>>();
153 if res.is_empty() {
154 let msg: HashMap<String, RespGetEnvInfo> = map! {};
155 return send_ok!(req.uuid, msg, peeraddr);
156 } else {
157 res
158 }
159 };
160
161 fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
162}
163
164fn update_env_kick_vm(
166 ops_id: usize,
167 peeraddr: SocketAddr,
168 request: Vec<u8>,
169) -> Result<()> {
170 #[derive(Deserialize)]
171 struct MyReq {
172 pub uuid: u64,
173 pub msg: ReqUpdateEnvKickVm,
174 }
175
176 fwd_to_slave!(ops_id, request, MyReq, peeraddr)
177}
178
179fn update_env_lifetime(
181 ops_id: usize,
182 peeraddr: SocketAddr,
183 request: Vec<u8>,
184) -> Result<()> {
185 #[derive(Deserialize)]
186 struct MyReq {
187 pub uuid: u64,
188 pub msg: ReqUpdateEnvLife,
189 }
190
191 let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
192
193 if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
194 fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, set)
195 } else {
196 send_err!(req.uuid, eg!("Not exists!"), peeraddr)
197 }
198}
199
200fn del_env(
202 ops_id: usize,
203 peeraddr: SocketAddr,
204 request: Vec<u8>,
205) -> Result<()> {
206 #[derive(Deserialize)]
207 struct MyReq {
208 pub uuid: u64,
209 pub msg: ReqDelEnv,
210 }
211
212 let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
213 let addr_set = if let Some(set) = ENV_MAP.write().remove(&req.msg.env_id) {
214 set
215 } else {
216 return send_ok!(req.uuid, "Success!", peeraddr);
217 };
218
219 fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, &addr_set)
220}
221
222#[inline(always)]
224fn get_env_list_all(
225 ops_id: usize,
226 peeraddr: SocketAddr,
227 request: Vec<u8>,
228) -> Result<()> {
229 get_env_list(ops_id, peeraddr, request).c(d!())
230}
231
232fn gen_proxy_uuid() -> u64 {
234 lazy_static! {
235 static ref UUID: AtomicU64 = AtomicU64::new(9999);
236 }
237 UUID.fetch_add(1, Ordering::Relaxed)
238}
239
240fn resp_cb_simple(r: &mut SlaveRes) {
242 if 0 < r.num_to_wait {
243 send_err!(
244 r.uuid,
245 eg!("Not all slave-server[s] reponsed!"),
246 r.peeraddr
247 )
248 .unwrap_or_else(|e| p(e));
249 } else if r.msg.values().any(|v| v.status == RetStatus::Fail) {
250 send_err!(r.uuid, eg!("Some slave-server[s] got error!"), r.peeraddr)
251 .unwrap_or_else(|e| p(e));
252 } else {
253 send_ok!(r.uuid, "Success!", r.peeraddr).unwrap_or_else(|e| p(e));
254 }
255}
256
257fn resp_cb_merge<'a, T: Serialize + Deserialize<'a>>(
260 r: &'a mut SlaveRes,
261) -> Result<()> {
262 if 0 < r.num_to_wait {
263 p(eg!("Not all slave-server[s] reponsed!"));
264 }
265
266 let res = r
271 .msg
272 .iter()
273 .filter(|(_, raw)| raw.status == RetStatus::Success)
274 .filter_map(|(slave, raw)| {
275 info!(serde_json::from_slice::<HashMap<ServerAddr, T>>(&raw.msg))
276 .ok()
277 .and_then(|resp| resp.into_iter().next())
278 .map(|resp| (slave.to_string(), resp.1))
279 })
280 .collect::<HashMap<_, _>>();
281
282 send_ok!(r.uuid, res, r.peeraddr)
283}
284
285fn send_req_to_slave<T: Serialize>(
287 ops_id: usize,
288 req: Req<T>,
289 slave_set: &[SocketAddr],
290) -> Result<()> {
291 let mut req_bytes = serde_json::to_vec(&req).c(d!())?;
292 let mut body =
293 format!("{id:>0width$}", id = ops_id, width = OPS_ID_LEN).into_bytes();
294 body.append(&mut req_bytes);
295
296 macro_rules! do_send {
297 ($body: expr, $slave: expr) => {
298 task::spawn(async move {
299 info_omit!(SOCK_MID.send_to(&$body, $slave).await);
300 });
301 };
302 }
303
304 if 1 < slave_set.len() {
305 for slave in slave_set.iter().skip(1).copied() {
306 let b = body.clone();
307 do_send!(b, slave);
308 }
309 } else if slave_set.is_empty() {
310 return Ok(());
311 }
312
313 let first_slave = slave_set[0];
315 do_send!(body, first_slave);
316
317 Ok(())
318}
319
320fn register_resp_hdr(
322 num_to_wait: usize,
323 cb: fn(&mut SlaveRes),
324 peeraddr: SocketAddr,
325 orig_uuid: UUID,
326 proxy_uuid: UUID,
327) {
328 let ts = ts!();
329 let idx = ts as usize % TIMEOUT_SECS;
330 let sr = SlaveRes {
331 msg: map! {},
332 num_to_wait,
333 start_ts: ts,
334 do_resp: cb,
335 peeraddr,
336 uuid: orig_uuid,
337 };
338 let mut proxy = PROXY.lock();
339 proxy.idx_map.insert(proxy_uuid, idx);
340 proxy.buckets[idx].ts = ts;
341 proxy.buckets[idx].res.insert(proxy_uuid, sr);
342}