shell_compose/
dispatcher.rsuse crate::{
ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message, ProcStatus,
QueryCommand, Runner,
};
use chrono::{DateTime, Local, TimeZone};
use job_scheduler_ng::{Job, JobScheduler};
use log::{error, info};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use thiserror::Error;
#[derive(Default)]
pub struct Dispatcher {
procs: Arc<Mutex<Vec<Runner>>>,
}
#[derive(Error, Debug)]
pub enum DispatcherError {
#[error(transparent)]
CliArgsError(#[from] clap::Error),
#[error("Failed to spawn process: {0}")]
ProcSpawnError(std::io::Error),
#[error("Failed to spawn process (timeout)")]
ProcSpawnTimeoutError,
#[error("Process exit code: {0}")]
ProcExitError(i32),
#[error("Empty command")]
EmptyProcCommandError,
#[error(transparent)]
JustfileError(#[from] JustfileError),
#[error("Communication protocol error")]
UnexpectedMessageError,
#[error(transparent)]
IpcClientError(#[from] IpcClientError),
#[error("Cron error: {0}")]
CronError(#[from] cron::error::Error),
}
impl Dispatcher {
pub fn new() -> Self {
Dispatcher::default()
}
pub fn exec_command(&mut self, cmd: ExecCommand) -> Message {
info!("Executing `{cmd:?}`");
let res = match cmd {
ExecCommand::Run { args } => self.run(&args),
ExecCommand::Runat { at, args } => self.run_at(&at, &args),
ExecCommand::Start { service } => self.start(&service),
ExecCommand::Up { group } => self.up(&group),
};
if let Err(e) = &res {
error!("{e}");
}
res.into()
}
pub fn query_command(&mut self, cmd: QueryCommand, stream: &mut IpcStream) {
info!("Executing `{cmd:?}`");
let res = match cmd {
QueryCommand::Exit => std::process::exit(0),
QueryCommand::Ps => self.ps(stream),
QueryCommand::Logs => self.log(stream),
};
if let Err(e) = &res {
error!("{e}");
}
let _ = stream.send_message(&res.into());
}
fn run(&mut self, args: &[String]) -> Result<(), DispatcherError> {
let mut child = Runner::spawn(args)?;
thread::sleep(Duration::from_millis(10));
let result = match child.update_proc_info().state {
ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
_ => Ok(()),
};
self.procs.lock().unwrap().insert(0, child);
result
}
fn run_at(&mut self, cron: &str, args: &[String]) -> Result<(), DispatcherError> {
let mut scheduler = JobScheduler::new();
let job: Vec<String> = args.into();
let procs = self.procs.clone();
scheduler.add(Job::new(cron.parse()?, move || {
let child = Runner::spawn(&job).unwrap();
procs.lock().unwrap().insert(0, child);
}));
let _handle = thread::spawn(move || loop {
scheduler.tick();
let wait_time = scheduler.time_till_next_job();
if wait_time == Duration::from_millis(0) {
info!("Ending cron job");
break;
}
std::thread::sleep(wait_time);
});
Ok(())
}
fn start(&mut self, service: &str) -> Result<(), DispatcherError> {
self.run(vec!["just".to_string(), service.to_string()].as_slice())
}
fn up(&mut self, group: &str) -> Result<(), DispatcherError> {
if let Ok(justfile) = Justfile::parse() {
let recipes = justfile.group_recipes(group);
for recipe in recipes {
self.start(&recipe)?;
}
}
Ok(())
}
fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
for child in &mut self.procs.lock().unwrap().iter_mut() {
let info = child.update_proc_info();
if stream.send_message(&Message::PsInfo(info.clone())).is_err() {
info!("Aborting ps command (stream error)");
break;
}
}
Ok(())
}
fn log(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
let mut last_seen_ts: HashMap<u32, DateTime<Local>> = HashMap::new(); 'cmd: loop {
for child in self.procs.lock().unwrap().iter_mut() {
if let Ok(output) = child.output.lock() {
let last_seen = last_seen_ts
.entry(child.proc.id())
.or_insert(Local.timestamp_opt(0, 0).unwrap());
for entry in output.lines_since(last_seen) {
if stream
.send_message(&Message::LogLine(entry.clone()))
.is_err()
{
info!("Aborting log command (stream error)");
break 'cmd;
}
}
}
}
thread::sleep(Duration::from_millis(50));
}
Ok(())
}
}