microsandbox_utils/runtime/
supervisor.rs1use nix::{
2 fcntl::{fcntl, FcntlArg, OFlag},
3 pty::openpty,
4 unistd::Pid,
5};
6use std::{
7 os::unix::io::{AsRawFd, FromRawFd, IntoRawFd},
8 path::PathBuf,
9 process::Stdio,
10};
11use tokio::{
12 fs::{create_dir_all, File},
13 io::unix::AsyncFd,
14 process::Command,
15 signal::unix::{signal, SignalKind},
16};
17
18use crate::{
19 path::SUPERVISOR_LOG_FILENAME, term, ChildIo, MicrosandboxUtilsResult, ProcessMonitor,
20 RotatingLog,
21};
22
23pub struct Supervisor<M>
29where
30 M: ProcessMonitor + Send,
31{
32 child_exe: PathBuf,
34
35 child_args: Vec<String>,
37
38 child_pid: Option<u32>,
40
41 child_envs: Vec<(String, String)>,
43
44 log_dir: PathBuf,
46
47 process_monitor: M,
49}
50
51impl<M> Supervisor<M>
56where
57 M: ProcessMonitor + Send,
58{
59 pub fn new(
69 child_exe: impl Into<PathBuf>,
70 child_args: impl IntoIterator<Item = impl Into<String>>,
71 child_envs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
72 log_dir: impl Into<PathBuf>,
73 process_monitor: M,
74 ) -> Self {
75 Self {
76 child_exe: child_exe.into(),
77 child_args: child_args.into_iter().map(Into::into).collect(),
78 child_envs: child_envs
79 .into_iter()
80 .map(|(k, v)| (k.into(), v.into()))
81 .collect(),
82 child_pid: None,
83 log_dir: log_dir.into(),
84 process_monitor,
85 }
86 }
87
88 pub async fn start(&mut self) -> MicrosandboxUtilsResult<()> {
95 create_dir_all(&self.log_dir).await?;
97
98 let _supervisor_log = RotatingLog::new(self.log_dir.join(SUPERVISOR_LOG_FILENAME)).await?;
100
101 let (mut child, child_io) = if term::is_interactive_terminal() {
103 tracing::info!("running in an interactive terminal");
104 let pty = openpty(None, None)?;
106 let master_fd = pty.master.as_raw_fd();
107 {
108 let flags = OFlag::from_bits_truncate(fcntl(master_fd, FcntlArg::F_GETFL)?);
109 let new_flags = flags | OFlag::O_NONBLOCK;
110 fcntl(master_fd, FcntlArg::F_SETFL(new_flags))?;
111 }
112
113 let slave_in = pty.slave.try_clone()?;
115 let slave_out = pty.slave.try_clone()?;
116 let slave_err = pty.slave;
117
118 let mut command = Command::new(&self.child_exe);
120 command
121 .args(&self.child_args)
122 .envs(self.child_envs.iter().map(|(k, v)| (k, v)))
123 .stdin(Stdio::from(slave_in))
124 .stdout(Stdio::from(slave_out))
125 .stderr(Stdio::from(slave_err));
126
127 unsafe {
129 command.pre_exec(|| {
130 nix::unistd::setsid()
131 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
132 if libc::ioctl(libc::STDIN_FILENO, libc::TIOCSCTTY as _, 1 as libc::c_long) < 0
133 {
134 return Err(std::io::Error::last_os_error());
135 }
136 Ok(())
137 });
138 }
139
140 let child = command.spawn()?;
141
142 let master_fd_owned = pty.master;
144 let master_write_fd = nix::unistd::dup(master_fd_owned.as_raw_fd())?;
145 let master_read_file =
146 unsafe { std::fs::File::from_raw_fd(master_fd_owned.into_raw_fd()) };
147 let master_write_file = unsafe { std::fs::File::from_raw_fd(master_write_fd) };
148
149 let master_read = AsyncFd::new(master_read_file)?;
150 let master_write = File::from_std(master_write_file);
151
152 let child_io = ChildIo::TTY {
154 master_read,
155 master_write,
156 };
157
158 (child, child_io)
159 } else {
160 tracing::info!("running in a non-interactive terminal");
161 let mut child = Command::new(&self.child_exe)
163 .args(&self.child_args)
164 .envs(self.child_envs.iter().map(|(k, v)| (k, v)))
165 .stdin(Stdio::piped())
166 .stdout(Stdio::piped())
167 .stderr(Stdio::piped())
168 .spawn()?;
169
170 let stdin = child.stdin.take();
172 let stdout = child.stdout.take();
173 let stderr = child.stderr.take();
174
175 let child_io = ChildIo::Piped {
177 stdin,
178 stdout,
179 stderr,
180 };
181
182 (child, child_io)
183 };
184
185 let child_pid = child.id().expect("failed to get child process id");
186 self.child_pid = Some(child_pid);
187
188 self.process_monitor.start(child_pid, child_io).await?;
190
191 let mut sigterm = signal(SignalKind::terminate())?;
193 let mut sigint = signal(SignalKind::interrupt())?;
194
195 tokio::select! {
197 status = child.wait() => {
198 self.process_monitor.stop().await?;
200
201 tracing::info!("child process {} exited", child_pid);
202
203 if status.is_ok() {
204 if let Ok(status) = status {
205 if status.success() {
206 tracing::info!(
207 "child process {} exited successfully",
208 child_pid
209 );
210 } else {
211 tracing::error!(
212 "child process {} exited with status: {:?}",
213 child_pid,
214 status
215 );
216 }
217 }
218 } else {
219 tracing::error!(
220 "failed to wait for child process {}: {:?}",
221 child_pid,
222 status
223 );
224 }
225 }
226 _ = sigterm.recv() => {
227 self.process_monitor.stop().await?;
229
230 tracing::info!("received SIGTERM signal");
231
232 if let Some(pid) = self.child_pid.take() {
233 if let Err(e) = nix::sys::signal::kill(Pid::from_raw(pid as i32), nix::sys::signal::Signal::SIGTERM) {
234 tracing::error!(
235 "failed to send SIGTERM to process {}: {}",
236 pid,
237 e
238 );
239 }
240 }
241
242 if let Err(e) = child.wait().await {
244 tracing::error!(
245 "error waiting for child after SIGTERM: {}",
246 e
247 );
248 }
249 }
250 _ = sigint.recv() => {
251 self.process_monitor.stop().await?;
253
254 tracing::info!("received SIGINT signal");
255
256 if let Some(pid) = self.child_pid.take() {
257 if let Err(e) = nix::sys::signal::kill(Pid::from_raw(pid as i32), nix::sys::signal::Signal::SIGTERM) {
258 tracing::error!(
259 "failed to send SIGTERM to process {}: {}",
260 pid,
261 e
262 );
263 }
264 }
265
266 if let Err(e) = child.wait().await {
268 tracing::error!(
269 "error waiting for child after SIGINT: {}",
270 e
271 );
272 }
273 }
274 }
275
276 self.child_pid = None;
277
278 Ok(())
279 }
280}