shell_compose/
dispatcher.rs

1use crate::{
2    CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3    ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::str::FromStr;
11use std::sync::{mpsc, Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
15use thiserror::Error;
16
17pub type JobId = u32;
18pub type Pid = u32;
19
20pub struct Dispatcher<'a> {
21    jobs: BTreeMap<JobId, JobInfo>,
22    last_job_id: JobId,
23    cronjobs: HashMap<JobId, job_scheduler::Uuid>,
24    procs: Arc<Mutex<Vec<Runner>>>,
25    scheduler: Arc<Mutex<JobScheduler<'a>>>,
26    system: System,
27    /// Sender channel for Runner threads
28    channel: mpsc::Sender<Pid>,
29}
30
31#[derive(Clone, Serialize, Deserialize, Debug)]
32pub struct JobInfo {
33    pub job_type: JobType,
34    pub args: Vec<String>,
35    pub entrypoint: Option<String>,
36    pub restart: RestartInfo,
37}
38
39#[derive(Clone, Serialize, Deserialize, Debug)]
40pub enum JobType {
41    Shell,
42    Service(String),
43    Cron(String),
44}
45
46/// Restart policy
47#[derive(Clone, Serialize, Deserialize, Debug)]
48pub enum RestartPolicy {
49    Always,
50    OnFailure,
51    Never,
52}
53
54/// Runtime infos of job
55#[derive(Clone, Serialize, Deserialize, Debug)]
56pub struct RestartInfo {
57    pub policy: RestartPolicy,
58    /// Waiting time before restart in ms
59    pub wait_time: u64,
60    // stats: #Runs, #Success, #Restarts
61}
62
63/// Information for jobs command
64#[derive(Clone, Serialize, Deserialize, Debug)]
65pub struct Job {
66    pub id: JobId,
67    pub info: JobInfo,
68}
69
70#[derive(Error, Debug)]
71pub enum DispatcherError {
72    #[error(transparent)]
73    CliArgsError(#[from] clap::Error),
74    #[error("Failed to start `shell-composed`: {0}")]
75    DispatcherSpawnError(std::io::Error),
76    #[error("Connection to `shell-composed` failed")]
77    DispatcherSpawnTimeoutError,
78    #[error("Failed to spawn process: {0}")]
79    ProcSpawnError(std::io::Error),
80    #[error("Failed to terminate child process: {0}")]
81    KillError(std::io::Error),
82    #[error("Job {0} not found")]
83    JobNotFoundError(JobId),
84    #[error("Service `{0}` not found")]
85    ServiceNotFoundError(String),
86    #[error("Process exit code: {0}")]
87    ProcExitError(i32),
88    #[error("Empty command")]
89    EmptyProcCommandError,
90    #[error(transparent)]
91    JustfileError(#[from] JustfileError),
92    #[error("Communication protocol error")]
93    UnexpectedMessageError,
94    #[error(transparent)]
95    IpcClientError(#[from] IpcClientError),
96    #[error("Cron error: {0}")]
97    CronError(#[from] cron::error::Error),
98}
99
100impl Default for RestartInfo {
101    fn default() -> Self {
102        RestartInfo {
103            policy: RestartPolicy::OnFailure,
104            wait_time: 50,
105        }
106    }
107}
108
109impl JobInfo {
110    pub fn new_shell_job(args: Vec<String>) -> Self {
111        JobInfo {
112            job_type: JobType::Shell,
113            args,
114            entrypoint: None,
115            restart: RestartInfo {
116                policy: RestartPolicy::Never,
117                ..Default::default()
118            },
119        }
120    }
121    pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
122        JobInfo {
123            job_type: JobType::Cron(cron),
124            args,
125            entrypoint: None,
126            restart: RestartInfo {
127                policy: RestartPolicy::Never,
128                ..Default::default()
129            },
130        }
131    }
132    pub fn new_service(service: String) -> Self {
133        JobInfo {
134            job_type: JobType::Service(service.clone()),
135            args: vec!["just".to_string(), service], // TODO: exclude entrypoint
136            entrypoint: Some("just".to_string()),
137            restart: RestartInfo::default(),
138        }
139    }
140}
141
142impl Dispatcher<'_> {
143    pub fn create() -> Dispatcher<'static> {
144        let procs = Arc::new(Mutex::new(Vec::new()));
145        let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
146
147        let scheduler_spawn = scheduler.clone();
148        let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
149
150        let (send, recv) = mpsc::channel();
151        let send_spawn = send.clone();
152        let procs_spawn = procs.clone();
153        let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
154
155        let system = System::new_with_specifics(
156            RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
157        );
158
159        Dispatcher {
160            jobs: BTreeMap::new(),
161            last_job_id: 0,
162            cronjobs: HashMap::new(),
163            procs,
164            scheduler,
165            system,
166            channel: send,
167        }
168    }
169    pub fn exec_command(&mut self, cmd: ExecCommand) -> Message {
170        info!("Executing `{cmd:?}`");
171        let res = match cmd {
172            ExecCommand::Run { args } => self.run(&args),
173            ExecCommand::Runat { at, args } => self.run_at(&at, &args),
174            ExecCommand::Start { service } => self.start(&service),
175            ExecCommand::Up { group } => self.up(&group),
176        };
177        match res {
178            Err(e) => {
179                error!("{e}");
180                Message::Err(format!("{e}"))
181            }
182            Ok(job_ids) => Message::JobsStarted(job_ids),
183        }
184    }
185    pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
186        info!("Executing `{cmd:?}`");
187        let res = match cmd {
188            CliCommand::Stop { job_id } => self.stop(job_id),
189            CliCommand::Down { group } => self.down(&group),
190            CliCommand::Ps => self.ps(stream),
191            CliCommand::Jobs => self.jobs(stream),
192            CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
193            CliCommand::Exit => std::process::exit(0),
194        };
195        if let Err(e) = &res {
196            error!("{e}");
197        }
198        let _ = stream.send_message(&res.into());
199    }
200    fn add_job(&mut self, job: JobInfo) -> JobId {
201        self.last_job_id += 1;
202        self.jobs.insert(self.last_job_id, job);
203        self.last_job_id
204    }
205    /// Find service job
206    fn find_job(&self, service: &str) -> Option<JobId> {
207        self.jobs
208            .iter()
209            .find(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if name == service))
210            .map(|(id, _info)| *id)
211    }
212    fn run(&mut self, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
213        let job_info = JobInfo::new_shell_job(args.to_vec());
214        let job_id = self.add_job(job_info);
215        self.spawn_job(job_id)?;
216        Ok(vec![job_id])
217    }
218    fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
219        let job = self
220            .jobs
221            .get(&job_id)
222            .ok_or(DispatcherError::JobNotFoundError(job_id))?;
223        let child = Runner::spawn(job_id, &job.args, job.restart.clone(), self.channel.clone())?;
224        self.procs.lock().expect("lock").push(child);
225        // Wait for startup failure
226        thread::sleep(Duration::from_millis(10));
227        if let Some(child) = self.procs.lock().expect("lock").last() {
228            return match child.info.state {
229                ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
230                // ProcStatus::Unknown(e) => Err(DispatcherError::ProcSpawnError(e)),
231                _ => Ok(()),
232            };
233        }
234        Ok(())
235    }
236    /// Stop job
237    fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
238        if let Some(uuid) = self.cronjobs.remove(&job_id) {
239            info!("Removing cron job {job_id}");
240            self.scheduler.lock().expect("lock").remove(uuid);
241        }
242        for child in self
243            .procs
244            .lock()
245            .expect("lock")
246            .iter_mut()
247            .filter(|child| child.info.job_id == job_id)
248        {
249            if child.is_running() {
250                child.user_terminated = true;
251                child.terminate().map_err(DispatcherError::KillError)?;
252            }
253        }
254        if self.jobs.remove(&job_id).is_some() {
255            Ok(())
256        } else {
257            Err(DispatcherError::JobNotFoundError(job_id))
258        }
259    }
260    /// Add cron job
261    fn run_at(&mut self, cron: &str, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
262        let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
263        let restart_info = job_info.restart.clone();
264        let job_id = self.add_job(job_info);
265        let job_args = args.to_vec();
266        let procs = self.procs.clone();
267        let channel = self.channel.clone();
268        let uuid = self
269            .scheduler
270            .lock()
271            .expect("lock")
272            .add(job_scheduler::Job::new(cron.parse()?, move || {
273                let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
274                    .unwrap();
275                procs.lock().expect("lock").push(child);
276            }));
277        self.cronjobs.insert(job_id, uuid);
278        Ok(vec![job_id])
279    }
280    /// Start service (just recipe)
281    fn start(&mut self, service: &str) -> Result<Vec<JobId>, DispatcherError> {
282        // Find existing job or add new
283        let job_id = self
284            .find_job(service)
285            .unwrap_or_else(|| self.add_job(JobInfo::new_service(service.to_string())));
286        // Check for existing process for this service
287        let running = self
288            .procs
289            .lock()
290            .expect("lock")
291            .iter_mut()
292            .any(|child| child.info.job_id == job_id && child.is_running());
293        if running {
294            Ok(vec![])
295        } else {
296            self.spawn_job(job_id)?;
297            Ok(vec![job_id])
298        }
299    }
300    /// Start service group (all just repipes in group)
301    fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
302        let mut job_ids = Vec::new();
303        let justfile = Justfile::parse()?;
304        let recipes = justfile.group_recipes(group);
305        for service in recipes {
306            let ids = self.start(&service)?;
307            job_ids.extend(ids);
308        }
309        Ok(job_ids)
310    }
311    /// Stop service group
312    fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
313        let mut job_ids = Vec::new();
314        let justfile = Justfile::parse()?;
315        let recipes = justfile.group_recipes(group);
316        for service in recipes {
317            self.jobs
318                .iter()
319                .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
320                .for_each(|(id, _info)| job_ids.push(*id));
321        }
322        for job_id in job_ids {
323            self.stop(job_id)?;
324        }
325        Ok(())
326    }
327    /// Return info about running and finished processes
328    fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
329        // Update system info
330        // For accurate CPU usage, a process needs to be refreshed twice
331        // https://docs.rs/sysinfo/latest/i686-pc-windows-msvc/sysinfo/struct.Process.html#method.cpu_usage
332        let ts = Local::now();
333        self.system.refresh_processes_specifics(
334            ProcessesToUpdate::All,
335            true,
336            ProcessRefreshKind::nothing().with_cpu(),
337        );
338        // Collect pids and child pids
339        let pids: Vec<sysinfo::Pid> = self
340            .procs
341            .lock()
342            .expect("lock")
343            .iter()
344            .flat_map(|proc| {
345                let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
346                self.system
347                    .processes()
348                    .iter()
349                    .filter(move |(_pid, process)| {
350                        process.parent().unwrap_or(0.into()) == parent_pid
351                    })
352                    .map(|(pid, _process)| *pid)
353                    .chain([parent_pid])
354            })
355            .collect();
356        std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); // 200ms
357        let duration = (Local::now() - ts).num_milliseconds();
358        fn per_second(value: u64, ms: i64) -> u64 {
359            (value as f64 * 1000.0 / ms as f64) as u64
360        }
361        self.system.refresh_processes_specifics(
362            ProcessesToUpdate::Some(&pids),
363            true,
364            ProcessRefreshKind::nothing()
365                .with_cpu()
366                .with_disk_usage()
367                .with_memory(),
368        );
369
370        let mut proc_infos = Vec::new();
371        for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
372            let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
373            // CPU usage has to be measured from just child processs
374            // For tasks spawning child processes, we should consider the whole process subtree!
375            let main_pid = if child.info.program() == "just" {
376                self.system
377                    .processes()
378                    .iter()
379                    .find(|(_pid, process)| {
380                        process.parent().unwrap_or(0.into()) == parent_pid
381                            && process.name() != "ctrl-c"
382                    })
383                    .map(|(pid, _process)| *pid)
384                    .unwrap_or(parent_pid)
385            } else {
386                parent_pid
387            };
388            if let Some(process) = self.system.process(main_pid) {
389                child.info.cpu = process.cpu_usage();
390                child.info.memory = process.memory();
391                child.info.virtual_memory = process.virtual_memory();
392                let disk = process.disk_usage();
393                child.info.total_written_bytes = disk.total_written_bytes;
394                child.info.written_bytes = per_second(disk.written_bytes, duration);
395                child.info.total_read_bytes = disk.total_read_bytes;
396                child.info.read_bytes = per_second(disk.read_bytes, duration);
397            } else {
398                child.info.cpu = 0.0;
399                child.info.memory = 0;
400                child.info.virtual_memory = 0;
401                child.info.written_bytes = 0;
402                child.info.read_bytes = 0;
403            }
404            let info = child.update_proc_state();
405            proc_infos.push(info.clone());
406        }
407        stream.send_message(&Message::PsInfo(proc_infos))?;
408        Ok(())
409    }
410    /// Return info about jobs
411    fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
412        let mut job_infos = Vec::new();
413        for (id, info) in self.jobs.iter().rev() {
414            job_infos.push(Job {
415                id: *id,
416                info: info.clone(),
417            });
418        }
419        stream.send_message(&Message::JobInfo(job_infos))?;
420        Ok(())
421    }
422    /// Return log lines
423    fn log(
424        &mut self,
425        job_or_service: Option<String>,
426        stream: &mut IpcStream,
427    ) -> Result<(), DispatcherError> {
428        let mut job_id_filter = None;
429        if let Some(job_or_service) = job_or_service {
430            if let Ok(job_id) = JobId::from_str(&job_or_service) {
431                if self.jobs.contains_key(&job_id) {
432                    job_id_filter = Some(job_id);
433                } else {
434                    return Err(DispatcherError::JobNotFoundError(job_id));
435                }
436            } else {
437                job_id_filter = Some(
438                    self.find_job(&job_or_service)
439                        .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
440                );
441            }
442        }
443
444        let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
445        'logwait: loop {
446            // Collect log entries from child proceses
447            let mut log_lines = Vec::new();
448            for child in self.procs.lock().expect("lock").iter_mut() {
449                if let Ok(output) = child.output.lock() {
450                    let last_seen = last_seen_ts
451                        .entry(child.proc.id())
452                        .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
453                    for entry in output.lines_since(last_seen) {
454                        if let Some(job_id) = job_id_filter {
455                            if entry.job_id != job_id {
456                                continue;
457                            }
458                        }
459                        log_lines.push(entry.clone());
460                    }
461                }
462            }
463
464            if log_lines.is_empty() {
465                // Exit when client is disconnected
466                stream.alive()?;
467            } else {
468                log_lines.sort_by_key(|entry| entry.ts);
469                for entry in log_lines {
470                    if stream.send_message(&Message::LogLine(entry)).is_err() {
471                        info!("Aborting log command (stream error)");
472                        break 'logwait;
473                    }
474                }
475            }
476            // Wait for new output
477            thread::sleep(Duration::from_millis(100));
478        }
479        Ok(())
480    }
481}
482
483fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
484    loop {
485        let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
486            scheduler.tick();
487            scheduler.time_till_next_job()
488        } else {
489            Duration::from_millis(50)
490        };
491        std::thread::sleep(wait_time);
492    }
493}
494
495// sender: Sender channel for Runner threads
496// recv: Watcher receiver channel
497fn child_watcher(
498    procs: Arc<Mutex<Vec<Runner>>>,
499    sender: mpsc::Sender<Pid>,
500    recv: mpsc::Receiver<Pid>,
501) {
502    loop {
503        // PID of terminated process sent from output_listener
504        let pid = recv.recv().expect("recv");
505        let ts = Local::now();
506        let respawn_child = if let Some(child) = procs
507            .lock()
508            .expect("lock")
509            .iter_mut()
510            .find(|p| p.info.pid == pid)
511        {
512            // https://doc.rust-lang.org/std/process/struct.Child.html#warning
513            let exit_code = child.proc.wait().ok().and_then(|st| st.code());
514            let _ = child.update_proc_state();
515            child.info.end = Some(ts);
516            if let Some(code) = exit_code {
517                info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
518            } else {
519                info!(target: &format!("{pid}"), "Process terminated");
520            }
521            child.restart_infos()
522        } else {
523            info!(target: &format!("{pid}"), "(Unknown) process terminated");
524            None
525        };
526        if let Some(spawn_info) = respawn_child {
527            thread::sleep(Duration::from_millis(spawn_info.restart_info.wait_time));
528            let result = Runner::spawn(
529                spawn_info.job_id,
530                &spawn_info.args,
531                spawn_info.restart_info,
532                sender.clone(),
533            );
534            match result {
535                Ok(child) => procs.lock().expect("lock").push(child),
536                Err(e) => error!("Error trying to respawn failed process: {e}"),
537            }
538        }
539    }
540}