virtual_terminal/
lib.rs

1#![deny(missing_docs)]
2#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
3use libc::TIOCSCTTY;
4use serde::{Deserialize, Serialize};
5use std::collections::BTreeMap;
6use std::ffi::{OsStr, OsString};
7use std::os::fd::AsRawFd as _;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::fs::File;
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12
13#[derive(Debug, Serialize)]
14#[serde(rename_all = "lowercase")]
15/// Output from the command
16pub enum Output {
17    /// The process id of the command, sent right after the command is started
18    Pid(u32),
19    /// Data from the command's stdout/stderr
20    Stdout(Vec<u8>),
21    /// Error messages
22    Error(String),
23    /// The command has terminated (with an optional exit code)
24    Terminated(Option<i32>),
25}
26
27#[derive(Debug, Deserialize, Eq, PartialEq)]
28/// Input to the command
29pub enum Input {
30    /// Data to be sent to the command's stdin
31    Data(Vec<u8>),
32    /// Resize the virtual terminal
33    Resize((usize, usize)),
34    /// Terminate the command
35    Terminate,
36}
37
38const BUF_SIZE: usize = 8192;
39
40fn set_term_size(
41    fd: i32,
42    term_size: (usize, usize),
43) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
44    let ws = libc::winsize {
45        ws_row: u16::try_from(term_size.1)?,
46        ws_col: u16::try_from(term_size.0)?,
47        ws_xpixel: 0,
48        ws_ypixel: 0,
49    };
50
51    if unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &ws) } != 0 {
52        return Err("ioctl".into());
53    }
54
55    Ok(())
56}
57
58/// A command to be run in a virtual terminal
59pub struct Command {
60    pid: Option<u32>,
61    program: OsString,
62    args: Vec<OsString>,
63    env: BTreeMap<OsString, OsString>,
64    current_dir: Option<PathBuf>,
65    in_tx: async_channel::Sender<Input>,
66    in_rx: async_channel::Receiver<Input>,
67    out_tx: async_channel::Sender<Output>,
68    out_rx: async_channel::Receiver<Output>,
69    terminal_id: String,
70    terminal_size: (usize, usize),
71    shutdown_timeout: Option<Duration>,
72}
73
74impl Command {
75    /// Create a new command
76    pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
77        let (in_tx, in_rx) = async_channel::bounded(BUF_SIZE);
78        let (out_tx, out_rx) = async_channel::bounded(BUF_SIZE);
79        Self {
80            pid: None,
81            program: program.as_ref().to_os_string(),
82            args: <_>::default(),
83            env: <_>::default(),
84            current_dir: None,
85            in_tx,
86            in_rx,
87            out_tx,
88            out_rx,
89            terminal_id: "screen-256color".to_string(),
90            terminal_size: (80, 24),
91            shutdown_timeout: None,
92        }
93    }
94    /// Get the sender for sending input to the command
95    pub fn in_tx(&self) -> async_channel::Sender<Input> {
96        self.in_tx.clone()
97    }
98    /// Get the output sender, useful e.g. to manually send empty output frames to the terminal handler
99    pub fn out_tx(&self) -> async_channel::Sender<Output> {
100        self.out_tx.clone()
101    }
102    /// Get the receiver for receiving output from the command
103    pub fn out_rx(&self) -> async_channel::Receiver<Output> {
104        self.out_rx.clone()
105    }
106    /// Set the terminal id
107    pub fn terminal_id<S: Into<String>>(mut self, terminal_id: S) -> Self {
108        self.terminal_id = terminal_id.into();
109        self
110    }
111    /// Set the terminal size
112    pub fn terminal_size(mut self, terminal_size: (usize, usize)) -> Self {
113        self.terminal_size = terminal_size;
114        self
115    }
116    /// Set the shutdown timeout to collect the command output
117    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
118        self.shutdown_timeout = Some(timeout);
119        self
120    }
121    /// Set the program arguments
122    pub fn args<I, S>(mut self, args: I) -> Self
123    where
124        I: IntoIterator<Item = S>,
125        S: AsRef<OsStr>,
126    {
127        self.args = args
128            .into_iter()
129            .map(|s| s.as_ref().to_os_string())
130            .collect();
131        self
132    }
133    /// Add a program argument
134    pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
135        self.args.push(arg.as_ref().to_os_string());
136        self
137    }
138    /// Set the environment variables
139    pub fn envs<I, K, V>(mut self, env: I) -> Self
140    where
141        I: IntoIterator<Item = (K, V)>,
142        K: AsRef<OsStr>,
143        V: AsRef<OsStr>,
144    {
145        self.env = env
146            .into_iter()
147            .map(|(k, v)| (k.as_ref().to_os_string(), v.as_ref().to_os_string()))
148            .collect();
149        self
150    }
151    /// Add an environment variable
152    pub fn env<K: AsRef<OsStr>, V: AsRef<OsStr>>(mut self, key: K, value: V) -> Self {
153        self.env
154            .insert(key.as_ref().to_os_string(), value.as_ref().to_os_string());
155        self
156    }
157    /// Set the working directory
158    pub fn current_dir<P: AsRef<Path>>(mut self, current_dir: P) -> Self {
159        self.current_dir = Some(current_dir.as_ref().to_path_buf());
160        self
161    }
162    /// Run the command
163    pub async fn run(self) {
164        let out_tx = self.out_tx.clone();
165        match self.run_subprocess().await {
166            Ok(v) => {
167                out_tx.send(Output::Terminated(v)).await.ok();
168            }
169            Err(e) => {
170                out_tx.send(Output::Error(e.to_string())).await.ok();
171            }
172        }
173    }
174
175    #[allow(clippy::too_many_lines)]
176    async fn run_subprocess(
177        mut self,
178    ) -> Result<Option<i32>, Box<dyn std::error::Error + Send + Sync + 'static>> {
179        let win_size = rustix::termios::Winsize {
180            ws_col: self.terminal_size.0.try_into()?,
181            ws_row: self.terminal_size.1.try_into()?,
182            ws_xpixel: 0,
183            ws_ypixel: 0,
184        };
185        let pty = rustix_openpty::openpty(None, Some(&win_size))?;
186        let (master, slave) = (pty.controller, pty.user);
187
188        let master_fd = master.as_raw_fd();
189        let slave_fd = slave.as_raw_fd();
190
191        if let Ok(mut termios) = rustix::termios::tcgetattr(&master) {
192            // Set character encoding to UTF-8.
193            termios
194                .input_modes
195                .set(rustix::termios::InputModes::IUTF8, true);
196            let _ = rustix::termios::tcsetattr(
197                &master,
198                rustix::termios::OptionalActions::Now,
199                &termios,
200            );
201        }
202
203        let mut builder = tokio::process::Command::new(&self.program);
204
205        if let Some(ref current_dir) = self.current_dir {
206            builder.current_dir(current_dir);
207        }
208
209        builder
210            .args(&self.args)
211            .envs(&self.env)
212            .env("COLUMNS", self.terminal_size.0.to_string())
213            .env("LINES", self.terminal_size.1.to_string())
214            .env("TERM", &self.terminal_id);
215
216        builder.stdin(slave.try_clone()?);
217        builder.stderr(slave.try_clone()?);
218        builder.stdout(slave);
219
220        unsafe {
221            builder.pre_exec(move || {
222                let err = libc::setsid();
223                if err == -1 {
224                    return Err(std::io::Error::new(
225                        std::io::ErrorKind::Other,
226                        "Failed to set session id",
227                    ));
228                }
229
230                let res = libc::ioctl(slave_fd, TIOCSCTTY as _, 0);
231                if res == -1 {
232                    return Err(std::io::Error::new(
233                        std::io::ErrorKind::Other,
234                        format!("Failed to set controlling terminal: {}", res),
235                    ));
236                }
237
238                libc::close(slave_fd);
239                libc::close(master_fd);
240
241                libc::signal(libc::SIGCHLD, libc::SIG_DFL);
242                libc::signal(libc::SIGHUP, libc::SIG_DFL);
243                libc::signal(libc::SIGINT, libc::SIG_DFL);
244                libc::signal(libc::SIGQUIT, libc::SIG_DFL);
245                libc::signal(libc::SIGTERM, libc::SIG_DFL);
246                libc::signal(libc::SIGALRM, libc::SIG_DFL);
247
248                Ok(())
249            });
250        }
251
252        let mut child = builder.spawn()?;
253
254        let pid = child.id().ok_or("unable to get child pid")?;
255
256        self.out_tx.send(Output::Pid(pid)).await?;
257        self.pid = Some(pid);
258
259        let mut stdout = File::from_std(std::fs::File::from(master));
260
261        let mut stdin = stdout.try_clone().await?;
262
263        let tx_stdout = self.out_tx.clone();
264
265        let fut_out = tokio::spawn(async move {
266            let mut buf = [0u8; BUF_SIZE];
267            while let Ok(b) = stdout.read(&mut buf).await {
268                if b == 0 {
269                    break;
270                }
271                if tx_stdout
272                    .send(Output::Stdout(buf[..b].to_vec()))
273                    .await
274                    .is_err()
275                {
276                    break;
277                }
278            }
279        });
280
281        let shutdown_timeout = self.shutdown_timeout;
282
283        let fut_in = tokio::spawn(async move {
284            while let Ok(input) = self.in_rx.recv().await {
285                let mut data = match input {
286                    Input::Data(d) => d,
287                    Input::Resize(size) => {
288                        set_term_size(stdin.as_raw_fd(), size).ok();
289                        bmart::process::kill_pstree_with_signal(
290                            pid,
291                            bmart::process::Signal::SIGWINCH,
292                            true,
293                        );
294                        continue;
295                    }
296                    Input::Terminate => {
297                        break;
298                    }
299                };
300                // TODO: remove this input hack
301                if data == [0x0a] {
302                    data[0] = 0x0d;
303                }
304                if stdin.write_all(&data).await.is_err() {
305                    break;
306                }
307            }
308        });
309
310        let result = child.wait().await?;
311
312        // TODO: fix this
313        if let Some(t) = shutdown_timeout {
314            for _ in 0..10 {
315                if fut_out.is_finished() {
316                    break;
317                }
318                tokio::time::sleep(t / 10).await;
319            }
320        }
321
322        fut_out.abort();
323        fut_in.abort();
324
325        let exit_code = result.code();
326
327        Ok(exit_code)
328    }
329}
330
331impl Drop for Command {
332    fn drop(&mut self) {
333        if let Some(pid) = self.pid {
334            tokio::spawn(bmart::process::kill_pstree(
335                pid,
336                Some(Duration::from_secs(1)),
337                true,
338            ));
339        }
340    }
341}