#![allow(clippy::module_name_repetitions)]
use ::kube::{
api::{Api, ListParams, PatchParams}, client::APIClient, config
};
use serde_json::json;
use std::{
collections::HashMap, env, net::{IpAddr, SocketAddr}, thread
};
use super::master;
use constellation_internal::{abort_on_unwind, Cpu, Mem, Pid, PidInternal};
pub fn kube_master(
master_bind: SocketAddr, fabric_port: u16, bridge_bind: SocketAddr, mem: Mem, cpu: Cpu,
replicas: u32,
) {
let config = config::incluster_config().expect("failed to load in-cluster kubeconfig");
let namespace = config.default_ns.clone();
let client = APIClient::new(config);
let jobs = Api::v1ReplicaSet(client.clone()).within(&namespace);
let replicas = replicas - 1;
let fs = json!({
"spec": { "replicas": replicas }
});
let _ = tokio::runtime::Runtime::new()
.unwrap()
.block_on(jobs.patch_scale(
"constellation",
&PatchParams::default(),
serde_json::to_vec(&fs).unwrap(),
))
.unwrap();
let pods = Api::v1Pod(client).within(&namespace);
let ips = loop {
let pods = tokio::runtime::Runtime::new()
.unwrap()
.block_on(pods.list(&ListParams {
label_selector: Some(format!("{}={}", "constellation", "node")),
..ListParams::default()
}))
.expect("failed to list pods")
.items;
let ips: Vec<IpAddr> = pods
.into_iter()
.filter_map(|pod| Some(pod.status?.pod_ip?.parse().unwrap()))
.collect();
if ips.len() == replicas as usize {
break ips;
}
std::thread::sleep(std::time::Duration::from_secs(2));
};
let _ = thread::Builder::new()
.name(String::from("master"))
.spawn(abort_on_unwind(move || {
std::thread::sleep(std::time::Duration::from_secs(10));
let master_addr = SocketAddr::new(
env::var("CONSTELLATION_IP").unwrap().parse().unwrap(),
master_bind.port(),
);
let mut nodes = ips
.into_iter()
.map(|ip| {
let fabric = SocketAddr::new(ip, master_bind.port());
let bridge = None;
(fabric, (bridge, mem, cpu))
})
.collect::<HashMap<_, _>>(); let _ = nodes.insert(
SocketAddr::new(master_addr.ip(), fabric_port),
(Some(bridge_bind), mem, cpu),
);
master::run(
SocketAddr::new(master_bind.ip(), master_addr.port()),
Pid::new(master_addr.ip(), master_addr.port()),
nodes,
)
}))
.unwrap();
}