#[cfg(target_os = "freebsd")]
use std::ffi::c_void;
#[cfg(target_os = "macos")]
use std::ffi::CString;
#[cfg(target_os = "macos")]
use std::iter::repeat;
#[cfg(target_os = "freebsd")]
use std::iter::repeat;
#[cfg(target_os = "freebsd")]
use std::mem::size_of;
#[cfg(target_os = "macos")]
use std::ptr::null_mut;
use std::{
fs::File,
io::{Seek, SeekFrom},
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd},
os::unix::process::CommandExt,
process::Command,
};
use anyhow::{bail, Context};
use libc::{self, pid_t};
#[cfg(target_os = "macos")]
use libc::{c_char, PATH_MAX};
#[cfg(target_os = "freebsd")]
use libc::{sysctl, CTL_KERN, KERN_PROC, KERN_PROC_PATHNAME, PATH_MAX};
use mio::net::UnixStream;
use nix::{self, unistd::*};
use tempfile::tempfile;
use sozu::{metrics, server::Server};
use sozu_command_lib::{
channel::Channel,
config::Config,
logging::target_to_backend,
proxy::{ProxyRequest, ProxyRequestOrder, ProxyResponse},
ready::Ready,
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
};
use crate::{command::Worker, logging, util};
pub fn start_workers(executable_path: String, config: &Config) -> anyhow::Result<Vec<Worker>> {
let state = ConfigState::new();
let mut workers = Vec::new();
for index in 0..config.worker_count {
let listeners = Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
});
let (pid, command_channel, scm_socket) = fork_main_into_worker(
&index.to_string(),
config,
executable_path.clone(),
&state,
listeners,
)?;
let mut worker = Worker::new(index as u32, pid, command_channel, scm_socket, config);
if let Some(worker_channel) = worker.worker_channel.as_mut() {
if let Err(e) = worker_channel.blocking() {
error!("Could not block the worker channel: {}", e);
}
worker_channel
.write_message(&ProxyRequest {
id: format!("start-status-{}", index),
order: ProxyRequestOrder::Status,
})
.with_context(|| "Could not send status request to the worker")?;
if let Err(e) = worker_channel.nonblocking() {
error!("Could not unblock the worker channel: {}", e);
}
}
workers.push(worker);
}
info!("Created workers");
Ok(workers)
}
pub fn start_worker(
id: u32,
config: &Config,
executable_path: String,
state: &ConfigState,
listeners: Option<Listeners>,
) -> anyhow::Result<Worker> {
let (worker_pid, main_to_worker_channel, main_to_worker_scm) =
fork_main_into_worker(&id.to_string(), config, executable_path, state, listeners)?;
Ok(Worker::new(
id,
worker_pid,
main_to_worker_channel,
main_to_worker_scm,
config,
))
}
pub fn begin_worker_process(
worker_to_main_channel_fd: i32,
worker_to_main_scm_fd: i32,
configuration_state_fd: i32,
id: i32,
command_buffer_size: usize,
max_command_buffer_size: usize,
) -> Result<(), anyhow::Error> {
let mut worker_to_main_channel: Channel<ProxyResponse, Config> = Channel::new(
unsafe { UnixStream::from_raw_fd(worker_to_main_channel_fd) },
command_buffer_size,
max_command_buffer_size,
);
if let Err(e) = worker_to_main_channel.blocking() {
error!("Could not block the worker-to-main channel: {}", e);
}
let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };
let config_state: ConfigState = serde_json::from_reader(configuration_state_file)
.with_context(|| "could not parse configuration state data")?;
let worker_config = worker_to_main_channel
.read_message()
.with_context(|| "worker could not read configuration from socket")?;
let worker_id = format!("{}-{:02}", "WRK", id);
logging::setup(
worker_id.clone(),
&worker_config.log_level,
&worker_config.log_target,
worker_config.log_access_target.as_deref(),
);
trace!(
"Creating worker {} with config: {:#?}",
worker_id,
worker_config
);
let backend = target_to_backend(&worker_config.log_target);
let access_backend = worker_config
.log_access_target
.as_deref()
.map(target_to_backend);
sozu_command_lib::logging::Logger::init(
worker_id.clone(),
&worker_config.log_level,
backend,
access_backend,
);
info!("worker {} starting...", id);
if let Err(e) = worker_to_main_channel.nonblocking() {
error!("Could not unblock the worker-to-main channel: {}", e);
}
let mut worker_to_main_channel: Channel<ProxyResponse, ProxyRequest> =
worker_to_main_channel.into();
worker_to_main_channel.readiness.insert(Ready::readable());
if let Some(metrics) = worker_config.metrics.as_ref() {
metrics::setup(
&metrics.address,
worker_id,
metrics.tagged_metrics,
metrics.prefix.clone(),
)
.with_context(|| "Could not setup metrics")?;
}
let worker_to_main_scm_socket = ScmSocket::new(worker_to_main_scm_fd)
.with_context(|| "could not create worker-to-main scm socket")?;
let mut server = Server::try_new_from_config(
worker_to_main_channel,
worker_to_main_scm_socket,
worker_config,
config_state,
true,
)
.with_context(|| "Could not create server from config")?;
info!("starting event loop");
server.run();
info!("ending event loop");
Ok(())
}
pub fn fork_main_into_worker(
worker_id: &str,
config: &Config,
executable_path: String,
state: &ConfigState,
listeners: Option<Listeners>,
) -> anyhow::Result<(pid_t, Channel<ProxyRequest, ProxyResponse>, ScmSocket)> {
trace!("parent({})", unsafe { libc::getpid() });
let mut state_file =
tempfile().with_context(|| "could not create temporary file for configuration state")?;
util::disable_close_on_exec(state_file.as_raw_fd())?;
serde_json::to_writer(&mut state_file, state)
.with_context(|| "could not write upgrade data to temporary file")?;
state_file
.seek(SeekFrom::Start(0))
.with_context(|| "could not seek to beginning of file")?;
let (main_to_worker, worker_to_main) = UnixStream::pair()?;
let (main_to_worker_scm, worker_to_main_scm) = UnixStream::pair()?;
let main_to_worker_scm = ScmSocket::new(main_to_worker_scm.into_raw_fd())
.with_context(|| "Could not create main-to-worker scm socket")?;
util::disable_close_on_exec(worker_to_main.as_raw_fd())?;
util::disable_close_on_exec(worker_to_main_scm.as_raw_fd())?;
let mut main_to_worker_channel: Channel<Config, ProxyResponse> = Channel::new(
main_to_worker,
config.command_buffer_size,
config.max_command_buffer_size,
);
if let Err(e) = main_to_worker_channel.blocking() {
error!("Could not block the main-to-worker channel: {}", e);
}
info!("{} launching worker", worker_id);
debug!("executable path is {}", executable_path);
match unsafe { fork() } {
Ok(ForkResult::Parent { child }) => {
info!("{} worker launched: {}", worker_id, child);
main_to_worker_channel
.write_message(config)
.with_context(|| "Could not send config to the new worker using the channel")?;
if let Err(e) = main_to_worker_channel.nonblocking() {
error!("Could not unblock the main-to-worker channel: {}", e);
}
if let Some(listeners) = listeners {
info!("sending listeners to new worker: {:?}", listeners);
let result = main_to_worker_scm.send_listeners(&listeners);
info!("sent listeners from main: {:?}", result);
listeners.close();
};
util::disable_close_on_exec(main_to_worker_scm.fd)?;
Ok((
child.into(),
main_to_worker_channel.into(),
main_to_worker_scm,
))
}
Ok(ForkResult::Child) => {
trace!("child({}):\twill spawn a child", unsafe { libc::getpid() });
Command::new(executable_path)
.arg("worker")
.arg("--id")
.arg(worker_id)
.arg("--fd")
.arg(worker_to_main.as_raw_fd().to_string())
.arg("--scm")
.arg(worker_to_main_scm.as_raw_fd().to_string())
.arg("--configuration-state-fd")
.arg(state_file.as_raw_fd().to_string())
.arg("--command-buffer-size")
.arg(config.command_buffer_size.to_string())
.arg("--max-command-buffer-size")
.arg(config.max_command_buffer_size.to_string())
.exec();
unreachable!();
}
Err(e) => {
error!("Error during fork(): {}", e);
Err(anyhow::Error::from(e))
}
}
}
#[cfg(target_os = "linux")]
pub unsafe fn get_executable_path() -> anyhow::Result<String> {
use std::fs;
let path = fs::read_link("/proc/self/exe").with_context(|| "/proc/self/exe doesn't exist")?;
let mut path_str = match path.into_os_string().into_string() {
Ok(s) => s,
Err(_) => bail!("Failed to convert PathBuf to String"),
};
if path_str.ends_with(" (deleted)") {
let len = path_str.len();
path_str.truncate(len - 10)
}
Ok(path_str)
}
#[cfg(target_os = "macos")]
extern "C" {
pub fn _NSGetExecutablePath(buf: *mut c_char, size: *mut u32) -> i32;
}
#[cfg(target_os = "macos")]
pub unsafe fn get_executable_path() -> String {
let capacity = PATH_MAX as usize;
let mut temp: Vec<u8> = Vec::with_capacity(capacity);
temp.extend(repeat(0).take(capacity));
let pathbuf = CString::from_vec_unchecked(temp);
let ptr = pathbuf.into_raw();
let mut size = capacity as u32;
if _NSGetExecutablePath(ptr, &mut size) == 0 {
let mut temp2: Vec<u8> = Vec::with_capacity(capacity);
temp2.extend(repeat(0).take(capacity));
let pathbuf2 = CString::from_vec_unchecked(temp2);
let ptr2 = pathbuf2.into_raw();
if libc::realpath(ptr, ptr2) != null_mut() {
let path = CString::from_raw(ptr2);
path.to_str()
.expect("failed to convert CString to String")
.to_string()
} else {
panic!();
}
} else {
panic!("buffer too small");
}
}
#[cfg(target_os = "freebsd")]
pub unsafe fn get_executable_path() -> String {
let mut capacity = PATH_MAX as usize;
let mut path: Vec<u8> = Vec::with_capacity(capacity);
path.extend(repeat(0).take(capacity));
let mib: Vec<i32> = vec![CTL_KERN, KERN_PROC, KERN_PROC_PATHNAME, -1];
let len = mib.len() * size_of::<i32>();
let element_size = size_of::<i32>();
let res = sysctl(
mib.as_ptr(),
(len / element_size) as u32,
path.as_mut_ptr() as *mut c_void,
&mut capacity,
std::ptr::null() as *const c_void,
0,
);
if res != 0 {
panic!("Could not retrieve the path of the executable");
}
String::from_raw_parts(path.as_mut_ptr(), capacity - 1, path.len())
}