1use crate::{
2 CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3 ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::str::FromStr;
11use std::sync::{mpsc, Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
15use thiserror::Error;
16
17pub type JobId = u32;
18pub type Pid = u32;
19
20pub struct Dispatcher<'a> {
21 jobs: BTreeMap<JobId, JobInfo>,
22 last_job_id: JobId,
23 cronjobs: HashMap<JobId, job_scheduler::Uuid>,
24 procs: Arc<Mutex<Vec<Runner>>>,
25 scheduler: Arc<Mutex<JobScheduler<'a>>>,
26 system: System,
27 channel: mpsc::Sender<Pid>,
29}
30
31#[derive(Clone, Serialize, Deserialize, Debug)]
32pub struct JobInfo {
33 pub job_type: JobType,
34 pub args: Vec<String>,
35 pub entrypoint: Option<String>,
36 pub restart: RestartInfo,
37 }
39
40#[derive(Clone, Serialize, Deserialize, Debug)]
41pub enum JobType {
42 Shell,
43 Service(String),
44 Cron(String),
45}
46
47#[derive(Clone, Serialize, Deserialize, Debug)]
48pub struct RestartInfo {
49 pub policy: Restart,
50 pub wait_time: u64,
52}
53
54#[derive(Clone, Serialize, Deserialize, Debug)]
56pub enum Restart {
57 Always,
58 OnFailure,
59 Never,
60}
61
62struct JobSpawnInfo<'a> {
63 job_id: JobId,
64 args: &'a [String],
65 restart_info: RestartInfo,
66}
67
68#[derive(Clone, Serialize, Deserialize, Debug)]
69pub struct Job {
70 pub id: JobId,
71 pub info: JobInfo,
72}
73
74#[derive(Error, Debug)]
75pub enum DispatcherError {
76 #[error(transparent)]
77 CliArgsError(#[from] clap::Error),
78 #[error("Failed to start `shell-composed`: {0}")]
79 DispatcherSpawnError(std::io::Error),
80 #[error("Connection to `shell-composed` failed")]
81 DispatcherSpawnTimeoutError,
82 #[error("Failed to spawn process: {0}")]
83 ProcSpawnError(std::io::Error),
84 #[error("Failed to terminate child process: {0}")]
85 KillError(std::io::Error),
86 #[error("Job {0} not found")]
87 JobNotFoundError(JobId),
88 #[error("Service `{0}` not found")]
89 ServiceNotFoundError(String),
90 #[error("Process exit code: {0}")]
91 ProcExitError(i32),
92 #[error("Empty command")]
93 EmptyProcCommandError,
94 #[error(transparent)]
95 JustfileError(#[from] JustfileError),
96 #[error("Communication protocol error")]
97 UnexpectedMessageError,
98 #[error(transparent)]
99 IpcClientError(#[from] IpcClientError),
100 #[error("Cron error: {0}")]
101 CronError(#[from] cron::error::Error),
102}
103
104impl Default for RestartInfo {
105 fn default() -> Self {
106 RestartInfo {
107 policy: Restart::OnFailure,
108 wait_time: 50,
109 }
110 }
111}
112
113impl JobInfo {
114 pub fn new_shell_job(args: Vec<String>) -> Self {
115 JobInfo {
116 job_type: JobType::Shell,
117 args,
118 entrypoint: None,
119 restart: RestartInfo {
120 policy: Restart::Never,
121 ..Default::default()
122 },
123 }
124 }
125 pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
126 JobInfo {
127 job_type: JobType::Cron(cron),
128 args,
129 entrypoint: None,
130 restart: RestartInfo {
131 policy: Restart::Never,
132 ..Default::default()
133 },
134 }
135 }
136 pub fn new_service(service: String) -> Self {
137 JobInfo {
138 job_type: JobType::Service(service.clone()),
139 args: vec!["just".to_string(), service], entrypoint: Some("just".to_string()),
141 restart: RestartInfo::default(),
142 }
143 }
144}
145
146impl Dispatcher<'_> {
147 pub fn create() -> Dispatcher<'static> {
148 let procs = Arc::new(Mutex::new(Vec::new()));
149 let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
150
151 let scheduler_spawn = scheduler.clone();
152 let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
153
154 let (send, recv) = mpsc::channel();
155 let send_spawn = send.clone();
156 let procs_spawn = procs.clone();
157 let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
158
159 let system = System::new_with_specifics(
160 RefreshKind::new().with_processes(ProcessRefreshKind::new()),
161 );
162
163 Dispatcher {
164 jobs: BTreeMap::new(),
165 last_job_id: 0,
166 cronjobs: HashMap::new(),
167 procs,
168 scheduler,
169 system,
170 channel: send,
171 }
172 }
173 pub fn exec_command(&mut self, cmd: ExecCommand) -> Message {
174 info!("Executing `{cmd:?}`");
175 let res = match cmd {
176 ExecCommand::Run { args } => self.run(&args),
177 ExecCommand::Runat { at, args } => self.run_at(&at, &args),
178 ExecCommand::Start { service } => self.start(&service),
179 ExecCommand::Up { group } => self.up(&group),
180 };
181 match res {
182 Err(e) => {
183 error!("{e}");
184 Message::Err(format!("{e}"))
185 }
186 Ok(job_ids) => Message::JobsStarted(job_ids),
187 }
188 }
189 pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
190 info!("Executing `{cmd:?}`");
191 let res = match cmd {
192 CliCommand::Stop { job_id } => self.stop(job_id),
193 CliCommand::Down { group } => self.down(&group),
194 CliCommand::Ps => self.ps(stream),
195 CliCommand::Jobs => self.jobs(stream),
196 CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
197 CliCommand::Exit => std::process::exit(0),
198 };
199 if let Err(e) = &res {
200 error!("{e}");
201 }
202 let _ = stream.send_message(&res.into());
203 }
204 fn add_job(&mut self, job: JobInfo) -> JobId {
205 self.last_job_id += 1;
206 self.jobs.insert(self.last_job_id, job);
207 self.last_job_id
208 }
209 fn spawn_info(&self, job_id: JobId) -> Result<JobSpawnInfo<'_>, DispatcherError> {
210 let job = self
211 .jobs
212 .get(&job_id)
213 .ok_or(DispatcherError::JobNotFoundError(job_id))?;
214 Ok(JobSpawnInfo {
215 job_id,
216 args: &job.args,
217 restart_info: job.restart.clone(),
218 })
219 }
220 fn find_job(&self, service: &str) -> Option<JobId> {
222 self.jobs
223 .iter()
224 .find(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if name == service))
225 .map(|(id, _info)| *id)
226 }
227 fn run(&mut self, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
228 let job_info = JobInfo::new_shell_job(args.to_vec());
229 let job_id = self.add_job(job_info);
230 self.spawn_job(job_id)?;
231 Ok(vec![job_id])
232 }
233 fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
234 let job = self.spawn_info(job_id)?;
235 let child = Runner::spawn(job.job_id, job.args, job.restart_info, self.channel.clone())?;
236 self.procs.lock().expect("lock").push(child);
237 thread::sleep(Duration::from_millis(10));
239 if let Some(child) = self.procs.lock().expect("lock").last() {
240 return match child.info.state {
241 ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
242 _ => Ok(()),
244 };
245 }
246 Ok(())
247 }
248 fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
250 if let Some(uuid) = self.cronjobs.remove(&job_id) {
251 info!("Removing cron job {job_id}");
252 self.scheduler.lock().expect("lock").remove(uuid);
253 }
254 for child in self
255 .procs
256 .lock()
257 .expect("lock")
258 .iter_mut()
259 .filter(|child| child.info.job_id == job_id)
260 {
261 if child.is_running() {
262 child.user_terminated = true;
263 child.terminate().map_err(DispatcherError::KillError)?;
264 }
265 }
266 if self.jobs.remove(&job_id).is_some() {
267 Ok(())
268 } else {
269 Err(DispatcherError::JobNotFoundError(job_id))
270 }
271 }
272 fn run_at(&mut self, cron: &str, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
274 let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
275 let restart_info = job_info.restart.clone();
276 let job_id = self.add_job(job_info);
277 let job_args = args.to_vec();
278 let procs = self.procs.clone();
279 let channel = self.channel.clone();
280 let uuid = self
281 .scheduler
282 .lock()
283 .expect("lock")
284 .add(job_scheduler::Job::new(cron.parse()?, move || {
285 let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
286 .unwrap();
287 procs.lock().expect("lock").push(child);
288 }));
289 self.cronjobs.insert(job_id, uuid);
290 Ok(vec![job_id])
291 }
292 fn start(&mut self, service: &str) -> Result<Vec<JobId>, DispatcherError> {
294 let job_id = self
296 .find_job(service)
297 .unwrap_or_else(|| self.add_job(JobInfo::new_service(service.to_string())));
298 let running = self
300 .procs
301 .lock()
302 .expect("lock")
303 .iter_mut()
304 .any(|child| child.info.job_id == job_id && child.is_running());
305 if running {
306 Ok(vec![])
307 } else {
308 self.spawn_job(job_id)?;
309 Ok(vec![job_id])
310 }
311 }
312 fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
314 let mut job_ids = Vec::new();
315 let justfile = Justfile::parse()?;
316 let recipes = justfile.group_recipes(group);
317 for service in recipes {
318 let ids = self.start(&service)?;
319 job_ids.extend(ids);
320 }
321 Ok(job_ids)
322 }
323 fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
325 let mut job_ids = Vec::new();
326 let justfile = Justfile::parse()?;
327 let recipes = justfile.group_recipes(group);
328 for service in recipes {
329 self.jobs
330 .iter()
331 .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
332 .for_each(|(id, _info)| job_ids.push(*id));
333 }
334 for job_id in job_ids {
335 self.stop(job_id)?;
336 }
337 Ok(())
338 }
339 fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
341 let ts = Local::now();
345 self.system.refresh_processes_specifics(
346 ProcessesToUpdate::All,
347 true,
348 ProcessRefreshKind::new().with_cpu(),
349 );
350 let pids: Vec<sysinfo::Pid> = self
352 .procs
353 .lock()
354 .expect("lock")
355 .iter()
356 .flat_map(|proc| {
357 let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
358 self.system
359 .processes()
360 .iter()
361 .filter(move |(_pid, process)| {
362 process.parent().unwrap_or(0.into()) == parent_pid
363 })
364 .map(|(pid, _process)| *pid)
365 .chain([parent_pid])
366 })
367 .collect();
368 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); let duration = (Local::now() - ts).num_milliseconds();
370 fn per_second(value: u64, ms: i64) -> u64 {
371 (value as f64 * 1000.0 / ms as f64) as u64
372 }
373 self.system.refresh_processes_specifics(
374 ProcessesToUpdate::Some(&pids),
375 true,
376 ProcessRefreshKind::new()
377 .with_cpu()
378 .with_disk_usage()
379 .with_memory(),
380 );
381
382 let mut proc_infos = Vec::new();
383 for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
384 let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
385 let main_pid = if child.info.program() == "just" {
388 self.system
389 .processes()
390 .iter()
391 .find(|(_pid, process)| {
392 process.parent().unwrap_or(0.into()) == parent_pid
393 && process.name() != "ctrl-c"
394 })
395 .map(|(pid, _process)| *pid)
396 .unwrap_or(parent_pid)
397 } else {
398 parent_pid
399 };
400 if let Some(process) = self.system.process(main_pid) {
401 child.info.cpu = process.cpu_usage();
402 child.info.memory = process.memory();
403 child.info.virtual_memory = process.virtual_memory();
404 let disk = process.disk_usage();
405 child.info.total_written_bytes = disk.total_written_bytes;
406 child.info.written_bytes = per_second(disk.written_bytes, duration);
407 child.info.total_read_bytes = disk.total_read_bytes;
408 child.info.read_bytes = per_second(disk.read_bytes, duration);
409 } else {
410 child.info.cpu = 0.0;
411 child.info.memory = 0;
412 child.info.virtual_memory = 0;
413 child.info.written_bytes = 0;
414 child.info.read_bytes = 0;
415 }
416 let info = child.update_proc_state();
417 proc_infos.push(info.clone());
418 }
419 stream.send_message(&Message::PsInfo(proc_infos))?;
420 Ok(())
421 }
422 fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
424 let mut job_infos = Vec::new();
425 for (id, info) in self.jobs.iter().rev() {
426 job_infos.push(Job {
427 id: *id,
428 info: info.clone(),
429 });
430 }
431 stream.send_message(&Message::JobInfo(job_infos))?;
432 Ok(())
433 }
434 fn log(
436 &mut self,
437 job_or_service: Option<String>,
438 stream: &mut IpcStream,
439 ) -> Result<(), DispatcherError> {
440 let mut job_id_filter = None;
441 if let Some(job_or_service) = job_or_service {
442 if let Ok(job_id) = JobId::from_str(&job_or_service) {
443 if self.jobs.contains_key(&job_id) {
444 job_id_filter = Some(job_id);
445 } else {
446 return Err(DispatcherError::JobNotFoundError(job_id));
447 }
448 } else {
449 job_id_filter = Some(
450 self.find_job(&job_or_service)
451 .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
452 );
453 }
454 }
455
456 let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
457 'logwait: loop {
458 let mut log_lines = Vec::new();
460 for child in self.procs.lock().expect("lock").iter_mut() {
461 if let Ok(output) = child.output.lock() {
462 let last_seen = last_seen_ts
463 .entry(child.proc.id())
464 .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
465 for entry in output.lines_since(last_seen) {
466 if let Some(job_id) = job_id_filter {
467 if entry.job_id != job_id {
468 continue;
469 }
470 }
471 log_lines.push(entry.clone());
472 }
473 }
474 }
475
476 if log_lines.is_empty() {
477 stream.alive()?;
479 } else {
480 log_lines.sort_by_key(|entry| entry.ts);
481 for entry in log_lines {
482 if stream.send_message(&Message::LogLine(entry)).is_err() {
483 info!("Aborting log command (stream error)");
484 break 'logwait;
485 }
486 }
487 }
488 thread::sleep(Duration::from_millis(100));
490 }
491 Ok(())
492 }
493}
494
495fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
496 loop {
497 let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
498 scheduler.tick();
499 scheduler.time_till_next_job()
500 } else {
501 Duration::from_millis(50)
502 };
503 std::thread::sleep(wait_time);
504 }
505}
506
507fn child_watcher(
510 procs: Arc<Mutex<Vec<Runner>>>,
511 sender: mpsc::Sender<Pid>,
512 recv: mpsc::Receiver<Pid>,
513) {
514 loop {
515 let pid = recv.recv().expect("recv");
517 let ts = Local::now();
518 let mut respawn_child = None;
519 if let Some(child) = procs
520 .lock()
521 .expect("lock")
522 .iter_mut()
523 .find(|p| p.info.pid == pid)
524 {
525 let exit_code = child.proc.wait().ok().and_then(|st| st.code());
527 let _ = child.update_proc_state();
528 child.info.end = Some(ts);
529 if let Some(code) = exit_code {
530 info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
531 } else {
532 info!(target: &format!("{pid}"), "Process terminated");
533 }
534 let respawn = !child.user_terminated
535 && match child.restart_info.policy {
536 Restart::Always => true,
537 Restart::OnFailure => {
538 matches!(child.info.state, ProcStatus::ExitErr(code) if code > 0)
539 }
540 Restart::Never => false,
541 };
542 if respawn {
543 respawn_child = Some((child.info.clone(), child.restart_info.clone()));
544 }
545 } else {
546 info!(target: &format!("{pid}"), "(Unknown) process terminated");
547 }
548 if let Some((child_info, restart_info)) = respawn_child {
549 thread::sleep(Duration::from_millis(restart_info.wait_time));
550 let result = Runner::spawn(
551 child_info.job_id,
552 &child_info.cmd_args,
553 restart_info,
554 sender.clone(),
555 );
556 match result {
557 Ok(child) => procs.lock().expect("lock").push(child),
558 Err(e) => error!("Error trying to respawn failed process: {e}"),
559 }
560 }
561 }
562}