1#![deny(missing_docs)]
2#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
3use libc::TIOCSCTTY;
4use serde::{Deserialize, Serialize};
5use std::collections::BTreeMap;
6use std::ffi::{OsStr, OsString};
7use std::os::fd::AsRawFd as _;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::fs::File;
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12
13#[derive(Debug, Serialize)]
14#[serde(rename_all = "lowercase")]
15pub enum Output {
17 Pid(u32),
19 Stdout(Vec<u8>),
21 Error(String),
23 Terminated(Option<i32>),
25}
26
27#[derive(Debug, Deserialize, Eq, PartialEq)]
28pub enum Input {
30 Data(Vec<u8>),
32 Resize((usize, usize)),
34 Terminate,
36}
37
38const BUF_SIZE: usize = 8192;
39
40fn set_term_size(
41 fd: i32,
42 term_size: (usize, usize),
43) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
44 let ws = libc::winsize {
45 ws_row: u16::try_from(term_size.1)?,
46 ws_col: u16::try_from(term_size.0)?,
47 ws_xpixel: 0,
48 ws_ypixel: 0,
49 };
50
51 if unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &ws) } != 0 {
52 return Err("ioctl".into());
53 }
54
55 Ok(())
56}
57
58pub struct Command {
60 pid: Option<u32>,
61 program: OsString,
62 args: Vec<OsString>,
63 env: BTreeMap<OsString, OsString>,
64 current_dir: Option<PathBuf>,
65 in_tx: async_channel::Sender<Input>,
66 in_rx: async_channel::Receiver<Input>,
67 out_tx: async_channel::Sender<Output>,
68 out_rx: async_channel::Receiver<Output>,
69 terminal_id: String,
70 terminal_size: (usize, usize),
71 shutdown_timeout: Option<Duration>,
72}
73
74impl Command {
75 pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
77 let (in_tx, in_rx) = async_channel::bounded(BUF_SIZE);
78 let (out_tx, out_rx) = async_channel::bounded(BUF_SIZE);
79 Self {
80 pid: None,
81 program: program.as_ref().to_os_string(),
82 args: <_>::default(),
83 env: <_>::default(),
84 current_dir: None,
85 in_tx,
86 in_rx,
87 out_tx,
88 out_rx,
89 terminal_id: "screen-256color".to_string(),
90 terminal_size: (80, 24),
91 shutdown_timeout: None,
92 }
93 }
94 pub fn in_tx(&self) -> async_channel::Sender<Input> {
96 self.in_tx.clone()
97 }
98 pub fn out_tx(&self) -> async_channel::Sender<Output> {
100 self.out_tx.clone()
101 }
102 pub fn out_rx(&self) -> async_channel::Receiver<Output> {
104 self.out_rx.clone()
105 }
106 pub fn terminal_id<S: Into<String>>(mut self, terminal_id: S) -> Self {
108 self.terminal_id = terminal_id.into();
109 self
110 }
111 pub fn terminal_size(mut self, terminal_size: (usize, usize)) -> Self {
113 self.terminal_size = terminal_size;
114 self
115 }
116 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
118 self.shutdown_timeout = Some(timeout);
119 self
120 }
121 pub fn args<I, S>(mut self, args: I) -> Self
123 where
124 I: IntoIterator<Item = S>,
125 S: AsRef<OsStr>,
126 {
127 self.args = args
128 .into_iter()
129 .map(|s| s.as_ref().to_os_string())
130 .collect();
131 self
132 }
133 pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
135 self.args.push(arg.as_ref().to_os_string());
136 self
137 }
138 pub fn envs<I, K, V>(mut self, env: I) -> Self
140 where
141 I: IntoIterator<Item = (K, V)>,
142 K: AsRef<OsStr>,
143 V: AsRef<OsStr>,
144 {
145 self.env = env
146 .into_iter()
147 .map(|(k, v)| (k.as_ref().to_os_string(), v.as_ref().to_os_string()))
148 .collect();
149 self
150 }
151 pub fn env<K: AsRef<OsStr>, V: AsRef<OsStr>>(mut self, key: K, value: V) -> Self {
153 self.env
154 .insert(key.as_ref().to_os_string(), value.as_ref().to_os_string());
155 self
156 }
157 pub fn current_dir<P: AsRef<Path>>(mut self, current_dir: P) -> Self {
159 self.current_dir = Some(current_dir.as_ref().to_path_buf());
160 self
161 }
162 pub async fn run(self) {
164 let out_tx = self.out_tx.clone();
165 match self.run_subprocess().await {
166 Ok(v) => {
167 out_tx.send(Output::Terminated(v)).await.ok();
168 }
169 Err(e) => {
170 out_tx.send(Output::Error(e.to_string())).await.ok();
171 }
172 }
173 }
174
175 #[allow(clippy::too_many_lines)]
176 async fn run_subprocess(
177 mut self,
178 ) -> Result<Option<i32>, Box<dyn std::error::Error + Send + Sync + 'static>> {
179 let win_size = rustix::termios::Winsize {
180 ws_col: self.terminal_size.0.try_into()?,
181 ws_row: self.terminal_size.1.try_into()?,
182 ws_xpixel: 0,
183 ws_ypixel: 0,
184 };
185 let pty = rustix_openpty::openpty(None, Some(&win_size))?;
186 let (master, slave) = (pty.controller, pty.user);
187
188 let master_fd = master.as_raw_fd();
189 let slave_fd = slave.as_raw_fd();
190
191 if let Ok(mut termios) = rustix::termios::tcgetattr(&master) {
192 termios
194 .input_modes
195 .set(rustix::termios::InputModes::IUTF8, true);
196 let _ = rustix::termios::tcsetattr(
197 &master,
198 rustix::termios::OptionalActions::Now,
199 &termios,
200 );
201 }
202
203 let mut builder = tokio::process::Command::new(&self.program);
204
205 if let Some(ref current_dir) = self.current_dir {
206 builder.current_dir(current_dir);
207 }
208
209 builder
210 .args(&self.args)
211 .envs(&self.env)
212 .env("COLUMNS", self.terminal_size.0.to_string())
213 .env("LINES", self.terminal_size.1.to_string())
214 .env("TERM", &self.terminal_id);
215
216 builder.stdin(slave.try_clone()?);
217 builder.stderr(slave.try_clone()?);
218 builder.stdout(slave);
219
220 unsafe {
221 builder.pre_exec(move || {
222 let err = libc::setsid();
223 if err == -1 {
224 return Err(std::io::Error::new(
225 std::io::ErrorKind::Other,
226 "Failed to set session id",
227 ));
228 }
229
230 let res = libc::ioctl(slave_fd, TIOCSCTTY as _, 0);
231 if res == -1 {
232 return Err(std::io::Error::new(
233 std::io::ErrorKind::Other,
234 format!("Failed to set controlling terminal: {}", res),
235 ));
236 }
237
238 libc::close(slave_fd);
239 libc::close(master_fd);
240
241 libc::signal(libc::SIGCHLD, libc::SIG_DFL);
242 libc::signal(libc::SIGHUP, libc::SIG_DFL);
243 libc::signal(libc::SIGINT, libc::SIG_DFL);
244 libc::signal(libc::SIGQUIT, libc::SIG_DFL);
245 libc::signal(libc::SIGTERM, libc::SIG_DFL);
246 libc::signal(libc::SIGALRM, libc::SIG_DFL);
247
248 Ok(())
249 });
250 }
251
252 let mut child = builder.spawn()?;
253
254 let pid = child.id().ok_or("unable to get child pid")?;
255
256 self.out_tx.send(Output::Pid(pid)).await?;
257 self.pid = Some(pid);
258
259 let mut stdout = File::from_std(std::fs::File::from(master));
260
261 let mut stdin = stdout.try_clone().await?;
262
263 let tx_stdout = self.out_tx.clone();
264
265 let fut_out = tokio::spawn(async move {
266 let mut buf = [0u8; BUF_SIZE];
267 while let Ok(b) = stdout.read(&mut buf).await {
268 if b == 0 {
269 break;
270 }
271 if tx_stdout
272 .send(Output::Stdout(buf[..b].to_vec()))
273 .await
274 .is_err()
275 {
276 break;
277 }
278 }
279 });
280
281 let shutdown_timeout = self.shutdown_timeout;
282
283 let fut_in = tokio::spawn(async move {
284 while let Ok(input) = self.in_rx.recv().await {
285 let mut data = match input {
286 Input::Data(d) => d,
287 Input::Resize(size) => {
288 set_term_size(stdin.as_raw_fd(), size).ok();
289 bmart::process::kill_pstree_with_signal(
290 pid,
291 bmart::process::Signal::SIGWINCH,
292 true,
293 );
294 continue;
295 }
296 Input::Terminate => {
297 break;
298 }
299 };
300 if data == [0x0a] {
302 data[0] = 0x0d;
303 }
304 if stdin.write_all(&data).await.is_err() {
305 break;
306 }
307 }
308 });
309
310 let result = child.wait().await?;
311
312 if let Some(t) = shutdown_timeout {
314 for _ in 0..10 {
315 if fut_out.is_finished() {
316 break;
317 }
318 tokio::time::sleep(t / 10).await;
319 }
320 }
321
322 fut_out.abort();
323 fut_in.abort();
324
325 let exit_code = result.code();
326
327 Ok(exit_code)
328 }
329}
330
331impl Drop for Command {
332 fn drop(&mut self) {
333 if let Some(pid) = self.pid {
334 tokio::spawn(bmart::process::kill_pstree(
335 pid,
336 Some(Duration::from_secs(1)),
337 true,
338 ));
339 }
340 }
341}