microsandbox_utils/runtime/
supervisor.rs

1use nix::{
2    fcntl::{fcntl, FcntlArg, OFlag},
3    pty::openpty,
4    unistd::Pid,
5};
6use std::{
7    os::unix::io::{AsRawFd, FromRawFd, IntoRawFd},
8    path::PathBuf,
9    process::Stdio,
10};
11use tokio::{
12    fs::{create_dir_all, File},
13    io::unix::AsyncFd,
14    process::Command,
15    signal::unix::{signal, SignalKind},
16};
17
18use crate::{
19    path::SUPERVISOR_LOG_FILENAME, term, ChildIo, MicrosandboxUtilsResult, ProcessMonitor,
20    RotatingLog,
21};
22
23//--------------------------------------------------------------------------------------------------
24// Types
25//--------------------------------------------------------------------------------------------------
26
27/// A supervisor that manages a child process and its logging.
28pub struct Supervisor<M>
29where
30    M: ProcessMonitor + Send,
31{
32    /// Path to the child executable
33    child_exe: PathBuf,
34
35    /// Arguments to pass to the child executable
36    child_args: Vec<String>,
37
38    /// The managed child process ID
39    child_pid: Option<u32>,
40
41    /// Environment variables for the child process
42    child_envs: Vec<(String, String)>,
43
44    /// Path to the supervisor's log directory
45    log_dir: PathBuf,
46
47    /// The metrics monitor
48    process_monitor: M,
49}
50
51//--------------------------------------------------------------------------------------------------
52// Methods
53//--------------------------------------------------------------------------------------------------
54
55impl<M> Supervisor<M>
56where
57    M: ProcessMonitor + Send,
58{
59    /// Creates a new supervisor instance.
60    ///
61    /// ## Arguments
62    ///
63    /// * `child_exe` - Path to the child executable
64    /// * `child_args` - Arguments to pass to the child executable
65    /// * `log_dir` - Path to the supervisor's log directory
66    /// * `process_monitor` - The process monitor to use
67    /// * `child_envs` - Environment variables for the child process
68    pub fn new(
69        child_exe: impl Into<PathBuf>,
70        child_args: impl IntoIterator<Item = impl Into<String>>,
71        child_envs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
72        log_dir: impl Into<PathBuf>,
73        process_monitor: M,
74    ) -> Self {
75        Self {
76            child_exe: child_exe.into(),
77            child_args: child_args.into_iter().map(Into::into).collect(),
78            child_envs: child_envs
79                .into_iter()
80                .map(|(k, v)| (k.into(), v.into()))
81                .collect(),
82            child_pid: None,
83            log_dir: log_dir.into(),
84            process_monitor,
85        }
86    }
87
88    /// Starts the supervisor and the child process.
89    ///
90    /// This method:
91    /// 1. Creates the log directory if it doesn't exist
92    /// 2. Starts the child process with appropriate IO (TTY or pipes)
93    /// 3. Passes the IO to the process monitor
94    pub async fn start(&mut self) -> MicrosandboxUtilsResult<()> {
95        // Create log directory if it doesn't exist
96        create_dir_all(&self.log_dir).await?;
97
98        // Setup supervisor's rotating log
99        let _supervisor_log = RotatingLog::new(self.log_dir.join(SUPERVISOR_LOG_FILENAME)).await?;
100
101        // Check if we're running in an interactive terminal
102        let (mut child, child_io) = if term::is_interactive_terminal() {
103            tracing::info!("running in an interactive terminal");
104            // Create a new pseudo terminal and set master to non-blocking mode
105            let pty = openpty(None, None)?;
106            let master_fd = pty.master.as_raw_fd();
107            {
108                let flags = OFlag::from_bits_truncate(fcntl(master_fd, FcntlArg::F_GETFL)?);
109                let new_flags = flags | OFlag::O_NONBLOCK;
110                fcntl(master_fd, FcntlArg::F_SETFL(new_flags))?;
111            }
112
113            // Clone the slave for stdin, stdout, and stderr
114            let slave_in = pty.slave.try_clone()?;
115            let slave_out = pty.slave.try_clone()?;
116            let slave_err = pty.slave;
117
118            // Start child process with PTY
119            let mut command = Command::new(&self.child_exe);
120            command
121                .args(&self.child_args)
122                .envs(self.child_envs.iter().map(|(k, v)| (k, v)))
123                .stdin(Stdio::from(slave_in))
124                .stdout(Stdio::from(slave_out))
125                .stderr(Stdio::from(slave_err));
126
127            // Set up child's session and controlling terminal
128            unsafe {
129                command.pre_exec(|| {
130                    nix::unistd::setsid()
131                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
132                    if libc::ioctl(libc::STDIN_FILENO, libc::TIOCSCTTY as _, 1 as libc::c_long) < 0
133                    {
134                        return Err(std::io::Error::last_os_error());
135                    }
136                    Ok(())
137                });
138            }
139
140            let child = command.spawn()?;
141
142            // Set up master file handles for asynchronous I/O
143            let master_fd_owned = pty.master;
144            let master_write_fd = nix::unistd::dup(master_fd_owned.as_raw_fd())?;
145            let master_read_file =
146                unsafe { std::fs::File::from_raw_fd(master_fd_owned.into_raw_fd()) };
147            let master_write_file = unsafe { std::fs::File::from_raw_fd(master_write_fd) };
148
149            let master_read = AsyncFd::new(master_read_file)?;
150            let master_write = File::from_std(master_write_file);
151
152            // Create the TTY ChildIO
153            let child_io = ChildIo::TTY {
154                master_read,
155                master_write,
156            };
157
158            (child, child_io)
159        } else {
160            tracing::info!("running in a non-interactive terminal");
161            // Start child process with pipes
162            let mut child = Command::new(&self.child_exe)
163                .args(&self.child_args)
164                .envs(self.child_envs.iter().map(|(k, v)| (k, v)))
165                .stdin(Stdio::piped())
166                .stdout(Stdio::piped())
167                .stderr(Stdio::piped())
168                .spawn()?;
169
170            // Take ownership of child's stdin/stdout/stderr
171            let stdin = child.stdin.take();
172            let stdout = child.stdout.take();
173            let stderr = child.stderr.take();
174
175            // Create the Piped ChildIO enum
176            let child_io = ChildIo::Piped {
177                stdin,
178                stdout,
179                stderr,
180            };
181
182            (child, child_io)
183        };
184
185        let child_pid = child.id().expect("failed to get child process id");
186        self.child_pid = Some(child_pid);
187
188        // Start monitoring
189        self.process_monitor.start(child_pid, child_io).await?;
190
191        // Setup signal handlers
192        let mut sigterm = signal(SignalKind::terminate())?;
193        let mut sigint = signal(SignalKind::interrupt())?;
194
195        // Wait for either child process to exit or signal to be received
196        tokio::select! {
197            status = child.wait() => {
198                // Stop process monitoring
199                self.process_monitor.stop().await?;
200
201                tracing::info!("child process {} exited", child_pid);
202
203                if status.is_ok() {
204                    if let Ok(status) = status {
205                        if status.success() {
206                            tracing::info!(
207                                "child process {} exited successfully",
208                                child_pid
209                            );
210                        } else {
211                            tracing::error!(
212                                "child process {} exited with status: {:?}",
213                                child_pid,
214                                status
215                            );
216                        }
217                    }
218                } else {
219                    tracing::error!(
220                        "failed to wait for child process {}: {:?}",
221                        child_pid,
222                        status
223                    );
224                }
225            }
226            _ = sigterm.recv() => {
227                // Stop process monitoring
228                self.process_monitor.stop().await?;
229
230                tracing::info!("received SIGTERM signal");
231
232                if let Some(pid) = self.child_pid.take() {
233                    if let Err(e) = nix::sys::signal::kill(Pid::from_raw(pid as i32), nix::sys::signal::Signal::SIGTERM) {
234                        tracing::error!(
235                            "failed to send SIGTERM to process {}: {}",
236                            pid,
237                            e
238                        );
239                    }
240                }
241
242                // Wait for child to exit after sending signal
243                if let Err(e) = child.wait().await {
244                    tracing::error!(
245                        "error waiting for child after SIGTERM: {}",
246                        e
247                    );
248                }
249            }
250            _ = sigint.recv() => {
251                // Stop process monitoring
252                self.process_monitor.stop().await?;
253
254                tracing::info!("received SIGINT signal");
255
256                if let Some(pid) = self.child_pid.take() {
257                    if let Err(e) = nix::sys::signal::kill(Pid::from_raw(pid as i32), nix::sys::signal::Signal::SIGTERM) {
258                        tracing::error!(
259                            "failed to send SIGTERM to process {}: {}",
260                            pid,
261                            e
262                        );
263                    }
264                }
265
266                // Wait for child to exit after sending signal
267                if let Err(e) = child.wait().await {
268                    tracing::error!(
269                        "error waiting for child after SIGINT: {}",
270                        e
271                    );
272                }
273            }
274        }
275
276        self.child_pid = None;
277
278        Ok(())
279    }
280}