1use std::sync::Arc;
2
3pub use subprocess_impl::SbProcess::{self as Process};
4
5#[derive(Debug, Clone, Hash, Eq, PartialEq)]
6pub struct ProcessId {
7 id: u32,
8 command: Arc<str>,
9}
10
11impl ProcessId {
12 pub fn new(id: u32, command: String) -> Self {
13 Self {
14 id,
15 command: command.into_boxed_str().into(),
16 }
17 }
18 pub fn command(&self) -> &str {
19 &self.command
20 }
21}
22
23impl std::fmt::Display for ProcessId {
24 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
25 write!(f, "[{}]: {}", self.id, self.command)
26 }
27}
28
29#[derive(Debug, Clone)]
30pub enum ProcessSignal {
31 SIGINT,
32 SIGTERM,
33 SIGKILL,
34}
35
36#[derive(Clone, Copy)]
37pub enum ProcessStdio {
38 Inherit,
39 Raw,
40 StderrOnly,
41}
42
43impl From<bool> for ProcessStdio {
44 fn from(b: bool) -> Self {
45 if b {
46 Self::Raw
47 } else {
48 Self::Inherit
49 }
50 }
51}
52
53mod subprocess_impl {
54 use std::{
55 io::BufRead,
56 sync::{Arc, RwLock},
57 };
58
59 use subprocess::{ExitStatus, Popen, PopenConfig};
60
61 use crate::{
62 errors::{TogetherInternalError, TogetherResult},
63 log, log_err,
64 };
65
66 use super::{ProcessId, ProcessSignal, ProcessStdio};
67
68 pub struct SbProcess {
69 popen: subprocess::Popen,
70 mute: Option<Arc<RwLock<bool>>>,
71 }
72
73 impl SbProcess {
74 pub fn spawn(
75 command: &str,
76 cwd: Option<&str>,
77 stdio: ProcessStdio,
78 ) -> TogetherResult<Self> {
79 let mut config = PopenConfig::default();
80 config.stdout = match stdio {
81 ProcessStdio::Raw => subprocess::Redirection::None,
82 _ => subprocess::Redirection::Pipe,
83 };
84 config.stderr = match stdio {
85 ProcessStdio::Raw | ProcessStdio::StderrOnly => subprocess::Redirection::None,
86 _ => subprocess::Redirection::Pipe,
87 };
88 config.cwd = cwd.map(|s| s.into());
89
90 #[cfg(unix)]
91 {
92 config.setpgid = true;
93 }
94
95 let mut argv = os::SHELL.to_vec();
96 argv.push(command);
97 let popen = Popen::create(&argv, config)?;
98 let mute = Arc::new(RwLock::new(false));
99
100 Ok(Self {
101 popen,
102 mute: Some(mute),
103 })
104 }
105
106 pub fn kill(&mut self, signal: Option<&ProcessSignal>) -> TogetherResult<()> {
107 fn check_err<T: Ord + Default>(num: T) -> std::io::Result<T> {
108 if num < T::default() {
109 return Err(std::io::Error::last_os_error());
110 }
111 Ok(num)
112 }
113
114 #[cfg(windows)]
115 {
116 Ok(self.popen.terminate()?)
117 }
118 #[cfg(unix)]
119 {
120 self.popen.poll();
121 let pid = match self.popen.pid() {
122 Some(pid) => pid as i32,
123 _ => return Ok(()),
124 };
125 let signal = match signal {
126 Some(ProcessSignal::SIGINT) => libc::SIGINT,
127 Some(ProcessSignal::SIGTERM) => libc::SIGTERM,
128 Some(ProcessSignal::SIGKILL) => libc::SIGKILL,
129 None => libc::SIGTERM,
130 };
131 let _code = check_err(unsafe { libc::kill(-pid, signal) })?;
132 Ok(())
133 }
134 }
135
136 pub fn try_wait(&mut self) -> TogetherResult<Option<i32>> {
137 match self.popen.poll() {
138 Some(ExitStatus::Exited(code)) => Ok(Some(code as i32)),
139 Some(ExitStatus::Signaled(_)) => Ok(Some(1)),
140 Some(ExitStatus::Other(_)) | Some(ExitStatus::Undetermined) => {
141 Err(TogetherInternalError::ProcessFailedToExit.into())
142 }
143 None => Ok(None),
144 }
145 }
146
147 pub fn forward_stdio(&mut self, id: &ProcessId) {
148 let stdout = self.popen.stdout.take().unwrap();
149 let stderr = self.popen.stderr.take().unwrap();
150 let id = id.clone();
151 let mute = self.mute.clone();
152 std::thread::spawn(move || {
153 let id = id.clone();
154 Self::forward_stdio_blocking(&id, stdout, stderr, mute)
155 });
156 }
157
158 fn forward_stdio_blocking(
159 id: &ProcessId,
160 stdout: std::fs::File,
161 stderr: std::fs::File,
162 mute: Option<Arc<RwLock<bool>>>,
163 ) {
164 let mut stdout = std::io::BufReader::new(stdout);
165 let mut stderr = std::io::BufReader::new(stderr);
166 let mut stdout_line = String::new();
167 let mut stderr_line = String::new();
168 loop {
169 let mut stdout_done = false;
170 let mut stderr_done = false;
171 let mut stdout_bytes = vec![];
172 let mut stderr_bytes = vec![];
173 let stdout_read = stdout.read_line(&mut stdout_line);
174 let stderr_read = stderr.read_line(&mut stderr_line);
175 match (stdout_read, stderr_read) {
176 (Ok(0), Ok(0)) => {
177 stdout_done = true;
178 stderr_done = true;
179 }
180 (Ok(0), _) => {
181 stdout_done = true;
182 }
183 (_, Ok(0)) => {
184 stderr_done = true;
185 }
186 (Ok(_), Ok(_)) => {}
187 (Err(e), _) => {
188 log_err!("Failed to read stdout: {}", e);
189 stdout_done = true;
190 }
191 (_, Err(e)) => {
192 log_err!("Failed to read stderr: {}", e);
193 stderr_done = true;
194 }
195 }
196 if !stdout_done {
197 stdout_bytes.extend(stdout_line.as_bytes());
198 stdout_line.clear();
199 }
200 if !stderr_done {
201 stderr_bytes.extend(stderr_line.as_bytes());
202 stderr_line.clear();
203 }
204 if !stdout_bytes.is_empty() {
205 while mute.as_ref().map_or(false, |m| *m.read().unwrap()) {
206 log!("Skipping muted process {}", id.id);
207 std::thread::sleep(std::time::Duration::from_millis(100));
208 }
209 print!("{}: {}", id.id, String::from_utf8_lossy(&stdout_bytes));
210 }
211 if !stderr_bytes.is_empty() {
212 eprint!("{}: {}", id.id, String::from_utf8_lossy(&stderr_bytes));
213 }
214 if stdout_done && stderr_done {
215 break;
216 }
217 }
218 }
219 }
220
221 #[cfg(unix)]
222 mod os {
223 pub const SHELL: [&str; 2] = ["sh", "-c"];
224 }
225
226 #[cfg(windows)]
227 mod os {
228 pub const SHELL: [&str; 2] = ["cmd.exe", "/c"];
229 }
230}