#[macro_use]
extern crate serde_derive;
extern crate dirs;
extern crate hostname;
extern crate libc;
extern crate notify;
extern crate num_cpus;
extern crate serde;
extern crate serde_json;
extern crate shared_child;
extern crate unix_daemonize;
mod longthreads;
use notify::Watcher;
use std::collections::HashMap;
use std::io::{Read, Result, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::os::unix::process::CommandExt;
use std::sync::{Arc, Mutex};
const GB: f64 = (1u64 << 30) as f64;
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct RunningJob {
pub job: Job,
pub started: Duration,
pub node: String,
pub pid: u32,
#[serde(default)]
pub completed: Duration,
#[serde(default)]
pub exit_code: Option<i32>,
}
impl RunningJob {
pub fn duration(&self) -> Duration {
if self.completed != std::time::Duration::from_secs(0) {
if let Some(t) = self.completed.checked_sub(self.started) {
t
} else {
Duration::from_secs(0)
}
} else {
let dur = now();
if let Some(t) = dur.checked_sub(self.started) {
t
} else {
Duration::from_secs(0)
}
}
}
pub fn wait_duration(&self) -> Duration {
self.started
.checked_sub(self.job.submitted)
.unwrap_or(Duration::from_secs(0))
}
fn completed(&self) -> Result<()> {
if let Some(ec) = self.exit_code {
if ec != 0 {
std::fs::rename(
self.job.filepath(Path::new(RUNNING)),
self.job.filepath(Path::new(FAILED)),
)?;
let mut x = self.clone();
x.completed = now();
return x.save(&Path::new(FAILED));
}
}
if let Err(_) = std::fs::rename(
self.job.filepath(Path::new(CANCELING)),
self.job.filepath(Path::new(COMPLETED)),
) {
std::fs::rename(
self.job.filepath(Path::new(RUNNING)),
self.job.filepath(Path::new(COMPLETED)),
)?;
}
let mut x = self.clone();
x.completed = now();
x.save(&Path::new(COMPLETED))
}
pub fn cancel(&self) -> Result<()> {
std::fs::rename(
self.job.filepath(Path::new(RUNNING)),
self.job.filepath(Path::new(CANCELING)),
)?;
let mut x = self.clone();
x.completed = now();
x.save(&Path::new(CANCELING))?;
self.kill()
}
pub fn zombie(&self) -> Result<()> {
std::fs::rename(
self.job.filepath(Path::new(RUNNING)),
self.job.filepath(Path::new(ZOMBIE)),
)?;
let mut x = self.clone();
x.completed = now();
x.save(&Path::new(ZOMBIE))
}
fn canceled(&self) -> Result<()> {
std::fs::rename(
self.job.filepath(Path::new(CANCELING)),
self.job.filepath(Path::new(CANCELED)),
)?;
let mut x = self.clone();
x.completed = now();
x.save(&Path::new(CANCELED))
}
fn read(fname: &Path) -> Result<RunningJob> {
let mut f = std::fs::File::open(fname)?;
let mut data = Vec::new();
f.read_to_end(&mut data)?;
match serde_json::from_slice::<RunningJob>(&data) {
Ok(job) => {
if job.job.command.len() == 0 {
Err(std::io::Error::new(std::io::ErrorKind::Other, "empty cmd?"))
} else {
Ok(job)
}
}
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
fn save(&self, subdir: &Path) -> Result<()> {
let mut f = std::fs::File::create(self.job.filepath(subdir).with_extension("tmp"))?;
f.write_all(&serde_json::to_string(self).unwrap().as_bytes())?;
std::fs::rename(
self.job.filepath(subdir).with_extension("tmp"),
self.job.filepath(subdir),
)
}
pub fn kill(&self) -> Result<()> {
let host = hostname::get().unwrap().into_string().unwrap();
if self.node != host {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("cannot kill job on {} from host {}", &self.node, &host),
));
}
if self.exists() {
self.canceled()?;
unsafe {
libc::kill(self.pid as i32, libc::SIGTERM);
}
std::thread::sleep(Duration::from_secs(2));
if self.exists() {
let myself = DaemonInfo::new();
myself.log(format!(
"FAILED to kill {} (pid {}) with SIGTERM",
self.job.jobname, self.pid
));
unsafe {
libc::kill(self.pid as i32, libc::SIGKILL);
}
std::thread::sleep(Duration::from_secs(2));
if self.exists() {
myself.log(format!(
"FAILED to kill {} (pid {}) with SIGKILL",
self.job.jobname, self.pid
));
return Err(std::io::Error::new(std::io::ErrorKind::Other, "bad kill?"));
}
}
}
Ok(())
}
pub fn exists(&self) -> bool {
if let Ok(mut f) = std::fs::File::open(format!("/proc/{}/environ", self.pid)) {
let mut data = Vec::new();
f.read_to_end(&mut data).ok();
let goal = format!("RQ_SUBMIT_TIME={}", self.job.submitted.as_secs());
let goal = goal.as_bytes();
if data.windows(goal.len()).any(|w| w == goal) {
return true;
}
} else {
return pid_exists(self.pid as i32);
}
false
}
pub fn memory_in_use(&self) -> u64 {
if let Ok(p) = procfs::process::Process::new(self.pid as libc::pid_t) {
let page_size = procfs::page_size().unwrap() as u64;
(p.stat.rss as u64 * page_size) as u64
} else {
0
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Job {
pub home_dir: PathBuf,
pub directory: PathBuf,
pub command: Vec<String>,
pub jobname: String,
pub output: PathBuf,
pub submitted: Duration,
#[serde(default)]
pub cores: usize,
#[serde(default)]
pub memory_required: u64,
#[serde(default)]
pub max_output: Option<u64>,
#[serde(default)]
pub restartable: bool,
}
impl Job {
pub fn new(
command: Vec<String>,
jobname: String,
output: PathBuf,
cores: usize,
memory_required: u64,
max_output: u64,
restartable: bool,
) -> Result<Job> {
Ok(Job {
directory: std::env::current_dir()?,
home_dir: dirs::home_dir().unwrap(),
command,
jobname,
output,
submitted: now(),
cores,
memory_required,
max_output: Some(max_output),
restartable: restartable,
})
}
fn pretty_command(&self) -> String {
let mut out = String::new();
for c in self.command.iter() {
if !c.contains(' ') && !c.contains('"') && !c.contains('\'') && !c.contains('\\') {
out.push_str(c);
} else {
out.push_str(&format!("{:?}", c));
}
out.push(' ');
}
out.pop();
out
}
pub fn cancel(&self) -> Result<()> {
std::fs::rename(
self.filepath(Path::new(WAITING)),
self.filepath(Path::new(CANCELED)),
)
}
pub fn wait_duration(&self) -> Duration {
let dur = now();
if let Some(t) = dur.checked_sub(self.submitted) {
t
} else {
Duration::from_secs(0)
}
}
fn filename(&self) -> PathBuf {
PathBuf::from(format!(
"{}.{}.job",
self.submitted.as_secs(),
self.submitted.subsec_nanos()
))
}
fn filepath(&self, subdir: &Path) -> PathBuf {
self.home_dir.join(RQ).join(subdir).join(self.filename())
}
fn read(fname: &Path) -> Result<Job> {
let mut f = std::fs::File::open(fname)?;
let mut data = Vec::new();
f.read_to_end(&mut data)?;
match serde_json::from_slice::<RunningJob>(&data) {
Ok(rj) => {
let job = rj.job;
if job.command.len() == 0 {
Err(std::io::Error::new(std::io::ErrorKind::Other, "empty cmd?"))
} else {
Ok(job)
}
}
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
fn save(&self, subdir: &Path) -> Result<()> {
let mut f = std::fs::File::create(self.filepath(subdir).with_extension("tmp"))?;
let rj = RunningJob {
job: self.clone(),
node: String::from("NONE"),
pid: 0,
started: now(),
completed: std::time::Duration::from_secs(0),
exit_code: None,
};
f.write_all(&serde_json::to_string(&rj).unwrap().as_bytes())?;
std::fs::rename(
&self.filepath(subdir).with_extension("tmp"),
&self.filepath(subdir),
)
}
fn change_status(&self, old_subdir: &Path, new_subdir: &Path) -> Result<()> {
std::fs::rename(self.filepath(old_subdir), self.filepath(new_subdir))
}
pub fn submit(&self) -> Result<()> {
self.ensure_directories()?;
self.save(Path::new(WAITING))
}
fn ensure_directories(&self) -> Result<()> {
ensure_directories()
}
}
const RQ: &'static str = ".roundqueue";
const RUNNING: &'static str = "running";
const WAITING: &'static str = "waiting";
const FAILED: &'static str = "failed";
const ZOMBIE: &'static str = "zombie";
const COMPLETED: &'static str = "completed";
const CANCELED: &'static str = "canceled";
const CANCELING: &'static str = "cancel";
const SHORT_TIME: std::time::Duration = std::time::Duration::from_secs(1);
const LIVE_TIME: std::time::Duration = std::time::Duration::from_secs(10 * 60);
const POLLING_TIME: std::time::Duration = std::time::Duration::from_secs(30);
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Status {
pub homedirs_sharing_host: Vec<PathBuf>,
pub waiting: Vec<Job>,
pub running: Vec<RunningJob>,
pub nodes: Vec<DaemonInfo>,
}
impl Status {
pub fn new() -> Result<Status> {
let mut status = Status {
homedirs_sharing_host: Vec::new(),
waiting: Vec::new(),
running: Vec::new(),
nodes: Vec::new(),
};
let host = hostname::get().unwrap().into_string().unwrap();
let my_homedir = dirs::home_dir().unwrap();
let mut root_home = my_homedir.clone();
root_home.pop();
let root_home = root_home;
for node in std::fs::read_dir(my_homedir.join(RQ)) {
for node in node.flat_map(|r| r.ok()) {
if let Ok(dinfo) = DaemonInfo::read(&my_homedir.join(RQ).join(node.path())) {
status.nodes.push(dinfo);
}
}
}
for userdir in root_home.read_dir()? {
if let Ok(userdir) = userdir {
let rqdir = userdir.path().join(RQ);
if let Ok(pid) = read_pid(&rqdir.join(&host)) {
if pid_exists(pid) {
status
.homedirs_sharing_host
.push(root_home.join(userdir.path()));
} else {
}
} else {
}
if let Ok(rr) = rqdir.join(RUNNING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
if host == j.node && !j.exists() {
println!("Job {} appears to have failed!", j.job.jobname);
j.zombie().ok();
} else {
status.running.push(j);
}
} else {
eprintln!("Error reading {:?}", run.path());
}
}
}
for j in status.running.iter_mut().filter(|j| j.job.cores == 0) {
j.job.cores = num_cpus::get_physical();
}
if let Ok(rr) = rqdir.join(WAITING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = Job::read(&run.path()) {
status.waiting.push(j);
} else {
eprintln!("Error reading {:?}", run.path());
}
}
}
}
}
Ok(status)
}
pub fn my_waiting_jobs() -> Vec<Job> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(WAITING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = Job::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_running_jobs() -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(RUNNING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_completed_jobs(&self) -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(COMPLETED).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_failed_jobs(&self) -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(FAILED).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_zombie_jobs(&self) -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(ZOMBIE).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_canceled_jobs(&self) -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(CANCELED).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn my_canceling_jobs(&self) -> Vec<RunningJob> {
let home = dirs::home_dir().unwrap();
let mut out = Vec::new();
if let Ok(rr) = home.join(".roundqueue").join(CANCELING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
out.push(j);
}
}
}
out
}
pub fn has_jobname(&self, jn: &str) -> bool {
self.waiting.iter().any(|j| j.jobname == jn)
|| self.running.iter().any(|j| j.job.jobname == jn)
}
fn run_next(
self,
host: &str,
home_dir: &Path,
threads: &longthreads::Threads,
in_foreground: bool,
) {
let cpus = num_cpus::get_physical();
let available_ram = if let Ok(m) = procfs::Meminfo::new() {
if let Some(ma) = m.mem_available {
ma
} else {
m.mem_total
}
} else {
16 * GB as u64
};
let myself = DaemonInfo::new();
let waiting: Vec<_> = self
.waiting
.iter()
.filter(|j| self.homedirs_sharing_host.contains(&j.home_dir))
.filter(|j| j.cores <= cpus)
.filter(|j| j.memory_required <= available_ram)
.map(|j| j.home_dir.clone())
.collect();
if waiting.len() == 0 {
return;
}
let run_counts: HashMap<_, _> = waiting
.iter()
.map(|hd| {
(
hd.clone(),
self.running
.iter()
.filter(|j| &j.job.home_dir == hd)
.map(|j| j.job.cores)
.sum::<usize>(),
)
})
.collect();
let least_running = run_counts.values().cloned().min().unwrap();
if run_counts.get(home_dir) != Some(&least_running) {
return;
}
let mut job = self.waiting[0].clone();
let mut earliest_submitted = std::time::Duration::from_secs(0xffffffffffffff);
for j in self
.waiting
.into_iter()
.filter(|j| j.cores <= cpus)
.filter(|j| j.memory_required <= available_ram)
{
if run_counts.get(&j.home_dir) == Some(&least_running)
&& j.submitted < earliest_submitted
{
earliest_submitted = j.submitted;
job = j;
}
}
if &job.home_dir != home_dir {
return;
}
let status = Status::new().unwrap();
let cores_in_use: usize = status
.running
.iter()
.filter(|&j| host == j.node)
.map(|j| j.job.cores)
.sum();
let mem_reserved: u64 = status
.running
.iter()
.filter(|&j| host == j.node)
.map(|j| {
let in_use = j.memory_in_use();
if j.job.memory_required < in_use {
0
} else {
j.job.memory_required - in_use
}
})
.sum();
let cores_available = cpus - cores_in_use;
if job.cores > cores_available || cores_available == 0 {
return;
}
let memory_still_available = if available_ram > mem_reserved {
available_ram - mem_reserved
} else {
0
};
if job.memory_required > memory_still_available {
myself.log(format!(
"Not enough memory available: {:.1}G memory available, with {:.1}G reserved, but I require {:.1}G",
available_ram as f64/GB, mem_reserved as f64/GB, job.memory_required as f64/GB,
));
return;
}
if let Err(e) = job.change_status(Path::new(WAITING), Path::new(RUNNING)) {
myself.log(format!(
"Unable to change status of job {} ({})",
&job.jobname, e
));
return;
}
myself.log(format!("starting {:?}", &job.jobname));
let mut f = match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(job.directory.join(&job.output))
{
Ok(f) => f,
Err(e) => {
myself.log(format!(
"Error creating output {:?}: {}",
job.directory.join(&job.output),
e
));
return;
}
};
if let Err(e) = writeln!(
f,
"::::: Starting job {:?} on {}: {}",
&job.jobname,
&host,
job.pretty_command()
) {
myself.log(format!(
"Error writing to output {:?}: {}",
job.directory.join(&job.output),
e
));
return;
}
if in_foreground {
let home = dirs::home_dir().unwrap();
let host = hostname::get().unwrap().into_string().unwrap();
unix_daemonize::daemonize_redirect(
Some(home.join(RQ).join(&host).with_extension("log")),
Some(home.join(RQ).join(&host).with_extension("log")),
unix_daemonize::ChdirMode::ChdirRoot,
)
.unwrap();
}
let mut cmd = std::process::Command::new(&job.command[0]);
let fd = f.into_raw_fd();
let stderr = unsafe { std::process::Stdio::from_raw_fd(fd) };
let stdout = unsafe { std::process::Stdio::from_raw_fd(fd) };
cmd.args(&job.command[1..])
.current_dir(&job.directory)
.env("RQ_SUBMIT_TIME", format!("{}", job.submitted.as_secs()))
.stderr(stderr)
.stdout(stdout)
.stdin(std::process::Stdio::null());
unsafe {
cmd.pre_exec(|| {
libc::nice(19);
Ok(())
});
}
let child = match shared_child::SharedChild::spawn(&mut cmd) {
Ok(c) => c,
Err(e) => {
myself.log(format!("Unable to spawn child: {}", e));
return;
}
};
let mut runningjob = RunningJob {
started: now(),
node: String::from(host),
job: job,
pid: child.id(),
completed: std::time::Duration::from_secs(0),
exit_code: None,
};
if runningjob.job.cores == 0 {
runningjob.job.cores = num_cpus::get_physical();
}
if let Err(e) = runningjob.save(Path::new(RUNNING)) {
myself.log(format!("Yikes, unable to save job? {}", e));
return;
}
let child = Arc::new(child);
let child_to_kill = child.clone();
let all_done = Arc::new(Mutex::new(false));
let all_done_setter = all_done.clone();
let output_path = runningjob.job.directory.join(&runningjob.job.output);
if let Some(max_output) = runningjob.job.max_output {
threads.spawn(move || {
let output_path = &output_path;
loop {
std::thread::sleep(SHORT_TIME);
if *all_done.lock().unwrap() {
return;
}
if let Ok(md) = output_path.metadata() {
if md.len() > max_output {
child_to_kill.kill().ok();
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(output_path)
{
writeln!(
f,
":::::: [{}] Job created too large an output file! {} > {}",
child_to_kill.id(),
md.len(),
max_output
)
.ok();
}
}
}
std::thread::sleep(LIVE_TIME);
}
});
}
threads.spawn(move || {
match child.wait() {
Err(e) => {
myself.log(format!("Error running {:?}: {}", runningjob.job.command, e));
}
Ok(st) => {
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(runningjob.job.directory.join(&runningjob.job.output))
{
writeln!(
f,
":::::: [{}] Job {:?} exited with status {:?}",
myself.pid,
&runningjob.job.jobname,
st.code()
)
.ok();
runningjob.exit_code = st.code();
}
myself.log(format!("Done running {:?}: {}", runningjob.job.jobname, st));
}
}
*all_done_setter.lock().unwrap() = true;
if let Err(e) = runningjob.completed() {
myself.log(format!(
"Unable to change status of completed job {} ({})",
&runningjob.job.jobname, e
));
return;
}
});
}
}
pub fn spawn_runner(in_foreground: bool, quietly: bool) -> Result<()> {
ensure_directories()?;
std::env::set_var("RUST_BACKTRACE", "1");
let home = dirs::home_dir().unwrap();
let host = hostname::get().unwrap().into_string().unwrap();
let mut root_home = home.clone();
root_home.pop();
let root_home = root_home;
let cpus = num_cpus::get_physical();
let hyperthreads = num_cpus::get();
if !quietly {
println!(
"I am spawning a runner for {} with {} cpus in {:?}!",
&host, cpus, &home
);
}
if !in_foreground {
unix_daemonize::daemonize_redirect(
Some(home.join(RQ).join(&host).with_extension("log")),
Some(home.join(RQ).join(&host).with_extension("log")),
unix_daemonize::ChdirMode::ChdirRoot,
)
.unwrap();
}
DaemonInfo::write()?;
let myself = DaemonInfo::new();
myself.log(format!(
"==================\nRestarting runner process {}!",
myself.pid
));
let mut old_status = Status::new()?;
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let notify_polling = notify_tx.clone();
std::thread::spawn(move || {
loop {
notify_polling.send(notify::DebouncedEvent::Rescan).unwrap();
std::thread::sleep(POLLING_TIME);
}
});
let mut watcher = notify::watcher(notify_tx.clone(), std::time::Duration::from_secs(1)).ok();
if let Some(watcher) = &mut watcher {
watcher
.watch(
home.join(RQ).join(WAITING),
notify::RecursiveMode::NonRecursive,
)
.ok();
}
for userdir in root_home.read_dir()? {
if let Ok(userdir) = userdir {
if let Some(watcher) = &mut watcher {
watcher
.watch(
userdir.path().join(RQ).join(RUNNING),
notify::RecursiveMode::NonRecursive,
)
.ok();
}
}
}
let threads = longthreads::Threads::new();
loop {
if let Some(watcher) = &mut watcher {
watcher
.watch(
home.join(RQ).join(WAITING),
notify::RecursiveMode::NonRecursive,
)
.ok();
}
if let Ok(rr) = home.join(RQ).join(CANCELING).read_dir() {
for run in rr.flat_map(|r| r.ok()) {
if let Ok(j) = RunningJob::read(&run.path()) {
j.kill().ok();
} else {
myself.log(format!("Error reading scancel {:?}", run.path()));
}
}
}
let daemon_running =
DaemonInfo::read_my_own().expect("Lock file unreadable, I should exit");
if daemon_running.pid != myself.pid {
myself.log(format!(
"I {} have been replaced by {}.",
myself.pid, daemon_running.pid
));
return Ok(());
}
DaemonInfo::write().unwrap();
if Status::my_waiting_jobs().len() == 0 {
if in_foreground {
println!("There are no runnable jobs.");
return Ok(());
}
if let Some(watcher) = &mut watcher {
watcher
.watch(
home.join(RQ).join(CANCELING),
notify::RecursiveMode::NonRecursive,
)
.ok();
}
notify_rx.recv().unwrap();
continue;
}
for userdir in root_home.read_dir()? {
if let Ok(userdir) = userdir {
if let Some(watcher) = &mut watcher {
watcher
.watch(
userdir.path().join(RQ).join(RUNNING),
notify::RecursiveMode::NonRecursive,
)
.ok();
}
}
}
let status = Status::new().unwrap();
for j in status.running.iter().filter(|j| &j.node == &host) {
myself.log(format!(" Job {} has {} cores", j.job.jobname, j.job.cores));
}
let running = status
.running
.iter()
.filter(|j| &j.node == &host)
.map(|j| j.job.cores)
.sum();
if old_status != status {
myself.log(format!(
"Currently using {}/{} cores, with {} jobs waiting.",
running,
cpus,
status.waiting.len()
));
}
old_status = status.clone();
let total_cpus: usize = status.nodes.iter().map(|di| di.physical_cores).sum();
let total_running: usize = status
.running
.iter()
.filter(|&j| status.nodes.iter().any(|d| d.hostname == j.node))
.map(|j| j.job.cores)
.sum();
if cpus > running && status.waiting.len() > 0 {
status.run_next(&host, &home, &threads, in_foreground);
} else if status.waiting.len() > 0 && total_running >= total_cpus {
let mut user_running_cores = HashMap::new();
for j in status
.running
.iter()
.filter(|&j| status.nodes.iter().any(|d| d.hostname == j.node))
{
let hd = j.job.home_dir.clone();
let count = user_running_cores.get(&hd).unwrap_or(&0) + j.job.cores;
user_running_cores.insert(hd, count);
}
if !user_running_cores.contains_key(&home) {
user_running_cores.insert(home.clone(), 0);
}
let total_users = user_running_cores.len();
let cpus_per_user = total_cpus / total_users;
let fewest_running_waiting_user: usize = status
.waiting
.iter()
.map(|j| user_running_cores.get(&j.home_dir).unwrap_or(&0))
.min()
.map(|&n| n)
.unwrap_or(0);
if user_running_cores[&home] > cpus_per_user
&& user_running_cores[&home] > fewest_running_waiting_user
{
let my_running = Status::my_running_jobs();
if let Some(j) = my_running
.into_iter()
.filter(|j| j.job.restartable)
.min_by_key(|j| j.started)
{
println!("Could consider restarting {}", j.job.jobname);
let restart_job = j.job.clone();
if restart_job.submit().is_ok() {
if j.kill().is_err() {
restart_job.cancel().ok();
}
}
}
}
if hyperthreads > running {
if user_running_cores[&home] < cpus_per_user {
myself.log(format!(
"Thinking about using hyperthreads: {} < {}.",
user_running_cores[&home], cpus_per_user
));
let politely_waiting = status
.waiting
.iter()
.filter(|j| user_running_cores[&j.home_dir] < cpus_per_user)
.count();
if politely_waiting > 0 {
myself.log(format!(
" Starting a job since we have some {} waiting.",
politely_waiting
));
status.run_next(&host, &home, &threads, in_foreground);
} else {
myself.log(format!(" I have no jobs waiting to run."));
}
}
}
}
if in_foreground {
return Ok(());
}
notify_rx.recv().unwrap();
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct DaemonInfo {
pub hostname: String,
pub pid: libc::pid_t,
pub physical_cores: usize,
pub logical_cpus: usize,
pub restart_time: Duration,
}
impl DaemonInfo {
fn new() -> DaemonInfo {
DaemonInfo {
hostname: hostname::get().unwrap().into_string().unwrap(),
pid: unsafe { libc::getpid() },
physical_cores: num_cpus::get_physical(),
logical_cpus: num_cpus::get(),
restart_time: now(),
}
}
fn read_my_own() -> Result<DaemonInfo> {
let home = dirs::home_dir().unwrap();
let host = hostname::get().unwrap().into_string().unwrap();
DaemonInfo::read(&home.join(RQ).join(&host))
}
fn read(fname: &Path) -> Result<DaemonInfo> {
let mut f = std::fs::File::open(fname)?;
let mut data = Vec::new();
f.read_to_end(&mut data)?;
let dinfo = match serde_json::from_slice::<DaemonInfo>(&data) {
Ok(dinfo) => dinfo,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
};
let now = now();
match dinfo.exists() {
Some(true) => (),
Some(false) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Process does not exist",
));
}
None => {
if dinfo.restart_time < now && now - dinfo.restart_time > LIVE_TIME {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Must have died long ago",
));
}
}
}
Ok(dinfo)
}
fn write() -> Result<()> {
let home = dirs::home_dir().unwrap();
let myself = DaemonInfo::new();
let mut f = std::fs::File::create(&home.join(RQ).join(&myself.hostname))?;
f.write_all(&serde_json::to_string(&myself).unwrap().as_bytes())
}
fn log(&self, msg: String) {
eprintln!("{}: {}", self.pid, msg);
}
fn exists(&self) -> Option<bool> {
let host = hostname::get().unwrap().into_string().unwrap();
if self.hostname != host {
return None;
}
if let Ok(mut f) = std::fs::File::open(format!("/proc/{}/cmdline", self.pid)) {
let mut data = Vec::new();
f.read_to_end(&mut data).ok();
if data.windows(b"daemon\0".len()).any(|w| w == b"daemon\0")
|| data.windows(b"restart".len()).any(|w| w == b"restart")
|| data.windows(b"run".len()).any(|w| w == b"run")
{
return Some(true);
}
}
Some(false)
}
}
fn read_pid(fname: &Path) -> Result<libc::pid_t> {
Ok(DaemonInfo::read(fname)?.pid)
}
fn ensure_directories() -> Result<()> {
let home = dirs::home_dir().unwrap();
std::fs::create_dir_all(&home.join(RQ).join(WAITING))?;
std::fs::create_dir_all(&home.join(RQ).join(RUNNING))?;
std::fs::create_dir_all(&home.join(RQ).join(COMPLETED))?;
std::fs::create_dir_all(&home.join(RQ).join(FAILED))?;
std::fs::create_dir_all(&home.join(RQ).join(CANCELED))?;
std::fs::create_dir_all(&home.join(RQ).join(ZOMBIE))?;
std::fs::create_dir_all(&home.join(RQ).join(CANCELING))
}
pub fn now() -> Duration {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
}
fn pid_exists(pid: libc::pid_t) -> bool {
std::path::Path::new(&format!("/proc/{}", pid)).exists()
}