shell_compose/
dispatcher.rs

1use crate::{
2    CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3    ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::str::FromStr;
11use std::sync::{mpsc, Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
15use thiserror::Error;
16
17pub type JobId = u32;
18pub type Pid = u32;
19
20pub struct Dispatcher<'a> {
21    jobs: BTreeMap<JobId, JobInfo>,
22    last_job_id: JobId,
23    cronjobs: HashMap<JobId, job_scheduler::Uuid>,
24    procs: Arc<Mutex<Vec<Runner>>>,
25    scheduler: Arc<Mutex<JobScheduler<'a>>>,
26    system: System,
27    /// Sender channel for Runner threads
28    channel: mpsc::Sender<Pid>,
29}
30
31#[derive(Clone, Serialize, Deserialize, Debug)]
32pub struct JobInfo {
33    pub job_type: JobType,
34    pub args: Vec<String>,
35    pub entrypoint: Option<String>,
36    pub restart: RestartInfo,
37    // stats: #Runs, #Success, #Restarts
38}
39
40#[derive(Clone, Serialize, Deserialize, Debug)]
41pub enum JobType {
42    Shell,
43    Service(String),
44    Cron(String),
45}
46
47#[derive(Clone, Serialize, Deserialize, Debug)]
48pub struct RestartInfo {
49    pub policy: Restart,
50    /// Waiting time before restart in ms
51    pub wait_time: u64,
52}
53
54/// Restart policy
55#[derive(Clone, Serialize, Deserialize, Debug)]
56pub enum Restart {
57    Always,
58    OnFailure,
59    Never,
60}
61
62struct JobSpawnInfo<'a> {
63    job_id: JobId,
64    args: &'a [String],
65    restart_info: RestartInfo,
66}
67
68#[derive(Clone, Serialize, Deserialize, Debug)]
69pub struct Job {
70    pub id: JobId,
71    pub info: JobInfo,
72}
73
74#[derive(Error, Debug)]
75pub enum DispatcherError {
76    #[error(transparent)]
77    CliArgsError(#[from] clap::Error),
78    #[error("Failed to start `shell-composed`: {0}")]
79    DispatcherSpawnError(std::io::Error),
80    #[error("Connection to `shell-composed` failed")]
81    DispatcherSpawnTimeoutError,
82    #[error("Failed to spawn process: {0}")]
83    ProcSpawnError(std::io::Error),
84    #[error("Failed to terminate child process: {0}")]
85    KillError(std::io::Error),
86    #[error("Job {0} not found")]
87    JobNotFoundError(JobId),
88    #[error("Service `{0}` not found")]
89    ServiceNotFoundError(String),
90    #[error("Process exit code: {0}")]
91    ProcExitError(i32),
92    #[error("Empty command")]
93    EmptyProcCommandError,
94    #[error(transparent)]
95    JustfileError(#[from] JustfileError),
96    #[error("Communication protocol error")]
97    UnexpectedMessageError,
98    #[error(transparent)]
99    IpcClientError(#[from] IpcClientError),
100    #[error("Cron error: {0}")]
101    CronError(#[from] cron::error::Error),
102}
103
104impl Default for RestartInfo {
105    fn default() -> Self {
106        RestartInfo {
107            policy: Restart::OnFailure,
108            wait_time: 50,
109        }
110    }
111}
112
113impl JobInfo {
114    pub fn new_shell_job(args: Vec<String>) -> Self {
115        JobInfo {
116            job_type: JobType::Shell,
117            args,
118            entrypoint: None,
119            restart: RestartInfo {
120                policy: Restart::Never,
121                ..Default::default()
122            },
123        }
124    }
125    pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
126        JobInfo {
127            job_type: JobType::Cron(cron),
128            args,
129            entrypoint: None,
130            restart: RestartInfo {
131                policy: Restart::Never,
132                ..Default::default()
133            },
134        }
135    }
136    pub fn new_service(service: String) -> Self {
137        JobInfo {
138            job_type: JobType::Service(service.clone()),
139            args: vec!["just".to_string(), service], // TODO: exclude entrypoint
140            entrypoint: Some("just".to_string()),
141            restart: RestartInfo::default(),
142        }
143    }
144}
145
146impl Dispatcher<'_> {
147    pub fn create() -> Dispatcher<'static> {
148        let procs = Arc::new(Mutex::new(Vec::new()));
149        let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
150
151        let scheduler_spawn = scheduler.clone();
152        let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
153
154        let (send, recv) = mpsc::channel();
155        let send_spawn = send.clone();
156        let procs_spawn = procs.clone();
157        let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
158
159        let system = System::new_with_specifics(
160            RefreshKind::new().with_processes(ProcessRefreshKind::new()),
161        );
162
163        Dispatcher {
164            jobs: BTreeMap::new(),
165            last_job_id: 0,
166            cronjobs: HashMap::new(),
167            procs,
168            scheduler,
169            system,
170            channel: send,
171        }
172    }
173    pub fn exec_command(&mut self, cmd: ExecCommand) -> Message {
174        info!("Executing `{cmd:?}`");
175        let res = match cmd {
176            ExecCommand::Run { args } => self.run(&args),
177            ExecCommand::Runat { at, args } => self.run_at(&at, &args),
178            ExecCommand::Start { service } => self.start(&service),
179            ExecCommand::Up { group } => self.up(&group),
180        };
181        match res {
182            Err(e) => {
183                error!("{e}");
184                Message::Err(format!("{e}"))
185            }
186            Ok(job_ids) => Message::JobsStarted(job_ids),
187        }
188    }
189    pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
190        info!("Executing `{cmd:?}`");
191        let res = match cmd {
192            CliCommand::Stop { job_id } => self.stop(job_id),
193            CliCommand::Down { group } => self.down(&group),
194            CliCommand::Ps => self.ps(stream),
195            CliCommand::Jobs => self.jobs(stream),
196            CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
197            CliCommand::Exit => std::process::exit(0),
198        };
199        if let Err(e) = &res {
200            error!("{e}");
201        }
202        let _ = stream.send_message(&res.into());
203    }
204    fn add_job(&mut self, job: JobInfo) -> JobId {
205        self.last_job_id += 1;
206        self.jobs.insert(self.last_job_id, job);
207        self.last_job_id
208    }
209    fn spawn_info(&self, job_id: JobId) -> Result<JobSpawnInfo<'_>, DispatcherError> {
210        let job = self
211            .jobs
212            .get(&job_id)
213            .ok_or(DispatcherError::JobNotFoundError(job_id))?;
214        Ok(JobSpawnInfo {
215            job_id,
216            args: &job.args,
217            restart_info: job.restart.clone(),
218        })
219    }
220    /// Find service job
221    fn find_job(&self, service: &str) -> Option<JobId> {
222        self.jobs
223            .iter()
224            .find(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if name == service))
225            .map(|(id, _info)| *id)
226    }
227    fn run(&mut self, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
228        let job_info = JobInfo::new_shell_job(args.to_vec());
229        let job_id = self.add_job(job_info);
230        self.spawn_job(job_id)?;
231        Ok(vec![job_id])
232    }
233    fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
234        let job = self.spawn_info(job_id)?;
235        let child = Runner::spawn(job.job_id, job.args, job.restart_info, self.channel.clone())?;
236        self.procs.lock().expect("lock").push(child);
237        // Wait for startup failure
238        thread::sleep(Duration::from_millis(10));
239        if let Some(child) = self.procs.lock().expect("lock").last() {
240            return match child.info.state {
241                ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
242                // ProcStatus::Unknown(e) => Err(DispatcherError::ProcSpawnError(e)),
243                _ => Ok(()),
244            };
245        }
246        Ok(())
247    }
248    /// Stop job
249    fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
250        if let Some(uuid) = self.cronjobs.remove(&job_id) {
251            info!("Removing cron job {job_id}");
252            self.scheduler.lock().expect("lock").remove(uuid);
253        }
254        for child in self
255            .procs
256            .lock()
257            .expect("lock")
258            .iter_mut()
259            .filter(|child| child.info.job_id == job_id)
260        {
261            if child.is_running() {
262                child.user_terminated = true;
263                child.terminate().map_err(DispatcherError::KillError)?;
264            }
265        }
266        if self.jobs.remove(&job_id).is_some() {
267            Ok(())
268        } else {
269            Err(DispatcherError::JobNotFoundError(job_id))
270        }
271    }
272    /// Add cron job
273    fn run_at(&mut self, cron: &str, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
274        let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
275        let restart_info = job_info.restart.clone();
276        let job_id = self.add_job(job_info);
277        let job_args = args.to_vec();
278        let procs = self.procs.clone();
279        let channel = self.channel.clone();
280        let uuid = self
281            .scheduler
282            .lock()
283            .expect("lock")
284            .add(job_scheduler::Job::new(cron.parse()?, move || {
285                let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
286                    .unwrap();
287                procs.lock().expect("lock").push(child);
288            }));
289        self.cronjobs.insert(job_id, uuid);
290        Ok(vec![job_id])
291    }
292    /// Start service (just recipe)
293    fn start(&mut self, service: &str) -> Result<Vec<JobId>, DispatcherError> {
294        // Find existing job or add new
295        let job_id = self
296            .find_job(service)
297            .unwrap_or_else(|| self.add_job(JobInfo::new_service(service.to_string())));
298        // Check for existing process for this service
299        let running = self
300            .procs
301            .lock()
302            .expect("lock")
303            .iter_mut()
304            .any(|child| child.info.job_id == job_id && child.is_running());
305        if running {
306            Ok(vec![])
307        } else {
308            self.spawn_job(job_id)?;
309            Ok(vec![job_id])
310        }
311    }
312    /// Start service group (all just repipes in group)
313    fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
314        let mut job_ids = Vec::new();
315        let justfile = Justfile::parse()?;
316        let recipes = justfile.group_recipes(group);
317        for service in recipes {
318            let ids = self.start(&service)?;
319            job_ids.extend(ids);
320        }
321        Ok(job_ids)
322    }
323    /// Stop service group
324    fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
325        let mut job_ids = Vec::new();
326        let justfile = Justfile::parse()?;
327        let recipes = justfile.group_recipes(group);
328        for service in recipes {
329            self.jobs
330                .iter()
331                .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
332                .for_each(|(id, _info)| job_ids.push(*id));
333        }
334        for job_id in job_ids {
335            self.stop(job_id)?;
336        }
337        Ok(())
338    }
339    /// Return info about running and finished processes
340    fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
341        // Update system info
342        // For accurate CPU usage, a process needs to be refreshed twice
343        // https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage
344        let ts = Local::now();
345        self.system.refresh_processes_specifics(
346            ProcessesToUpdate::All,
347            true,
348            ProcessRefreshKind::new().with_cpu(),
349        );
350        // Collect pids and child pids
351        let pids: Vec<sysinfo::Pid> = self
352            .procs
353            .lock()
354            .expect("lock")
355            .iter()
356            .flat_map(|proc| {
357                let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
358                self.system
359                    .processes()
360                    .iter()
361                    .filter(move |(_pid, process)| {
362                        process.parent().unwrap_or(0.into()) == parent_pid
363                    })
364                    .map(|(pid, _process)| *pid)
365                    .chain([parent_pid])
366            })
367            .collect();
368        std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); // 200ms
369        let duration = (Local::now() - ts).num_milliseconds();
370        fn per_second(value: u64, ms: i64) -> u64 {
371            (value as f64 * 1000.0 / ms as f64) as u64
372        }
373        self.system.refresh_processes_specifics(
374            ProcessesToUpdate::Some(&pids),
375            true,
376            ProcessRefreshKind::new()
377                .with_cpu()
378                .with_disk_usage()
379                .with_memory(),
380        );
381
382        let mut proc_infos = Vec::new();
383        for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
384            let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
385            // CPU usage has to be measured from just child processs
386            // For tasks spawning child processes, we should consider the whole process subtree!
387            let main_pid = if child.info.program() == "just" {
388                self.system
389                    .processes()
390                    .iter()
391                    .find(|(_pid, process)| {
392                        process.parent().unwrap_or(0.into()) == parent_pid
393                            && process.name() != "ctrl-c"
394                    })
395                    .map(|(pid, _process)| *pid)
396                    .unwrap_or(parent_pid)
397            } else {
398                parent_pid
399            };
400            if let Some(process) = self.system.process(main_pid) {
401                child.info.cpu = process.cpu_usage();
402                child.info.memory = process.memory();
403                child.info.virtual_memory = process.virtual_memory();
404                let disk = process.disk_usage();
405                child.info.total_written_bytes = disk.total_written_bytes;
406                child.info.written_bytes = per_second(disk.written_bytes, duration);
407                child.info.total_read_bytes = disk.total_read_bytes;
408                child.info.read_bytes = per_second(disk.read_bytes, duration);
409            } else {
410                child.info.cpu = 0.0;
411                child.info.memory = 0;
412                child.info.virtual_memory = 0;
413                child.info.written_bytes = 0;
414                child.info.read_bytes = 0;
415            }
416            let info = child.update_proc_state();
417            proc_infos.push(info.clone());
418        }
419        stream.send_message(&Message::PsInfo(proc_infos))?;
420        Ok(())
421    }
422    /// Return info about jobs
423    fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
424        let mut job_infos = Vec::new();
425        for (id, info) in self.jobs.iter().rev() {
426            job_infos.push(Job {
427                id: *id,
428                info: info.clone(),
429            });
430        }
431        stream.send_message(&Message::JobInfo(job_infos))?;
432        Ok(())
433    }
434    /// Return log lines
435    fn log(
436        &mut self,
437        job_or_service: Option<String>,
438        stream: &mut IpcStream,
439    ) -> Result<(), DispatcherError> {
440        let mut job_id_filter = None;
441        if let Some(job_or_service) = job_or_service {
442            if let Ok(job_id) = JobId::from_str(&job_or_service) {
443                if self.jobs.contains_key(&job_id) {
444                    job_id_filter = Some(job_id);
445                } else {
446                    return Err(DispatcherError::JobNotFoundError(job_id));
447                }
448            } else {
449                job_id_filter = Some(
450                    self.find_job(&job_or_service)
451                        .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
452                );
453            }
454        }
455
456        let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
457        'logwait: loop {
458            // Collect log entries from child proceses
459            let mut log_lines = Vec::new();
460            for child in self.procs.lock().expect("lock").iter_mut() {
461                if let Ok(output) = child.output.lock() {
462                    let last_seen = last_seen_ts
463                        .entry(child.proc.id())
464                        .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
465                    for entry in output.lines_since(last_seen) {
466                        if let Some(job_id) = job_id_filter {
467                            if entry.job_id != job_id {
468                                continue;
469                            }
470                        }
471                        log_lines.push(entry.clone());
472                    }
473                }
474            }
475
476            if log_lines.is_empty() {
477                // Exit when client is disconnected
478                stream.alive()?;
479            } else {
480                log_lines.sort_by_key(|entry| entry.ts);
481                for entry in log_lines {
482                    if stream.send_message(&Message::LogLine(entry)).is_err() {
483                        info!("Aborting log command (stream error)");
484                        break 'logwait;
485                    }
486                }
487            }
488            // Wait for new output
489            thread::sleep(Duration::from_millis(100));
490        }
491        Ok(())
492    }
493}
494
495fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
496    loop {
497        let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
498            scheduler.tick();
499            scheduler.time_till_next_job()
500        } else {
501            Duration::from_millis(50)
502        };
503        std::thread::sleep(wait_time);
504    }
505}
506
507// sender: Sender channel for Runner threads
508// recv: Watcher receiver channel
509fn child_watcher(
510    procs: Arc<Mutex<Vec<Runner>>>,
511    sender: mpsc::Sender<Pid>,
512    recv: mpsc::Receiver<Pid>,
513) {
514    loop {
515        // PID of terminated process sent from output_listener
516        let pid = recv.recv().expect("recv");
517        let ts = Local::now();
518        let mut respawn_child = None;
519        if let Some(child) = procs
520            .lock()
521            .expect("lock")
522            .iter_mut()
523            .find(|p| p.info.pid == pid)
524        {
525            // https://doc.rust-lang.org/std/process/struct.Child.html#warning
526            let exit_code = child.proc.wait().ok().and_then(|st| st.code());
527            let _ = child.update_proc_state();
528            child.info.end = Some(ts);
529            if let Some(code) = exit_code {
530                info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
531            } else {
532                info!(target: &format!("{pid}"), "Process terminated");
533            }
534            let respawn = !child.user_terminated
535                && match child.restart_info.policy {
536                    Restart::Always => true,
537                    Restart::OnFailure => {
538                        matches!(child.info.state, ProcStatus::ExitErr(code) if code > 0)
539                    }
540                    Restart::Never => false,
541                };
542            if respawn {
543                respawn_child = Some((child.info.clone(), child.restart_info.clone()));
544            }
545        } else {
546            info!(target: &format!("{pid}"), "(Unknown) process terminated");
547        }
548        if let Some((child_info, restart_info)) = respawn_child {
549            thread::sleep(Duration::from_millis(restart_info.wait_time));
550            let result = Runner::spawn(
551                child_info.job_id,
552                &child_info.cmd_args,
553                restart_info,
554                sender.clone(),
555            );
556            match result {
557                Ok(child) => procs.lock().expect("lock").push(child),
558                Err(e) => error!("Error trying to respawn failed process: {e}"),
559            }
560        }
561    }
562}