use crate::{
CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
ProcStatus, Runner,
};
use chrono::{DateTime, Local, TimeZone};
use job_scheduler_ng::{self as job_scheduler, JobScheduler};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
use thiserror::Error;
pub type JobId = u32;
pub type Pid = u32;
pub struct Dispatcher<'a> {
jobs: BTreeMap<JobId, JobInfo>,
last_job_id: JobId,
cronjobs: HashMap<JobId, job_scheduler::Uuid>,
procs: Arc<Mutex<Vec<Runner>>>,
scheduler: Arc<Mutex<JobScheduler<'a>>>,
system: System,
channel: mpsc::Sender<Pid>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct JobInfo {
pub job_type: JobType,
pub args: Vec<String>,
pub restart: RestartInfo,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum JobType {
Shell,
Service(String),
Cron(String),
}
#[derive(clap::ValueEnum, Clone, Serialize, Deserialize, Debug)]
pub enum RestartPolicy {
Always,
OnFailure,
Never,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct RestartInfo {
pub policy: RestartPolicy,
pub wait_time: u64,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Job {
pub id: JobId,
pub info: JobInfo,
}
#[derive(Error, Debug)]
pub enum DispatcherError {
#[error(transparent)]
CliArgsError(#[from] clap::Error),
#[error("Failed to start `shell-composed`: {0}")]
DispatcherSpawnError(std::io::Error),
#[error("Connection to `shell-composed` failed")]
DispatcherSpawnTimeoutError,
#[error("Failed to spawn `{0}`: {1}")]
ProcSpawnError(String, std::io::Error),
#[error("Failed to terminate child process: {0}")]
KillError(std::io::Error),
#[error("Job {0} not found")]
JobNotFoundError(JobId),
#[error("Service `{0}` not found")]
ServiceNotFoundError(String),
#[error("Process exit code: {0}")]
ProcExitError(i32),
#[error("Empty command")]
EmptyProcCommandError,
#[error(transparent)]
JustfileError(#[from] JustfileError),
#[error("Communication protocol error")]
UnexpectedMessageError,
#[error(transparent)]
IpcClientError(#[from] IpcClientError),
#[error("Cron error: {0}")]
CronError(#[from] cron::error::Error),
}
impl Default for RestartInfo {
fn default() -> Self {
RestartInfo {
policy: RestartPolicy::OnFailure,
wait_time: 50,
}
}
}
impl JobInfo {
pub fn new_shell_job(args: Vec<String>, restart: Option<RestartPolicy>) -> Self {
let policy = restart.unwrap_or(RestartPolicy::Never);
JobInfo {
job_type: JobType::Shell,
args,
restart: RestartInfo {
policy,
..Default::default()
},
}
}
pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
JobInfo {
job_type: JobType::Cron(cron),
args,
restart: RestartInfo {
policy: RestartPolicy::Never,
..Default::default()
},
}
}
pub fn service_command(service: &str, args: &[String]) -> String {
[service]
.into_iter()
.chain(args.iter().map(|arg| arg.as_str()))
.collect::<Vec<_>>()
.join(" ")
}
pub fn new_service(service: &str, svc_args: &[String], restart: Option<RestartPolicy>) -> Self {
let mut args = vec!["just".to_string(), service.to_string()];
args.extend(svc_args.to_vec());
let policy = restart.unwrap_or(RestartPolicy::OnFailure);
JobInfo {
job_type: JobType::Service(Self::service_command(service, svc_args)),
args,
restart: RestartInfo {
policy,
..Default::default()
},
}
}
}
impl Dispatcher<'_> {
pub fn create() -> Dispatcher<'static> {
let procs = Arc::new(Mutex::new(Vec::new()));
let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
let scheduler_spawn = scheduler.clone();
let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
let (send, recv) = mpsc::channel();
let send_spawn = send.clone();
let procs_spawn = procs.clone();
let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
let system = System::new_with_specifics(
RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
);
Dispatcher {
jobs: BTreeMap::new(),
last_job_id: 0,
cronjobs: HashMap::new(),
procs,
scheduler,
system,
channel: send,
}
}
pub fn exec_command(&mut self, cmd: ExecCommand, cwd: PathBuf) -> Message {
info!("Executing `{cmd:?}`");
std::env::set_current_dir(&cwd).unwrap();
let res = match cmd {
ExecCommand::Run { args, restart } => self.run(&args, restart),
ExecCommand::Runat { at, args } => self.run_at(&at, cwd, &args),
ExecCommand::Start {
service,
args,
restart,
} => self.start(&service, &args, restart),
ExecCommand::Up { group } => self.up(&group),
};
match res {
Err(e) => {
error!("{e}");
Message::Err(format!("{e}"))
}
Ok(job_ids) => Message::JobsStarted(job_ids),
}
}
pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
info!("Executing `{cmd:?}`");
let res = match cmd {
CliCommand::Stop { job_id } => self.stop(job_id),
CliCommand::Down { group } => self.down(&group),
CliCommand::Ps => self.ps(stream),
CliCommand::Jobs => self.jobs(stream),
CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
CliCommand::Exit => std::process::exit(0),
};
if let Err(e) = &res {
error!("{e}");
}
let _ = stream.send_message(&res.into());
}
fn add_job(&mut self, job: JobInfo) -> JobId {
self.last_job_id += 1;
self.jobs.insert(self.last_job_id, job);
self.last_job_id
}
fn find_job(&self, service: &str, args: &[String]) -> Option<JobId> {
let cmd = JobInfo::service_command(service, args);
self.jobs
.iter()
.find(
|(_id, info)| matches!(&info.job_type, JobType::Service(command) if *command == cmd),
)
.map(|(id, _info)| *id)
}
fn run(
&mut self,
args: &[String],
policy: Option<RestartPolicy>,
) -> Result<Vec<JobId>, DispatcherError> {
let job_info = JobInfo::new_shell_job(args.to_vec(), policy);
let job_id = self.add_job(job_info);
self.spawn_job(job_id)?;
Ok(vec![job_id])
}
fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
let job = self
.jobs
.get(&job_id)
.ok_or(DispatcherError::JobNotFoundError(job_id))?;
let child = Runner::spawn(job_id, &job.args, job.restart.clone(), self.channel.clone())?;
self.procs.lock().expect("lock").push(child);
thread::sleep(Duration::from_millis(10));
if let Some(child) = self.procs.lock().expect("lock").last() {
return match child.info.state {
ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
_ => Ok(()),
};
}
Ok(())
}
fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
if let Some(uuid) = self.cronjobs.remove(&job_id) {
info!("Removing cron job {job_id}");
self.scheduler.lock().expect("lock").remove(uuid);
}
for child in self
.procs
.lock()
.expect("lock")
.iter_mut()
.filter(|child| child.info.job_id == job_id)
{
if child.is_running() {
child.user_terminated = true;
child.terminate().map_err(DispatcherError::KillError)?;
}
}
if self.jobs.remove(&job_id).is_some() {
Ok(())
} else {
Err(DispatcherError::JobNotFoundError(job_id))
}
}
fn run_at(
&mut self,
cron: &str,
cwd: PathBuf,
args: &[String],
) -> Result<Vec<JobId>, DispatcherError> {
let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
let restart_info = job_info.restart.clone();
let job_id = self.add_job(job_info);
let job_args = args.to_vec();
let procs = self.procs.clone();
let channel = self.channel.clone();
let uuid = self
.scheduler
.lock()
.expect("lock")
.add(job_scheduler::Job::new(cron.parse()?, move || {
std::env::set_current_dir(&cwd).unwrap();
let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
.unwrap();
procs.lock().expect("lock").push(child);
}));
self.cronjobs.insert(job_id, uuid);
Ok(vec![job_id])
}
fn start(
&mut self,
service: &str,
args: &[String],
policy: Option<RestartPolicy>,
) -> Result<Vec<JobId>, DispatcherError> {
let job_id = self
.find_job(service, args)
.unwrap_or_else(|| self.add_job(JobInfo::new_service(service, args, policy)));
let running = self
.procs
.lock()
.expect("lock")
.iter_mut()
.any(|child| child.info.job_id == job_id && child.is_running());
if running {
Ok(vec![])
} else {
self.spawn_job(job_id)?;
Ok(vec![job_id])
}
}
fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
let mut job_ids = Vec::new();
let justfile = Justfile::parse()?;
let recipes = justfile.group_recipes(group);
for service in recipes {
let ids = self.start(&service, &[], None)?;
job_ids.extend(ids);
}
Ok(job_ids)
}
fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
let mut job_ids = Vec::new();
let justfile = Justfile::parse()?;
let recipes = justfile.group_recipes(group);
for service in recipes {
self.jobs
.iter()
.filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
.for_each(|(id, _info)| job_ids.push(*id));
}
for job_id in job_ids {
self.stop(job_id)?;
}
Ok(())
}
fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
let ts = Local::now();
self.system.refresh_processes_specifics(
ProcessesToUpdate::All,
true,
ProcessRefreshKind::nothing().with_cpu(),
);
let pids: Vec<sysinfo::Pid> = self
.procs
.lock()
.expect("lock")
.iter()
.flat_map(|proc| {
let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
self.system
.processes()
.iter()
.filter(move |(_pid, process)| {
process.parent().unwrap_or(0.into()) == parent_pid
})
.map(|(pid, _process)| *pid)
.chain([parent_pid])
})
.collect();
std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); let duration = (Local::now() - ts).num_milliseconds();
fn per_second(value: u64, ms: i64) -> u64 {
(value as f64 * 1000.0 / ms as f64) as u64
}
self.system.refresh_processes_specifics(
ProcessesToUpdate::Some(&pids),
true,
ProcessRefreshKind::nothing()
.with_cpu()
.with_disk_usage()
.with_memory(),
);
let mut proc_infos = Vec::new();
for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
let main_pid = if child.info.program() == "just" {
self.system
.processes()
.iter()
.find(|(_pid, process)| {
process.parent().unwrap_or(0.into()) == parent_pid
&& process.name() != "ctrl-c"
})
.map(|(pid, _process)| *pid)
.unwrap_or(parent_pid)
} else {
parent_pid
};
if let Some(process) = self.system.process(main_pid) {
child.info.cpu = process.cpu_usage();
child.info.memory = process.memory();
child.info.virtual_memory = process.virtual_memory();
let disk = process.disk_usage();
child.info.total_written_bytes = disk.total_written_bytes;
child.info.written_bytes = per_second(disk.written_bytes, duration);
child.info.total_read_bytes = disk.total_read_bytes;
child.info.read_bytes = per_second(disk.read_bytes, duration);
} else {
child.info.cpu = 0.0;
child.info.memory = 0;
child.info.virtual_memory = 0;
child.info.written_bytes = 0;
child.info.read_bytes = 0;
}
let info = child.update_proc_state();
proc_infos.push(info.clone());
}
stream.send_message(&Message::PsInfo(proc_infos))?;
Ok(())
}
fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
let mut job_infos = Vec::new();
for (id, info) in self.jobs.iter().rev() {
job_infos.push(Job {
id: *id,
info: info.clone(),
});
}
stream.send_message(&Message::JobInfo(job_infos))?;
Ok(())
}
fn log(
&mut self,
job_or_service: Option<String>,
stream: &mut IpcStream,
) -> Result<(), DispatcherError> {
let mut job_id_filter = None;
if let Some(job_or_service) = job_or_service {
if let Ok(job_id) = JobId::from_str(&job_or_service) {
if self.jobs.contains_key(&job_id) {
job_id_filter = Some(job_id);
} else {
return Err(DispatcherError::JobNotFoundError(job_id));
}
} else {
job_id_filter = Some(
self.find_job(&job_or_service, &[])
.ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
);
}
}
let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
'logwait: loop {
let mut log_lines = Vec::new();
for child in self.procs.lock().expect("lock").iter_mut() {
if let Ok(output) = child.output.lock() {
let last_seen = last_seen_ts
.entry(child.proc.id())
.or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
for entry in output.lines_since(last_seen) {
if let Some(job_id) = job_id_filter {
if entry.job_id != job_id {
continue;
}
}
log_lines.push(entry.clone());
}
}
}
if log_lines.is_empty() {
stream.alive()?;
} else {
log_lines.sort_by_key(|entry| entry.ts);
for entry in log_lines {
if stream.send_message(&Message::LogLine(entry)).is_err() {
info!("Aborting log command (stream error)");
break 'logwait;
}
}
}
thread::sleep(Duration::from_millis(100));
}
Ok(())
}
}
fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
loop {
let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
scheduler.tick();
scheduler.time_till_next_job()
} else {
Duration::from_millis(50)
};
std::thread::sleep(wait_time);
}
}
fn child_watcher(
procs: Arc<Mutex<Vec<Runner>>>,
sender: mpsc::Sender<Pid>,
recv: mpsc::Receiver<Pid>,
) {
loop {
let pid = recv.recv().expect("recv");
let ts = Local::now();
let respawn_child = if let Some(child) = procs
.lock()
.expect("lock")
.iter_mut()
.find(|p| p.info.pid == pid)
{
let exit_code = child.proc.wait().ok().and_then(|st| st.code());
let _ = child.update_proc_state();
child.info.end = Some(ts);
if let Some(code) = exit_code {
info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
} else {
info!(target: &format!("{pid}"), "Process terminated");
}
child.restart_infos()
} else {
info!(target: &format!("{pid}"), "(Unknown) process terminated");
None
};
if let Some(spawn_info) = respawn_child {
thread::sleep(Duration::from_millis(spawn_info.restart_info.wait_time));
let result = Runner::spawn(
spawn_info.job_id,
&spawn_info.args,
spawn_info.restart_info,
sender.clone(),
);
match result {
Ok(child) => procs.lock().expect("lock").push(child),
Err(e) => error!("Error trying to respawn failed process: {e}"),
}
}
}
}