Skip to main content

tracel_xtask/utils/
process.rs

1use std::{
2    collections::HashMap,
3    io::{BufRead, BufReader},
4    path::Path,
5    process::{Command, ExitStatus, Stdio},
6    sync::mpsc,
7    thread,
8};
9
10use anyhow::{self, Context};
11use rand::Rng;
12use regex::Regex;
13
14use crate::group_info;
15use crate::{endgroup, group};
16
17/// A custom error for failed subprocesses.
18///
19/// To get the `ExitStatus`, downcast the error at call sites.
20#[derive(Debug)]
21pub struct ProcessExitError {
22    pub message: String,
23    pub status: ExitStatus,
24    pub signal: Option<ExitSignal>,
25}
26
27#[derive(Debug)]
28pub struct ExitSignal {
29    pub code: u32,
30    pub name: String,
31    pub description: String,
32}
33
34impl std::fmt::Display for ProcessExitError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "{} ({})", self.message, self.status)
37    }
38}
39
40impl std::fmt::Display for ExitSignal {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(
43            f,
44            "signal: {}, {}: {}",
45            self.code, self.name, self.description
46        )
47    }
48}
49
50impl std::error::Error for ProcessExitError {}
51
52fn return_process_error(
53    error_msg: &str,
54    status: ExitStatus,
55    signal: Option<ExitSignal>,
56) -> anyhow::Result<()> {
57    Err(ProcessExitError {
58        message: error_msg.to_string(),
59        status,
60        signal,
61    }
62    .into())
63}
64
65fn extract_exit_signal(line: &str) -> Option<ExitSignal> {
66    // Matches: (signal: 11, SIGSEGV: invalid memory reference)
67    let re = Regex::new(r"\(signal:\s*(\d+),\s*(SIG[A-Z]+):\s*([^)]+)\)").ok()?;
68    let caps = re.captures(line)?;
69    let code = caps.get(1)?.as_str().parse::<u32>().ok()?;
70    let name = caps.get(2)?.as_str().to_string();
71    let description = caps.get(3)?.as_str().trim().to_string();
72
73    Some(ExitSignal {
74        code,
75        name,
76        description,
77    })
78}
79
80/// Run a process
81pub fn run_process(
82    name: &str,
83    args: &[&str],
84    envs: Option<HashMap<&str, &str>>,
85    path: Option<&Path>,
86    error_msg: &str,
87) -> anyhow::Result<()> {
88    let joined_args = args.join(" ");
89    group_info!("Command line: {} {}", name, &joined_args);
90    let mut command = Command::new(name);
91    if let Some(path) = path {
92        command.current_dir(path);
93    }
94    if let Some(envs) = envs {
95        command.envs(&envs);
96    }
97    let status = command.args(args).status().map_err(|e| {
98        let first = args.first().copied().unwrap_or("");
99        anyhow::anyhow!("Failed to execute {} {}: {}", name, first, e)
100    })?;
101    if !status.success() {
102        return return_process_error(error_msg, status, None);
103    }
104    Ok(())
105}
106
107pub fn run_process_capture_stdout(cmd: &mut Command, label: &str) -> anyhow::Result<String> {
108    let out = cmd
109        .stdout(Stdio::piped())
110        .stderr(Stdio::inherit())
111        .output()
112        .with_context(|| format!("running {label}"))?;
113    if !out.status.success() {
114        return Err(anyhow::anyhow!("{label} failed with status {}", out.status));
115    }
116    String::from_utf8(out.stdout).context("non-UTF8 output")
117}
118
119/// Run a process for workspace
120/// regexp must have one capture group if defined
121#[allow(clippy::too_many_arguments)]
122pub fn run_process_for_workspace<'a>(
123    name: &str,
124    args: &[&'a str],
125    excluded: &'a [String],
126    group_regexp: Option<&str>,
127    group_name: Option<&str>,
128    error_msg: &str,
129    ignore_log: Option<&str>,
130    ignore_msg: Option<&str>,
131) -> anyhow::Result<()> {
132    let group_rx: Option<Regex> = group_regexp.map(|r| Regex::new(r).unwrap());
133    // split the args between cargo args and binary args so that we can extend the cargo args
134    // and then append the binary args back.
135    let (cargo_args, binary_args) = split_vector(args, "--");
136    let mut cmd_args = cargo_args.to_owned();
137    excluded
138        .iter()
139        .for_each(|ex| cmd_args.extend(["--exclude", ex]));
140    cmd_args.extend(binary_args);
141    group_info!("Command line: cargo {}", cmd_args.join(" "));
142    // process
143    let mut child = Command::new(name)
144        .args(&cmd_args)
145        .stdout(Stdio::piped())
146        .stderr(Stdio::piped())
147        .spawn()
148        .map_err(|e| {
149            anyhow::anyhow!(format!(
150                "Failed to start {} {}: {}",
151                name,
152                cmd_args.first().unwrap(),
153                e
154            ))
155        })?;
156
157    // handle stdout and stderr in dedicated threads using a MPSC channel for synchronization
158    let (tx, rx) = mpsc::channel();
159    // stdout processing thread
160    if let Some(stdout) = child.stdout.take() {
161        let tx = tx.clone();
162        thread::spawn(move || {
163            let reader = BufReader::new(stdout);
164            for line in reader.lines().map_while(Result::ok) {
165                tx.send((line, false)).unwrap();
166            }
167        });
168    }
169    // stderr processing thread
170    if let Some(stderr) = child.stderr.take() {
171        let tx = tx.clone();
172        thread::spawn(move || {
173            let reader = BufReader::new(stderr);
174            for line in reader.lines().map_while(Result::ok) {
175                tx.send((line, true)).unwrap();
176            }
177        });
178    }
179    // Drop the sender once all the logs have been processed to close the channel
180    drop(tx);
181
182    // Process the stdout to inject log groups
183    let mut ignore_error = false;
184    let mut close_group = false;
185    let mut signal = None;
186    for (line, _is_stderr) in rx.iter() {
187        let mut skip_line = false;
188
189        if let Some(rx) = &group_rx {
190            let cleaned_line = standardize_slashes(&remove_ansi_codes(&line));
191            if let Some(caps) = rx.captures(&cleaned_line) {
192                let crate_name = &caps[1];
193                if close_group {
194                    endgroup!();
195                }
196                close_group = true;
197                group!("{}: {}", group_name.unwrap_or("Group"), crate_name);
198            }
199        }
200
201        if let Some(log) = ignore_log {
202            if line.contains(log) {
203                if let Some(msg) = ignore_msg {
204                    warn!("{msg}");
205                }
206                ignore_error = true;
207                skip_line = true;
208            }
209        }
210
211        if line.contains("(signal:") {
212            signal = extract_exit_signal(&line);
213        }
214
215        if !skip_line {
216            println!("{line}");
217        }
218    }
219
220    let status = child
221        .wait()
222        .expect("Should be able to wait for the process to finish.");
223
224    if status.success() || ignore_error {
225        if close_group {
226            endgroup!();
227        }
228        anyhow::Ok(())
229    } else {
230        return_process_error(error_msg, status, signal)
231    }
232}
233
234/// Run a process command for a package
235#[allow(clippy::too_many_arguments)]
236pub fn run_process_for_package(
237    name: &str,
238    package: &String,
239    args: &[&str],
240    excluded: &[String],
241    only: &[String],
242    error_msg: &str,
243    ignore_log: Option<&str>,
244    ignore_msg: Option<&str>,
245) -> anyhow::Result<()> {
246    if excluded.contains(package) || (!only.is_empty() && !only.contains(package)) {
247        group_info!("Skip '{}' because it has been excluded!", package);
248        return anyhow::Ok(());
249    }
250    let joined_args = args.join(" ");
251    group_info!("Command line: cargo {}", &joined_args);
252
253    let mut child = Command::new(name)
254        .args(args)
255        .stdout(Stdio::piped())
256        .stderr(Stdio::piped())
257        .spawn()
258        .map_err(|e| {
259            anyhow::anyhow!(format!(
260                "Failed to start {} {}: {}",
261                name,
262                args.first().unwrap(),
263                e
264            ))
265        })?;
266
267    // handle stdout and stderr in dedicated threads using a MPSC channel for synchronization
268    let (tx, rx) = mpsc::channel();
269    // stdout processing thread
270    if let Some(stdout) = child.stdout.take() {
271        let tx = tx.clone();
272        thread::spawn(move || {
273            let reader = BufReader::new(stdout);
274            for line in reader.lines().map_while(Result::ok) {
275                tx.send((line, false)).unwrap();
276            }
277        });
278    }
279    // stderr processing thread
280    if let Some(stderr) = child.stderr.take() {
281        let tx = tx.clone();
282        thread::spawn(move || {
283            let reader = BufReader::new(stderr);
284            for line in reader.lines().map_while(Result::ok) {
285                tx.send((line, true)).unwrap();
286            }
287        });
288    }
289    // Drop the sender once all the logs have been processed to close the channel
290    drop(tx);
291
292    // Process the stdout to inject log groups
293    let mut ignore_error = false;
294    let mut skip_line = false;
295    let mut signal = None;
296    for (line, is_stderr) in rx.iter() {
297        if let Some(log) = ignore_log {
298            if !is_stderr {
299                // skip the lines until a non stderr line is encountered
300                skip_line = false;
301            }
302            if line.contains(log) {
303                if let Some(msg) = ignore_msg {
304                    warn!("{msg}");
305                    ignore_error = true;
306                    skip_line = true;
307                }
308            }
309        }
310        if line.contains("(signal:") {
311            signal = extract_exit_signal(&line);
312        }
313
314        if !skip_line {
315            println!("{line}");
316        }
317    }
318
319    let status = child
320        .wait()
321        .expect("Should be able to wait for the process to finish.");
322
323    if status.success() || ignore_error {
324        anyhow::Ok(())
325    } else {
326        return_process_error(error_msg, status, signal)
327    }
328}
329
330/// Return a random port between 3000 and 9999
331pub fn random_port() -> u16 {
332    let mut rng = rand::rng();
333    rng.random_range(3000..=9999)
334}
335
336fn remove_ansi_codes(s: &str) -> String {
337    let re = Regex::new(r"\x1b\[[0-9;]*m").unwrap();
338    re.replace_all(s, "").to_string()
339}
340
341fn standardize_slashes(s: &str) -> String {
342    s.replace('\\', "/")
343}
344
345/// Split given VEC into a left and right vectors where SPLIT belongs to the right vector.
346/// If SPLIT does not exist in VEC then left is a VEC slice and right is empty.
347fn split_vector<T: PartialEq>(vec: &[T], split: T) -> (&[T], &[T]) {
348    let mut left = vec;
349    let mut right = &vec[vec.len()..];
350    if let Some(pos) = vec.iter().position(|e| *e == split) {
351        left = &vec[..pos];
352        right = &vec[pos..];
353    }
354    (left, right)
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use rstest::rstest;
361
362    #[rstest]
363    fn test_random_port_in_range() {
364        for _ in 0..10000 {
365            let port = random_port();
366            assert!(
367                (3000..=9999).contains(&port),
368                "Port should be between 3000 and 9999, got {port}"
369            );
370        }
371    }
372
373    #[rstest]
374    #[case::simple_escape_code("\x1b[31mRed Text\x1b[0m", "Red Text")]
375    #[case::complex_escape_code("\x1b[1;34mBold Blue Text\x1b[0m", "Bold Blue Text")]
376    #[case::no_escape_code("No ANSI Codes", "No ANSI Codes")]
377    fn test_remove_ansi_codes(#[case] input: &str, #[case] expected: &str) {
378        let result = remove_ansi_codes(input);
379        assert_eq!(
380            result, expected,
381            "Expected '{expected}', but got '{result}'"
382        );
383    }
384
385    #[rstest]
386    #[case::windows_path(r"C:\path\to\file", "C:/path/to/file")]
387    #[case::network_path(r"\\network\share\file", "//network/share/file")]
388    #[case::already_standard_path("/already/standard/path", "/already/standard/path")]
389    fn test_standardize_slashes(#[case] input: &str, #[case] expected: &str) {
390        let result = standardize_slashes(input);
391        assert_eq!(
392            result, expected,
393            "Expected '{expected}', but got '{result}'"
394        );
395    }
396
397    #[rstest]
398    #[case::element_found(vec!["a", "b", "c", "d", "e", "f"], "d", vec!["a", "b", "c"], vec!["d", "e", "f"])]
399    #[case::element_not_found(vec!["a", "b", "c", "d", "e", "f"], "z", vec!["a", "b", "c", "d", "e", "f"], vec![])]
400    #[case::element_at_start(vec!["a", "b", "c", "d", "e", "f"], "a", vec![], vec!["a", "b", "c", "d", "e", "f"])]
401    #[case::element_at_end(vec!["a", "b", "c", "d", "e", "f"], "f", vec!["a", "b", "c", "d", "e"], vec!["f"])]
402    #[case::empty_vector(vec![], "x", vec![], vec![])]
403    #[case::cargo_with_binary_args(vec!["cargo", "build", "--exclude", "crate", "--workpspace", "--", "--color", "always"], "--", vec!["cargo", "build", "--exclude", "crate", "--workpspace"], vec!["--", "--color", "always"])]
404    #[case::cargo_without_binary_args(vec!["cargo", "build", "--exclude", "crate", "--workpspace"], "--", vec!["cargo", "build", "--exclude", "crate", "--workpspace"], vec![])]
405    fn test_split_vector(
406        #[case] vec: Vec<&str>,
407        #[case] split_elem: &str,
408        #[case] expected_left: Vec<&str>,
409        #[case] expected_right: Vec<&str>,
410    ) {
411        let (left, right) = split_vector(&vec, split_elem);
412
413        assert_eq!(left, &expected_left);
414        assert_eq!(right, &expected_right);
415    }
416}