Skip to main content

microsandbox_agentd/
session.rs

1//! Exec session management: spawning processes with PTY or pipe I/O.
2
3use std::{
4    ffi::CString,
5    os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
6    process::Stdio,
7};
8
9use nix::{
10    pty::openpty,
11    sys::signal::{Signal, kill},
12    unistd::Pid,
13};
14use tokio::{
15    io::{AsyncReadExt, unix::AsyncFd},
16    process::{Child, Command},
17    sync::mpsc,
18};
19
20use microsandbox_protocol::exec::ExecRequest;
21
22use crate::error::{AgentdError, AgentdResult};
23
24//--------------------------------------------------------------------------------------------------
25// Types
26//--------------------------------------------------------------------------------------------------
27
28/// An active exec session handle for sending input to a running process.
29///
30/// Output reading is handled by a background task that sends events
31/// via the `mpsc` channel provided at spawn time.
32pub struct ExecSession {
33    /// The PID of the spawned process.
34    pid: i32,
35
36    /// The PTY master fd (only for PTY mode, used for writing and resize).
37    pty_master: Option<OwnedFd>,
38
39    /// The child's stdin (only for pipe mode).
40    stdin: Option<tokio::process::ChildStdin>,
41}
42
43/// Output from a session that the agent loop should forward to the host.
44pub enum SessionOutput {
45    /// Data from stdout (or PTY master).
46    Stdout(Vec<u8>),
47
48    /// Data from stderr (pipe mode only).
49    Stderr(Vec<u8>),
50
51    /// The process has exited with the given code.
52    Exited(i32),
53
54    /// Pre-encoded frame bytes to write directly to the serial output buffer.
55    ///
56    /// Used by filesystem streaming operations that encode their own
57    /// `FsData`/`FsResponse` messages.
58    Raw(Vec<u8>),
59}
60
61//--------------------------------------------------------------------------------------------------
62// Methods
63//--------------------------------------------------------------------------------------------------
64
65impl ExecSession {
66    /// Spawns a new exec session.
67    ///
68    /// If `req.tty` is true, uses a PTY. Otherwise, uses piped stdin/stdout/stderr.
69    /// A background task is spawned to read output and send events via `tx`.
70    pub fn spawn(
71        id: u32,
72        req: &ExecRequest,
73        tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
74    ) -> AgentdResult<Self> {
75        if req.tty {
76            Self::spawn_pty(id, req, tx)
77        } else {
78            Self::spawn_pipe(id, req, tx)
79        }
80    }
81
82    /// Returns the PID of the spawned process (as u32 for the protocol).
83    pub fn pid(&self) -> u32 {
84        self.pid as u32
85    }
86
87    /// Writes data to the process's stdin (or PTY master).
88    pub async fn write_stdin(&self, data: &[u8]) -> AgentdResult<()> {
89        if let Some(ref master) = self.pty_master {
90            blocking_write_fd(master.as_raw_fd(), data).await
91        } else if let Some(ref stdin) = self.stdin {
92            blocking_write_fd(stdin.as_raw_fd(), data).await
93        } else {
94            Ok(())
95        }
96    }
97
98    /// Resizes the PTY (only applicable for TTY sessions).
99    pub fn resize(&self, rows: u16, cols: u16) -> AgentdResult<()> {
100        if let Some(ref master) = self.pty_master {
101            let ws = libc::winsize {
102                ws_row: rows,
103                ws_col: cols,
104                ws_xpixel: 0,
105                ws_ypixel: 0,
106            };
107            let ret = unsafe { libc::ioctl(master.as_raw_fd(), libc::TIOCSWINSZ, &ws) };
108            if ret < 0 {
109                return Err(std::io::Error::last_os_error().into());
110            }
111        }
112        Ok(())
113    }
114
115    /// Sends a signal to the spawned process.
116    pub fn send_signal(&self, signal: i32) -> AgentdResult<()> {
117        let sig = Signal::try_from(signal)
118            .map_err(|e| AgentdError::ExecSession(format!("invalid signal {signal}: {e}")))?;
119        kill(Pid::from_raw(self.pid), sig)?;
120        Ok(())
121    }
122
123    /// Closes the process's stdin.
124    ///
125    /// For pipe mode, drops the `ChildStdin` handle which closes the fd.
126    /// For PTY mode, this is a no-op (the PTY master stays open for output).
127    pub fn close_stdin(&mut self) {
128        self.stdin.take();
129    }
130}
131
132impl ExecSession {
133    /// Spawns a process with a PTY.
134    fn spawn_pty(
135        id: u32,
136        req: &ExecRequest,
137        tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
138    ) -> AgentdResult<Self> {
139        let pty = openpty(None, None)?;
140
141        // Set initial window size.
142        let ws = libc::winsize {
143            ws_row: req.rows,
144            ws_col: req.cols,
145            ws_xpixel: 0,
146            ws_ypixel: 0,
147        };
148        let ret = unsafe { libc::ioctl(pty.master.as_raw_fd(), libc::TIOCSWINSZ, &ws) };
149        if ret < 0 {
150            return Err(std::io::Error::last_os_error().into());
151        }
152
153        let slave_fd = pty.slave.as_raw_fd();
154
155        // Pre-build all strings before fork to avoid allocating in the child.
156        let c_cmd = CString::new(req.cmd.as_str())
157            .map_err(|e| AgentdError::ExecSession(format!("invalid command: {e}")))?;
158        let mut c_args: Vec<CString> = vec![c_cmd.clone()];
159        for arg in &req.args {
160            c_args.push(
161                CString::new(arg.as_str())
162                    .map_err(|e| AgentdError::ExecSession(format!("invalid arg: {e}")))?,
163            );
164        }
165
166        // Build argv pointer array (null-terminated).
167        let argv_ptrs: Vec<*const libc::c_char> = c_args
168            .iter()
169            .map(|s| s.as_ptr())
170            .chain(std::iter::once(std::ptr::null()))
171            .collect();
172
173        // Pre-parse environment variables into CStrings.
174        let c_env: Vec<(CString, CString)> = req
175            .env
176            .iter()
177            .filter_map(|var| {
178                let (key, val) = var.split_once('=')?;
179                let k = CString::new(key).ok()?;
180                let v = CString::new(val).ok()?;
181                Some((k, v))
182            })
183            .collect();
184
185        // Pre-build cwd CString.
186        let c_cwd = req
187            .cwd
188            .as_ref()
189            .map(|dir| CString::new(dir.as_str()))
190            .transpose()
191            .map_err(|e| AgentdError::ExecSession(format!("invalid cwd: {e}")))?;
192
193        // Pre-parse rlimits before fork (no allocations in child).
194        let parsed_rlimits = parse_rlimits(req);
195
196        // Fork.
197        let pid = unsafe { libc::fork() };
198        if pid < 0 {
199            return Err(std::io::Error::last_os_error().into());
200        }
201
202        #[allow(unreachable_code)]
203        if pid == 0 {
204            // Child process — only async-signal-safe operations from here.
205            drop(pty.master);
206
207            // Create new session.
208            if unsafe { libc::setsid() } < 0 {
209                unsafe { libc::_exit(1) };
210            }
211
212            // Set controlling terminal.
213            #[cfg(target_os = "linux")]
214            let tiocsctty = libc::TIOCSCTTY;
215            #[cfg(target_os = "macos")]
216            let tiocsctty: libc::c_ulong = libc::TIOCSCTTY.into();
217
218            if unsafe { libc::ioctl(slave_fd, tiocsctty, 0) } < 0 {
219                unsafe { libc::_exit(1) };
220            }
221
222            // Dup slave to stdin/stdout/stderr.
223            unsafe {
224                if libc::dup2(slave_fd, 0) < 0 {
225                    libc::_exit(1);
226                }
227                if libc::dup2(slave_fd, 1) < 0 {
228                    libc::_exit(1);
229                }
230                if libc::dup2(slave_fd, 2) < 0 {
231                    libc::_exit(1);
232                }
233                if slave_fd > 2 {
234                    libc::close(slave_fd);
235                }
236            }
237
238            // Set environment variables using pre-built CStrings.
239            for (key, val) in &c_env {
240                unsafe {
241                    libc::setenv(key.as_ptr(), val.as_ptr(), 1);
242                }
243            }
244
245            // Set working directory.
246            if let Some(ref dir) = c_cwd {
247                unsafe {
248                    libc::chdir(dir.as_ptr());
249                }
250            }
251
252            // Apply resource limits.
253            for (resource, limit) in &parsed_rlimits {
254                if unsafe { libc::setrlimit(*resource as _, limit) } != 0 {
255                    unsafe { libc::_exit(1) };
256                }
257            }
258
259            // execvp — on success this never returns.
260            unsafe {
261                libc::execvp(argv_ptrs[0], argv_ptrs.as_ptr());
262            }
263
264            // If execvp returns, it failed.
265            unsafe { libc::_exit(127) };
266        }
267
268        // Parent process.
269        drop(pty.slave);
270
271        // Dup the master fd for the reader task.
272        let reader_fd = unsafe { libc::dup(pty.master.as_raw_fd()) };
273        if reader_fd < 0 {
274            return Err(std::io::Error::last_os_error().into());
275        }
276        let reader_fd = unsafe { OwnedFd::from_raw_fd(reader_fd) };
277
278        // Spawn background reader task.
279        tokio::spawn(pty_reader_task(id, pid, reader_fd, tx));
280
281        Ok(Self {
282            pid,
283            pty_master: Some(pty.master),
284            stdin: None,
285        })
286    }
287
288    /// Spawns a process with piped stdio.
289    fn spawn_pipe(
290        id: u32,
291        req: &ExecRequest,
292        tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
293    ) -> AgentdResult<Self> {
294        let mut cmd = Command::new(&req.cmd);
295        cmd.args(&req.args)
296            .stdin(Stdio::piped())
297            .stdout(Stdio::piped())
298            .stderr(Stdio::piped());
299
300        for var in &req.env {
301            if let Some((key, val)) = var.split_once('=') {
302                cmd.env(key, val);
303            }
304        }
305
306        if let Some(ref dir) = req.cwd {
307            cmd.current_dir(dir);
308        }
309
310        // Apply resource limits in the child before exec.
311        let parsed_rlimits = parse_rlimits(req);
312        if !parsed_rlimits.is_empty() {
313            unsafe {
314                cmd.pre_exec(move || {
315                    for (resource, limit) in &parsed_rlimits {
316                        if libc::setrlimit(*resource as _, limit) != 0 {
317                            return Err(std::io::Error::last_os_error());
318                        }
319                    }
320                    Ok(())
321                });
322            }
323        }
324
325        let mut child = cmd.spawn()?;
326        let pid = child.id().unwrap_or(0) as i32;
327        let stdin = child.stdin.take();
328        let stdout = child.stdout.take();
329        let stderr = child.stderr.take();
330
331        // Spawn background reader task.
332        tokio::spawn(pipe_reader_task(id, child, stdout, stderr, tx));
333
334        Ok(Self {
335            pid,
336            pty_master: None,
337            stdin,
338        })
339    }
340}
341
342//--------------------------------------------------------------------------------------------------
343// Functions
344//--------------------------------------------------------------------------------------------------
345
346/// Parses a resource limit name into the corresponding `RLIMIT_*` constant.
347///
348/// Uses raw constants for Linux-specific limits that aren't in libc's cross-platform API.
349fn parse_rlimit_resource(name: &str) -> Option<libc::c_int> {
350    // Linux x86_64 RLIMIT_* values for resources not exposed by libc on all platforms.
351    const RLIMIT_LOCKS: libc::c_int = 10;
352    const RLIMIT_SIGPENDING: libc::c_int = 11;
353    const RLIMIT_MSGQUEUE: libc::c_int = 12;
354    const RLIMIT_NICE: libc::c_int = 13;
355    const RLIMIT_RTPRIO: libc::c_int = 14;
356    const RLIMIT_RTTIME: libc::c_int = 15;
357
358    match name {
359        "cpu" => Some(libc::RLIMIT_CPU as _),
360        "fsize" => Some(libc::RLIMIT_FSIZE as _),
361        "data" => Some(libc::RLIMIT_DATA as _),
362        "stack" => Some(libc::RLIMIT_STACK as _),
363        "core" => Some(libc::RLIMIT_CORE as _),
364        "rss" => Some(libc::RLIMIT_RSS as _),
365        "nproc" => Some(libc::RLIMIT_NPROC as _),
366        "nofile" => Some(libc::RLIMIT_NOFILE as _),
367        "memlock" => Some(libc::RLIMIT_MEMLOCK as _),
368        "as" => Some(libc::RLIMIT_AS as _),
369        "locks" => Some(RLIMIT_LOCKS),
370        "sigpending" => Some(RLIMIT_SIGPENDING),
371        "msgqueue" => Some(RLIMIT_MSGQUEUE),
372        "nice" => Some(RLIMIT_NICE),
373        "rtprio" => Some(RLIMIT_RTPRIO),
374        "rttime" => Some(RLIMIT_RTTIME),
375        _ => None,
376    }
377}
378
379/// Pre-parses rlimits from the exec request into `(resource_id, rlimit)` tuples
380/// that can be applied in the child process via `setrlimit()`.
381fn parse_rlimits(req: &ExecRequest) -> Vec<(libc::c_int, libc::rlimit)> {
382    req.rlimits
383        .iter()
384        .filter_map(|rl| {
385            let resource = parse_rlimit_resource(&rl.resource)?;
386            Some((
387                resource,
388                libc::rlimit {
389                    rlim_cur: rl.soft,
390                    rlim_max: rl.hard,
391                },
392            ))
393        })
394        .collect()
395}
396
397/// Writes data to a raw fd using a blocking task, handling short writes.
398async fn blocking_write_fd(fd: RawFd, data: &[u8]) -> AgentdResult<()> {
399    let data = data.to_vec();
400    tokio::task::spawn_blocking(move || {
401        let mut written = 0;
402        while written < data.len() {
403            let ptr = unsafe { data.as_ptr().add(written) as *const libc::c_void };
404            let ret = unsafe { libc::write(fd, ptr, data.len() - written) };
405            if ret < 0 {
406                return Err(AgentdError::Io(std::io::Error::last_os_error()));
407            }
408            written += ret as usize;
409        }
410        Ok(())
411    })
412    .await
413    .map_err(|e| AgentdError::ExecSession(format!("stdin write join error: {e}")))?
414}
415
416/// Background task that reads from a PTY master fd and sends output events.
417async fn pty_reader_task(
418    id: u32,
419    pid: i32,
420    master_fd: OwnedFd,
421    tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
422) {
423    // Set non-blocking for async I/O.
424    let raw = master_fd.as_raw_fd();
425    let flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
426    if flags >= 0 {
427        unsafe { libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) };
428    }
429
430    let Ok(async_fd) = AsyncFd::new(master_fd) else {
431        let code = wait_for_pid(pid).await;
432        let _ = tx.send((id, SessionOutput::Exited(code)));
433        return;
434    };
435
436    loop {
437        let Ok(mut guard) = async_fd.readable().await else {
438            break;
439        };
440
441        let fd = async_fd.as_raw_fd();
442        let mut buf = [0u8; 4096];
443        let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
444
445        if n > 0 {
446            let _ = tx.send((id, SessionOutput::Stdout(buf[..n as usize].to_vec())));
447            guard.clear_ready();
448        } else if n == 0 {
449            break;
450        } else {
451            let err = std::io::Error::last_os_error();
452            if err.raw_os_error() == Some(libc::EAGAIN)
453                || err.raw_os_error() == Some(libc::EWOULDBLOCK)
454            {
455                guard.clear_ready();
456                continue;
457            }
458            // EIO or other error — PTY slave closed.
459            break;
460        }
461    }
462
463    let code = wait_for_pid(pid).await;
464    let _ = tx.send((id, SessionOutput::Exited(code)));
465}
466
467/// Background task that reads from piped stdout/stderr and sends output events.
468async fn pipe_reader_task(
469    id: u32,
470    mut child: Child,
471    stdout: Option<tokio::process::ChildStdout>,
472    stderr: Option<tokio::process::ChildStderr>,
473    tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
474) {
475    let mut stdout = stdout;
476    let mut stderr = stderr;
477    let mut stdout_eof = stdout.is_none();
478    let mut stderr_eof = stderr.is_none();
479
480    while !stdout_eof || !stderr_eof {
481        let mut stdout_buf = [0u8; 4096];
482        let mut stderr_buf = [0u8; 4096];
483
484        tokio::select! {
485            result = async {
486                match stdout.as_mut() {
487                    Some(out) => out.read(&mut stdout_buf).await,
488                    None => std::future::pending().await,
489                }
490            }, if !stdout_eof => {
491                match result {
492                    Ok(0) | Err(_) => {
493                        stdout = None;
494                        stdout_eof = true;
495                    }
496                    Ok(n) => {
497                        let _ = tx.send((id, SessionOutput::Stdout(stdout_buf[..n].to_vec())));
498                    }
499                }
500            }
501            result = async {
502                match stderr.as_mut() {
503                    Some(err) => err.read(&mut stderr_buf).await,
504                    None => std::future::pending().await,
505                }
506            }, if !stderr_eof => {
507                match result {
508                    Ok(0) | Err(_) => {
509                        stderr = None;
510                        stderr_eof = true;
511                    }
512                    Ok(n) => {
513                        let _ = tx.send((id, SessionOutput::Stderr(stderr_buf[..n].to_vec())));
514                    }
515                }
516            }
517        }
518    }
519
520    // Both streams are done — wait for process exit.
521    let code = match child.wait().await {
522        Ok(status) => status.code().unwrap_or(-1),
523        Err(_) => -1,
524    };
525
526    let _ = tx.send((id, SessionOutput::Exited(code)));
527}
528
529/// Waits for a process to exit by PID and returns the exit code.
530async fn wait_for_pid(pid: i32) -> i32 {
531    tokio::task::spawn_blocking(move || {
532        let mut status: i32 = 0;
533        unsafe {
534            libc::waitpid(pid, &mut status, 0);
535        }
536        if libc::WIFEXITED(status) {
537            libc::WEXITSTATUS(status)
538        } else {
539            -1
540        }
541    })
542    .await
543    .unwrap_or(-1)
544}