#![feature(backtrace)]
#![warn(
// missing_copy_implementations,
missing_debug_implementations,
missing_docs,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results,
clippy::pedantic,
)] #![allow(
dead_code,
clippy::similar_names,
clippy::type_complexity,
clippy::non_ascii_literal,
clippy::shadow_unrelated,
clippy::too_many_lines,
clippy::unnested_or_patterns
)]
mod args;
mod bridge;
#[cfg(feature = "kubernetes")]
mod kube;
mod master;
use either::Either;
#[cfg(unix)]
use nix::{fcntl, sys::signal, sys::socket, unistd};
use palaver::{
file::{execve, fexecve, move_fd, move_fds}, process::ChildHandle, socket::{socket, SockFlag}, valgrind
};
use std::{
collections::HashMap, convert::{TryFrom, TryInto}, env, io, io::Seek, net::{IpAddr, SocketAddr, TcpListener}, process, sync, sync::Arc, thread
};
#[cfg(unix)]
use std::{
ffi::{CStr, CString}, fs::File, os::unix::{ffi::OsStringExt, io::IntoRawFd}
};
#[cfg(feature = "kubernetes")]
use self::kube::kube_master;
use constellation_internal::{
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, Cpu, FabricOutputEvent, Fd, Format, Mem, Pid, PidInternal, Trace
};
#[derive(PartialEq, Debug)]
struct Args {
format: Format,
verbose: bool,
role: Role,
}
#[derive(PartialEq, Debug)]
enum Role {
#[cfg(feature = "kubernetes")]
KubeMaster {
master_bind: SocketAddr,
bridge_bind: SocketAddr,
mem: Mem,
cpu: Cpu,
replicas: u32,
},
Master(SocketAddr, Vec<Node>),
Worker(SocketAddr),
Bridge,
}
#[derive(PartialEq, Debug)]
struct Node {
fabric: SocketAddr,
bridge: Option<SocketAddr>,
mem: Mem,
cpu: Cpu,
}
const LISTENER_FD: Fd = 3;
const ARG_FD: Fd = 4;
const BOUND_FD_START: Fd = 5;
fn main() {
std::env::set_var("RUST_BACKTRACE", "full");
std::panic::set_hook(Box::new(|info| {
eprintln!(
"thread '{}' {}",
thread::current().name().unwrap_or("<unnamed>"),
info
);
eprintln!("{:?}", std::backtrace::Backtrace::force_capture());
std::process::abort();
}));
let args = Args::from_args(env::args().skip(1)).unwrap_or_else(|(message, success)| {
println!("{}", message);
process::exit(if success { 0 } else { 1 })
});
if args.role == Role::Bridge {
return bridge::main();
}
let trace = &Trace::new(io::stdout(), args.format, args.verbose);
let (listen, listener) = match args.role {
#[cfg(feature = "kubernetes")]
Role::KubeMaster {
master_bind,
bridge_bind,
mem,
cpu,
replicas,
} => {
let fabric = TcpListener::bind(SocketAddr::new(master_bind.ip(), 0)).unwrap();
kube_master(
master_bind,
fabric.local_addr().unwrap().port(),
bridge_bind,
mem,
cpu,
replicas,
);
(master_bind.ip(), fabric)
}
Role::Master(listen, mut nodes) => {
let fabric = TcpListener::bind(SocketAddr::new(listen.ip(), 0)).unwrap();
let master_addr = nodes[0].fabric;
nodes[0]
.fabric
.set_port(fabric.local_addr().unwrap().port());
let nodes = nodes
.into_iter()
.map(
|Node {
fabric,
bridge,
mem,
cpu,
}| { (fabric, (bridge, mem, cpu)) },
)
.collect::<HashMap<_, _>>(); let _ = thread::Builder::new()
.name(String::from("master"))
.spawn(abort_on_unwind(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
master::run(
SocketAddr::new(listen.ip(), master_addr.port()),
Pid::new(master_addr.ip(), master_addr.port()),
nodes,
)
}))
.unwrap();
(listen.ip(), fabric)
}
Role::Worker(listen) => (listen.ip(), TcpListener::bind(&listen).unwrap()),
Role::Bridge => unreachable!(),
};
loop {
let accepted = listener.accept();
let (stream, addr) = if let Ok(accepted) = accepted {
accepted
} else {
continue;
};
let mut pending_inner = HashMap::new();
let pending = &sync::RwLock::new(&mut pending_inner);
let (mut stream_read, stream_write) =
(BufferedStream::new(&stream), &sync::Mutex::new(&stream));
if bincode::serialize_into::<_, IpAddr>(&mut *stream_write.try_lock().unwrap(), &addr.ip())
.is_err()
{
continue;
}
let ip = bincode::deserialize_from::<_, IpAddr>(&mut stream_read);
let ip = if let Ok(ip) = ip { ip } else { continue };
crossbeam::scope(|scope| {
while let Ok(request) =
bincode_deserialize_from(&mut stream_read).map_err(map_bincode_err)
{
let request: FabricRequest<File, File> = request;
let (pid, child) = spawn(listen, ip, request);
let child = Arc::new(child);
let x = pending.write().unwrap().insert(pid, child.clone());
assert!(x.is_none());
trace.fabric(FabricOutputEvent::Init {
pid,
system_pid: nix::libc::pid_t::from(child.pid).try_into().unwrap(),
});
if bincode::serialize_into(
*stream_write.lock().unwrap(),
&Either::Left::<Pid, Pid>(pid),
)
.map_err(map_bincode_err)
.is_err()
{
break;
}
let _ = scope.spawn(abort_on_unwind_1(move |_scope| {
match child.wait() {
Ok(palaver::process::WaitStatus::Exited(0))
| Ok(palaver::process::WaitStatus::Signaled(signal::Signal::SIGKILL, _)) => (),
wait_status => {
if cfg!(feature = "strict") {
panic!("{:?}", wait_status)
}
}
}
let x = pending.write().unwrap().remove(&pid).unwrap();
assert!(Arc::ptr_eq(&child, &x));
drop(x);
let child = Arc::try_unwrap(child).unwrap();
let child_pid = child.pid;
drop(child);
trace.fabric(FabricOutputEvent::Exit {
pid,
system_pid: nix::libc::pid_t::from(child_pid).try_into().unwrap(),
});
let _unchecked_error = bincode::serialize_into(
*stream_write.lock().unwrap(),
&Either::Right::<Pid, Pid>(pid),
)
.map_err(map_bincode_err);
}));
}
for (&_pid, child) in pending.read().unwrap().iter() {
let _unchecked_error = signal::kill(
nix::unistd::Pid::from_raw(-child.pid.as_raw()),
signal::Signal::SIGKILL,
);
let _unchecked_error = child.signal(signal::Signal::SIGKILL);
}
})
.unwrap();
assert_eq!(pending_inner.len(), 0);
}
}
fn spawn(listen: IpAddr, ip: IpAddr, request: FabricRequest<File, File>) -> (Pid, ChildHandle) {
let process_listener = socket(
socket::AddressFamily::Inet,
socket::SockType::Stream,
SockFlag::SOCK_NONBLOCK,
socket::SockProtocol::Tcp,
)
.unwrap();
socket::setsockopt(process_listener, socket::sockopt::ReuseAddr, &true).unwrap();
socket::bind(
process_listener,
&socket::SockAddr::Inet(socket::InetAddr::from_std(&SocketAddr::new(listen, 0))),
)
.unwrap();
socket::setsockopt(process_listener, socket::sockopt::ReusePort, &true).unwrap();
let port = if let socket::SockAddr::Inet(inet) = socket::getsockname(process_listener).unwrap()
{
inet.to_std()
} else {
panic!()
}
.port();
let pid = Pid::new(ip, port);
let _ = (&request.arg).seek(std::io::SeekFrom::End(0)).unwrap();
bincode::serialize_into(&request.arg, &pid).unwrap();
let _ = (&request.arg).seek(std::io::SeekFrom::Start(0)).unwrap();
let args = request
.args
.into_iter()
.map(|x| CString::new(OsStringExt::into_vec(x)).unwrap())
.collect::<Vec<_>>();
let vars = [
(
CString::new("CONSTELLATION").unwrap(),
CString::new("fabric").unwrap(),
),
(
CString::new("CONSTELLATION_RESOURCES").unwrap(),
CString::new(serde_json::to_string(&request.resources).unwrap()).unwrap(),
),
]
.iter()
.cloned()
.chain(
request
.vars
.into_iter()
.map(|(x, y)| {
(
CString::new(OsStringExt::into_vec(x)).unwrap(),
CString::new(OsStringExt::into_vec(y)).unwrap(),
)
})
.filter(|&(ref x, _)| {
x.to_str() != Ok("CONSTELLATION") && x.to_str() != Ok("CONSTELLATION_RESOURCES")
}),
)
.map(|(key, value)| {
CString::new(format!(
"{}={}",
key.to_str().unwrap(),
value.to_str().unwrap()
))
.unwrap()
})
.collect::<Vec<_>>();
let args: Vec<&CStr> = args.iter().map(|x| &**x).collect();
let vars: Vec<&CStr> = vars.iter().map(|x| &**x).collect();
#[cfg(feature = "distribute_binaries")]
let binary = request.binary;
let mut binary_desired_fd = BOUND_FD_START + Fd::try_from(request.bind.len()).unwrap();
let arg = request.arg;
let bind = request.bind;
let child = match palaver::process::fork(false).expect("Fork failed") {
palaver::process::ForkResult::Child => {
forbid_alloc(|| {
unistd::setpgid(unistd::Pid::from_raw(0), unistd::Pid::from_raw(0)).unwrap();
#[cfg(feature = "distribute_binaries")]
let binary = binary.into_raw_fd(); let arg = arg.into_raw_fd();
move_fds(
&mut [
(arg, ARG_FD),
(process_listener, LISTENER_FD),
#[cfg(feature = "distribute_binaries")]
(binary, binary_desired_fd),
],
Some(fcntl::FdFlag::empty()),
true,
);
for fd in BOUND_FD_START..1024 {
if cfg!(feature = "distribute_binaries") && fd == binary_desired_fd {
continue;
}
let _ = unistd::close(fd);
}
for (i, addr) in bind.iter().enumerate() {
let socket: Fd = BOUND_FD_START + Fd::try_from(i).unwrap();
let fd = socket::socket(
socket::AddressFamily::Inet,
socket::SockType::Stream,
socket::SockFlag::empty(),
socket::SockProtocol::Tcp,
)
.unwrap();
if fd != socket {
move_fd(fd, socket, Some(fcntl::FdFlag::empty()), true).unwrap();
}
socket::setsockopt(socket, socket::sockopt::ReuseAddr, &true).unwrap();
socket::bind(
socket,
&socket::SockAddr::Inet(socket::InetAddr::from_std(&addr)),
)
.unwrap();
}
if cfg!(feature = "distribute_binaries") {
if valgrind::is().unwrap_or(false) {
let binary_desired_fd_ = valgrind::start_fd() - 1;
assert!(binary_desired_fd_ > binary_desired_fd);
move_fd(
binary_desired_fd,
binary_desired_fd_,
Some(fcntl::FdFlag::empty()),
true,
)
.unwrap();
binary_desired_fd = binary_desired_fd_;
}
fexecve(binary_desired_fd, &args, &vars).expect("Failed to fexecve for fabric");
} else {
execve(&args[0], &args, &vars).expect("Failed to execve for fabric");
}
unreachable!()
})
}
palaver::process::ForkResult::Parent(child) => child,
};
unistd::close(process_listener).unwrap();
(pid, child)
}