together_rs/
process.rs

1use std::sync::Arc;
2
3pub use subprocess_impl::SbProcess::{self as Process};
4
5#[derive(Debug, Clone, Hash, Eq, PartialEq)]
6pub struct ProcessId {
7    id: u32,
8    command: Arc<str>,
9}
10
11impl ProcessId {
12    pub fn new(id: u32, command: String) -> Self {
13        Self {
14            id,
15            command: command.into_boxed_str().into(),
16        }
17    }
18    pub fn command(&self) -> &str {
19        &self.command
20    }
21}
22
23impl std::fmt::Display for ProcessId {
24    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
25        write!(f, "[{}]: {}", self.id, self.command)
26    }
27}
28
29#[derive(Debug, Clone)]
30pub enum ProcessSignal {
31    SIGINT,
32    SIGTERM,
33    SIGKILL,
34}
35
36#[derive(Clone, Copy)]
37pub enum ProcessStdio {
38    Inherit,
39    Raw,
40    StderrOnly,
41}
42
43impl From<bool> for ProcessStdio {
44    fn from(b: bool) -> Self {
45        if b {
46            Self::Raw
47        } else {
48            Self::Inherit
49        }
50    }
51}
52
53mod subprocess_impl {
54    use std::{
55        io::BufRead,
56        sync::{Arc, RwLock},
57    };
58
59    use subprocess::{ExitStatus, Popen, PopenConfig};
60
61    use crate::{
62        errors::{TogetherInternalError, TogetherResult},
63        log, log_err,
64    };
65
66    use super::{ProcessId, ProcessSignal, ProcessStdio};
67
68    pub struct SbProcess {
69        popen: subprocess::Popen,
70        mute: Option<Arc<RwLock<bool>>>,
71    }
72
73    impl SbProcess {
74        pub fn spawn(
75            command: &str,
76            cwd: Option<&str>,
77            stdio: ProcessStdio,
78        ) -> TogetherResult<Self> {
79            let mut config = PopenConfig::default();
80            config.stdout = match stdio {
81                ProcessStdio::Raw => subprocess::Redirection::None,
82                _ => subprocess::Redirection::Pipe,
83            };
84            config.stderr = match stdio {
85                ProcessStdio::Raw | ProcessStdio::StderrOnly => subprocess::Redirection::None,
86                _ => subprocess::Redirection::Pipe,
87            };
88            config.cwd = cwd.map(|s| s.into());
89
90            #[cfg(unix)]
91            {
92                config.setpgid = true;
93            }
94
95            let mut argv = os::SHELL.to_vec();
96            argv.push(command);
97            let popen = Popen::create(&argv, config)?;
98            let mute = Arc::new(RwLock::new(false));
99
100            Ok(Self {
101                popen,
102                mute: Some(mute),
103            })
104        }
105
106        pub fn kill(&mut self, signal: Option<&ProcessSignal>) -> TogetherResult<()> {
107            fn check_err<T: Ord + Default>(num: T) -> std::io::Result<T> {
108                if num < T::default() {
109                    return Err(std::io::Error::last_os_error());
110                }
111                Ok(num)
112            }
113
114            #[cfg(windows)]
115            {
116                Ok(self.popen.terminate()?)
117            }
118            #[cfg(unix)]
119            {
120                self.popen.poll();
121                let pid = match self.popen.pid() {
122                    Some(pid) => pid as i32,
123                    _ => return Ok(()),
124                };
125                let signal = match signal {
126                    Some(ProcessSignal::SIGINT) => libc::SIGINT,
127                    Some(ProcessSignal::SIGTERM) => libc::SIGTERM,
128                    Some(ProcessSignal::SIGKILL) => libc::SIGKILL,
129                    None => libc::SIGTERM,
130                };
131                let _code = check_err(unsafe { libc::kill(-pid, signal) })?;
132                Ok(())
133            }
134        }
135
136        pub fn try_wait(&mut self) -> TogetherResult<Option<i32>> {
137            match self.popen.poll() {
138                Some(ExitStatus::Exited(code)) => Ok(Some(code as i32)),
139                Some(ExitStatus::Signaled(_)) => Ok(Some(1)),
140                Some(ExitStatus::Other(_)) | Some(ExitStatus::Undetermined) => {
141                    Err(TogetherInternalError::ProcessFailedToExit.into())
142                }
143                None => Ok(None),
144            }
145        }
146
147        pub fn forward_stdio(&mut self, id: &ProcessId) {
148            let stdout = self.popen.stdout.take().unwrap();
149            let stderr = self.popen.stderr.take().unwrap();
150            let id = id.clone();
151            let mute = self.mute.clone();
152            std::thread::spawn(move || {
153                let id = id.clone();
154                Self::forward_stdio_blocking(&id, stdout, stderr, mute)
155            });
156        }
157
158        fn forward_stdio_blocking(
159            id: &ProcessId,
160            stdout: std::fs::File,
161            stderr: std::fs::File,
162            mute: Option<Arc<RwLock<bool>>>,
163        ) {
164            let mut stdout = std::io::BufReader::new(stdout);
165            let mut stderr = std::io::BufReader::new(stderr);
166            let mut stdout_line = String::new();
167            let mut stderr_line = String::new();
168            loop {
169                let mut stdout_done = false;
170                let mut stderr_done = false;
171                let mut stdout_bytes = vec![];
172                let mut stderr_bytes = vec![];
173                let stdout_read = stdout.read_line(&mut stdout_line);
174                let stderr_read = stderr.read_line(&mut stderr_line);
175                match (stdout_read, stderr_read) {
176                    (Ok(0), Ok(0)) => {
177                        stdout_done = true;
178                        stderr_done = true;
179                    }
180                    (Ok(0), _) => {
181                        stdout_done = true;
182                    }
183                    (_, Ok(0)) => {
184                        stderr_done = true;
185                    }
186                    (Ok(_), Ok(_)) => {}
187                    (Err(e), _) => {
188                        log_err!("Failed to read stdout: {}", e);
189                        stdout_done = true;
190                    }
191                    (_, Err(e)) => {
192                        log_err!("Failed to read stderr: {}", e);
193                        stderr_done = true;
194                    }
195                }
196                if !stdout_done {
197                    stdout_bytes.extend(stdout_line.as_bytes());
198                    stdout_line.clear();
199                }
200                if !stderr_done {
201                    stderr_bytes.extend(stderr_line.as_bytes());
202                    stderr_line.clear();
203                }
204                if !stdout_bytes.is_empty() {
205                    while mute.as_ref().map_or(false, |m| *m.read().unwrap()) {
206                        log!("Skipping muted process {}", id.id);
207                        std::thread::sleep(std::time::Duration::from_millis(100));
208                    }
209                    print!("{}: {}", id.id, String::from_utf8_lossy(&stdout_bytes));
210                }
211                if !stderr_bytes.is_empty() {
212                    eprint!("{}: {}", id.id, String::from_utf8_lossy(&stderr_bytes));
213                }
214                if stdout_done && stderr_done {
215                    break;
216                }
217            }
218        }
219    }
220
221    #[cfg(unix)]
222    mod os {
223        pub const SHELL: [&str; 2] = ["sh", "-c"];
224    }
225
226    #[cfg(windows)]
227    mod os {
228        pub const SHELL: [&str; 2] = ["cmd.exe", "/c"];
229    }
230}