mod add_env;
pub(crate) mod sync;
use crate::{def::*, send_err, send_ok, CFG, PROXY, SOCK_MID};
use add_env::add_env;
use async_std::{net::SocketAddr, task};
use lazy_static::lazy_static;
use myutil::{err::*, *};
use parking_lot::RwLock;
use serde::Deserialize;
use serde::Serialize;
use std::{
collections::HashMap,
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use ttserver_def::*;
lazy_static! {
static ref ENV_MAP: Arc<RwLock<HashMap<EnvId, Vec<SocketAddr>>>> =
Arc::new(RwLock::new(map! {}));
static ref SLAVE_INFO: Arc<RwLock<HashMap<SocketAddr, RespGetServerInfo>>> =
Arc::new(RwLock::new(map! {}));
}
type Ops = fn(usize, SocketAddr, Vec<u8>) -> Result<()>;
include!("included_ops_map.rs");
#[macro_export(crate)]
macro_rules! fwd_to_slave {
(@@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
let num_to_wait = $addr_set.len();
let proxy_uuid = gen_proxy_uuid();
register_resp_hdr(num_to_wait, $cb, $peeraddr, $req.uuid, proxy_uuid);
let cli_id = $req.cli_id.take().unwrap_or_else(||$peeraddr.ip().to_string());
send_req_to_slave($ops_id, Req::newx(proxy_uuid, Some(cli_id), mem::take(&mut $req.msg)), $addr_set)
.c(d!())
}};
(@@$ops_id: expr, $req: expr, $peeraddr: expr, $cb: tt, $addr_set: expr) => {{
let mut req = $req;
fwd_to_slave!(@@@$ops_id, req, $peeraddr, $cb, $addr_set)
.or_else(|e| send_err!(req.uuid, e, $peeraddr))
}};
(@$ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr, $cb: tt) => {{
let req = serde_json::from_slice::<$req_kind>(&$orig_req).c(d!())?;
let addr_set = if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
set.clone()
} else {
return send_err!(req.uuid, eg!("ENV not exists!"), $peeraddr);
};
fwd_to_slave!(@@$ops_id, req, $peeraddr, $cb, &addr_set)
}};
($ops_id: expr, $orig_req: expr, $req_kind: ty, $peeraddr: expr) => {{
fwd_to_slave!(@$ops_id, $orig_req, $req_kind, $peeraddr, resp_cb_simple)
}};
}
fn register_client_id(
_ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
let resp = Resp {
uuid: req.uuid,
status: RetStatus::Success,
msg: vct![],
};
send_ok!(req.uuid, resp, peeraddr)
}
fn get_server_info(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
fn cb(r: &mut SlaveRes) {
info_omit!(resp_cb_merge::<RespGetServerInfo>(r));
}
let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
let addr_set = CFG.server_addr_set.clone();
fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
}
fn get_env_list(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
fn cb(r: &mut SlaveRes) {
info_omit!(resp_cb_merge::<RespGetEnvList>(r));
}
let req = serde_json::from_slice::<Req<&str>>(&request).c(d!())?;
let addr_set = CFG.server_addr_set.clone();
fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
}
fn get_env_info(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqGetEnvInfo,
}
fn cb(r: &mut SlaveRes) {
info_omit!(resp_cb_merge::<RespGetEnvInfo>(r));
}
let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
let addr_set = {
let lk = ENV_MAP.read();
let res = req
.msg
.env_set
.iter()
.filter_map(|env_id| lk.get(env_id))
.flatten()
.copied()
.collect::<Vec<_>>();
if res.is_empty() {
let msg: HashMap<String, RespGetEnvInfo> = map! {};
return send_ok!(req.uuid, msg, peeraddr);
} else {
res
}
};
fwd_to_slave!(@@ops_id, req, peeraddr, cb, &addr_set)
}
fn update_env_kick_vm(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqUpdateEnvKickVm,
}
fwd_to_slave!(ops_id, request, MyReq, peeraddr)
}
fn update_env_lifetime(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqUpdateEnvLife,
}
let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
if let Some(set) = ENV_MAP.read().get(&req.msg.env_id) {
fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, set)
} else {
send_err!(req.uuid, eg!("ENV not exists!"), peeraddr)
}
}
fn del_env(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqDelEnv,
}
let req = serde_json::from_slice::<MyReq>(&request).c(d!())?;
let addr_set = if let Some(set) = ENV_MAP.write().remove(&req.msg.env_id) {
set
} else {
return send_ok!(req.uuid, "Success!", peeraddr);
};
fwd_to_slave!(@@ops_id, req, peeraddr, resp_cb_simple, &addr_set)
}
#[inline(always)]
fn get_env_list_all(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
get_env_list(ops_id, peeraddr, request).c(d!())
}
fn stop_env(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqStopEnv,
}
fwd_to_slave!(ops_id, request, MyReq, peeraddr)
}
fn start_env(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqStartEnv,
}
fwd_to_slave!(ops_id, request, MyReq, peeraddr)
}
fn update_env_resource(
ops_id: usize,
peeraddr: SocketAddr,
request: Vec<u8>,
) -> Result<()> {
#[derive(Deserialize)]
struct MyReq {
uuid: u64,
cli_id: Option<CliId>,
msg: ReqUpdateEnvResource,
}
fwd_to_slave!(ops_id, request, MyReq, peeraddr)
}
fn gen_proxy_uuid() -> u64 {
lazy_static! {
static ref UUID: AtomicU64 = AtomicU64::new(9999);
}
UUID.fetch_add(1, Ordering::Relaxed)
}
fn resp_cb_simple(r: &mut SlaveRes) {
if 0 < r.num_to_wait {
send_err!(
r.uuid,
eg!("Not all slave-server[s] reponsed!"),
r.peeraddr
)
.unwrap_or_else(|e| p(e));
} else if r.msg.values().any(|v| v.status == RetStatus::Fail) {
let msg = r
.msg
.values()
.filter(|v| v.status == RetStatus::Fail)
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(" ;; ");
send_err!(r.uuid, eg!(msg), r.peeraddr).unwrap_or_else(|e| p(e));
} else {
send_ok!(r.uuid, "Success!", r.peeraddr).unwrap_or_else(|e| p(e));
}
}
fn resp_cb_merge<'a, T: Serialize + Deserialize<'a>>(
r: &'a mut SlaveRes,
) -> Result<()> {
if 0 < r.num_to_wait {
p(eg!("Not all slave-server[s] reponsed!"));
}
let res = r
.msg
.iter()
.filter(|(_, raw)| raw.status == RetStatus::Success)
.filter_map(|(slave, raw)| {
info!(serde_json::from_slice::<HashMap<ServerAddr, T>>(&raw.msg))
.ok()
.and_then(|resp| resp.into_iter().next())
.map(|resp| (slave.to_string(), resp.1))
})
.collect::<HashMap<_, _>>();
send_ok!(r.uuid, res, r.peeraddr)
}
fn send_req_to_slave<T: Serialize>(
ops_id: usize,
req: Req<T>,
slave_set: &[SocketAddr],
) -> Result<()> {
let mut req_bytes = serde_json::to_vec(&req).c(d!())?;
let mut body =
format!("{id:>0width$}", id = ops_id, width = OPS_ID_LEN).into_bytes();
body.append(&mut req_bytes);
macro_rules! do_send {
($body: expr, $slave: expr) => {
task::spawn(async move {
info_omit!(SOCK_MID.send_to(&$body, $slave).await);
});
};
}
if 1 < slave_set.len() {
for slave in slave_set.iter().skip(1).copied() {
let b = body.clone();
do_send!(b, slave);
}
} else if slave_set.is_empty() {
return Ok(());
}
let first_slave = slave_set[0];
do_send!(body, first_slave);
Ok(())
}
fn register_resp_hdr(
num_to_wait: usize,
cb: fn(&mut SlaveRes),
peeraddr: SocketAddr,
orig_uuid: UUID,
proxy_uuid: UUID,
) {
let ts = ts!();
let idx = ts as usize % TIMEOUT_SECS;
let sr = SlaveRes {
msg: map! {},
num_to_wait,
start_ts: ts,
do_resp: cb,
peeraddr,
uuid: orig_uuid,
};
let mut proxy = PROXY.lock();
proxy.idx_map.insert(proxy_uuid, idx);
if ts != proxy.buckets[idx].ts {
mem::take(&mut proxy.buckets[idx]).res.keys().for_each(|k| {
proxy.idx_map.remove(k);
});
}
proxy.buckets[idx].ts = ts;
proxy.buckets[idx].res.insert(proxy_uuid, sr);
}