use crate::{nat, vm};
use lazy_static::lazy_static;
use myutil::{err::*, *};
use parking_lot::{Mutex, RwLock};
pub(crate) use ppcore_def::*;
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::atomic::{AtomicI32, AtomicU16, Ordering},
sync::{Arc, Weak},
};
const MAX_LIFE_TIME: u64 = 6 * 3600;
pub type OsName = String;
pub type ImagePath = String;
#[derive(Debug, Default)]
pub struct Serv {
cli: Arc<RwLock<HashMap<CliId, HashMap<EnvId, Env>>>>,
env_id_inuse: Arc<Mutex<HashSet<EnvId>>>,
vm_id_inuse: Arc<Mutex<HashSet<VmId>>>,
pub_port_inuse: Arc<Mutex<HashSet<PubPort>>>,
resource: Arc<RwLock<Resource>>,
}
impl Serv {
#[inline(always)]
pub fn new() -> Serv {
Serv::default()
}
#[inline(always)]
pub fn set_resource(&self, rsc: Resource) {
*self.resource.write() =
Resource::new(rsc.cpu_total, rsc.mem_total, rsc.disk_total);
}
#[inline(always)]
pub fn get_resource(&self) -> Resource {
*self.resource.read()
}
pub fn clean_expired_env(&self) {
let ts = ts!();
let cli = self.cli.read();
let expired = cli
.iter()
.map(|(cli_id, env)| {
env.iter()
.filter(|(_, v)| v.end_timestamp < ts)
.map(move |(k, _)| (cli_id.clone(), k.clone()))
})
.flatten()
.collect::<Vec<_>>();
if !expired.is_empty() {
drop(cli);
let mut cli = self.cli.write();
expired.iter().for_each(|(cli_id, k)| {
cli.get_mut(cli_id.as_str())
.map(|env_set| env_set.remove(k));
});
}
vm::zobmie_clean();
}
#[inline(always)]
pub fn add_client(&self, id: CliId) -> Result<()> {
let mut cli = self.cli.write();
if cli.get(&id).is_some() {
Err(eg!("Already exists!"))
} else {
cli.insert(id, map! {});
Ok(())
}
}
#[inline(always)]
pub fn del_client(&self, id: &CliIdRef) {
self.cli.write().remove(id);
}
#[inline(always)]
pub fn register_env(&self, id: CliId, env: Env) -> Result<()> {
let mut cli = self.cli.write();
let env_set = cli.entry(id).or_insert(map! {});
if env_set.get(&env.id).is_some() {
Err(eg!("Env already exists!"))
} else {
env_set.insert(env.id.clone(), env);
Ok(())
}
}
#[inline(always)]
pub fn del_env(&self, cli_id: &CliIdRef, env_id: &EnvIdRef) {
if let Some(env_set) = self.cli.write().get_mut(cli_id) {
env_set.remove(env_id);
}
}
#[inline(always)]
pub fn get_env_meta(&self, cli_id: &CliIdRef) -> Vec<EnvMeta> {
let get = |env: &HashMap<EnvId, Env>| {
env.values().map(|i| i.as_meta()).collect::<Vec<_>>()
};
self.cli.read().get(cli_id).map(get).unwrap_or_default()
}
#[inline(always)]
pub fn get_env_meta_all(&self) -> Vec<EnvMeta> {
self.cli
.read()
.values()
.map(|env| env.values().map(|i| i.as_meta()))
.flatten()
.collect::<Vec<_>>()
}
#[inline(always)]
pub fn get_env_detail(
&self,
cli_id: &CliIdRef,
env_set: Vec<EnvId>,
) -> Vec<EnvInfo> {
let get = |env: &HashMap<EnvId, Env>| {
env.values()
.filter(|v| env_set.iter().any(|vid| vid == &v.id))
.map(|env| env.as_info())
.collect::<Vec<_>>()
};
self.cli.read().get(cli_id).map(get).unwrap_or_default()
}
#[inline(always)]
pub fn update_env_life(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
lifetime: u64,
is_fucker: bool,
) -> Result<()> {
let mut cli = self.cli.write();
if let Some(env_set) = cli.get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
env.update_life(lifetime, is_fucker).c(d!())
} else {
Err(eg!("Env NOT exists!"))
}
} else {
Err(eg!("Client NOT exists!"))
}
}
#[inline(always)]
pub fn update_env_del_vm(
&self,
cli_id: &CliIdRef,
env_id: &EnvIdRef,
vmid_set: &[VmId],
) -> Result<()> {
let mut cli = self.cli.write();
if let Some(env_set) = cli.get_mut(cli_id) {
if let Some(env) = env_set.get_mut(env_id) {
vmid_set.iter().for_each(|id| {
env.vm.remove(id);
});
Ok(())
} else {
Err(eg!("Env NOT exists!"))
}
} else {
Err(eg!("Client NOT exists!"))
}
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Resource {
pub vm_active: u32,
pub cpu_total: u64,
pub cpu_used: u32,
pub mem_total: u64,
pub mem_used: u32,
pub disk_total: u64,
pub disk_used: u32,
}
impl Resource {
#[inline(always)]
pub fn new(cpu_total: u64, mem_total: u64, disk_total: u64) -> Resource {
let mut rsc = Resource::default();
rsc.cpu_total = cpu_total;
rsc.mem_total = mem_total;
rsc.disk_total = disk_total;
rsc
}
}
#[derive(Clone, Debug)]
pub struct Env {
id: EnvId,
start_timestamp: u64,
end_timestamp: u64,
vm: HashMap<VmId, Vm>,
serv_belong_to: Weak<Serv>,
}
impl Env {
#[inline(always)]
fn as_meta(&self) -> EnvMeta {
EnvMeta {
id: self.id.clone(),
start_timestamp: self.start_timestamp,
end_timestamp: self.end_timestamp,
vm_cnt: self.vm.len(),
}
}
#[inline(always)]
fn as_info(&self) -> EnvInfo {
EnvInfo {
id: self.id.clone(),
start_timestamp: self.start_timestamp,
end_timestamp: self.end_timestamp,
vm: self.vm.iter().map(|(&k, v)| (k, v.as_info())).collect(),
}
}
pub fn new(serv: &Arc<Serv>, id: &EnvIdRef) -> Result<Env> {
let mut inuse = serv.env_id_inuse.lock();
if inuse.get(id).is_none() {
inuse.insert(id.to_owned());
drop(inuse);
} else {
return Err(eg!("Already exists!"));
}
Ok(Env {
id: id.to_owned(),
vm: HashMap::new(),
start_timestamp: ts!(),
end_timestamp: 3600 + ts!(),
serv_belong_to: Arc::downgrade(serv),
})
}
#[inline(always)]
pub fn update_life(&mut self, secs: u64, is_fucker: bool) -> Result<()> {
if MAX_LIFE_TIME < secs && !is_fucker {
return Err(eg!("Life time too long!"));
}
self.end_timestamp = self.start_timestamp + secs;
Ok(())
}
#[inline(always)]
pub fn add_vm_set(&mut self, cfg_set: Vec<VmCfg>) -> Result<()> {
let mut vm = vct![];
self.check_resource(&cfg_set).c(d!())?;
for cfg in cfg_set.into_iter() {
vm.push(Vm::create(&self.serv_belong_to, cfg)?);
}
vm.into_iter().for_each(|vm| {
self.vm.insert(vm.id(), vm);
});
Ok(())
}
fn check_resource(&self, cfg_set: &[VmCfg]) -> Result<()> {
if let Some(s) = self.serv_belong_to.upgrade() {
let rsc;
{
rsc = *s.resource.read();
}
let (cpu, mem, disk) =
cfg_set.iter().fold((0u64, 0, 0), |mut b, vm| {
b.0 += vm.cpu_num.unwrap_or(CPU_DEFAULT) as u64;
b.1 += vm.mem_size.unwrap_or(MEM_DEFAULT) as u64;
b.2 += vm.disk_size.unwrap_or(DISK_DEFAULT) as u64;
b
});
if rsc.cpu_used as u64 + cpu > rsc.cpu_total {
return Err(eg!(format!(
"CPU resource busy: total {}, used {}, you want: {}",
rsc.cpu_total, rsc.cpu_used, cpu
)));
}
if rsc.mem_used as u64 + mem > rsc.mem_total {
return Err(eg!(format!(
"Memory resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.mem_total, rsc.mem_used, mem
)));
}
if rsc.disk_used as u64 + disk > rsc.disk_total {
return Err(eg!(format!(
"Disk resource busy: total {} MB, used {} MB, you want: {} MB",
rsc.disk_total, rsc.disk_used, disk
)));
}
} else {
return Err(eg!("The fucking world is OVER!"));
}
Ok(())
}
}
impl Drop for Env {
fn drop(&mut self) {
if let Some(s) = self.serv_belong_to.upgrade() {
s.env_id_inuse.lock().remove(&self.id);
}
}
}
#[derive(Clone, Debug)]
pub struct VmCfg {
pub image_path: String,
pub port_list: Vec<VmPort>,
pub kind: Option<VmKind>,
pub cpu_num: Option<u32>,
pub mem_size: Option<u32>,
pub disk_size: Option<u32>,
}
#[derive(Clone, Debug)]
pub struct Vm {
pub(crate) image_path: PathBuf,
pub kind: VmKind,
pub cpu_num: u32,
pub mem_size: u32,
pub disk_size: u32,
serv_belong_to: Weak<Serv>,
pub(crate) id: VmId,
pub ip: Ipv4,
pub port_map: HashMap<VmPort, PubPort>,
}
impl Vm {
#[inline(always)]
pub(crate) fn as_info(&self) -> VmInfo {
VmInfo {
os: self
.image_path
.file_name()
.map(|f| f.to_str())
.flatten()
.unwrap_or_default()
.trim_end_matches(".qcow2")
.to_owned(),
kind: self.kind,
cpu_num: self.cpu_num,
mem_size: self.mem_size,
disk_size: self.disk_size,
ip: self.ip.clone(),
port_map: self.port_map.clone(),
}
}
pub(crate) fn create(serv: &Weak<Serv>, cfg: VmCfg) -> Result<Vm> {
let cpu_num = cfg.cpu_num.unwrap_or(CPU_DEFAULT);
let mem_size = cfg.mem_size.unwrap_or(MEM_DEFAULT);
let disk_size = cfg.disk_size.unwrap_or(DISK_DEFAULT);
let mut res = Vm {
image_path: PathBuf::from(cfg.image_path),
kind: cfg.kind.unwrap_or_default(),
cpu_num,
mem_size,
disk_size,
serv_belong_to: Weak::clone(serv),
id: -1,
ip: Ipv4::default(),
port_map: cfg.port_list.into_iter().fold(
HashMap::new(),
|mut acc, new| {
acc.insert(new, 0);
acc
},
),
};
let cnt_it = |s: &Serv| {
let mut rsc = s.resource.write();
rsc.vm_active += 1;
rsc.cpu_used += cpu_num;
rsc.mem_used += mem_size;
rsc.disk_used += disk_size;
};
if let Some(s) = serv.upgrade() {
cnt_it(&s);
res.alloc_resource(&s).c(d!()).map(|_| res)
} else {
Err(eg!())
}
}
#[inline(always)]
fn alloc_resource(&mut self, serv: &Arc<Serv>) -> Result<()> {
self.alloc_id(&serv)
.c(d!())
.map(|id| self.ip = Self::gen_ip(id))
.and_then(|_| self.alloc_pub_port(&serv).c(d!()))
.and_then(|_| nat::set_rule(&self.port_map, &self.ip).c(d!()))
.and_then(|_| self.start_vm().c(d!()))
}
#[inline(always)]
fn start_vm(&self) -> Result<()> {
vm::start(self).c(d!())
}
#[inline(always)]
fn alloc_id(&mut self, serv: &Arc<Serv>) -> Result<VmId> {
const VM_ID_LIMIT: i32 = 0xffff;
lazy_static! {
static ref VM_ID: AtomicI32 = AtomicI32::new(0);
}
let vm_id = {
let mut cnter = 0;
let mut vmid_inuse = serv.vm_id_inuse.lock();
loop {
let id = VM_ID.fetch_add(1, Ordering::Relaxed) % VM_ID_LIMIT;
if vmid_inuse.get(&id).is_none() {
vmid_inuse.insert(id);
self.id = id;
break id;
}
cnter += 1;
if VM_ID_LIMIT < cnter {
return Err(eg!("The fucking world is over!!!"));
}
}
};
Ok(vm_id)
}
#[inline(always)]
fn gen_ip(vm_id: VmId) -> Ipv4 {
Ipv4::new(format!("10.10.{}.{}", vm_id / 256, vm_id % 256))
}
fn alloc_pub_port(&mut self, serv: &Arc<Serv>) -> Result<()> {
const PUB_PORT_LIMIT: u16 = 20000;
lazy_static! {
static ref PUB_PORT: AtomicU16 = AtomicU16::new(40000);
}
let mut cnter = 0;
let mut v_cnter = self.port_map.len();
let mut buf = vct![];
while 0 < v_cnter {
let mut port_inuse = serv.pub_port_inuse.lock();
let port = PUB_PORT.fetch_add(1, Ordering::Relaxed);
if port_inuse.get(&port).is_none() {
port_inuse.insert(port);
buf.push(port);
v_cnter -= 1;
}
cnter += 1;
if PUB_PORT_LIMIT < cnter {
return Err(eg!("The fucking world is over!!!"));
}
}
self.port_map.values_mut().zip(buf.into_iter()).for_each(
|(p, port)| {
*p = port;
},
);
Ok(())
}
#[inline(always)]
pub fn id(&self) -> VmId {
self.id
}
}
impl Drop for Vm {
fn drop(&mut self) {
if let Some(s) = self.serv_belong_to.upgrade() {
s.vm_id_inuse.lock().remove(&self.id);
{
let mut rsc = s.resource.write();
rsc.vm_active -= 1;
rsc.cpu_used -= self.cpu_num;
rsc.mem_used -= self.mem_size;
rsc.disk_used -= self.disk_size;
}
if !self.port_map.is_empty() {
let mut pub_port = vct![];
let mut inuse = s.pub_port_inuse.lock();
self.port_map.values().for_each(|port| {
inuse.remove(port);
pub_port.push(*port);
});
info_omit!(nat::clean_rule(&pub_port));
}
}
vm::post_clean(self);
}
}