#![warn(
// missing_copy_implementations,
missing_debug_implementations,
missing_docs,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results,
clippy::pedantic,
)] #![allow(
dead_code,
clippy::similar_names,
clippy::type_complexity,
clippy::non_ascii_literal,
clippy::shadow_unrelated,
clippy::too_many_lines
)]
mod args;
mod bridge;
#[cfg(feature = "kubernetes")]
mod kube;
mod master;
use either::Either;
#[cfg(unix)]
use nix::{fcntl, sys::signal, sys::socket, sys::wait, unistd};
use palaver::{
file::{copy_fd, execve, fexecve, move_fds}, socket::{socket, SockFlag}, valgrind
};
use std::{
collections::HashMap, convert::{TryFrom, TryInto}, env, io, net::{IpAddr, SocketAddr, TcpListener}, process, sync, thread
};
#[cfg(unix)]
use std::{
ffi::{CStr, CString}, fs::File, os::unix::{ffi::OsStringExt, io::IntoRawFd}
};
#[cfg(feature = "kubernetes")]
use self::kube::kube_master;
use constellation_internal::{
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, FabricRequest}, BufferedStream, FabricOutputEvent, Fd, Format, Pid, PidInternal, Trace
};
#[derive(PartialEq, Debug)]
struct Args {
format: Format,
verbose: bool,
role: Role,
}
#[derive(PartialEq, Debug)]
enum Role {
#[cfg(feature = "kubernetes")]
KubeMaster {
master_bind: SocketAddr,
bridge_bind: SocketAddr,
mem: u64,
cpu: u32,
replicas: u32,
},
Master(SocketAddr, Vec<Node>),
Worker(SocketAddr),
Bridge,
}
#[derive(PartialEq, Debug)]
struct Node {
fabric: SocketAddr,
bridge: Option<SocketAddr>,
mem: u64,
cpu: u32,
}
const LISTENER_FD: Fd = 3;
const ARG_FD: Fd = 4;
const BOUND_FD_START: Fd = 5;
fn main() {
std::env::set_var("RUST_BACKTRACE", "full");
std::panic::set_hook(Box::new(|info| {
eprintln!(
"thread '{}' {}",
thread::current().name().unwrap_or("<unnamed>"),
info
);
eprintln!("{:?}", backtrace::Backtrace::new());
std::process::abort();
}));
let args = Args::from_args(env::args().skip(1)).unwrap_or_else(|(message, success)| {
println!("{}", message);
process::exit(if success { 0 } else { 1 })
});
if args.role == Role::Bridge {
return bridge::main();
}
let stdout = io::stdout();
let trace = &Trace::new(stdout, args.format, args.verbose);
let (listen, listener) = match args.role {
#[cfg(feature = "kubernetes")]
Role::KubeMaster {
master_bind,
bridge_bind,
mem,
cpu,
replicas,
} => {
let fabric = TcpListener::bind(SocketAddr::new(master_bind.ip(), 0)).unwrap();
kube_master(
master_bind,
fabric.local_addr().unwrap().port(),
bridge_bind,
mem,
cpu,
replicas,
);
(master_bind.ip(), fabric)
}
Role::Master(listen, mut nodes) => {
let fabric = TcpListener::bind(SocketAddr::new(listen.ip(), 0)).unwrap();
let master_addr = nodes[0].fabric;
nodes[0]
.fabric
.set_port(fabric.local_addr().unwrap().port());
let nodes = nodes
.into_iter()
.map(
|Node {
fabric,
bridge,
mem,
cpu,
}| { (fabric, (bridge, mem, cpu)) },
)
.collect::<HashMap<_, _>>(); let _ = thread::Builder::new()
.name(String::from("master"))
.spawn(abort_on_unwind(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
master::run(
SocketAddr::new(listen.ip(), master_addr.port()),
Pid::new(master_addr.ip(), master_addr.port()),
nodes,
)
}))
.unwrap();
(listen.ip(), fabric)
}
Role::Worker(listen) => (listen.ip(), TcpListener::bind(&listen).unwrap()),
Role::Bridge => unreachable!(),
};
loop {
let accepted = listener.accept();
if accepted.is_err() {
continue;
}
let (stream, addr) = accepted.unwrap();
let mut pending_inner = HashMap::new();
let pending = &sync::RwLock::new(&mut pending_inner);
let (mut stream_read, stream_write) =
(BufferedStream::new(&stream), &sync::Mutex::new(&stream));
if bincode::serialize_into::<_, IpAddr>(&mut *stream_write.try_lock().unwrap(), &addr.ip())
.is_err()
{
continue;
}
let ip = bincode::deserialize_from::<_, IpAddr>(&mut stream_read);
if ip.is_err() {
continue;
}
let ip = ip.unwrap();
crossbeam::scope(|scope| {
while let Ok(request) =
bincode_deserialize_from(&mut stream_read).map_err(map_bincode_err)
{
let request: FabricRequest<File, File> = request;
let process_listener = socket(
socket::AddressFamily::Inet,
socket::SockType::Stream,
SockFlag::SOCK_NONBLOCK,
socket::SockProtocol::Tcp,
)
.unwrap();
socket::setsockopt(process_listener, socket::sockopt::ReuseAddr, &true).unwrap();
socket::bind(
process_listener,
&socket::SockAddr::Inet(socket::InetAddr::from_std(&SocketAddr::new(
listen, 0,
))),
)
.unwrap();
socket::setsockopt(process_listener, socket::sockopt::ReusePort, &true).unwrap();
let port = if let socket::SockAddr::Inet(inet) =
socket::getsockname(process_listener).unwrap()
{
inet.to_std()
} else {
panic!()
}
.port();
let process_id = Pid::new(ip, port);
let args = request
.args
.into_iter()
.map(|x| CString::new(OsStringExt::into_vec(x)).unwrap())
.collect::<Vec<_>>();
let vars = [
(
CString::new("CONSTELLATION").unwrap(),
CString::new("fabric").unwrap(),
),
(
CString::new("CONSTELLATION_RESOURCES").unwrap(),
CString::new(serde_json::to_string(&request.resources).unwrap()).unwrap(),
),
]
.iter()
.cloned()
.chain(request.vars.into_iter().map(|(x, y)| {
(
CString::new(OsStringExt::into_vec(x)).unwrap(),
CString::new(OsStringExt::into_vec(y)).unwrap(),
)
}))
.map(|(key, value)| {
CString::new(format!(
"{}={}",
key.to_str().unwrap(),
value.to_str().unwrap()
))
.unwrap()
})
.collect::<Vec<_>>();
let args: Vec<&CStr> = args.iter().map(|x| &**x).collect();
let vars: Vec<&CStr> = vars.iter().map(|x| &**x).collect();
#[cfg(feature = "distribute_binaries")]
let binary = request.binary;
let mut binary_desired_fd =
BOUND_FD_START + Fd::try_from(request.bind.len()).unwrap();
let arg = request.arg;
let bind = request.bind;
let child = match unistd::fork().expect("Fork failed") {
unistd::ForkResult::Child => {
forbid_alloc(|| {
#[cfg(any(target_os = "android", target_os = "linux"))]
{
use nix::libc;
let err =
unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) };
assert_eq!(err, 0);
}
unistd::setpgid(unistd::Pid::from_raw(0), unistd::Pid::from_raw(0))
.unwrap();
#[cfg(feature = "distribute_binaries")]
let binary = binary.into_raw_fd(); let arg = arg.into_raw_fd();
move_fds(
&mut [
(arg, ARG_FD),
(process_listener, LISTENER_FD),
#[cfg(feature = "distribute_binaries")]
(binary, binary_desired_fd),
],
Some(fcntl::FdFlag::empty()),
true,
);
for (i, addr) in bind.iter().enumerate() {
let socket: Fd = BOUND_FD_START + Fd::try_from(i).unwrap();
let fd = socket::socket(
socket::AddressFamily::Inet,
socket::SockType::Stream,
socket::SockFlag::empty(),
socket::SockProtocol::Tcp,
)
.unwrap();
if fd != socket {
copy_fd(fd, socket, Some(fcntl::FdFlag::empty()), true)
.unwrap();
unistd::close(fd).unwrap();
}
socket::setsockopt(socket, socket::sockopt::ReuseAddr, &true)
.unwrap();
socket::bind(
socket,
&socket::SockAddr::Inet(socket::InetAddr::from_std(&addr)),
)
.unwrap();
}
if cfg!(feature = "distribute_binaries") {
if valgrind::is().unwrap_or(false) {
let binary_desired_fd_ = valgrind::start_fd() - 1;
assert!(binary_desired_fd_ > binary_desired_fd);
copy_fd(
binary_desired_fd,
binary_desired_fd_,
Some(fcntl::FdFlag::empty()),
true,
)
.unwrap();
unistd::close(binary_desired_fd).unwrap();
binary_desired_fd = binary_desired_fd_;
}
fexecve(binary_desired_fd, &args, &vars)
.expect("Failed to fexecve for fabric");
} else {
execve(&args[0], &args, &vars)
.expect("Failed to execve for fabric");
}
unreachable!()
})
}
unistd::ForkResult::Parent { child, .. } => child,
};
unistd::close(process_listener).unwrap();
let x = pending.write().unwrap().insert(process_id, child);
assert!(x.is_none());
trace.fabric(FabricOutputEvent::Init {
pid: process_id,
system_pid: nix::libc::pid_t::from(child).try_into().unwrap(),
});
if bincode::serialize_into(
*stream_write.lock().unwrap(),
&Either::Left::<Pid, Pid>(process_id),
)
.map_err(map_bincode_err)
.is_err()
{
break;
}
let _ = scope.spawn(abort_on_unwind_1(move |_scope| {
loop {
match wait::waitpid(child, None) {
Err(nix::Error::Sys(nix::errno::Errno::EINTR)) => (),
Ok(wait::WaitStatus::Exited(pid, code)) if code == 0 => {
assert_eq!(pid, child);
break;
}
Ok(wait::WaitStatus::Signaled(pid, signal, _))
if signal == signal::Signal::SIGKILL =>
{
assert_eq!(pid, child);
break;
}
wait_status => panic!("{:?}", wait_status),
}
}
let x = pending.write().unwrap().remove(&process_id).unwrap();
assert_eq!(x, child);
trace.fabric(FabricOutputEvent::Exit {
pid: process_id,
system_pid: nix::libc::pid_t::from(child).try_into().unwrap(),
});
let _unchecked_error = bincode::serialize_into(
*stream_write.lock().unwrap(),
&Either::Right::<Pid, Pid>(process_id),
)
.map_err(map_bincode_err);
}));
}
for (&_job, &pid) in pending.read().unwrap().iter() {
let _unchecked_error = signal::kill(pid, signal::Signal::SIGKILL);
}
})
.unwrap();
assert_eq!(pending_inner.len(), 0);
}
}