microsandbox_core/runtime/
monitor.rs1use std::{
2 io::{Read, Write},
3 os::fd::BorrowedFd,
4 path::{Path, PathBuf},
5};
6
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use microsandbox_utils::{
10 ChildIo, MicrosandboxUtilsError, MicrosandboxUtilsResult, ProcessMonitor, RotatingLog,
11 LOG_SUFFIX,
12};
13use sqlx::{Pool, Sqlite};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15
16use crate::{management::db, vm::Rootfs, MicrosandboxResult};
17
18pub const SANDBOX_STATUS_RUNNING: &str = "RUNNING";
24
25pub const SANDBOX_STATUS_STOPPED: &str = "STOPPED";
27
28pub struct MicroVmMonitor {
34 sandbox_db: Pool<Sqlite>,
36
37 sandbox_name: String,
39
40 config_file: String,
42
43 config_last_modified: DateTime<Utc>,
45
46 supervisor_pid: u32,
48
49 log_path: Option<PathBuf>,
51
52 log_dir: PathBuf,
54
55 rootfs: Rootfs,
57
58 original_term: Option<nix::sys::termios::Termios>,
60
61 forward_output: bool,
63}
64
65impl MicroVmMonitor {
70 pub async fn new(
72 supervisor_pid: u32,
73 sandbox_db_path: impl AsRef<Path>,
74 sandbox_name: String,
75 config_file: String,
76 config_last_modified: DateTime<Utc>,
77 log_dir: impl Into<PathBuf>,
78 rootfs: Rootfs,
79 forward_output: bool,
80 ) -> MicrosandboxResult<Self> {
81 Ok(Self {
82 supervisor_pid,
83 sandbox_db: db::get_pool(sandbox_db_path.as_ref()).await?,
84 sandbox_name,
85 config_file,
86 config_last_modified,
87 log_path: None,
88 log_dir: log_dir.into(),
89 rootfs,
90 original_term: None,
91 forward_output,
92 })
93 }
94
95 fn restore_terminal_settings(&mut self) {
96 if let Some(original_term) = self.original_term.take() {
97 if let Err(e) = nix::sys::termios::tcsetattr(
98 unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) },
99 nix::sys::termios::SetArg::TCSANOW,
100 &original_term,
101 ) {
102 tracing::warn!(error = %e, "failed to restore terminal settings in restore_terminal_settings");
103 }
104 }
105 }
106
107 fn generate_log_path(&self) -> PathBuf {
110 let config_dir = self.log_dir.join(&self.config_file);
112 config_dir.join(format!("{}.{}", self.sandbox_name, LOG_SUFFIX))
114 }
115}
116
117#[async_trait]
122impl ProcessMonitor for MicroVmMonitor {
123 async fn start(&mut self, pid: u32, child_io: ChildIo) -> MicrosandboxUtilsResult<()> {
124 let log_path = self.generate_log_path();
126
127 if let Some(parent) = log_path.parent() {
129 tokio::fs::create_dir_all(parent).await?;
130 }
131
132 let microvm_log =
133 std::sync::Arc::new(tokio::sync::Mutex::new(RotatingLog::new(&log_path).await?));
134 let microvm_pid = pid;
135
136 self.log_path = Some(log_path);
137
138 let rootfs_paths = match &self.rootfs {
140 Rootfs::Native(path) => format!("native:{}", path.to_string_lossy().into_owned()),
141 Rootfs::Overlayfs(paths) => format!(
142 "overlayfs:{}",
143 paths
144 .iter()
145 .map(|p| p.to_string_lossy().into_owned())
146 .collect::<Vec<String>>()
147 .join(":")
148 ),
149 };
150
151 db::save_or_update_sandbox(
153 &self.sandbox_db,
154 &self.sandbox_name,
155 &self.config_file,
156 &self.config_last_modified,
157 SANDBOX_STATUS_RUNNING,
158 self.supervisor_pid,
159 microvm_pid,
160 &rootfs_paths,
161 None,
162 None,
163 )
164 .await
165 .map_err(MicrosandboxUtilsError::custom)?;
166
167 match child_io {
168 ChildIo::Piped {
169 stdin,
170 stdout,
171 stderr,
172 } => {
173 if let Some(mut stdout) = stdout {
175 let log = microvm_log.clone();
176 let forward_output = self.forward_output;
177 tokio::spawn(async move {
178 let mut buf = [0u8; 8192]; while let Ok(n) = stdout.read(&mut buf).await {
180 if n == 0 {
181 break;
182 }
183 let mut log_guard = log.lock().await;
185 if let Err(e) = log_guard.write_all(&buf[..n]).await {
186 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm stdout log");
187 }
188 if let Err(e) = log_guard.flush().await {
189 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm stdout log");
190 }
191
192 if forward_output {
194 print!("{}", String::from_utf8_lossy(&buf[..n]));
195 if let Err(e) = std::io::stdout().flush() {
197 tracing::warn!(error = %e, "failed to flush parent stdout");
198 }
199 }
200 }
201 });
202 }
203
204 if let Some(mut stderr) = stderr {
206 let log = microvm_log.clone();
207 let forward_output = self.forward_output;
208 tokio::spawn(async move {
209 let mut buf = [0u8; 8192]; while let Ok(n) = stderr.read(&mut buf).await {
211 if n == 0 {
212 break;
213 }
214 let mut log_guard = log.lock().await;
216 if let Err(e) = log_guard.write_all(&buf[..n]).await {
217 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm stderr log");
218 }
219 if let Err(e) = log_guard.flush().await {
220 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm stderr log");
221 }
222
223 if forward_output {
225 eprint!("{}", String::from_utf8_lossy(&buf[..n]));
226 if let Err(e) = std::io::stderr().flush() {
228 tracing::warn!(error = %e, "failed to flush parent stderr");
229 }
230 }
231 }
232 });
233 }
234
235 if let Some(mut child_stdin) = stdin {
237 tokio::spawn(async move {
238 let mut parent_stdin = tokio::io::stdin();
239 if let Err(e) = tokio::io::copy(&mut parent_stdin, &mut child_stdin).await {
240 tracing::warn!(error = %e, "failed to copy parent stdin to child stdin");
241 }
242 });
243 }
244 }
245 ChildIo::TTY {
246 master_read,
247 mut master_write,
248 } => {
249 let term = nix::sys::termios::tcgetattr(unsafe {
252 BorrowedFd::borrow_raw(libc::STDIN_FILENO)
253 })?;
254 self.original_term = Some(term.clone());
255 let mut raw_term = term.clone();
256 nix::sys::termios::cfmakeraw(&mut raw_term);
257 nix::sys::termios::tcsetattr(
258 unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) },
259 nix::sys::termios::SetArg::TCSANOW,
260 &raw_term,
261 )?;
262
263 let log = microvm_log.clone();
265 let forward_output = self.forward_output;
266 tokio::spawn(async move {
267 let mut buf = [0u8; 1024];
268 loop {
269 let mut read_guard = match master_read.readable().await {
270 Ok(guard) => guard,
271 Err(e) => {
272 tracing::warn!(error = %e, "error waiting for master fd to become readable");
273 break;
274 }
275 };
276
277 match read_guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
278 Ok(Ok(0)) => break, Ok(Ok(n)) => {
280 let mut log_guard = log.lock().await;
282 if let Err(e) = log_guard.write_all(&buf[..n]).await {
283 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm tty log");
284 }
285 if let Err(e) = log_guard.flush().await {
286 tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm tty log");
287 }
288
289 if forward_output {
291 print!("{}", String::from_utf8_lossy(&buf[..n]));
292 std::io::stdout().flush().ok();
294 }
295 }
296 Ok(Err(e)) => {
297 tracing::warn!(error = %e, "error reading from master fd");
298 break;
299 }
300 Err(_) => continue,
301 }
302 }
303 });
304
305 tokio::spawn(async move {
307 let mut stdin = tokio::io::stdin();
308 if let Err(e) = tokio::io::copy(&mut stdin, &mut master_write).await {
309 tracing::warn!(error = %e, "error copying stdin to master fd");
310 }
311 });
312 }
313 }
314
315 Ok(())
316 }
317
318 async fn stop(&mut self) -> MicrosandboxUtilsResult<()> {
319 self.restore_terminal_settings();
321
322 db::update_sandbox_status(
324 &self.sandbox_db,
325 &self.sandbox_name,
326 &self.config_file,
327 SANDBOX_STATUS_STOPPED,
328 )
329 .await
330 .map_err(MicrosandboxUtilsError::custom)?;
331
332 self.log_path = None;
334
335 Ok(())
336 }
337}
338
339impl Drop for MicroVmMonitor {
340 fn drop(&mut self) {
341 self.restore_terminal_settings();
342 }
343}