1use std::{
4 ffi::CString,
5 os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
6 process::Stdio,
7};
8
9use nix::{
10 pty::openpty,
11 sys::signal::{Signal, kill},
12 unistd::Pid,
13};
14use tokio::{
15 io::{AsyncReadExt, unix::AsyncFd},
16 process::{Child, Command},
17 sync::mpsc,
18};
19
20use microsandbox_protocol::exec::ExecRequest;
21
22use crate::error::{AgentdError, AgentdResult};
23
24pub struct ExecSession {
33 pid: i32,
35
36 pty_master: Option<OwnedFd>,
38
39 stdin: Option<tokio::process::ChildStdin>,
41}
42
43pub enum SessionOutput {
45 Stdout(Vec<u8>),
47
48 Stderr(Vec<u8>),
50
51 Exited(i32),
53
54 Raw(Vec<u8>),
59}
60
61impl ExecSession {
66 pub fn spawn(
71 id: u32,
72 req: &ExecRequest,
73 tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
74 ) -> AgentdResult<Self> {
75 if req.tty {
76 Self::spawn_pty(id, req, tx)
77 } else {
78 Self::spawn_pipe(id, req, tx)
79 }
80 }
81
82 pub fn pid(&self) -> u32 {
84 self.pid as u32
85 }
86
87 pub async fn write_stdin(&self, data: &[u8]) -> AgentdResult<()> {
89 if let Some(ref master) = self.pty_master {
90 blocking_write_fd(master.as_raw_fd(), data).await
91 } else if let Some(ref stdin) = self.stdin {
92 blocking_write_fd(stdin.as_raw_fd(), data).await
93 } else {
94 Ok(())
95 }
96 }
97
98 pub fn resize(&self, rows: u16, cols: u16) -> AgentdResult<()> {
100 if let Some(ref master) = self.pty_master {
101 let ws = libc::winsize {
102 ws_row: rows,
103 ws_col: cols,
104 ws_xpixel: 0,
105 ws_ypixel: 0,
106 };
107 let ret = unsafe { libc::ioctl(master.as_raw_fd(), libc::TIOCSWINSZ, &ws) };
108 if ret < 0 {
109 return Err(std::io::Error::last_os_error().into());
110 }
111 }
112 Ok(())
113 }
114
115 pub fn send_signal(&self, signal: i32) -> AgentdResult<()> {
117 let sig = Signal::try_from(signal)
118 .map_err(|e| AgentdError::ExecSession(format!("invalid signal {signal}: {e}")))?;
119 kill(Pid::from_raw(self.pid), sig)?;
120 Ok(())
121 }
122
123 pub fn close_stdin(&mut self) {
128 self.stdin.take();
129 }
130}
131
132impl ExecSession {
133 fn spawn_pty(
135 id: u32,
136 req: &ExecRequest,
137 tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
138 ) -> AgentdResult<Self> {
139 let pty = openpty(None, None)?;
140
141 let ws = libc::winsize {
143 ws_row: req.rows,
144 ws_col: req.cols,
145 ws_xpixel: 0,
146 ws_ypixel: 0,
147 };
148 let ret = unsafe { libc::ioctl(pty.master.as_raw_fd(), libc::TIOCSWINSZ, &ws) };
149 if ret < 0 {
150 return Err(std::io::Error::last_os_error().into());
151 }
152
153 let slave_fd = pty.slave.as_raw_fd();
154
155 let c_cmd = CString::new(req.cmd.as_str())
157 .map_err(|e| AgentdError::ExecSession(format!("invalid command: {e}")))?;
158 let mut c_args: Vec<CString> = vec![c_cmd.clone()];
159 for arg in &req.args {
160 c_args.push(
161 CString::new(arg.as_str())
162 .map_err(|e| AgentdError::ExecSession(format!("invalid arg: {e}")))?,
163 );
164 }
165
166 let argv_ptrs: Vec<*const libc::c_char> = c_args
168 .iter()
169 .map(|s| s.as_ptr())
170 .chain(std::iter::once(std::ptr::null()))
171 .collect();
172
173 let c_env: Vec<(CString, CString)> = req
175 .env
176 .iter()
177 .filter_map(|var| {
178 let (key, val) = var.split_once('=')?;
179 let k = CString::new(key).ok()?;
180 let v = CString::new(val).ok()?;
181 Some((k, v))
182 })
183 .collect();
184
185 let c_cwd = req
187 .cwd
188 .as_ref()
189 .map(|dir| CString::new(dir.as_str()))
190 .transpose()
191 .map_err(|e| AgentdError::ExecSession(format!("invalid cwd: {e}")))?;
192
193 let parsed_rlimits = parse_rlimits(req);
195
196 let pid = unsafe { libc::fork() };
198 if pid < 0 {
199 return Err(std::io::Error::last_os_error().into());
200 }
201
202 #[allow(unreachable_code)]
203 if pid == 0 {
204 drop(pty.master);
206
207 if unsafe { libc::setsid() } < 0 {
209 unsafe { libc::_exit(1) };
210 }
211
212 #[cfg(target_os = "linux")]
214 let tiocsctty = libc::TIOCSCTTY;
215 #[cfg(target_os = "macos")]
216 let tiocsctty: libc::c_ulong = libc::TIOCSCTTY.into();
217
218 if unsafe { libc::ioctl(slave_fd, tiocsctty, 0) } < 0 {
219 unsafe { libc::_exit(1) };
220 }
221
222 unsafe {
224 if libc::dup2(slave_fd, 0) < 0 {
225 libc::_exit(1);
226 }
227 if libc::dup2(slave_fd, 1) < 0 {
228 libc::_exit(1);
229 }
230 if libc::dup2(slave_fd, 2) < 0 {
231 libc::_exit(1);
232 }
233 if slave_fd > 2 {
234 libc::close(slave_fd);
235 }
236 }
237
238 for (key, val) in &c_env {
240 unsafe {
241 libc::setenv(key.as_ptr(), val.as_ptr(), 1);
242 }
243 }
244
245 if let Some(ref dir) = c_cwd {
247 unsafe {
248 libc::chdir(dir.as_ptr());
249 }
250 }
251
252 for (resource, limit) in &parsed_rlimits {
254 if unsafe { libc::setrlimit(*resource as _, limit) } != 0 {
255 unsafe { libc::_exit(1) };
256 }
257 }
258
259 unsafe {
261 libc::execvp(argv_ptrs[0], argv_ptrs.as_ptr());
262 }
263
264 unsafe { libc::_exit(127) };
266 }
267
268 drop(pty.slave);
270
271 let reader_fd = unsafe { libc::dup(pty.master.as_raw_fd()) };
273 if reader_fd < 0 {
274 return Err(std::io::Error::last_os_error().into());
275 }
276 let reader_fd = unsafe { OwnedFd::from_raw_fd(reader_fd) };
277
278 tokio::spawn(pty_reader_task(id, pid, reader_fd, tx));
280
281 Ok(Self {
282 pid,
283 pty_master: Some(pty.master),
284 stdin: None,
285 })
286 }
287
288 fn spawn_pipe(
290 id: u32,
291 req: &ExecRequest,
292 tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
293 ) -> AgentdResult<Self> {
294 let mut cmd = Command::new(&req.cmd);
295 cmd.args(&req.args)
296 .stdin(Stdio::piped())
297 .stdout(Stdio::piped())
298 .stderr(Stdio::piped());
299
300 for var in &req.env {
301 if let Some((key, val)) = var.split_once('=') {
302 cmd.env(key, val);
303 }
304 }
305
306 if let Some(ref dir) = req.cwd {
307 cmd.current_dir(dir);
308 }
309
310 let parsed_rlimits = parse_rlimits(req);
312 if !parsed_rlimits.is_empty() {
313 unsafe {
314 cmd.pre_exec(move || {
315 for (resource, limit) in &parsed_rlimits {
316 if libc::setrlimit(*resource as _, limit) != 0 {
317 return Err(std::io::Error::last_os_error());
318 }
319 }
320 Ok(())
321 });
322 }
323 }
324
325 let mut child = cmd.spawn()?;
326 let pid = child.id().unwrap_or(0) as i32;
327 let stdin = child.stdin.take();
328 let stdout = child.stdout.take();
329 let stderr = child.stderr.take();
330
331 tokio::spawn(pipe_reader_task(id, child, stdout, stderr, tx));
333
334 Ok(Self {
335 pid,
336 pty_master: None,
337 stdin,
338 })
339 }
340}
341
342fn parse_rlimit_resource(name: &str) -> Option<libc::c_int> {
350 const RLIMIT_LOCKS: libc::c_int = 10;
352 const RLIMIT_SIGPENDING: libc::c_int = 11;
353 const RLIMIT_MSGQUEUE: libc::c_int = 12;
354 const RLIMIT_NICE: libc::c_int = 13;
355 const RLIMIT_RTPRIO: libc::c_int = 14;
356 const RLIMIT_RTTIME: libc::c_int = 15;
357
358 match name {
359 "cpu" => Some(libc::RLIMIT_CPU as _),
360 "fsize" => Some(libc::RLIMIT_FSIZE as _),
361 "data" => Some(libc::RLIMIT_DATA as _),
362 "stack" => Some(libc::RLIMIT_STACK as _),
363 "core" => Some(libc::RLIMIT_CORE as _),
364 "rss" => Some(libc::RLIMIT_RSS as _),
365 "nproc" => Some(libc::RLIMIT_NPROC as _),
366 "nofile" => Some(libc::RLIMIT_NOFILE as _),
367 "memlock" => Some(libc::RLIMIT_MEMLOCK as _),
368 "as" => Some(libc::RLIMIT_AS as _),
369 "locks" => Some(RLIMIT_LOCKS),
370 "sigpending" => Some(RLIMIT_SIGPENDING),
371 "msgqueue" => Some(RLIMIT_MSGQUEUE),
372 "nice" => Some(RLIMIT_NICE),
373 "rtprio" => Some(RLIMIT_RTPRIO),
374 "rttime" => Some(RLIMIT_RTTIME),
375 _ => None,
376 }
377}
378
379fn parse_rlimits(req: &ExecRequest) -> Vec<(libc::c_int, libc::rlimit)> {
382 req.rlimits
383 .iter()
384 .filter_map(|rl| {
385 let resource = parse_rlimit_resource(&rl.resource)?;
386 Some((
387 resource,
388 libc::rlimit {
389 rlim_cur: rl.soft,
390 rlim_max: rl.hard,
391 },
392 ))
393 })
394 .collect()
395}
396
397async fn blocking_write_fd(fd: RawFd, data: &[u8]) -> AgentdResult<()> {
399 let data = data.to_vec();
400 tokio::task::spawn_blocking(move || {
401 let mut written = 0;
402 while written < data.len() {
403 let ptr = unsafe { data.as_ptr().add(written) as *const libc::c_void };
404 let ret = unsafe { libc::write(fd, ptr, data.len() - written) };
405 if ret < 0 {
406 return Err(AgentdError::Io(std::io::Error::last_os_error()));
407 }
408 written += ret as usize;
409 }
410 Ok(())
411 })
412 .await
413 .map_err(|e| AgentdError::ExecSession(format!("stdin write join error: {e}")))?
414}
415
416async fn pty_reader_task(
418 id: u32,
419 pid: i32,
420 master_fd: OwnedFd,
421 tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
422) {
423 let raw = master_fd.as_raw_fd();
425 let flags = unsafe { libc::fcntl(raw, libc::F_GETFL) };
426 if flags >= 0 {
427 unsafe { libc::fcntl(raw, libc::F_SETFL, flags | libc::O_NONBLOCK) };
428 }
429
430 let Ok(async_fd) = AsyncFd::new(master_fd) else {
431 let code = wait_for_pid(pid).await;
432 let _ = tx.send((id, SessionOutput::Exited(code)));
433 return;
434 };
435
436 loop {
437 let Ok(mut guard) = async_fd.readable().await else {
438 break;
439 };
440
441 let fd = async_fd.as_raw_fd();
442 let mut buf = [0u8; 4096];
443 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
444
445 if n > 0 {
446 let _ = tx.send((id, SessionOutput::Stdout(buf[..n as usize].to_vec())));
447 guard.clear_ready();
448 } else if n == 0 {
449 break;
450 } else {
451 let err = std::io::Error::last_os_error();
452 if err.raw_os_error() == Some(libc::EAGAIN)
453 || err.raw_os_error() == Some(libc::EWOULDBLOCK)
454 {
455 guard.clear_ready();
456 continue;
457 }
458 break;
460 }
461 }
462
463 let code = wait_for_pid(pid).await;
464 let _ = tx.send((id, SessionOutput::Exited(code)));
465}
466
467async fn pipe_reader_task(
469 id: u32,
470 mut child: Child,
471 stdout: Option<tokio::process::ChildStdout>,
472 stderr: Option<tokio::process::ChildStderr>,
473 tx: mpsc::UnboundedSender<(u32, SessionOutput)>,
474) {
475 let mut stdout = stdout;
476 let mut stderr = stderr;
477 let mut stdout_eof = stdout.is_none();
478 let mut stderr_eof = stderr.is_none();
479
480 while !stdout_eof || !stderr_eof {
481 let mut stdout_buf = [0u8; 4096];
482 let mut stderr_buf = [0u8; 4096];
483
484 tokio::select! {
485 result = async {
486 match stdout.as_mut() {
487 Some(out) => out.read(&mut stdout_buf).await,
488 None => std::future::pending().await,
489 }
490 }, if !stdout_eof => {
491 match result {
492 Ok(0) | Err(_) => {
493 stdout = None;
494 stdout_eof = true;
495 }
496 Ok(n) => {
497 let _ = tx.send((id, SessionOutput::Stdout(stdout_buf[..n].to_vec())));
498 }
499 }
500 }
501 result = async {
502 match stderr.as_mut() {
503 Some(err) => err.read(&mut stderr_buf).await,
504 None => std::future::pending().await,
505 }
506 }, if !stderr_eof => {
507 match result {
508 Ok(0) | Err(_) => {
509 stderr = None;
510 stderr_eof = true;
511 }
512 Ok(n) => {
513 let _ = tx.send((id, SessionOutput::Stderr(stderr_buf[..n].to_vec())));
514 }
515 }
516 }
517 }
518 }
519
520 let code = match child.wait().await {
522 Ok(status) => status.code().unwrap_or(-1),
523 Err(_) => -1,
524 };
525
526 let _ = tx.send((id, SessionOutput::Exited(code)));
527}
528
529async fn wait_for_pid(pid: i32) -> i32 {
531 tokio::task::spawn_blocking(move || {
532 let mut status: i32 = 0;
533 unsafe {
534 libc::waitpid(pid, &mut status, 0);
535 }
536 if libc::WIFEXITED(status) {
537 libc::WEXITSTATUS(status)
538 } else {
539 -1
540 }
541 })
542 .await
543 .unwrap_or(-1)
544}