shell_compose/
dispatcher.rs

1use crate::{
2    CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3    ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::sync::{mpsc, Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
16use thiserror::Error;
17
18pub type JobId = u32;
19pub type Pid = u32;
20
21pub struct Dispatcher<'a> {
22    jobs: BTreeMap<JobId, JobInfo>,
23    last_job_id: JobId,
24    cronjobs: HashMap<JobId, job_scheduler::Uuid>,
25    procs: Arc<Mutex<Vec<Runner>>>,
26    scheduler: Arc<Mutex<JobScheduler<'a>>>,
27    system: System,
28    /// Sender channel for Runner threads
29    channel: mpsc::Sender<Pid>,
30}
31
32#[derive(Clone, Serialize, Deserialize, Debug)]
33pub struct JobInfo {
34    pub job_type: JobType,
35    pub args: Vec<String>,
36    pub restart: RestartInfo,
37}
38
39#[derive(Clone, Serialize, Deserialize, Debug)]
40pub enum JobType {
41    /// Shell command
42    Shell,
43    /// Service with command string
44    Service(String),
45    /// Cron job with time spec
46    Cron(String),
47}
48
49/// Restart policy
50#[derive(clap::ValueEnum, Clone, Serialize, Deserialize, Debug)]
51pub enum RestartPolicy {
52    Always,
53    OnFailure,
54    Never,
55}
56
57/// Runtime infos of job
58#[derive(Clone, Serialize, Deserialize, Debug)]
59pub struct RestartInfo {
60    pub policy: RestartPolicy,
61    /// Waiting time before restart in ms
62    pub wait_time: u64,
63    // stats: #Runs, #Success, #Restarts
64}
65
66/// Information for jobs command
67#[derive(Clone, Serialize, Deserialize, Debug)]
68pub struct Job {
69    pub id: JobId,
70    pub info: JobInfo,
71}
72
73#[derive(Error, Debug)]
74pub enum DispatcherError {
75    #[error(transparent)]
76    CliArgsError(#[from] clap::Error),
77    #[error("Failed to start `shell-composed`: {0}")]
78    DispatcherSpawnError(std::io::Error),
79    #[error("Connection to `shell-composed` failed")]
80    DispatcherSpawnTimeoutError,
81    #[error("Failed to spawn `{0}`: {1}")]
82    ProcSpawnError(String, std::io::Error),
83    #[error("Failed to terminate child process: {0}")]
84    KillError(std::io::Error),
85    #[error("Job {0} not found")]
86    JobNotFoundError(JobId),
87    #[error("Service `{0}` not found")]
88    ServiceNotFoundError(String),
89    #[error("Process exit code: {0}")]
90    ProcExitError(i32),
91    #[error("Empty command")]
92    EmptyProcCommandError,
93    #[error(transparent)]
94    JustfileError(#[from] JustfileError),
95    #[error("Communication protocol error")]
96    UnexpectedMessageError,
97    #[error(transparent)]
98    IpcClientError(#[from] IpcClientError),
99    #[error("Cron error: {0}")]
100    CronError(#[from] cron::error::Error),
101}
102
103impl Default for RestartInfo {
104    fn default() -> Self {
105        RestartInfo {
106            policy: RestartPolicy::OnFailure,
107            wait_time: 50,
108        }
109    }
110}
111
112impl JobInfo {
113    pub fn new_shell_job(args: Vec<String>, restart: Option<RestartPolicy>) -> Self {
114        let policy = restart.unwrap_or(RestartPolicy::Never);
115        JobInfo {
116            job_type: JobType::Shell,
117            args,
118            restart: RestartInfo {
119                policy,
120                ..Default::default()
121            },
122        }
123    }
124    pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
125        JobInfo {
126            job_type: JobType::Cron(cron),
127            args,
128            restart: RestartInfo {
129                policy: RestartPolicy::Never,
130                ..Default::default()
131            },
132        }
133    }
134    pub fn service_command(service: &str, args: &[String]) -> String {
135        [service]
136            .into_iter()
137            .chain(args.iter().map(|arg| arg.as_str()))
138            .collect::<Vec<_>>()
139            .join(" ")
140    }
141    pub fn new_service(service: &str, svc_args: &[String], restart: Option<RestartPolicy>) -> Self {
142        let mut args = vec!["just".to_string(), service.to_string()];
143        args.extend(svc_args.to_vec());
144        let policy = restart.unwrap_or(RestartPolicy::OnFailure);
145        JobInfo {
146            job_type: JobType::Service(Self::service_command(service, svc_args)),
147            args,
148            restart: RestartInfo {
149                policy,
150                ..Default::default()
151            },
152        }
153    }
154}
155
156impl Dispatcher<'_> {
157    pub fn create() -> Dispatcher<'static> {
158        let procs = Arc::new(Mutex::new(Vec::new()));
159        let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
160
161        let scheduler_spawn = scheduler.clone();
162        let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
163
164        let (send, recv) = mpsc::channel();
165        let send_spawn = send.clone();
166        let procs_spawn = procs.clone();
167        let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
168
169        let system = System::new_with_specifics(
170            RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
171        );
172
173        Dispatcher {
174            jobs: BTreeMap::new(),
175            last_job_id: 0,
176            cronjobs: HashMap::new(),
177            procs,
178            scheduler,
179            system,
180            channel: send,
181        }
182    }
183    pub fn exec_command(&mut self, cmd: ExecCommand, cwd: PathBuf) -> Message {
184        info!("Executing `{cmd:?}`");
185        std::env::set_current_dir(&cwd).unwrap();
186        let res = match cmd {
187            ExecCommand::Run { args, restart } => self.run(&args, restart),
188            ExecCommand::Runat { at, args } => self.run_at(&at, cwd, &args),
189            ExecCommand::Start {
190                service,
191                args,
192                restart,
193            } => self.start(&service, &args, restart),
194            ExecCommand::Up { group } => self.up(&group),
195        };
196        match res {
197            Err(e) => {
198                error!("{e}");
199                Message::Err(format!("{e}"))
200            }
201            Ok(job_ids) => Message::JobsStarted(job_ids),
202        }
203    }
204    pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
205        info!("Executing `{cmd:?}`");
206        let res = match cmd {
207            CliCommand::Stop { job_id } => self.stop(job_id),
208            CliCommand::Down { group } => self.down(&group),
209            CliCommand::Ps => self.ps(stream),
210            CliCommand::Jobs => self.jobs(stream),
211            CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
212            CliCommand::Exit => std::process::exit(0),
213        };
214        if let Err(e) = &res {
215            error!("{e}");
216        }
217        let _ = stream.send_message(&res.into());
218    }
219    fn add_job(&mut self, job: JobInfo) -> JobId {
220        self.last_job_id += 1;
221        self.jobs.insert(self.last_job_id, job);
222        self.last_job_id
223    }
224    /// Find service job
225    fn find_job(&self, service: &str, args: &[String]) -> Option<JobId> {
226        let cmd = JobInfo::service_command(service, args);
227        self.jobs
228            .iter()
229            .find(
230                |(_id, info)| matches!(&info.job_type, JobType::Service(command) if *command == cmd),
231            )
232            .map(|(id, _info)| *id)
233    }
234    fn run(
235        &mut self,
236        args: &[String],
237        policy: Option<RestartPolicy>,
238    ) -> Result<Vec<JobId>, DispatcherError> {
239        let job_info = JobInfo::new_shell_job(args.to_vec(), policy);
240        let job_id = self.add_job(job_info);
241        self.spawn_job(job_id)?;
242        Ok(vec![job_id])
243    }
244    fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
245        let job = self
246            .jobs
247            .get(&job_id)
248            .ok_or(DispatcherError::JobNotFoundError(job_id))?;
249        let child = Runner::spawn(job_id, &job.args, job.restart.clone(), self.channel.clone())?;
250        self.procs.lock().expect("lock").push(child);
251        // Wait for startup failure
252        thread::sleep(Duration::from_millis(10));
253        if let Some(child) = self.procs.lock().expect("lock").last() {
254            return match child.info.state {
255                ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
256                // ProcStatus::Unknown(e) => Err(DispatcherError::ProcSpawnError(e)),
257                _ => Ok(()),
258            };
259        }
260        Ok(())
261    }
262    /// Stop job
263    fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
264        if let Some(uuid) = self.cronjobs.remove(&job_id) {
265            info!("Removing cron job {job_id}");
266            self.scheduler.lock().expect("lock").remove(uuid);
267        }
268        for child in self
269            .procs
270            .lock()
271            .expect("lock")
272            .iter_mut()
273            .filter(|child| child.info.job_id == job_id)
274        {
275            if child.is_running() {
276                child.user_terminated = true;
277                child.terminate().map_err(DispatcherError::KillError)?;
278            }
279        }
280        if self.jobs.remove(&job_id).is_some() {
281            Ok(())
282        } else {
283            Err(DispatcherError::JobNotFoundError(job_id))
284        }
285    }
286    /// Add cron job
287    fn run_at(
288        &mut self,
289        cron: &str,
290        cwd: PathBuf,
291        args: &[String],
292    ) -> Result<Vec<JobId>, DispatcherError> {
293        let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
294        let restart_info = job_info.restart.clone();
295        let job_id = self.add_job(job_info);
296        let job_args = args.to_vec();
297        let procs = self.procs.clone();
298        let channel = self.channel.clone();
299        let uuid = self
300            .scheduler
301            .lock()
302            .expect("lock")
303            .add(job_scheduler::Job::new(cron.parse()?, move || {
304                std::env::set_current_dir(&cwd).unwrap();
305                let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
306                    .unwrap();
307                procs.lock().expect("lock").push(child);
308            }));
309        self.cronjobs.insert(job_id, uuid);
310        Ok(vec![job_id])
311    }
312    /// Start service (just recipe)
313    fn start(
314        &mut self,
315        service: &str,
316        args: &[String],
317        policy: Option<RestartPolicy>,
318    ) -> Result<Vec<JobId>, DispatcherError> {
319        // Find existing job or add new
320        let job_id = self
321            .find_job(service, args)
322            .unwrap_or_else(|| self.add_job(JobInfo::new_service(service, args, policy)));
323        // Check for existing process for this service
324        let running = self
325            .procs
326            .lock()
327            .expect("lock")
328            .iter_mut()
329            .any(|child| child.info.job_id == job_id && child.is_running());
330        if running {
331            Ok(vec![])
332        } else {
333            self.spawn_job(job_id)?;
334            Ok(vec![job_id])
335        }
336    }
337    /// Start service group (all just repipes in group)
338    fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
339        let mut job_ids = Vec::new();
340        let justfile = Justfile::parse()?;
341        let recipes = justfile.group_recipes(group);
342        for service in recipes {
343            let ids = self.start(&service, &[], None)?;
344            job_ids.extend(ids);
345        }
346        Ok(job_ids)
347    }
348    /// Stop service group
349    fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
350        let mut job_ids = Vec::new();
351        let justfile = Justfile::parse()?;
352        let recipes = justfile.group_recipes(group);
353        for service in recipes {
354            self.jobs
355                .iter()
356                .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
357                .for_each(|(id, _info)| job_ids.push(*id));
358        }
359        for job_id in job_ids {
360            self.stop(job_id)?;
361        }
362        Ok(())
363    }
364    /// Return info about running and finished processes
365    fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
366        // Update system info
367        // For accurate CPU usage, a process needs to be refreshed twice
368        // https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage
369        let ts = Local::now();
370        self.system.refresh_processes_specifics(
371            ProcessesToUpdate::All,
372            true,
373            ProcessRefreshKind::nothing().with_cpu(),
374        );
375        // Collect pids and child pids
376        let pids: Vec<sysinfo::Pid> = self
377            .procs
378            .lock()
379            .expect("lock")
380            .iter()
381            .flat_map(|proc| {
382                let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
383                self.system
384                    .processes()
385                    .iter()
386                    .filter(move |(_pid, process)| {
387                        process.parent().unwrap_or(0.into()) == parent_pid
388                    })
389                    .map(|(pid, _process)| *pid)
390                    .chain([parent_pid])
391            })
392            .collect();
393        std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); // 200ms
394        let duration = (Local::now() - ts).num_milliseconds();
395        fn per_second(value: u64, ms: i64) -> u64 {
396            (value as f64 * 1000.0 / ms as f64) as u64
397        }
398        self.system.refresh_processes_specifics(
399            ProcessesToUpdate::Some(&pids),
400            true,
401            ProcessRefreshKind::nothing()
402                .with_cpu()
403                .with_disk_usage()
404                .with_memory(),
405        );
406
407        let mut proc_infos = Vec::new();
408        for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
409            let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
410            // CPU usage has to be measured from just child processs
411            // For tasks spawning child processes, we should consider the whole process subtree!
412            let main_pid = if child.info.program() == "just" {
413                self.system
414                    .processes()
415                    .iter()
416                    .find(|(_pid, process)| {
417                        process.parent().unwrap_or(0.into()) == parent_pid
418                            && process.name() != "ctrl-c"
419                    })
420                    .map(|(pid, _process)| *pid)
421                    .unwrap_or(parent_pid)
422            } else {
423                parent_pid
424            };
425            if let Some(process) = self.system.process(main_pid) {
426                child.info.cpu = process.cpu_usage();
427                child.info.memory = process.memory();
428                child.info.virtual_memory = process.virtual_memory();
429                let disk = process.disk_usage();
430                child.info.total_written_bytes = disk.total_written_bytes;
431                child.info.written_bytes = per_second(disk.written_bytes, duration);
432                child.info.total_read_bytes = disk.total_read_bytes;
433                child.info.read_bytes = per_second(disk.read_bytes, duration);
434            } else {
435                child.info.cpu = 0.0;
436                child.info.memory = 0;
437                child.info.virtual_memory = 0;
438                child.info.written_bytes = 0;
439                child.info.read_bytes = 0;
440            }
441            let info = child.update_proc_state();
442            proc_infos.push(info.clone());
443        }
444        stream.send_message(&Message::PsInfo(proc_infos))?;
445        Ok(())
446    }
447    /// Return info about jobs
448    fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
449        let mut job_infos = Vec::new();
450        for (id, info) in self.jobs.iter().rev() {
451            job_infos.push(Job {
452                id: *id,
453                info: info.clone(),
454            });
455        }
456        stream.send_message(&Message::JobInfo(job_infos))?;
457        Ok(())
458    }
459    /// Return log lines
460    fn log(
461        &mut self,
462        job_or_service: Option<String>,
463        stream: &mut IpcStream,
464    ) -> Result<(), DispatcherError> {
465        let mut job_id_filter = None;
466        if let Some(job_or_service) = job_or_service {
467            if let Ok(job_id) = JobId::from_str(&job_or_service) {
468                if self.jobs.contains_key(&job_id) {
469                    job_id_filter = Some(job_id);
470                } else {
471                    return Err(DispatcherError::JobNotFoundError(job_id));
472                }
473            } else {
474                job_id_filter = Some(
475                    self.find_job(&job_or_service, &[])
476                        .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
477                );
478            }
479        }
480
481        let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
482        'logwait: loop {
483            // Collect log entries from child proceses
484            let mut log_lines = Vec::new();
485            for child in self.procs.lock().expect("lock").iter_mut() {
486                if let Ok(output) = child.output.lock() {
487                    let last_seen = last_seen_ts
488                        .entry(child.proc.id())
489                        .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
490                    for entry in output.lines_since(last_seen) {
491                        if let Some(job_id) = job_id_filter {
492                            if entry.job_id != job_id {
493                                continue;
494                            }
495                        }
496                        log_lines.push(entry.clone());
497                    }
498                }
499            }
500
501            if log_lines.is_empty() {
502                // Exit when client is disconnected
503                stream.alive()?;
504            } else {
505                log_lines.sort_by_key(|entry| entry.ts);
506                for entry in log_lines {
507                    if stream.send_message(&Message::LogLine(entry)).is_err() {
508                        info!("Aborting log command (stream error)");
509                        break 'logwait;
510                    }
511                }
512            }
513            // Wait for new output
514            thread::sleep(Duration::from_millis(100));
515        }
516        Ok(())
517    }
518}
519
520fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
521    loop {
522        let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
523            scheduler.tick();
524            scheduler.time_till_next_job()
525        } else {
526            Duration::from_millis(50)
527        };
528        std::thread::sleep(wait_time);
529    }
530}
531
532// sender: Sender channel for Runner threads
533// recv: Watcher receiver channel
534fn child_watcher(
535    procs: Arc<Mutex<Vec<Runner>>>,
536    sender: mpsc::Sender<Pid>,
537    recv: mpsc::Receiver<Pid>,
538) {
539    loop {
540        // PID of terminated process sent from output_listener
541        let pid = recv.recv().expect("recv");
542        let ts = Local::now();
543        let respawn_child = if let Some(child) = procs
544            .lock()
545            .expect("lock")
546            .iter_mut()
547            .find(|p| p.info.pid == pid)
548        {
549            // https://doc.rust-lang.org/std/process/struct.Child.html#warning
550            let exit_code = child.proc.wait().ok().and_then(|st| st.code());
551            let _ = child.update_proc_state();
552            child.info.end = Some(ts);
553            if let Some(code) = exit_code {
554                info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
555            } else {
556                info!(target: &format!("{pid}"), "Process terminated");
557            }
558            child.restart_infos()
559        } else {
560            info!(target: &format!("{pid}"), "(Unknown) process terminated");
561            None
562        };
563        if let Some(spawn_info) = respawn_child {
564            thread::sleep(Duration::from_millis(spawn_info.restart_info.wait_time));
565            let result = Runner::spawn(
566                spawn_info.job_id,
567                &spawn_info.args,
568                spawn_info.restart_info,
569                sender.clone(),
570            );
571            match result {
572                Ok(child) => procs.lock().expect("lock").push(child),
573                Err(e) => error!("Error trying to respawn failed process: {e}"),
574            }
575        }
576    }
577}