use nix::unistd::getpid;
use rain_core::errors::Result;
use rain_core::sys::get_hostname;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::Command;
use start::common::Readiness;
use start::process::Process;
use start::ssh::RemoteProcess;
pub struct StarterConfig {
pub local_governors: Vec<Option<u32>>,
pub server_listen_address: SocketAddr,
pub server_http_listen_address: SocketAddr,
pub log_dir: PathBuf,
pub governor_host_file: Option<PathBuf>,
pub governor_config: Option<String>,
pub remote_init: String,
pub reserve_cpu_on_server: bool,
pub run_prefix: Vec<String>,
}
impl StarterConfig {
pub fn new(
local_governors: Vec<Option<u32>>,
server_listen_address: SocketAddr,
server_http_listen_address: SocketAddr,
log_dir: &Path,
remote_init: String,
reserve_cpu_on_server: bool,
run_prefix: Vec<String>,
) -> Self {
Self {
local_governors,
server_listen_address,
server_http_listen_address,
log_dir: ::std::env::current_dir().unwrap().join(log_dir), governor_host_file: None,
governor_config: None,
remote_init,
reserve_cpu_on_server,
run_prefix,
}
}
pub fn autoconf_pbs(&mut self) -> Result<()> {
info!("Configuring PBS environment");
if self.governor_host_file.is_some() {
bail!("Options --autoconf=pbs and --governor_host_file are not compatible");
}
let nodefile = ::std::env::var("PBS_NODEFILE");
match nodefile {
Err(_) => bail!("Variable PBS_NODEFILE not defined, are you running inside PBS?"),
Ok(path) => self.governor_host_file = Some(PathBuf::from(path)),
}
Ok(())
}
}
pub struct Starter {
config: StarterConfig,
processes: Vec<Process>,
remote_processes: Vec<RemoteProcess>,
server_pid: u32,
}
fn read_host_file(path: &Path) -> Result<Vec<String>> {
let file = BufReader::new(File::open(path).map_err(|e| {
format!(
"Cannot open governor host file {:?}: {}",
path,
::std::error::Error::description(&e)
)
})?);
let mut result = Vec::new();
for line in file.lines() {
let line = line?;
let trimmed_line = line.trim();
if !trimmed_line.is_empty() && !trimmed_line.starts_with('#') {
result.push(trimmed_line.to_string());
}
}
Ok(result)
}
impl Starter {
pub fn new(config: StarterConfig) -> Self {
Self {
config,
processes: Vec::new(),
remote_processes: Vec::new(),
server_pid: 0,
}
}
pub fn has_processes(&self) -> bool {
!self.processes.is_empty()
}
pub fn start(&mut self) -> Result<()> {
if !self.config.local_governors.is_empty() && self.config.governor_host_file.is_some() {
bail!("Cannot combine remote & local governors");
}
let governor_hosts = if let Some(ref path) = self.config.governor_host_file {
read_host_file(path)?
} else {
Vec::new()
};
if self.config.local_governors.is_empty() && governor_hosts.is_empty() {
bail!("No governors are specified.");
}
self.start_server()?;
self.busy_wait_for_ready()?;
if !self.config.local_governors.is_empty() {
self.start_local_governors()?;
}
if !governor_hosts.is_empty() {
self.start_remote_governors(&governor_hosts)?;
}
self.busy_wait_for_ready()?;
Ok(())
}
pub fn local_rain_command(&self) -> (String, Vec<String>) {
let rain_program = ::std::env::args().nth(0).unwrap();
if self.config.run_prefix.is_empty() {
(rain_program, Vec::new())
} else {
let mut args = self.config.run_prefix[1..].to_vec();
args.push(rain_program);
(self.config.run_prefix[0].clone(), args)
}
}
fn spawn_process(
&mut self,
name: &str,
ready_file: &Path,
command: &mut Command,
) -> Result<&Process> {
self.processes.push(Process::spawn(
&self.config.log_dir,
name,
Readiness::WaitingForReadyFile(ready_file.to_path_buf()),
command,
)?);
Ok(self.processes.last().unwrap())
}
fn create_tmp_filename(&self, name: &str) -> PathBuf {
::std::env::temp_dir().join(format!("rain-{}-{}", getpid(), name))
}
fn start_server(&mut self) -> Result<()> {
let ready_file = self.create_tmp_filename("server-ready");
let (program, program_args) = self.local_rain_command();
let server_address = format!("{}", self.config.server_listen_address);
let server_http_address = format!("{}", self.config.server_http_listen_address);
let http_port = self.config.server_http_listen_address.port();
info!("Starting local server ({})", server_address);
let log_dir = self.config.log_dir.join("server");
self.server_pid = {
let process = self.spawn_process(
"server",
&ready_file,
Command::new(program)
.args(program_args)
.arg("server")
.arg("--logdir")
.arg(&log_dir)
.arg("--listen")
.arg(&server_address)
.arg("--http-listen")
.arg(&server_http_address)
.arg("--ready-file")
.arg(&ready_file),
)?;
let server_pid = process.id();
let hostname = get_hostname();
info!("Dashboard: http://{}:{}/", hostname, http_port);
info!("Server pid = {}", server_pid);
server_pid
};
Ok(())
}
fn start_remote_governors(&mut self, governor_hosts: &[String]) -> Result<()> {
info!("Starting {} remote governor(s)", governor_hosts.len());
let (program, program_args) = self.local_rain_command();
let dir = ::std::env::current_dir().unwrap(); let server_address = self.server_address(false);
for (i, host) in governor_hosts.iter().enumerate() {
info!(
"Connecting to {} (remote log dir: {:?})",
host, self.config.log_dir
);
let ready_file = self.create_tmp_filename(&format!("governor-{}-ready", i));
let name = format!("governor-{}", i);
let mut process = RemoteProcess::new(
name,
host,
Readiness::WaitingForReadyFile(ready_file.to_path_buf()),
);
let mut args = format!("--ready-file {:?}", ready_file);
if let Some(ref config) = self.config.governor_config {
args.push_str(" --config ");
args.push_str(config);
}
let command = if self.config.reserve_cpu_on_server {
format!(
"if (ps --pid {server_pid} | grep rain); then \n\
CPUS=-1 \n\
else \n\
CPUS=detect \n\
fi \n\
{remote_init}\n
{program} {program_args} governor {server_address} --cpus=$CPUS {args}",
program = program,
remote_init = self.config.remote_init,
program_args = program_args.join(" "),
server_address = server_address,
server_pid = self.server_pid,
args = args
)
} else {
format!(
"{remote_init}\n{program} {program_args} governor {server_address} {args}",
program = program,
remote_init = self.config.remote_init,
program_args = program_args.join(" "),
server_address = server_address,
args = args
)
};
process.start(&command, &dir, &self.config.log_dir)?;
self.remote_processes.push(process);
}
Ok(())
}
fn server_address(&self, localhost: bool) -> String {
let hostname = if localhost {
"127.0.0.1".to_string()
} else {
get_hostname()
};
format!("{}:{}", hostname, self.config.server_listen_address.port())
}
fn start_local_governors(&mut self) -> Result<()> {
info!(
"Starting {} local governor(s)",
self.config.local_governors.len()
);
let server_address = self.server_address(true);
let (program, program_args) = self.local_rain_command();
let governors: Vec<_> = self.config
.local_governors
.iter()
.cloned()
.enumerate()
.collect();
for (i, resource) in governors {
let ready_file = self.create_tmp_filename(&format!("governor-{}-ready", i));
let mut cmd = Command::new(&program);
cmd.args(&program_args)
.arg("governor")
.arg(&server_address)
.arg("--logdir")
.arg(self.config.log_dir.join(format!("governor-{}", i)))
.arg("--ready-file")
.arg(&ready_file);
if let Some(cpus) = resource {
cmd.arg("--cpus");
cmd.arg(cpus.to_string());
}
if let Some(ref config) = self.config.governor_config {
cmd.arg("--config");
cmd.arg(config);
}
self.spawn_process(&format!("governor-{}", i), &ready_file, &mut cmd)?;
}
Ok(())
}
pub fn busy_wait_for_ready(&mut self) -> Result<()> {
let mut timeout_ms = 50; while 0 != self.check_all_ready()? {
::std::thread::sleep(::std::time::Duration::from_millis(timeout_ms));
if timeout_ms < 1500 {
timeout_ms += 50;
}
}
Ok(())
}
pub fn check_all_ready(&mut self) -> Result<u32> {
let mut not_ready = 0u32;
for process in &mut self.processes {
if !process.check_ready()? {
not_ready += 1;
}
}
for process in &mut self.remote_processes {
if !process.check_ready()? {
not_ready += 1;
}
}
Ok(not_ready)
}
pub fn kill_all(&mut self) {
for mut process in ::std::mem::replace(&mut self.processes, Vec::new()) {
match process.kill() {
Ok(()) => {}
Err(e) => debug!("Kill failed: {}", e.description()),
};
}
for mut process in ::std::mem::replace(&mut self.remote_processes, Vec::new()) {
match process.kill() {
Ok(()) => {}
Err(e) => debug!("Kill failed: {}", e.description()),
};
}
}
}