1use crate::{
2 CliCommand, ExecCommand, IpcClientError, IpcStream, Justfile, JustfileError, Message,
3 ProcStatus, Runner,
4};
5use chrono::{DateTime, Local, TimeZone};
6use job_scheduler_ng::{self as job_scheduler, JobScheduler};
7use log::{error, info};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::str::FromStr;
11use std::sync::{mpsc, Arc, Mutex};
12use std::thread;
13use std::time::Duration;
14use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, RefreshKind, System};
15use thiserror::Error;
16
17pub type JobId = u32;
18pub type Pid = u32;
19
20pub struct Dispatcher<'a> {
21 jobs: BTreeMap<JobId, JobInfo>,
22 last_job_id: JobId,
23 cronjobs: HashMap<JobId, job_scheduler::Uuid>,
24 procs: Arc<Mutex<Vec<Runner>>>,
25 scheduler: Arc<Mutex<JobScheduler<'a>>>,
26 system: System,
27 channel: mpsc::Sender<Pid>,
29}
30
31#[derive(Clone, Serialize, Deserialize, Debug)]
32pub struct JobInfo {
33 pub job_type: JobType,
34 pub args: Vec<String>,
35 pub entrypoint: Option<String>,
36 pub restart: RestartInfo,
37}
38
39#[derive(Clone, Serialize, Deserialize, Debug)]
40pub enum JobType {
41 Shell,
42 Service(String),
43 Cron(String),
44}
45
46#[derive(Clone, Serialize, Deserialize, Debug)]
48pub enum RestartPolicy {
49 Always,
50 OnFailure,
51 Never,
52}
53
54#[derive(Clone, Serialize, Deserialize, Debug)]
56pub struct RestartInfo {
57 pub policy: RestartPolicy,
58 pub wait_time: u64,
60 }
62
63#[derive(Clone, Serialize, Deserialize, Debug)]
65pub struct Job {
66 pub id: JobId,
67 pub info: JobInfo,
68}
69
70#[derive(Error, Debug)]
71pub enum DispatcherError {
72 #[error(transparent)]
73 CliArgsError(#[from] clap::Error),
74 #[error("Failed to start `shell-composed`: {0}")]
75 DispatcherSpawnError(std::io::Error),
76 #[error("Connection to `shell-composed` failed")]
77 DispatcherSpawnTimeoutError,
78 #[error("Failed to spawn process: {0}")]
79 ProcSpawnError(std::io::Error),
80 #[error("Failed to terminate child process: {0}")]
81 KillError(std::io::Error),
82 #[error("Job {0} not found")]
83 JobNotFoundError(JobId),
84 #[error("Service `{0}` not found")]
85 ServiceNotFoundError(String),
86 #[error("Process exit code: {0}")]
87 ProcExitError(i32),
88 #[error("Empty command")]
89 EmptyProcCommandError,
90 #[error(transparent)]
91 JustfileError(#[from] JustfileError),
92 #[error("Communication protocol error")]
93 UnexpectedMessageError,
94 #[error(transparent)]
95 IpcClientError(#[from] IpcClientError),
96 #[error("Cron error: {0}")]
97 CronError(#[from] cron::error::Error),
98}
99
100impl Default for RestartInfo {
101 fn default() -> Self {
102 RestartInfo {
103 policy: RestartPolicy::OnFailure,
104 wait_time: 50,
105 }
106 }
107}
108
109impl JobInfo {
110 pub fn new_shell_job(args: Vec<String>) -> Self {
111 JobInfo {
112 job_type: JobType::Shell,
113 args,
114 entrypoint: None,
115 restart: RestartInfo {
116 policy: RestartPolicy::Never,
117 ..Default::default()
118 },
119 }
120 }
121 pub fn new_cron_job(cron: String, args: Vec<String>) -> Self {
122 JobInfo {
123 job_type: JobType::Cron(cron),
124 args,
125 entrypoint: None,
126 restart: RestartInfo {
127 policy: RestartPolicy::Never,
128 ..Default::default()
129 },
130 }
131 }
132 pub fn new_service(service: String) -> Self {
133 JobInfo {
134 job_type: JobType::Service(service.clone()),
135 args: vec!["just".to_string(), service], entrypoint: Some("just".to_string()),
137 restart: RestartInfo::default(),
138 }
139 }
140}
141
142impl Dispatcher<'_> {
143 pub fn create() -> Dispatcher<'static> {
144 let procs = Arc::new(Mutex::new(Vec::new()));
145 let scheduler = Arc::new(Mutex::new(JobScheduler::new()));
146
147 let scheduler_spawn = scheduler.clone();
148 let _handle = thread::spawn(move || cron_scheduler(scheduler_spawn));
149
150 let (send, recv) = mpsc::channel();
151 let send_spawn = send.clone();
152 let procs_spawn = procs.clone();
153 let _watcher = thread::spawn(move || child_watcher(procs_spawn, send_spawn, recv));
154
155 let system = System::new_with_specifics(
156 RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
157 );
158
159 Dispatcher {
160 jobs: BTreeMap::new(),
161 last_job_id: 0,
162 cronjobs: HashMap::new(),
163 procs,
164 scheduler,
165 system,
166 channel: send,
167 }
168 }
169 pub fn exec_command(&mut self, cmd: ExecCommand) -> Message {
170 info!("Executing `{cmd:?}`");
171 let res = match cmd {
172 ExecCommand::Run { args } => self.run(&args),
173 ExecCommand::Runat { at, args } => self.run_at(&at, &args),
174 ExecCommand::Start { service } => self.start(&service),
175 ExecCommand::Up { group } => self.up(&group),
176 };
177 match res {
178 Err(e) => {
179 error!("{e}");
180 Message::Err(format!("{e}"))
181 }
182 Ok(job_ids) => Message::JobsStarted(job_ids),
183 }
184 }
185 pub fn cli_command(&mut self, cmd: CliCommand, stream: &mut IpcStream) {
186 info!("Executing `{cmd:?}`");
187 let res = match cmd {
188 CliCommand::Stop { job_id } => self.stop(job_id),
189 CliCommand::Down { group } => self.down(&group),
190 CliCommand::Ps => self.ps(stream),
191 CliCommand::Jobs => self.jobs(stream),
192 CliCommand::Logs { job_or_service } => self.log(job_or_service, stream),
193 CliCommand::Exit => std::process::exit(0),
194 };
195 if let Err(e) = &res {
196 error!("{e}");
197 }
198 let _ = stream.send_message(&res.into());
199 }
200 fn add_job(&mut self, job: JobInfo) -> JobId {
201 self.last_job_id += 1;
202 self.jobs.insert(self.last_job_id, job);
203 self.last_job_id
204 }
205 fn find_job(&self, service: &str) -> Option<JobId> {
207 self.jobs
208 .iter()
209 .find(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if name == service))
210 .map(|(id, _info)| *id)
211 }
212 fn run(&mut self, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
213 let job_info = JobInfo::new_shell_job(args.to_vec());
214 let job_id = self.add_job(job_info);
215 self.spawn_job(job_id)?;
216 Ok(vec![job_id])
217 }
218 fn spawn_job(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
219 let job = self
220 .jobs
221 .get(&job_id)
222 .ok_or(DispatcherError::JobNotFoundError(job_id))?;
223 let child = Runner::spawn(job_id, &job.args, job.restart.clone(), self.channel.clone())?;
224 self.procs.lock().expect("lock").push(child);
225 thread::sleep(Duration::from_millis(10));
227 if let Some(child) = self.procs.lock().expect("lock").last() {
228 return match child.info.state {
229 ProcStatus::ExitErr(code) => Err(DispatcherError::ProcExitError(code)),
230 _ => Ok(()),
232 };
233 }
234 Ok(())
235 }
236 fn stop(&mut self, job_id: JobId) -> Result<(), DispatcherError> {
238 if let Some(uuid) = self.cronjobs.remove(&job_id) {
239 info!("Removing cron job {job_id}");
240 self.scheduler.lock().expect("lock").remove(uuid);
241 }
242 for child in self
243 .procs
244 .lock()
245 .expect("lock")
246 .iter_mut()
247 .filter(|child| child.info.job_id == job_id)
248 {
249 if child.is_running() {
250 child.user_terminated = true;
251 child.terminate().map_err(DispatcherError::KillError)?;
252 }
253 }
254 if self.jobs.remove(&job_id).is_some() {
255 Ok(())
256 } else {
257 Err(DispatcherError::JobNotFoundError(job_id))
258 }
259 }
260 fn run_at(&mut self, cron: &str, args: &[String]) -> Result<Vec<JobId>, DispatcherError> {
262 let job_info = JobInfo::new_cron_job(cron.to_string(), args.to_vec());
263 let restart_info = job_info.restart.clone();
264 let job_id = self.add_job(job_info);
265 let job_args = args.to_vec();
266 let procs = self.procs.clone();
267 let channel = self.channel.clone();
268 let uuid = self
269 .scheduler
270 .lock()
271 .expect("lock")
272 .add(job_scheduler::Job::new(cron.parse()?, move || {
273 let child = Runner::spawn(job_id, &job_args, restart_info.clone(), channel.clone())
274 .unwrap();
275 procs.lock().expect("lock").push(child);
276 }));
277 self.cronjobs.insert(job_id, uuid);
278 Ok(vec![job_id])
279 }
280 fn start(&mut self, service: &str) -> Result<Vec<JobId>, DispatcherError> {
282 let job_id = self
284 .find_job(service)
285 .unwrap_or_else(|| self.add_job(JobInfo::new_service(service.to_string())));
286 let running = self
288 .procs
289 .lock()
290 .expect("lock")
291 .iter_mut()
292 .any(|child| child.info.job_id == job_id && child.is_running());
293 if running {
294 Ok(vec![])
295 } else {
296 self.spawn_job(job_id)?;
297 Ok(vec![job_id])
298 }
299 }
300 fn up(&mut self, group: &str) -> Result<Vec<JobId>, DispatcherError> {
302 let mut job_ids = Vec::new();
303 let justfile = Justfile::parse()?;
304 let recipes = justfile.group_recipes(group);
305 for service in recipes {
306 let ids = self.start(&service)?;
307 job_ids.extend(ids);
308 }
309 Ok(job_ids)
310 }
311 fn down(&mut self, group: &str) -> Result<(), DispatcherError> {
313 let mut job_ids = Vec::new();
314 let justfile = Justfile::parse()?;
315 let recipes = justfile.group_recipes(group);
316 for service in recipes {
317 self.jobs
318 .iter()
319 .filter(|(_id, info)| matches!(&info.job_type, JobType::Service(name) if *name == service))
320 .for_each(|(id, _info)| job_ids.push(*id));
321 }
322 for job_id in job_ids {
323 self.stop(job_id)?;
324 }
325 Ok(())
326 }
327 fn ps(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
329 let ts = Local::now();
333 self.system.refresh_processes_specifics(
334 ProcessesToUpdate::All,
335 true,
336 ProcessRefreshKind::nothing().with_cpu(),
337 );
338 let pids: Vec<sysinfo::Pid> = self
340 .procs
341 .lock()
342 .expect("lock")
343 .iter()
344 .flat_map(|proc| {
345 let parent_pid = sysinfo::Pid::from(proc.info.pid as usize);
346 self.system
347 .processes()
348 .iter()
349 .filter(move |(_pid, process)| {
350 process.parent().unwrap_or(0.into()) == parent_pid
351 })
352 .map(|(pid, _process)| *pid)
353 .chain([parent_pid])
354 })
355 .collect();
356 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); let duration = (Local::now() - ts).num_milliseconds();
358 fn per_second(value: u64, ms: i64) -> u64 {
359 (value as f64 * 1000.0 / ms as f64) as u64
360 }
361 self.system.refresh_processes_specifics(
362 ProcessesToUpdate::Some(&pids),
363 true,
364 ProcessRefreshKind::nothing()
365 .with_cpu()
366 .with_disk_usage()
367 .with_memory(),
368 );
369
370 let mut proc_infos = Vec::new();
371 for child in &mut self.procs.lock().expect("lock").iter_mut().rev() {
372 let parent_pid = sysinfo::Pid::from(child.info.pid as usize);
373 let main_pid = if child.info.program() == "just" {
376 self.system
377 .processes()
378 .iter()
379 .find(|(_pid, process)| {
380 process.parent().unwrap_or(0.into()) == parent_pid
381 && process.name() != "ctrl-c"
382 })
383 .map(|(pid, _process)| *pid)
384 .unwrap_or(parent_pid)
385 } else {
386 parent_pid
387 };
388 if let Some(process) = self.system.process(main_pid) {
389 child.info.cpu = process.cpu_usage();
390 child.info.memory = process.memory();
391 child.info.virtual_memory = process.virtual_memory();
392 let disk = process.disk_usage();
393 child.info.total_written_bytes = disk.total_written_bytes;
394 child.info.written_bytes = per_second(disk.written_bytes, duration);
395 child.info.total_read_bytes = disk.total_read_bytes;
396 child.info.read_bytes = per_second(disk.read_bytes, duration);
397 } else {
398 child.info.cpu = 0.0;
399 child.info.memory = 0;
400 child.info.virtual_memory = 0;
401 child.info.written_bytes = 0;
402 child.info.read_bytes = 0;
403 }
404 let info = child.update_proc_state();
405 proc_infos.push(info.clone());
406 }
407 stream.send_message(&Message::PsInfo(proc_infos))?;
408 Ok(())
409 }
410 fn jobs(&mut self, stream: &mut IpcStream) -> Result<(), DispatcherError> {
412 let mut job_infos = Vec::new();
413 for (id, info) in self.jobs.iter().rev() {
414 job_infos.push(Job {
415 id: *id,
416 info: info.clone(),
417 });
418 }
419 stream.send_message(&Message::JobInfo(job_infos))?;
420 Ok(())
421 }
422 fn log(
424 &mut self,
425 job_or_service: Option<String>,
426 stream: &mut IpcStream,
427 ) -> Result<(), DispatcherError> {
428 let mut job_id_filter = None;
429 if let Some(job_or_service) = job_or_service {
430 if let Ok(job_id) = JobId::from_str(&job_or_service) {
431 if self.jobs.contains_key(&job_id) {
432 job_id_filter = Some(job_id);
433 } else {
434 return Err(DispatcherError::JobNotFoundError(job_id));
435 }
436 } else {
437 job_id_filter = Some(
438 self.find_job(&job_or_service)
439 .ok_or(DispatcherError::ServiceNotFoundError(job_or_service))?,
440 );
441 }
442 }
443
444 let mut last_seen_ts: HashMap<Pid, DateTime<Local>> = HashMap::new();
445 'logwait: loop {
446 let mut log_lines = Vec::new();
448 for child in self.procs.lock().expect("lock").iter_mut() {
449 if let Ok(output) = child.output.lock() {
450 let last_seen = last_seen_ts
451 .entry(child.proc.id())
452 .or_insert(Local.timestamp_millis_opt(0).single().expect("ts"));
453 for entry in output.lines_since(last_seen) {
454 if let Some(job_id) = job_id_filter {
455 if entry.job_id != job_id {
456 continue;
457 }
458 }
459 log_lines.push(entry.clone());
460 }
461 }
462 }
463
464 if log_lines.is_empty() {
465 stream.alive()?;
467 } else {
468 log_lines.sort_by_key(|entry| entry.ts);
469 for entry in log_lines {
470 if stream.send_message(&Message::LogLine(entry)).is_err() {
471 info!("Aborting log command (stream error)");
472 break 'logwait;
473 }
474 }
475 }
476 thread::sleep(Duration::from_millis(100));
478 }
479 Ok(())
480 }
481}
482
483fn cron_scheduler(scheduler: Arc<Mutex<JobScheduler<'static>>>) {
484 loop {
485 let wait_time = if let Ok(mut scheduler) = scheduler.lock() {
486 scheduler.tick();
487 scheduler.time_till_next_job()
488 } else {
489 Duration::from_millis(50)
490 };
491 std::thread::sleep(wait_time);
492 }
493}
494
495fn child_watcher(
498 procs: Arc<Mutex<Vec<Runner>>>,
499 sender: mpsc::Sender<Pid>,
500 recv: mpsc::Receiver<Pid>,
501) {
502 loop {
503 let pid = recv.recv().expect("recv");
505 let ts = Local::now();
506 let respawn_child = if let Some(child) = procs
507 .lock()
508 .expect("lock")
509 .iter_mut()
510 .find(|p| p.info.pid == pid)
511 {
512 let exit_code = child.proc.wait().ok().and_then(|st| st.code());
514 let _ = child.update_proc_state();
515 child.info.end = Some(ts);
516 if let Some(code) = exit_code {
517 info!(target: &format!("{pid}"), "Process terminated with exit code {code}");
518 } else {
519 info!(target: &format!("{pid}"), "Process terminated");
520 }
521 child.restart_infos()
522 } else {
523 info!(target: &format!("{pid}"), "(Unknown) process terminated");
524 None
525 };
526 if let Some(spawn_info) = respawn_child {
527 thread::sleep(Duration::from_millis(spawn_info.restart_info.wait_time));
528 let result = Runner::spawn(
529 spawn_info.job_id,
530 &spawn_info.args,
531 spawn_info.restart_info,
532 sender.clone(),
533 );
534 match result {
535 Ok(child) => procs.lock().expect("lock").push(child),
536 Err(e) => error!("Error trying to respawn failed process: {e}"),
537 }
538 }
539 }
540}