1use std::{
2 io,
3 process::{Output, Stdio},
4 sync::{
5 atomic::{AtomicBool, AtomicUsize, Ordering},
6 Arc,
7 },
8 time::{Duration, Instant},
9};
10
11use console::Color;
12use tokio::{
13 io::{AsyncBufReadExt, BufReader},
14 process::{Child, ChildStderr, ChildStdout},
15 signal, task, time,
16};
17
18use crate::{Cmd, Dependency, Error, KillTimeout, Location, Result, SpawnOptions};
19
20pub struct Process<Loc> {
22 pub tag: &'static str,
24 pub cmd: Cmd<Loc>,
26 pub timeout: KillTimeout,
28}
29
30enum TeardownReason {
31 CtrlC,
32 ProcessFinished(io::Result<Output>),
33}
34
35enum CtrlCResult {
36 ProcessExited,
37 Timeout,
38}
39
40pub(crate) enum ExitResult {
41 Output(Output),
42 Interrupted,
43 Killed { pid: u32 },
44}
45
46impl<Loc> Process<Loc>
47where
48 Loc: Location,
49{
50 pub fn new(tag: &'static str, cmd: Cmd<Loc>, timeout: KillTimeout) -> Self {
52 Self { tag, cmd, timeout }
53 }
54
55 pub fn tag(&self) -> &'static str {
57 self.tag
58 }
59
60 pub fn cmd(&self) -> &Cmd<Loc> {
62 &self.cmd
63 }
64
65 pub fn timeout(&self) -> &KillTimeout {
67 &self.timeout
68 }
69
70 pub async fn spawn(&self, opts: SpawnOptions) -> io::Result<RunningProcess> {
73 self.cmd().spawn(opts)
74 }
75}
76
77#[macro_export]
99macro_rules! process {
100 {
101 tag: $tag:expr,
102 cmd: $cmd:expr,
103 timeout: $timeout:expr$(,)?
104 } => {
105 $crate::Process::new(
106 $tag,
107 $cmd,
108 $timeout,
109 )
110 };
111 {
112 tag: $tag:expr,
113 cmd: $cmd:expr$(,)?
114 } => {
115 $crate::Process::new(
116 $tag,
117 $cmd,
118 $crate::KillTimeout::default(),
119 )
120 };
121}
122
123pub struct RunningProcess {
125 pub(crate) process: Child,
126 pub(crate) timeout: KillTimeout,
127}
128
129impl RunningProcess {
130 pub fn as_child(&self) -> &Child {
132 &self.process
133 }
134
135 pub fn into_child(self) -> Child {
137 self.process
138 }
139
140 pub(crate) fn stdout(&mut self) -> Option<ChildStdout> {
141 self.process.stdout.take()
142 }
143
144 pub(crate) fn stderr(&mut self) -> Option<ChildStderr> {
145 self.process.stderr.take()
146 }
147
148 pub(crate) async fn wait(self) -> Result<ExitResult> {
149 let process = self.process;
150
151 let pid = match process.id() {
152 Some(pid) => pid,
153 None => return Err(Error::ProcessDoesNotExist),
154 };
155
156 let process_exited = Arc::new(AtomicBool::new(false));
157
158 let exit_reason = {
159 let process_exited = process_exited.clone();
160
161 let process_task = task::spawn(async move {
162 let res = process.wait_with_output().await;
163 process_exited.store(true, Ordering::SeqCst);
164 res
165 });
166
167 tokio::select! {
168 result =
169 process_task =>
170 TeardownReason::ProcessFinished(
171 result.unwrap_or_else(|err| Err(io::Error::new(io::ErrorKind::Other, err)))
172 ),
173 _ = signal::ctrl_c() => TeardownReason::CtrlC,
174 }
175 };
176
177 match exit_reason {
178 TeardownReason::ProcessFinished(result) => {
179 let output = result?;
180 if output.status.success() {
181 Ok(ExitResult::Output(output))
182 } else {
183 Err(output.into())
184 }
185 }
186 TeardownReason::CtrlC => {
187 let res = {
188 let process_exited = process_exited.clone();
189 let exit_checker = task::spawn(async move {
190 loop {
191 if process_exited.load(Ordering::SeqCst) {
192 break;
193 }
194 }
195 });
196 tokio::select! {
197 _ = exit_checker => CtrlCResult::ProcessExited,
198 _ = time::sleep(*self.timeout) => CtrlCResult::Timeout,
199 }
200 };
201
202 match res {
203 CtrlCResult::ProcessExited => Ok(ExitResult::Interrupted),
204 CtrlCResult::Timeout => match Self::kill(pid) {
205 Ok(()) => Ok(ExitResult::Killed { pid }),
206 Err(err) => Err(err),
207 },
208 }
209 }
210 }
211 }
212
213 #[cfg(unix)]
215 pub async fn stop(mut self) -> Result<()> {
216 use nix::{
217 sys::signal::{self, Signal},
218 unistd::Pid,
219 };
220
221 match self.process.id() {
222 None => Err(Error::ProcessDoesNotExist),
223 Some(pid) => match signal::kill(Pid::from_raw(pid as i32), Signal::SIGINT) {
224 Ok(()) => {
225 let process = &mut self.process;
226
227 let res = tokio::select! {
228 res = process.wait() => Some(res),
229 _ = time::sleep(*self.timeout) => None,
230 };
231
232 match res {
233 Some(Ok(_)) => Ok(()),
234 Some(Err(error)) => {
235 eprintln!("⚠️ IO error on SIGINT: {error}. Killing the process {pid}.");
236 Self::kill(pid)
237 }
238 None => {
239 eprintln!("⚠️ SIGINT timeout. Killing the process {pid}.");
240 Self::kill(pid)
241 }
242 }
243 }
244 Err(error) => {
245 eprintln!("⚠️ Failed to terminate the process {pid}. {error}. Killing it.");
246 Self::kill(pid)
247 }
248 },
249 }
250 }
251
252 #[cfg(unix)]
255 pub(crate) fn kill(pid: u32) -> Result<()> {
256 use nix::{
257 sys::signal::{self, Signal},
258 unistd::Pid,
259 };
260
261 signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
262 .map_err(|err| Error::Zombie { pid, err })
263 }
264
265 #[cfg(windows)]
266 pub(crate) fn kill(pid: u32) -> Result<()> {
267 use winapi::{
268 shared::{
269 minwindef::{BOOL, DWORD, FALSE, UINT},
270 ntdef::NULL,
271 },
272 um::{
273 errhandlingapi::GetLastError,
274 handleapi::CloseHandle,
275 processthreadsapi::{OpenProcess, TerminateProcess},
276 winnt::{HANDLE, PROCESS_TERMINATE},
277 },
278 };
279
280 const DESIRED_ACCESS: DWORD = PROCESS_TERMINATE;
282
283 const INHERIT_HANDLE: BOOL = FALSE;
284
285 const EXIT_CODE: UINT = 0;
290
291 unsafe fn get_error(pid: u32) -> Result<()> {
294 let err: DWORD = GetLastError();
296
297 Err(Error::Zombie { pid, err })
298 }
299
300 unsafe {
301 let handle: HANDLE = OpenProcess(DESIRED_ACCESS, INHERIT_HANDLE, pid);
303 if handle == NULL {
304 get_error(pid)?;
305 }
306
307 let terminate_result: BOOL = TerminateProcess(handle, EXIT_CODE);
309 if terminate_result == FALSE {
310 get_error(pid)?;
311 }
312
313 let close_result: BOOL = CloseHandle(handle);
315 if close_result == FALSE {
316 get_error(pid)?;
317 }
318 }
319
320 Ok(())
321 }
322}
323
324pub enum PoolEntry<Loc, Dep: ?Sized> {
328 Process(Process<Loc>),
330 ProcessWithDep {
332 process: Process<Loc>,
334 dependency: Box<Dep>,
336 },
337}
338
339impl<Loc> PoolEntry<Loc, dyn Dependency>
340where
341 Loc: Location + 'static,
342{
343 fn process(&self) -> &Process<Loc> {
344 match self {
345 Self::Process(process) => process,
346 Self::ProcessWithDep {
347 process,
348 dependency: _,
349 } => process,
350 }
351 }
352
353 fn take(self) -> (Process<Loc>, Option<Box<dyn Dependency>>) {
354 match self {
355 Self::Process(process) => (process, None),
356 Self::ProcessWithDep {
357 process,
358 dependency,
359 } => (process, Some(dependency)),
360 }
361 }
362}
363
364pub struct ProcessPool;
370
371impl ProcessPool {
372 pub async fn run<Loc>(pool: Vec<Process<Loc>>) -> Result<()>
374 where
375 Loc: Location + 'static,
376 {
377 let pool = pool.into_iter().map(|p| PoolEntry::Process(p)).collect();
378 ProcessPool::runner::<Loc>(pool).await
379 }
380
381 pub async fn run_with_deps<Loc>(pool: Vec<PoolEntry<Loc, dyn Dependency>>) -> Result<()>
385 where
386 Loc: Location + 'static,
387 {
388 ProcessPool::runner(pool).await
389 }
390
391 async fn runner<Loc>(pool: Vec<PoolEntry<Loc, dyn Dependency>>) -> Result<()>
392 where
393 Loc: Location + 'static,
394 {
395 let pool_size = pool.len();
396 let exited_processes = Arc::new(AtomicUsize::new(0));
397
398 let (tag_col_length, timeout) =
399 pool.iter()
400 .fold((0, Duration::default()), |(len, timeout), entry| {
401 let process = entry.process();
402 let len = {
403 let tag_len = process.tag().len();
404 if tag_len > len {
405 tag_len
406 } else {
407 len
408 }
409 };
410 let timeout = if *process.timeout > timeout {
411 *process.timeout
412 } else {
413 timeout
414 };
415 (len, timeout)
416 });
417
418 let colors = colors::make(pool_size as u8);
419 let processes: Vec<(PoolEntry<Loc, dyn Dependency>, Color)> =
420 pool.into_iter().zip(colors).collect();
421
422 let processes_list = processes.iter().fold(String::new(), |acc, (entry, color)| {
423 let process = entry.process();
424 let styled = console::style(process.tag().to_string()).fg(*color).bold();
425 if acc.is_empty() {
426 styled.to_string()
427 } else {
428 format!("{}, {}", acc, styled)
429 }
430 });
431
432 eprintln!("❯ {} {}", console::style("Running:").bold(), processes_list);
433
434 for (entry, color) in processes {
435 let exited_processes = exited_processes.clone();
436
437 task::spawn(async move {
438 let (process, dependency) = entry.take();
439 let tag = process.tag();
440 let cmd = process.cmd();
441 let timeout = process.timeout();
442 let colored_tag = console::style(tag.to_owned()).fg(color).bold();
443 let colored_tag_col = {
444 let len = tag.len();
445 let pad = " ".repeat(if len < tag_col_length {
446 tag_col_length - len + 2
447 } else {
448 2
449 });
450 console::style(format!(
451 "{tag}{pad}{pipe}",
452 tag = colored_tag,
453 pad = pad,
454 pipe = console::style("|").fg(color).bold()
455 ))
456 };
457
458 let dep_res = match dependency {
459 None => Ok(()),
460 Some(dependency) => {
461 let dep_tag = console::style(dependency.tag()).bold();
462
463 eprintln!(
464 "{col} {process} is waiting for its {dep} dependency...",
465 col = colored_tag_col,
466 dep = dep_tag,
467 process = colored_tag
468 );
469
470 let res = dependency.wait().await;
471 if let Err(error) = &res {
472 eprintln!(
473 "{col} ❗️ {dep} dependency of {process} errored: {error}\nNot executing {process}.",
474 col = colored_tag_col,
475 dep = dep_tag,
476 process = colored_tag,
477 error = error
478 );
479 }
480 res
481 }
482 };
483
484 if let Ok(()) = dep_res {
485 eprintln!(
486 "{tag} {headline}",
487 tag = colored_tag_col,
488 headline = crate::headline!(cmd),
489 );
490
491 let opts = SpawnOptions {
492 stdout: Stdio::piped(),
493 stderr: Stdio::piped(),
494 timeout: timeout.to_owned(),
495 };
496
497 let mut process = process.spawn(opts).await.unwrap_or_else(|err| {
498 panic!("Failed to spawn {} process. {}", colored_tag, err)
499 });
500
501 match process.stdout() {
502 None => eprintln!(
503 "{} Unable to read from {} stdout",
504 colored_tag_col, colored_tag
505 ),
506 Some(stdout) => {
507 let mut reader = BufReader::new(stdout).lines();
508 task::spawn({
509 let tag = colored_tag_col.clone();
510 async move {
511 while let Some(line) = reader.next_line().await.unwrap() {
512 eprintln!("{} {}", tag, line);
513 }
514 }
515 });
516 }
517 }
518
519 match process.stderr() {
520 None => eprintln!(
521 "{} Unable to read from {} stderr",
522 colored_tag_col, colored_tag
523 ),
524 Some(stderr) => {
525 let mut reader = BufReader::new(stderr).lines();
526 task::spawn({
527 let tag = colored_tag_col.clone();
528 async move {
529 while let Some(line) = reader.next_line().await.unwrap() {
530 eprintln!("{} {}", tag, line);
531 }
532 }
533 });
534 }
535 }
536
537 let res = process.wait().await;
538
539 match res {
540 Ok(ExitResult::Output(_)) => eprintln!(
541 "{} Process {} exited with code 0.",
542 colored_tag_col, colored_tag
543 ),
544 Ok(ExitResult::Interrupted) => eprintln!(
545 "{} Process {} successfully exited.",
546 colored_tag_col, colored_tag
547 ),
548 Ok(ExitResult::Killed { pid }) => eprintln!(
549 "{} Process {} with pid {pid} was killed due to timeout.",
550 colored_tag_col,
551 colored_tag,
552 ),
553 Err(Error::NonZeroExitCode { code, output: _ }) => eprintln!(
554 "{} Process {} exited with non-zero code: {}",
555 colored_tag_col,
556 colored_tag,
557 code.map(|x| format!("{}", x)).unwrap_or_else(|| "-".to_string())
558 ),
559 Err(Error::ProcessDoesNotExist) => eprintln!(
560 "{} ⚠️ Process {} does not exist.",
561 colored_tag_col, colored_tag
562 ),
563 Err(Error::Zombie { pid, err }) => eprintln!(
564 "{} ⚠️ Process {} with pid {} hanged and we were unable to kill it. Error: {}",
565 colored_tag_col, colored_tag, pid, err
566 ),
567 Err(Error::IoError(err)) => eprintln!(
568 "{} Process {} exited with error: {}",
569 colored_tag_col, colored_tag, err
570 ),
571 }
572 }
573
574 exited_processes.fetch_add(1, Ordering::Relaxed);
575 });
576 }
577
578 signal::ctrl_c().await.unwrap();
579 eprintln!(); let expire = Instant::now() + timeout;
582 while exited_processes.load(Ordering::Relaxed) < pool_size {
583 if Instant::now() > expire {
584 eprintln!("⚠️ Timeout. Exiting.");
585 break;
586 }
587 time::sleep(Duration::from_millis(500)).await;
588 }
589
590 Ok(())
591 }
592}
593
594mod colors {
595 use console::Color;
596 use rand::{seq::SliceRandom, thread_rng};
597
598 pub fn make(n: u8) -> Vec<Color> {
599 let mut primaries = vec![
601 Color::Green,
603 Color::Yellow,
604 Color::Blue,
605 Color::Magenta,
606 Color::Cyan,
607 ];
608 let secondaries = vec![
610 Color::Color256(24),
611 Color::Color256(172),
612 Color::Color256(142),
613 ];
614
615 if n <= primaries.len() as u8 {
617 shuffle(primaries, n)
618 }
619 else if n <= (primaries.len() + primaries.len()) as u8 {
621 primaries.extend(secondaries);
622 shuffle(primaries, n)
623 } else {
624 todo!()
626 }
627 }
628
629 fn shuffle<T>(mut items: Vec<T>, n: u8) -> Vec<T> {
630 items.truncate(n as usize);
631 items.shuffle(&mut thread_rng());
632 items
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use std::time::Duration;
639
640 use crate::{Cmd, Location, Process};
641
642 #[allow(dead_code)]
643 fn process_macro_with_timeout<Loc: Location>(cmd: Cmd<Loc>) -> Process<Loc> {
644 process! {
645 tag: "server",
646 cmd: cmd,
647 timeout: Duration::from_secs(20).into(),
648 }
649 }
650
651 #[allow(dead_code)]
652 fn process_macro_without_timeout<Loc: Location>(cmd: Cmd<Loc>) -> Process<Loc> {
653 process! {
654 tag: "server",
655 cmd: cmd,
656 }
657 }
658}