1use crate::{
2 CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3 ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::sync::{mpsc, Arc, Mutex};
13use std::thread;
14use std::time::Duration;
15use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
16use thiserror::Error;
17
18pub type JobId = u32;
19pub type Pid = u32;
20
21pub struct Dispatcher<'a> {
22 jobs: BTreeMap<JobId, JobInfo>,
23 last_job_id: JobId,
24 cronjobs: HashMap<JobId, job_scheduler::Uuid>,
25 procs: Arc<Mutex<Vec<Runner>>>,
26 scheduler: Arc<Mutex<JobScheduler<'a>>>,
27 system: System,
28 channel: mpsc::Sender<Pid>,
30}
31
32#[derive(Clone, Serialize, Deserialize, Debug)]
33pub struct JobInfo {
34 pub job_type: JobType,
35 pub args: Vec<String>,
36 pub restart: RestartInfo,
37}
38
39#[derive(Clone, Serialize, Deserialize, Debug)]
40pub enum JobType {
41 Shell,
43 Service(String),
45 Cron(String),
47}
48
49#[derive(clap::ValueEnum, Clone, Serialize, Deserialize, Debug)]
51pub enum RestartPolicy {
52 Always,
53 OnFailure,
54 Never,
55}
56
57#[derive(Clone, Serialize, Deserialize, Debug)]
59pub struct RestartInfo {
60 pub policy: RestartPolicy,
61 pub wait_time: u64,
63 }
65
66#[derive(Clone, Serialize, Deserialize, Debug)]
68pub struct Job {
69 pub id: JobId,
70 pub info: JobInfo,
71}
72
73#[derive(Error, Debug)]
74pub enum DispatcherError {
75 #[error(transparent)]
76 CliArgsError(#[from] clap::Error),
77 #[error("Failed to start `shell-composed`: {0}")]
78 DispatcherSpawnError(std::io::Error),
79 #[error("Connection to `shell-composed` failed")]
80 DispatcherSpawnTimeoutError,
81 #[error("Failed to spawn `{0}`: {1}")]
82 ProcSpawnError(String, std::io::Error),
83 #[error("Failed to terminate child process: {0}")]
84 KillError(std::io::Error),
85 #[error("Job {0} not found")]
86 JobNotFoundError(JobId),
87 #[error("Service `{0}` not found")]
88 ServiceNotFoundError(String),
89 #[error("Process exit code: {0}")]
90 ProcExitError(i32),
91 #[error("Empty command")]
92 EmptyProcCommandError,
93 #[error(transparent)]
94 JustfileError(#[from] JustfileError),
95 #[error("Communication protocol error")]
96 UnexpectedMessageError,
97 #[error(transparent)]
98 IpcClientError(#[from] IpcClientError),
99 #[error("Cron error: {0}")]
100 CronError(#[from] cron::error::Error),
101}
102
103impl Default for RestartInfo {
104 fn default() -> Self {
105 RestartInfo {
106 policy: RestartPolicy::OnFailure,
107 wait_time: 50,
108 }
109 }
110}
111
112impl JobInfo {
113 pub fn new_shell_job(args: Vec<String>, restart: Option<RestartPolicy>) -> Self {
114 let policy = restart.unwrap_or(RestartPolicy::Never);
115 JobInfo {
116 job_type: JobType::Shell,
117 args,
118 restart: RestartInfo {
119 policy,
120 ..Default::default()
121 },
122 }
123 }
124 pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
125 JobInfo {
126 job_type: JobType::Cron(cron),
127 args,
128 restart: RestartInfo {
129 policy: RestartPolicy::Never,
130 ..Default::default()
131 },
132 }
133 }
134 pub fn service_command(service: &str, args: &[String]) -> String {
135 [service]
136 .into_iter()
137 .chain(args.iter().map(|arg| arg.as_str()))
138 .collect::<Vec<_>>()
139 .join(" ")
140 }
141 pub fn new_service(service: &str, svc_args: &[String], restart: Option<RestartPolicy>) -> Self {
142 let mut args = vec!["just".to_string(), service.to_string()];
143 args.extend(svc_args.to_vec());
144 let policy = restart.unwrap_or(RestartPolicy::OnFailure);
145 JobInfo {
146 job_type: JobType::Service(Self::service_command(service, svc_args)),
147 args,
148 restart: RestartInfo {
149 policy,
150 ..Default::default()
151 },
152 }
153 }
154}
155
156impl Dispatcher<'_> {
157 pub fn create() -> Dispatcher<'static> {
158 let procs = Arc::new(Mutex::new(Vec::new()));
159 let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
160
161 let scheduler_spawn = scheduler.clone();
162 let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
163
164 let (send, recv) = mpsc::channel();
165 let send_spawn = send.clone();
166 let procs_spawn = procs.clone();
167 let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
168
169 let system = System::new_with_specifics(
170 RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
171 );
172
173 Dispatcher {
174 jobs: BTreeMap::new(),
175 last_job_id: 0,
176 cronjobs: HashMap::new(),
177 procs,
178 scheduler,
179 system,
180 channel: send,
181 }
182 }
183 pub fn exec_command(&mut self, cmd: ExecCommand, cwd: PathBuf) -> Message {
184 info!("Executing `{cmd:?}`");
185 std::env::set_current_dir(&cwd).unwrap();
186 let res = match cmd {
187 ExecCommand::Run { args, restart } => self.run(&args, restart),
188 ExecCommand::Runat { at, args } => self.run_at(&at, cwd, &args),
189 ExecCommand::Start {
190 service,
191 args,
192 restart,
193 } => self.start(&service, &args, restart),
194 ExecCommand::Up { group } => self.up(&group),
195 };
196 match res {
197 Err(e) => {
198 error!("{e}");
199 Message::Err(format!("{e}"))
200 }
201 Ok(job_ids) => Message::JobsStarted(job_ids),
202 }
203 }
204 pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
205 info!("Executing `{cmd:?}`");
206 let res = match cmd {
207 CliCommand::Stop { job_id } => self.stop(job_id),
208 CliCommand::Down { group } => self.down(&group),
209 CliCommand::Ps => self.ps(stream),
210 CliCommand::Jobs => self.jobs(stream),
211 CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
212 CliCommand::Exit => std::process::exit(0),
213 };
214 if let Err(e) = &res {
215 error!("{e}");
216 }
217 let _ = stream.send_message(&res.into());
218 }
219 fn add_job(&mut self, job: JobInfo) -> JobId {
220 self.last_job_id += 1;
221 self.jobs.insert(self.last_job_id, job);
222 self.last_job_id
223 }
224 fn find_job(&self, service: &str, args: &[String]) -> Option<JobId> {
226 let cmd = JobInfo::service_command(service, args);
227 self.jobs
228 .iter()
229 .find(
230 |(_id, info)| matches!(&info.job_type, JobType::Service(command) if *command == cmd),
231 )
232 .map(|(id, _info)| *id)
233 }
234 fn run(
235 &mut self,
236 args: &[String],
237 policy: Option<RestartPolicy>,
238 ) -> Result<Vec<JobId>, DispatcherError> {
239 let job_info = JobInfo::new_shell_job(args.to_vec(), policy);
240 let job_id = self.add_job(job_info);
241 self.spawn_job(job_id)?;
242 Ok(vec![job_id])
243 }
244 fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
245 let job = self
246 .jobs
247 .get(&job_id)
248 .ok_or(DispatcherError::JobNotFoundError(job_id))?;
249 let child = Runner::spawn(job_id, &job.args, job.restart.clone(), self.channel.clone())?;
250 self.procs.lock().expect("lock").push(child);
251 thread::sleep(Duration::from_millis(10));
253 if let Some(child) = self.procs.lock().expect("lock").last() {
254 return match child.info.state {
255 ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
256 _ => Ok(()),
258 };
259 }
260 Ok(())
261 }
262 fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
264 if let Some(uuid) = self.cronjobs.remove(&job_id) {
265 info!("Removing cron job {job_id}");
266 self.scheduler.lock().expect("lock").remove(uuid);
267 }
268 for child in self
269 .procs
270 .lock()
271 .expect("lock")
272 .iter_mut()
273 .filter(|child| child.info.job_id == job_id)
274 {
275 if child.is_running() {
276 child.user_terminated = true;
277 child.terminate().map_err(DispatcherError::KillError)?;
278 }
279 }
280 if self.jobs.remove(&job_id).is_some() {
281 Ok(())
282 } else {
283 Err(DispatcherError::JobNotFoundError(job_id))
284 }
285 }
286 fn run_at(
288 &mut self,
289 cron: &str,
290 cwd: PathBuf,
291 args: &[String],
292 ) -> Result<Vec<JobId>, DispatcherError> {
293 let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
294 let restart_info = job_info.restart.clone();
295 let job_id = self.add_job(job_info);
296 let job_args = args.to_vec();
297 let procs = self.procs.clone();
298 let channel = self.channel.clone();
299 let uuid = self
300 .scheduler
301 .lock()
302 .expect("lock")
303 .add(job_scheduler::Job::new(cron.parse()?, move || {
304 std::env::set_current_dir(&cwd).unwrap();
305 let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
306 .unwrap();
307 procs.lock().expect("lock").push(child);
308 }));
309 self.cronjobs.insert(job_id, uuid);
310 Ok(vec![job_id])
311 }
312 fn start(
314 &mut self,
315 service: &str,
316 args: &[String],
317 policy: Option<RestartPolicy>,
318 ) -> Result<Vec<JobId>, DispatcherError> {
319 let job_id = self
321 .find_job(service, args)
322 .unwrap_or_else(|| self.add_job(JobInfo::new_service(service, args, policy)));
323 let running = self
325 .procs
326 .lock()
327 .expect("lock")
328 .iter_mut()
329 .any(|child| child.info.job_id == job_id && child.is_running());
330 if running {
331 Ok(vec![])
332 } else {
333 self.spawn_job(job_id)?;
334 Ok(vec![job_id])
335 }
336 }
337 fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
339 let mut job_ids = Vec::new();
340 let justfile = Justfile::parse()?;
341 let recipes = justfile.group_recipes(group);
342 for service in recipes {
343 let ids = self.start(&service, &[], None)?;
344 job_ids.extend(ids);
345 }
346 Ok(job_ids)
347 }
348 fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
350 let mut job_ids = Vec::new();
351 let justfile = Justfile::parse()?;
352 let recipes = justfile.group_recipes(group);
353 for service in recipes {
354 self.jobs
355 .iter()
356 .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
357 .for_each(|(id, _info)| job_ids.push(*id));
358 }
359 for job_id in job_ids {
360 self.stop(job_id)?;
361 }
362 Ok(())
363 }
364 fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
366 let ts = Local::now();
370 self.system.refresh_processes_specifics(
371 ProcessesToUpdate::All,
372 true,
373 ProcessRefreshKind::nothing().with_cpu(),
374 );
375 let pids: Vec<sysinfo::Pid> = self
377 .procs
378 .lock()
379 .expect("lock")
380 .iter()
381 .flat_map(|proc| {
382 let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
383 self.system
384 .processes()
385 .iter()
386 .filter(move |(_pid, process)| {
387 process.parent().unwrap_or(0.into()) == parent_pid
388 })
389 .map(|(pid, _process)| *pid)
390 .chain([parent_pid])
391 })
392 .collect();
393 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); let duration = (Local::now() - ts).num_milliseconds();
395 fn per_second(value: u64, ms: i64) -> u64 {
396 (value as f64 * 1000.0 / ms as f64) as u64
397 }
398 self.system.refresh_processes_specifics(
399 ProcessesToUpdate::Some(&pids),
400 true,
401 ProcessRefreshKind::nothing()
402 .with_cpu()
403 .with_disk_usage()
404 .with_memory(),
405 );
406
407 let mut proc_infos = Vec::new();
408 for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
409 let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
410 let main_pid = if child.info.program() == "just" {
413 self.system
414 .processes()
415 .iter()
416 .find(|(_pid, process)| {
417 process.parent().unwrap_or(0.into()) == parent_pid
418 && process.name() != "ctrl-c"
419 })
420 .map(|(pid, _process)| *pid)
421 .unwrap_or(parent_pid)
422 } else {
423 parent_pid
424 };
425 if let Some(process) = self.system.process(main_pid) {
426 child.info.cpu = process.cpu_usage();
427 child.info.memory = process.memory();
428 child.info.virtual_memory = process.virtual_memory();
429 let disk = process.disk_usage();
430 child.info.total_written_bytes = disk.total_written_bytes;
431 child.info.written_bytes = per_second(disk.written_bytes, duration);
432 child.info.total_read_bytes = disk.total_read_bytes;
433 child.info.read_bytes = per_second(disk.read_bytes, duration);
434 } else {
435 child.info.cpu = 0.0;
436 child.info.memory = 0;
437 child.info.virtual_memory = 0;
438 child.info.written_bytes = 0;
439 child.info.read_bytes = 0;
440 }
441 let info = child.update_proc_state();
442 proc_infos.push(info.clone());
443 }
444 stream.send_message(&Message::PsInfo(proc_infos))?;
445 Ok(())
446 }
447 fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
449 let mut job_infos = Vec::new();
450 for (id, info) in self.jobs.iter().rev() {
451 job_infos.push(Job {
452 id: *id,
453 info: info.clone(),
454 });
455 }
456 stream.send_message(&Message::JobInfo(job_infos))?;
457 Ok(())
458 }
459 fn log(
461 &mut self,
462 job_or_service: Option<String>,
463 stream: &mut IpcStream,
464 ) -> Result<(), DispatcherError> {
465 let mut job_id_filter = None;
466 if let Some(job_or_service) = job_or_service {
467 if let Ok(job_id) = JobId::from_str(&job_or_service) {
468 if self.jobs.contains_key(&job_id) {
469 job_id_filter = Some(job_id);
470 } else {
471 return Err(DispatcherError::JobNotFoundError(job_id));
472 }
473 } else {
474 job_id_filter = Some(
475 self.find_job(&job_or_service, &[])
476 .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
477 );
478 }
479 }
480
481 let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
482 'logwait: loop {
483 let mut log_lines = Vec::new();
485 for child in self.procs.lock().expect("lock").iter_mut() {
486 if let Ok(output) = child.output.lock() {
487 let last_seen = last_seen_ts
488 .entry(child.proc.id())
489 .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
490 for entry in output.lines_since(last_seen) {
491 if let Some(job_id) = job_id_filter {
492 if entry.job_id != job_id {
493 continue;
494 }
495 }
496 log_lines.push(entry.clone());
497 }
498 }
499 }
500
501 if log_lines.is_empty() {
502 stream.alive()?;
504 } else {
505 log_lines.sort_by_key(|entry| entry.ts);
506 for entry in log_lines {
507 if stream.send_message(&Message::LogLine(entry)).is_err() {
508 info!("Aborting log command (stream error)");
509 break 'logwait;
510 }
511 }
512 }
513 thread::sleep(Duration::from_millis(100));
515 }
516 Ok(())
517 }
518}
519
520fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
521 loop {
522 let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
523 scheduler.tick();
524 scheduler.time_till_next_job()
525 } else {
526 Duration::from_millis(50)
527 };
528 std::thread::sleep(wait_time);
529 }
530}
531
532fn child_watcher(
535 procs: Arc<Mutex<Vec<Runner>>>,
536 sender: mpsc::Sender<Pid>,
537 recv: mpsc::Receiver<Pid>,
538) {
539 loop {
540 let pid = recv.recv().expect("recv");
542 let ts = Local::now();
543 let respawn_child = if let Some(child) = procs
544 .lock()
545 .expect("lock")
546 .iter_mut()
547 .find(|p| p.info.pid == pid)
548 {
549 let exit_code = child.proc.wait().ok().and_then(|st| st.code());
551 let _ = child.update_proc_state();
552 child.info.end = Some(ts);
553 if let Some(code) = exit_code {
554 info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
555 } else {
556 info!(target: &format!("{pid}"), "Process terminated");
557 }
558 child.restart_infos()
559 } else {
560 info!(target: &format!("{pid}"), "(Unknown) process terminated");
561 None
562 };
563 if let Some(spawn_info) = respawn_child {
564 thread::sleep(Duration::from_millis(spawn_info.restart_info.wait_time));
565 let result = Runner::spawn(
566 spawn_info.job_id,
567 &spawn_info.args,
568 spawn_info.restart_info,
569 sender.clone(),
570 );
571 match result {
572 Ok(child) => procs.lock().expect("lock").push(child),
573 Err(e) => error!("Error trying to respawn failed process: {e}"),
574 }
575 }
576 }
577}