use std::{
env,
io::{Error, Read},
os::unix::{io::AsRawFd, process::CommandExt},
process::{Child, Command},
sync::atomic::{AtomicBool, Ordering},
thread,
time::{Duration, Instant},
};
use bincode::serialize_into;
use crate::{
registry::ListenerInfo,
utils::{
clone_fd, close_fd_quiet, unset_cloexec, ENV_PIPE_FDS, ENV_PIPE_READY, ENV_UPGRADE,
UPGRADE_TRUE_VAL,
},
};
pub type UpgradeFinished = Result<(), UpgradeError>;
#[derive(derive_more::From, derive_more::Display)]
#[display("{_variant}")]
pub enum UpgradeError {
#[display("child exited unexpectedly")]
ChildExit,
#[display("timed out waiting for ready signal from child")]
ChildTimeout,
#[display("upgrade not started: {}", _0)]
NotStarted(String),
#[display("serialization error: {:?}", _0)]
#[from]
SerializationError(bincode::Error), }
pub fn upgrade(fds: Vec<ListenerInfo>) -> UpgradeFinished {
if UPGRADING.swap(true, Ordering::Acquire) {
return Err(UpgradeError::NotStarted(String::from("Already in upgrade")));
}
log::debug!("In child, inherited files should be:\n {:?}", fds);
let pipes = UpgradePipes::new()?;
let child = exec_upgraded(&pipes.fds, fds.clone())?;
let (recv_ready, send_listeners) = pipes.take_pipes();
let send = send_fds(send_listeners, fds);
let waitc = wait_child(child);
let waitr = wait_ready(recv_ready);
let mut res = match waitr.join() {
Ok(r) => r,
_ => Err(UpgradeError::ChildExit),
};
UPGRADING.store(false, Ordering::Release);
waitc.thread().unpark();
match waitc.join() {
Ok(Ok(())) => (), Ok(Err(e)) => {
res = Err(e); }
Err(_) => panic!("Thread error in upgrade!"),
}
let _ = send.join();
res
}
pub(crate) static UPGRADING: AtomicBool = AtomicBool::new(false);
impl From<Error> for UpgradeError {
fn from(e: Error) -> UpgradeError {
UpgradeError::NotStarted(format!("{:?}", e))
}
}
struct FdPair {
recv_listeners_fd: i32,
send_ready_fd: i32,
}
impl Drop for FdPair {
fn drop(&mut self) {
close_fd_quiet(self.recv_listeners_fd);
close_fd_quiet(self.send_ready_fd);
}
}
struct UpgradePipes {
recv_ready: os_pipe::PipeReader,
send_listeners: os_pipe::PipeWriter,
fds: FdPair,
}
impl UpgradePipes {
fn new() -> Result<UpgradePipes, UpgradeError> {
let (recv_listeners_fd, send_listeners) = listener_pipes()?;
let (recv_ready, send_ready_fd) = ready_pipes().inspect_err(|_| {
close_fd_quiet(recv_listeners_fd);
})?;
let fds = FdPair {
recv_listeners_fd,
send_ready_fd,
};
Ok(Self {
recv_ready,
send_listeners,
fds,
})
}
fn take_pipes(self) -> (os_pipe::PipeReader, os_pipe::PipeWriter) {
(self.recv_ready, self.send_listeners)
}
}
fn send_fds(
send_pipe: os_pipe::PipeWriter,
fds: Vec<ListenerInfo>,
) -> thread::JoinHandle<UpgradeFinished> {
thread::spawn(move || -> UpgradeFinished {
serialize_into(send_pipe, &fds).map_err(|e| e.into())
})
}
fn wait_child(mut child: Child) -> thread::JoinHandle<UpgradeFinished> {
thread::spawn(move || {
let start = Instant::now();
let timeout = Duration::from_secs(5);
while start.elapsed() < timeout {
thread::sleep(Duration::from_millis(500));
proc_wait(&mut child)?;
if !UPGRADING.load(Ordering::Acquire) {
return proc_wait(&mut child);
}
}
let _ = child.kill();
let _ = child.wait();
Err(UpgradeError::ChildTimeout)
})
}
fn proc_wait(child: &mut Child) -> UpgradeFinished {
match child.try_wait() {
Ok(None) => Ok(()),
_ => Err(UpgradeError::ChildExit),
}
}
fn wait_ready(mut recv_ready: os_pipe::PipeReader) -> thread::JoinHandle<UpgradeFinished> {
thread::spawn(move || -> UpgradeFinished {
let mut buf = [0; 2];
if recv_ready.read_exact(&mut buf).is_ok() && &buf == b"OK" {
return Ok(());
}
Err(UpgradeError::ChildExit)
})
}
fn exec_upgraded(pipe_fds: &FdPair, inherit_fds: Vec<ListenerInfo>) -> Result<Child, Error> {
let mut run_args: Vec<String> = env::args().collect();
let cmdline = run_args.remove(0);
let cwd = env::current_dir()?;
let mut cmd = Command::new(cmdline);
cmd.args(run_args)
.current_dir(cwd)
.env(ENV_UPGRADE, UPGRADE_TRUE_VAL)
.env(ENV_PIPE_FDS, format!("{}", pipe_fds.recv_listeners_fd))
.env(ENV_PIPE_READY, format!("{}", pipe_fds.send_ready_fd));
#[cfg(feature = "systemd_sockets")]
{
cmd.env_remove(crate::tokio_ecdysis::systemd_sockets::LISTEN_PID);
cmd.env_remove(crate::tokio_ecdysis::systemd_sockets::LISTEN_FDNAMES);
cmd.env_remove(crate::tokio_ecdysis::systemd_sockets::LISTEN_FDS);
}
unsafe {
cmd.pre_exec(move || {
for i in inherit_fds.iter() {
unset_cloexec(i.fd);
}
Ok(())
});
}
cmd.spawn()
}
fn listener_pipes() -> Result<(i32, os_pipe::PipeWriter), UpgradeError> {
let (recv_listeners, send_listeners) = os_pipe::pipe()?;
let recv_listeners_fd = clone_fd(recv_listeners.as_raw_fd())?;
unset_cloexec(recv_listeners_fd);
Ok((recv_listeners_fd, send_listeners))
}
fn ready_pipes() -> Result<(os_pipe::PipeReader, i32), UpgradeError> {
let (recv_ready, send_ready) = os_pipe::pipe()?;
let send_ready_fd = clone_fd(send_ready.as_raw_fd())?;
unset_cloexec(send_ready_fd);
Ok((recv_ready, send_ready_fd))
}