#[cfg(not(feature = "testmock"))]
use crate::vmimg_path;
use crate::{nat, pause, resume, vm};
use lazy_static::lazy_static;
use myutil::{err::*, *};
use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::atomic::{AtomicI32, AtomicU16, Ordering},
sync::{Arc, Weak},
};
#[cfg(not(feature = "testmock"))]
use std::{thread, time};
pub(crate) use ttcore_def::*;
const MAX_LIFE_TIME: u64 = 6 * 3600;
const MIN_START_STOP_ITV: u64 = 20;
pub type OsName = String;
pub type ImagePath = String;
#[derive(Debug, Default)]
pub struct Serv {
cli: Arc<RwLock<HashMap<CliId, HashMap<EnvId, Env>>>>,
env_id_inuse: Arc<Mutex<HashSet<EnvId>>>,
vm_id_inuse: Arc<Mutex<HashSet<VmId>>>,
pub_port_inuse: Arc<Mutex<HashSet<PubPort>>>,
resource: Arc<RwLock<Resource>>,
}
impl Serv {
#[inline(always)]
pub fn new() -> Serv {
Serv::default()
}
#[inline(always)]
pub fn set_resource(&self, rsc: Resource) {
*self.resource.write() =
Resource::new(rsc.cpu_total, rsc.mem_total, rsc.disk_total);
}
#[inline(always)]
pub fn get_resource(&self) -> Resource {
*self.resource.read()
}
pub fn clean_expired_env(&self) {
let ts = ts!();
let cli = self.cli.read();
let expired = cli
.iter()
.map(|(cli_id, env)| {
env.iter()
.filter(|(_, v)| v.end_timestamp < ts)
.map(move |(k, _)| (cli_id.clone(), k.clone()))
})
.flatten()
.collect::<Vec<_>>();
if !expired.is_empty() {
drop(cli);
let mut cli = self.cli.write();
expired.iter().for_each(|(cli_id, k)| {
cli.get_mut(cli_id.as_str())
.map(|env_set| env_set.remove(k));
});
}
vm::zobmie_clean();
}
#[inline(always)]
pub fn add_client(&self, id: CliId) -> Result<()> {
let mut cli = self.cli.write();
if cli.get(&id).is_some() {
Err(eg!("Client already exists!"))
} else {
cli.insert(id, map! {});
Ok(())
}
}
#[inline(always)]
pub fn del_client(&self, id: &CliIdRef) {
self.cli.write().remove(id);
}
#[inline(always)]
pub fn register_env(&self, id: CliId, env: Env) -> Result<()> {
let mut cli = self.cli.write();
let env_set = cli.entry(id).or_insert(map! {});
if env_set.get(&env.id).is_some() {
Err(eg!("Env already exists!"))
} else {
env_set.insert(env.id.clone(), env);
Ok(())
}
}
#[inline(always)]
pub fn del_env(&self, cli_id: &CliIdRef, env_id: &EnvIdRef) {
if let Some(env_set) = self.cli.write().get_mut(cli_id) {
env_set.remove(env_id);
}
}
pub fn stop_env(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
) -> Result<()> {
if let Some(env_set) = self.cli.write().get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
let ts = ts!();
if env.last_mgmt_ts + MIN_START_STOP_ITV > ts {
return Err(eg!(
"start/stop too frequency! wait 20 seconds, and try again."
));
}
env.last_mgmt_ts = ts;
for vm in env.vm.values_mut() {
pause(vm.id()).c(d!()).map(|_| {
let mut rsc = self.resource.write();
rsc.vm_active -= 1;
rsc.cpu_used -= vm.cpu_num;
rsc.mem_used -= vm.mem_size;
rsc.disk_used -= vm.disk_size;
vm.during_stop = true;
})?;
}
env.is_stopped = true;
}
}
Ok(())
}
pub fn start_env(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
) -> Result<()> {
if let Some(env_set) = self.cli.write().get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
let ts = ts!();
if env.last_mgmt_ts + MIN_START_STOP_ITV > ts {
return Err(eg!(
"start/stop too frequency! wait 20 seconds, and try again."
));
}
env.last_mgmt_ts = ts;
for vm in env.vm.values_mut() {
resume(vm).c(d!()).map(|_| {
let mut rsc = self.resource.write();
rsc.vm_active += 1;
rsc.cpu_used += vm.cpu_num;
rsc.mem_used += vm.mem_size;
rsc.disk_used += vm.disk_size;
vm.during_stop = false;
})?;
}
env.is_stopped = false;
}
}
Ok(())
}
#[inline(always)]
pub fn get_env_meta(&self, cli_id: &CliIdRef) -> Vec<EnvMeta> {
let get = |env: &HashMap<EnvId, Env>| {
env.values().map(|i| i.as_meta()).collect::<Vec<_>>()
};
self.cli.read().get(cli_id).map(get).unwrap_or_default()
}
#[inline(always)]
pub fn get_env_meta_all(&self) -> Vec<EnvMeta> {
self.cli
.read()
.values()
.map(|env| env.values().map(|i| i.as_meta()))
.flatten()
.collect::<Vec<_>>()
}
#[inline(always)]
pub fn get_env_detail(
&self,
cli_id: &CliIdRef,
env_set: Vec<EnvId>,
) -> Vec<EnvInfo> {
let get = |env: &HashMap<EnvId, Env>| {
env.values()
.filter(|v| env_set.iter().any(|vid| vid == &v.id))
.map(|env| env.as_info())
.collect::<Vec<_>>()
};
self.cli.read().get(cli_id).map(get).unwrap_or_default()
}
#[inline(always)]
pub fn update_env_life(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
lifetime: u64,
is_fucker: bool,
) -> Result<()> {
let mut cli = self.cli.write();
if let Some(env_set) = cli.get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
env.update_life(lifetime, is_fucker).c(d!())
} else {
Err(eg!("Env NOT exists!"))
}
} else {
Err(eg!("Client NOT exists!"))
}
}
#[inline(always)]
pub fn update_env_del_vm(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
vmid_set: &[VmId],
) -> Result<()> {
let mut cli = self.cli.write();
if let Some(env_set) = cli.get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
vmid_set.iter().for_each(|id| {
env.vm.remove(id);
});
Ok(())
} else {
Err(eg!("Env NOT exists!"))
}
} else {
Err(eg!("Client NOT exists!"))
}
}
#[inline(always)]
pub fn update_env_hardware(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
cpu_mem_disk: (Option<u32>, Option<u32>, Option<u32>),
vm_port: &[Port],
deny_outgoing: Option<bool>,
) -> Result<()> {
let mut cli = self.cli.write();
if let Some(env_set) = cli.get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
let (cpu_num, mem_size, disk_size) = cpu_mem_disk;
env.update_hardware(
cpu_num,
mem_size,
disk_size,
vm_port,
deny_outgoing,
)
.c(d!())
} else {
Err(eg!("Env NOT exists!"))
}
} else {
Err(eg!("Client NOT exists!"))
}
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Resource {
pub vm_active: u32,
pub cpu_total: u64,
pub cpu_used: u32,
pub mem_total: u64,
pub mem_used: u32,
pub disk_total: u64,
pub disk_used: u32,
}
impl Resource {
#[inline(always)]
pub fn new(cpu_total: u64, mem_total: u64, disk_total: u64) -> Resource {
let mut rsc = Resource::default();
rsc.cpu_total = cpu_total;
rsc.mem_total = mem_total;
rsc.disk_total = disk_total;
rsc
}
}
#[derive(Clone, Debug)]
pub struct Env {
id: EnvId,
start_timestamp: u64,
end_timestamp: u64,
is_stopped: bool,
outgoing_denied: bool,
last_mgmt_ts: u64,
vm: HashMap<VmId, Vm>,
serv_belong_to: Weak<Serv>,
}
impl Env {
#[inline(always)]
fn as_meta(&self) -> EnvMeta {
EnvMeta {
id: self.id.clone(),
start_timestamp: self.start_timestamp,
end_timestamp: self.end_timestamp,
vm_cnt: self.vm.len(),
is_stopped: self.is_stopped,
}
}
#[inline(always)]
fn as_info(&self) -> EnvInfo {
EnvInfo {
id: self.id.clone(),
start_timestamp: self.start_timestamp,
end_timestamp: self.end_timestamp,
vm: self.vm.iter().map(|(&k, v)| (k, v.as_info())).collect(),
is_stopped: self.is_stopped,
}
}
pub fn new(serv: &Arc<Serv>, id: &EnvIdRef) -> Result<Env> {
let mut inuse = serv.env_id_inuse.lock();
if inuse.get(id).is_none() {
inuse.insert(id.to_owned());
drop(inuse);
} else {
return Err(eg!("Env already exists!"));
}
Ok(Env {
id: id.to_owned(),
vm: HashMap::new(),
start_timestamp: ts!(),
end_timestamp: 3600 + ts!(),
last_mgmt_ts: 0,
is_stopped: false,
outgoing_denied: false,
serv_belong_to: Arc::downgrade(serv),
})
}
#[inline(always)]
pub fn update_life(&mut self, secs: u64, is_fucker: bool) -> Result<()> {
if MAX_LIFE_TIME < secs && !is_fucker {
return Err(eg!("Life time too long!"));
}
self.end_timestamp = self.start_timestamp + secs;
Ok(())
}
#[inline(always)]
pub fn update_hardware(
&mut self,
cpu_num: Option<u32>,
mem_size: Option<u32>,
disk_size: Option<u32>,
vm_port: &[Port],
deny_outgoing: Option<bool>,
) -> Result<()> {
if cpu_num.and(mem_size).and(disk_size).is_some() {
if !self.is_stopped {
return Err(eg!(
"ENV must be stopped before updating it's hardware[s]."
));
}
let (cpu_new, mem_new, disk_new) =
if let Some(vm) = self.vm.values().next() {
(
cpu_num.unwrap_or(vm.cpu_num),
mem_size.unwrap_or(vm.mem_size),
disk_size.unwrap_or(vm.disk_size),
)
} else {
return Ok(());
};
self.check_resource_and_set((cpu_new, mem_new, disk_new))
.c(d!())?;
self.vm.values_mut().for_each(|vm| {
vm.cpu_num = cpu_new;
vm.mem_size = mem_new;
vm.disk_size = disk_new;
});
}
if !vm_port.is_empty() {
let mut port = vm_port.to_vec();
if let Some(s) = self.serv_belong_to.upgrade() {
{
let mut inuse = s.pub_port_inuse.lock();
let vm_set =
self.vm.values().fold(vct![], |mut base, vm| {
vm.port_map.values().for_each(|port| {
inuse.remove(port);
base.push(vm);
});
base
});
nat::clean_rule(vm_set.as_slice()).c(d!())?;
}
port.push(SSH_PORT);
port.push(TTREXEC_PORT);
port.sort_unstable();
port.dedup();
for vm in self.vm.values_mut() {
vm.port_map = port.iter().map(|p| (*p, 0u16)).collect();
vm.alloc_pub_port(&s)
.c(d!())
.and_then(|_| nat::set_rule(vm).c(d!()))?;
}
} else {
return Err(eg!("The fucking world is over!"));
}
}
if let Some(deny) = deny_outgoing {
let vm_set = self.vm.values().collect::<Vec<_>>();
if deny && !self.outgoing_denied {
nat::deny_outgoing(vm_set.as_slice()).c(d!())?;
self.outgoing_denied = true;
} else if !deny && self.outgoing_denied {
nat::allow_outgoing(vm_set.as_slice()).c(d!())?;
self.outgoing_denied = false;
}
}
Ok(())
}
#[inline(always)]
pub fn add_vm_set(&mut self, cfg_set: Vec<VmCfg>) -> Result<()> {
let mut vm = vct![];
self.check_resource(&cfg_set).c(d!())?;
for cfg in cfg_set.into_iter() {
vm.push(Vm::create_meta(&self.serv_belong_to, cfg)?);
}
Self::check_image(&vm).c(d!())?;
for vm in vm.iter() {
vm.start_vm().c(d!())?;
}
vm.into_iter().for_each(|vm| {
self.vm.insert(vm.id(), vm);
});
Ok(())
}
#[cfg(not(feature = "testmock"))]
fn check_image(vm: &[Vm]) -> Result<()> {
let mut cnter = 0;
let path_set = vm.iter().map(|i| vmimg_path(i)).collect::<Vec<_>>();
let mut timeout = (path_set.len() * 100) as u64;
alt!(2000 > timeout, timeout = 2000);
let timeout_unit = 200;
let nr_limit = timeout / timeout_unit;
while path_set
.iter()
.map(|i| i.canonicalize())
.any(|i| i.is_err())
{
if nr_limit < cnter {
return Err(
eg!(@path_set.into_iter().filter(|i| i.canonicalize().is_err()).collect::<Vec<_>>()),
);
}
cnter += 1;
thread::sleep(time::Duration::from_millis(timeout_unit));
}
Ok(())
}
#[cfg(feature = "testmock")]
fn check_image(_vm: &[Vm]) -> Result<()> {
Ok(())
}
fn check_resource(&self, cfg_set: &[VmCfg]) -> Result<()> {
if let Some(s) = self.serv_belong_to.upgrade() {
let rsc;
{
rsc = *s.resource.read();
}
let (cpu, mem, disk) =
cfg_set.iter().fold((0u64, 0, 0), |mut b, vm| {
b.0 += vm.cpu_num.unwrap_or(CPU_DEFAULT) as u64;
b.1 += vm.mem_size.unwrap_or(MEM_DEFAULT) as u64;
b.2 += vm.disk_size.unwrap_or(DISK_DEFAULT) as u64;
b
});
if rsc.cpu_used as u64 + cpu > rsc.cpu_total {
return Err(eg!(format!(
"CPU resource busy: total {}, used {}, you want: {}",
rsc.cpu_total, rsc.cpu_used, cpu
)));
}
if rsc.mem_used as u64 + mem > rsc.mem_total {
return Err(eg!(format!(
"Memory resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.mem_total, rsc.mem_used, mem
)));
}
if rsc.disk_used as u64 + disk > rsc.disk_total {
return Err(eg!(format!(
"Disk resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.disk_total, rsc.disk_used, disk
)));
}
} else {
return Err(eg!("The fucking world is OVER!"));
}
Ok(())
}
fn check_resource_and_set(&self, cfg: (u32, u32, u32)) -> Result<()> {
if let Some(s) = self.serv_belong_to.upgrade() {
let rsc;
{
rsc = *s.resource.read();
}
let vm_num = self.vm.len() as u64;
let (cpu, mem, disk) =
self.vm.values().fold((0u64, 0, 0), |mut b, vm| {
b.0 += vm.cpu_num as u64;
b.1 += vm.mem_size as u64;
b.2 += vm.disk_size as u64;
b
});
let (cpu_new, mem_new, disk_new) = (
cfg.0 as u64 * vm_num,
cfg.1 as u64 * vm_num,
cfg.2 as u64 * vm_num,
);
if cpu_new > cpu
&& rsc.cpu_used as u64 + cpu_new - cpu > rsc.cpu_total
{
return Err(eg!(format!(
"CPU resource busy: total {}, used {}, you want: {}",
rsc.cpu_total, rsc.cpu_used, cpu_new
)));
}
if mem_new > mem
&& rsc.mem_used as u64 + mem_new - mem > rsc.mem_total
{
return Err(eg!(format!(
"Memory resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.mem_total, rsc.mem_used, mem_new
)));
}
if disk_new > disk
&& rsc.disk_used as u64 + disk_new - disk > rsc.disk_total
{
return Err(eg!(format!(
"Disk resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.disk_total, rsc.disk_used, disk_new
)));
}
let mut r = s.resource.write();
r.cpu_used =
r.cpu_used + (cpu_new / vm_num) as u32 - (cpu / vm_num) as u32;
r.mem_used =
r.mem_used + (mem_new / vm_num) as u32 - (mem / vm_num) as u32;
r.disk_used = r.disk_used + (disk_new / vm_num) as u32
- (disk / vm_num) as u32;
} else {
return Err(eg!("The fucking world is OVER!"));
}
Ok(())
}
}
impl Drop for Env {
fn drop(&mut self) {
if let Some(s) = self.serv_belong_to.upgrade() {
s.env_id_inuse.lock().remove(&self.id);
}
}
}
#[derive(Clone, Debug)]
pub struct VmCfg {
pub image_path: String,
pub port_list: Vec<VmPort>,
pub kind: VmKind,
pub cpu_num: Option<u32>,
pub mem_size: Option<u32>,
pub disk_size: Option<u32>,
}
#[derive(Clone, Debug)]
pub struct Vm {
pub(crate) image_path: PathBuf,
pub kind: VmKind,
pub cpu_num: u32,
pub mem_size: u32,
pub disk_size: u32,
serv_belong_to: Weak<Serv>,
pub(crate) id: VmId,
pub ip: Ipv4,
pub port_map: HashMap<VmPort, PubPort>,
pub during_stop: bool,
}
impl Vm {
#[inline(always)]
pub(crate) fn as_info(&self) -> VmInfo {
VmInfo {
os: self
.image_path
.file_name()
.map(|f| f.to_str())
.flatten()
.unwrap_or("Unknown")
.to_owned(),
cpu_num: self.cpu_num,
mem_size: self.mem_size,
disk_size: self.disk_size,
ip: self.ip.clone(),
port_map: self.port_map.clone(),
}
}
pub(crate) fn create_meta(serv: &Weak<Serv>, cfg: VmCfg) -> Result<Vm> {
let cpu_num = cfg.cpu_num.unwrap_or(CPU_DEFAULT);
let mem_size = cfg.mem_size.unwrap_or(MEM_DEFAULT);
let disk_size = cfg.disk_size.unwrap_or(DISK_DEFAULT);
let mut res = Vm {
image_path: PathBuf::from(cfg.image_path),
kind: cfg.kind,
cpu_num,
mem_size,
disk_size,
serv_belong_to: Weak::clone(serv),
id: -1,
ip: Ipv4::default(),
port_map: cfg.port_list.into_iter().fold(
HashMap::new(),
|mut acc, new| {
acc.insert(new, 0);
acc
},
),
during_stop: false,
};
let cnt_it = |s: &Serv| {
let mut rsc = s.resource.write();
rsc.vm_active += 1;
rsc.cpu_used += cpu_num;
rsc.mem_used += mem_size;
rsc.disk_used += disk_size;
};
if let Some(s) = serv.upgrade() {
cnt_it(&s);
res.alloc_resource(&s).c(d!()).map(|_| res)
} else {
Err(eg!())
}
}
#[inline(always)]
fn alloc_resource(&mut self, serv: &Arc<Serv>) -> Result<()> {
self.alloc_id(&serv)
.c(d!())
.map(|id| self.ip = Self::gen_ip(id))
.and_then(|_| self.alloc_pub_port(&serv).c(d!()))
.and_then(|_| nat::set_rule(self).c(d!()))
.and_then(|_| self.pre_start().c(d!()))
}
#[inline(always)]
fn pre_start(&self) -> Result<()> {
vm::get_pre_starter(self)?(self).c(d!())
}
#[inline(always)]
fn start_vm(&self) -> Result<()> {
vm::start(self).c(d!())
}
#[inline(always)]
fn alloc_id(&mut self, serv: &Arc<Serv>) -> Result<VmId> {
const VM_ID_LIMIT: i32 = 0xffff;
lazy_static! {
static ref VM_ID: AtomicI32 = AtomicI32::new(0);
}
let vm_id = {
let mut cnter = 0;
let mut vmid_inuse = serv.vm_id_inuse.lock();
loop {
let id = VM_ID.fetch_add(1, Ordering::Relaxed) % VM_ID_LIMIT;
if vmid_inuse.get(&id).is_none() {
vmid_inuse.insert(id);
self.id = id;
break id;
}
cnter += 1;
if VM_ID_LIMIT < cnter {
return Err(eg!("The fucking world is over!!!"));
}
}
};
Ok(vm_id)
}
#[inline(always)]
fn gen_ip(vm_id: VmId) -> Ipv4 {
Ipv4::new(format!("10.10.{}.{}", vm_id / 256, vm_id % 256))
}
fn alloc_pub_port(&mut self, serv: &Arc<Serv>) -> Result<()> {
const PUB_PORT_LIMIT: u16 = 20000;
const PUB_PORT_BASE: u16 = 40000;
lazy_static! {
static ref PUB_PORT: AtomicU16 = AtomicU16::new(PUB_PORT_BASE);
}
let mut cnter = 0;
let mut v_cnter = self.port_map.len();
let mut buf = vct![];
while 0 < v_cnter {
let mut port_inuse = serv.pub_port_inuse.lock();
let port = PUB_PORT
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |x| {
Some(PUB_PORT_BASE + (1 + x) % PUB_PORT_LIMIT)
})
.map_err(|_| eg!(d!("The fucking world is over!!")))
.c(d!())?;
if port_inuse.get(&port).is_none() {
port_inuse.insert(port);
buf.push(port);
v_cnter -= 1;
}
cnter += 1;
if PUB_PORT_LIMIT < cnter {
return Err(eg!("The fucking world is over!!!"));
}
}
self.port_map.values_mut().zip(buf.into_iter()).for_each(
|(p, port)| {
*p = port;
},
);
Ok(())
}
#[inline(always)]
pub fn id(&self) -> VmId {
self.id
}
}
impl Drop for Vm {
fn drop(&mut self) {
if let Some(s) = self.serv_belong_to.upgrade() {
s.vm_id_inuse.lock().remove(&self.id);
if !self.during_stop {
let mut rsc = s.resource.write();
rsc.vm_active -= 1;
rsc.cpu_used -= self.cpu_num;
rsc.mem_used -= self.mem_size;
rsc.disk_used -= self.disk_size;
}
if !self.port_map.is_empty() {
let mut pub_port = vct![];
let mut inuse = s.pub_port_inuse.lock();
self.port_map.values().for_each(|port| {
inuse.remove(port);
pub_port.push(*port);
});
info_omit!(nat::clean_rule(&[self]));
}
}
vm::post_clean(self);
}
}