microsandbox_core/runtime/
monitor.rs

1use std::{
2    io::{Read, Write},
3    os::fd::BorrowedFd,
4    path::{Path, PathBuf},
5};
6
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use microsandbox_utils::{
10    ChildIo, MicrosandboxUtilsError, MicrosandboxUtilsResult, ProcessMonitor, RotatingLog,
11    LOG_SUFFIX,
12};
13use sqlx::{Pool, Sqlite};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15
16use crate::{management::db, vm::Rootfs, MicrosandboxResult};
17
18//--------------------------------------------------------------------------------------------------
19// Constants
20//--------------------------------------------------------------------------------------------------
21
22/// The status of a sandbox when it is running
23pub const SANDBOX_STATUS_RUNNING: &str = "RUNNING";
24
25/// The status of a sandbox when it is stopped
26pub const SANDBOX_STATUS_STOPPED: &str = "STOPPED";
27
28//--------------------------------------------------------------------------------------------------
29// Types
30//--------------------------------------------------------------------------------------------------
31
32/// A process monitor for MicroVMs
33pub struct MicroVmMonitor {
34    /// The database for tracking sandbox metrics and metadata
35    sandbox_db: Pool<Sqlite>,
36
37    /// The name of the sandbox
38    sandbox_name: String,
39
40    /// The config file for the sandbox
41    config_file: String,
42
43    /// The last modified timestamp of the config file
44    config_last_modified: DateTime<Utc>,
45
46    /// The supervisor PID
47    supervisor_pid: u32,
48
49    /// The MicroVM log path
50    log_path: Option<PathBuf>,
51
52    /// The log directory
53    log_dir: PathBuf,
54
55    /// The root filesystem
56    rootfs: Rootfs,
57
58    /// original terminal settings for STDIN (set in TTY mode)
59    original_term: Option<nix::sys::termios::Termios>,
60
61    /// Whether to forward output to stdout/stderr
62    forward_output: bool,
63}
64
65//--------------------------------------------------------------------------------------------------
66// Methods
67//--------------------------------------------------------------------------------------------------
68
69impl MicroVmMonitor {
70    /// Create a new MicroVM monitor
71    pub async fn new(
72        supervisor_pid: u32,
73        sandbox_db_path: impl AsRef<Path>,
74        sandbox_name: String,
75        config_file: String,
76        config_last_modified: DateTime<Utc>,
77        log_dir: impl Into<PathBuf>,
78        rootfs: Rootfs,
79        forward_output: bool,
80    ) -> MicrosandboxResult<Self> {
81        Ok(Self {
82            supervisor_pid,
83            sandbox_db: db::get_pool(sandbox_db_path.as_ref()).await?,
84            sandbox_name,
85            config_file,
86            config_last_modified,
87            log_path: None,
88            log_dir: log_dir.into(),
89            rootfs,
90            original_term: None,
91            forward_output,
92        })
93    }
94
95    fn restore_terminal_settings(&mut self) {
96        if let Some(original_term) = self.original_term.take() {
97            if let Err(e) = nix::sys::termios::tcsetattr(
98                unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) },
99                nix::sys::termios::SetArg::TCSANOW,
100                &original_term,
101            ) {
102                tracing::warn!(error = %e, "failed to restore terminal settings in restore_terminal_settings");
103            }
104        }
105    }
106
107    /// Generate a hierarchical log path with the format: <log_dir>/<config_file>/<sandbox_name>.<LOG_SUFFIX>
108    /// This creates a directory structure that namespaces logs by config file and sandbox name.
109    fn generate_log_path(&self) -> PathBuf {
110        // Create a directory for the config file
111        let config_dir = self.log_dir.join(&self.config_file);
112        // Place the log file inside that directory with the sandbox name
113        config_dir.join(format!("{}.{}", self.sandbox_name, LOG_SUFFIX))
114    }
115}
116
117//--------------------------------------------------------------------------------------------------
118// Trait Implementations
119//--------------------------------------------------------------------------------------------------
120
121#[async_trait]
122impl ProcessMonitor for MicroVmMonitor {
123    async fn start(&mut self, pid: u32, child_io: ChildIo) -> MicrosandboxUtilsResult<()> {
124        // Generate the log path with directory-level separation
125        let log_path = self.generate_log_path();
126
127        // Ensure the parent directory exists
128        if let Some(parent) = log_path.parent() {
129            tokio::fs::create_dir_all(parent).await?;
130        }
131
132        let microvm_log =
133            std::sync::Arc::new(tokio::sync::Mutex::new(RotatingLog::new(&log_path).await?));
134        let microvm_pid = pid;
135
136        self.log_path = Some(log_path);
137
138        // Get rootfs paths
139        let rootfs_paths = match &self.rootfs {
140            Rootfs::Native(path) => format!("native:{}", path.to_string_lossy().into_owned()),
141            Rootfs::Overlayfs(paths) => format!(
142                "overlayfs:{}",
143                paths
144                    .iter()
145                    .map(|p| p.to_string_lossy().into_owned())
146                    .collect::<Vec<String>>()
147                    .join(":")
148            ),
149        };
150
151        // Insert sandbox entry into database
152        db::save_or_update_sandbox(
153            &self.sandbox_db,
154            &self.sandbox_name,
155            &self.config_file,
156            &self.config_last_modified,
157            SANDBOX_STATUS_RUNNING,
158            self.supervisor_pid,
159            microvm_pid,
160            &rootfs_paths,
161            None,
162            None,
163        )
164        .await
165        .map_err(MicrosandboxUtilsError::custom)?;
166
167        match child_io {
168            ChildIo::Piped {
169                stdin,
170                stdout,
171                stderr,
172            } => {
173                // Handle stdout logging
174                if let Some(mut stdout) = stdout {
175                    let log = microvm_log.clone();
176                    let forward_output = self.forward_output;
177                    tokio::spawn(async move {
178                        let mut buf = [0u8; 8192]; // NOTE(appcypher): Using 8192 as buffer size because ChatGPT recommended it lol
179                        while let Ok(n) = stdout.read(&mut buf).await {
180                            if n == 0 {
181                                break;
182                            }
183                            // Write to log file
184                            let mut log_guard = log.lock().await;
185                            if let Err(e) = log_guard.write_all(&buf[..n]).await {
186                                tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm stdout log");
187                            }
188                            if let Err(e) = log_guard.flush().await {
189                                tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm stdout log");
190                            }
191
192                            // Also forward to parent's stdout if enabled
193                            if forward_output {
194                                print!("{}", String::from_utf8_lossy(&buf[..n]));
195                                // Flush stdout in case data is buffered
196                                if let Err(e) = std::io::stdout().flush() {
197                                    tracing::warn!(error = %e, "failed to flush parent stdout");
198                                }
199                            }
200                        }
201                    });
202                }
203
204                // Handle stderr logging
205                if let Some(mut stderr) = stderr {
206                    let log = microvm_log.clone();
207                    let forward_output = self.forward_output;
208                    tokio::spawn(async move {
209                        let mut buf = [0u8; 8192]; // NOTE(appcypher): Using 8192 as buffer size because ChatGPT recommended it lol
210                        while let Ok(n) = stderr.read(&mut buf).await {
211                            if n == 0 {
212                                break;
213                            }
214                            // Write to log file
215                            let mut log_guard = log.lock().await;
216                            if let Err(e) = log_guard.write_all(&buf[..n]).await {
217                                tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm stderr log");
218                            }
219                            if let Err(e) = log_guard.flush().await {
220                                tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm stderr log");
221                            }
222
223                            // Also forward to parent's stderr if enabled
224                            if forward_output {
225                                eprint!("{}", String::from_utf8_lossy(&buf[..n]));
226                                // Flush stderr in case data is buffered
227                                if let Err(e) = std::io::stderr().flush() {
228                                    tracing::warn!(error = %e, "failed to flush parent stderr");
229                                }
230                            }
231                        }
232                    });
233                }
234
235                // Handle stdin streaming from parent to child
236                if let Some(mut child_stdin) = stdin {
237                    tokio::spawn(async move {
238                        let mut parent_stdin = tokio::io::stdin();
239                        if let Err(e) = tokio::io::copy(&mut parent_stdin, &mut child_stdin).await {
240                            tracing::warn!(error = %e, "failed to copy parent stdin to child stdin");
241                        }
242                    });
243                }
244            }
245            ChildIo::TTY {
246                master_read,
247                mut master_write,
248            } => {
249                // Handle TTY I/O
250                // Put terminal in raw mode
251                let term = nix::sys::termios::tcgetattr(unsafe {
252                    BorrowedFd::borrow_raw(libc::STDIN_FILENO)
253                })?;
254                self.original_term = Some(term.clone());
255                let mut raw_term = term.clone();
256                nix::sys::termios::cfmakeraw(&mut raw_term);
257                nix::sys::termios::tcsetattr(
258                    unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) },
259                    nix::sys::termios::SetArg::TCSANOW,
260                    &raw_term,
261                )?;
262
263                // Spawn async task to read from the master
264                let log = microvm_log.clone();
265                let forward_output = self.forward_output;
266                tokio::spawn(async move {
267                    let mut buf = [0u8; 1024];
268                    loop {
269                        let mut read_guard = match master_read.readable().await {
270                            Ok(guard) => guard,
271                            Err(e) => {
272                                tracing::warn!(error = %e, "error waiting for master fd to become readable");
273                                break;
274                            }
275                        };
276
277                        match read_guard.try_io(|inner| inner.get_ref().read(&mut buf)) {
278                            Ok(Ok(0)) => break, // EOF reached.
279                            Ok(Ok(n)) => {
280                                // Write to log file
281                                let mut log_guard = log.lock().await;
282                                if let Err(e) = log_guard.write_all(&buf[..n]).await {
283                                    tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to write to microvm tty log");
284                                }
285                                if let Err(e) = log_guard.flush().await {
286                                    tracing::error!(microvm_pid = microvm_pid, error = %e, "failed to flush microvm tty log");
287                                }
288
289                                // Print the output from the child process if enabled
290                                if forward_output {
291                                    print!("{}", String::from_utf8_lossy(&buf[..n]));
292                                    // flush stdout in case data is buffered
293                                    std::io::stdout().flush().ok();
294                                }
295                            }
296                            Ok(Err(e)) => {
297                                tracing::warn!(error = %e, "error reading from master fd");
298                                break;
299                            }
300                            Err(_) => continue,
301                        }
302                    }
303                });
304
305                // Spawn async task to copy parent's stdin to the master
306                tokio::spawn(async move {
307                    let mut stdin = tokio::io::stdin();
308                    if let Err(e) = tokio::io::copy(&mut stdin, &mut master_write).await {
309                        tracing::warn!(error = %e, "error copying stdin to master fd");
310                    }
311                });
312            }
313        }
314
315        Ok(())
316    }
317
318    async fn stop(&mut self) -> MicrosandboxUtilsResult<()> {
319        // Restore terminal settings if they were modified
320        self.restore_terminal_settings();
321
322        // Update sandbox status to stopped
323        db::update_sandbox_status(
324            &self.sandbox_db,
325            &self.sandbox_name,
326            &self.config_file,
327            SANDBOX_STATUS_STOPPED,
328        )
329        .await
330        .map_err(MicrosandboxUtilsError::custom)?;
331
332        // Reset the log path
333        self.log_path = None;
334
335        Ok(())
336    }
337}
338
339impl Drop for MicroVmMonitor {
340    fn drop(&mut self) {
341        self.restore_terminal_settings();
342    }
343}