Skip to main content

microsandbox_agentd/
agent.rs

1//! Main agent loop: serial I/O, session management, heartbeat.
2
3use std::collections::HashMap;
4use std::fs::OpenOptions;
5use std::os::fd::AsRawFd;
6use std::{env, ptr};
7
8use chrono::Utc;
9use tokio::io::unix::AsyncFd;
10use tokio::sync::mpsc;
11use tokio::time::{self, Duration};
12
13use microsandbox_protocol::HANDOFF_POWEROFF_TIMEOUT;
14use microsandbox_protocol::codec::{self, MAX_FRAME_SIZE};
15use microsandbox_protocol::core::{Ready, RelayClientDisconnected};
16use microsandbox_protocol::exec::{
17    ExecExited, ExecFailed, ExecFailureKind, ExecRequest, ExecResize, ExecSignal, ExecStarted,
18    ExecStderr, ExecStdin, ExecStdinError, ExecStdout,
19};
20use microsandbox_protocol::fs::{FsData, FsRequest};
21use microsandbox_protocol::message::{Message, MessageType};
22
23use crate::config::AgentdConfig;
24use crate::error::{AgentdError, AgentdResult};
25use crate::fs::{FsReadSession, FsState, FsStreamSession, FsWriteSession};
26use crate::serial::AGENT_PORT_NAME;
27use crate::session::{ExecSession, SessionOutput};
28use crate::{clock, fs, heartbeat, serial};
29
30//--------------------------------------------------------------------------------------------------
31// Constants
32//--------------------------------------------------------------------------------------------------
33
34/// Heartbeat interval in seconds.
35///
36/// Keep this short so small idle timeouts (for example `--idle-timeout 1`)
37/// can be enforced without multi-second scheduling drift.
38const HEARTBEAT_INTERVAL_SECS: u64 = 1;
39
40/// Read buffer size for the serial port.
41const SERIAL_READ_BUF_SIZE: usize = 64 * 1024;
42
43/// Maximum allowed input buffer size (frame size limit + 4 bytes for length prefix).
44const MAX_INPUT_BUF_SIZE: usize = MAX_FRAME_SIZE as usize + 4;
45
46//--------------------------------------------------------------------------------------------------
47// Types
48//--------------------------------------------------------------------------------------------------
49
50#[derive(Default)]
51struct AgentState {
52    sessions: HashMap<u32, ExecSession>,
53    write_sessions: HashMap<u32, FsWriteSession>,
54    read_sessions: HashMap<u32, FsReadSession>,
55    fs: FsState,
56}
57
58//--------------------------------------------------------------------------------------------------
59// Functions
60//--------------------------------------------------------------------------------------------------
61
62/// Runs the main agent loop.
63///
64/// Discovers the virtio serial port, sends `core.ready` with boot timing data,
65/// then enters the main select loop handling serial I/O, process output, and heartbeat.
66///
67/// - `boot_time_ns`: `CLOCK_BOOTTIME` at `main()` start (kernel boot duration).
68/// - `init_time_ns`: nanoseconds spent in `init::init()`.
69pub async fn run(boot_time_ns: u64, init_time_ns: u64, config: &AgentdConfig) -> AgentdResult<()> {
70    // Discover serial port.
71    let port_path = serial::find_serial_port(AGENT_PORT_NAME)?;
72
73    // Open the port once with read+write. Virtio-console multiport devices
74    // only allow a single open; a second open returns EBUSY.
75    let port_file = OpenOptions::new().read(true).write(true).open(&port_path)?;
76
77    // Set non-blocking for async I/O.
78    let port_fd = port_file.as_raw_fd();
79    set_nonblocking(port_fd)?;
80
81    // A single AsyncFd tracks both readable and writable readiness.
82    let async_port = AsyncFd::new(port_file)?;
83
84    // Buffer for serial reads.
85    let mut read_buf = vec![0u8; SERIAL_READ_BUF_SIZE];
86    let mut serial_in_buf = Vec::new();
87    let mut serial_out_buf = Vec::new();
88
89    let mut state = AgentState::default();
90
91    // Channel for session output events.
92    let (session_tx, mut session_rx) = mpsc::unbounded_channel::<(u32, SessionOutput)>();
93
94    // Heartbeat state.
95    let mut last_activity = Utc::now();
96    let mut heartbeat_timer = time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
97
98    // Send core.ready with boot timing data.
99    let ready_time_ns = clock::boottime_ns();
100    let ready_msg = Message::with_payload(
101        MessageType::Ready,
102        0,
103        &Ready {
104            boot_time_ns,
105            init_time_ns,
106            ready_time_ns,
107        },
108    )
109    .map_err(|e| AgentdError::ExecSession(format!("encode ready: {e}")))?;
110    codec::encode_to_buf(&ready_msg, &mut serial_out_buf)
111        .map_err(|e| AgentdError::ExecSession(format!("encode ready frame: {e}")))?;
112    flush_write_buf(&async_port, &mut serial_out_buf).await?;
113
114    // Main loop.
115    'agent: loop {
116        tokio::select! {
117            // Read from serial port.
118            result = async_port.readable() => {
119                let Ok(mut guard) = result else {
120                    break;
121                };
122
123                loop {
124                    match guard.try_io(|inner| read_from_fd(inner.get_ref().as_raw_fd(), &mut read_buf)) {
125                        Ok(Ok(0)) => {
126                            // EOF on serial — host disconnected.
127                            break 'agent;
128                        }
129                        Ok(Ok(n)) => {
130                            serial_in_buf.extend_from_slice(&read_buf[..n]);
131                            last_activity = Utc::now();
132
133                            // Guard against unbounded buffer growth.
134                            if serial_in_buf.len() > MAX_INPUT_BUF_SIZE {
135                                return Err(AgentdError::ExecSession(
136                                    "serial input buffer exceeded maximum size".into(),
137                                ));
138                            }
139
140                            // Try to parse complete messages.
141                            while let Some(msg) = codec::try_decode_from_buf(&mut serial_in_buf)
142                                .map_err(|e| AgentdError::ExecSession(format!("decode: {e}")))?
143                            {
144                                handle_message(
145                                    msg,
146                                    &mut state,
147                                    &session_tx,
148                                    &mut serial_out_buf,
149                                    config,
150                                ).await?;
151                            }
152
153                            // Flush any outgoing messages.
154                            if !serial_out_buf.is_empty() {
155                                flush_write_buf(&async_port, &mut serial_out_buf).await?;
156                            }
157                        }
158                        Ok(Err(e)) if e.kind() == std::io::ErrorKind::Interrupted => continue,
159                        Ok(Err(e)) => return Err(e.into()),
160                        Err(_would_block) => break,
161                    }
162                }
163            }
164
165            // Receive output events from session reader tasks.
166            Some((id, output)) = session_rx.recv() => {
167                match output {
168                    SessionOutput::Stdout(data) => {
169                        let msg = Message::with_payload(MessageType::ExecStdout, id, &ExecStdout { data })
170                            .map_err(|e| AgentdError::ExecSession(format!("encode stdout: {e}")))?;
171                        codec::encode_to_buf(&msg, &mut serial_out_buf)
172                            .map_err(|e| AgentdError::ExecSession(format!("encode stdout frame: {e}")))?;
173                    }
174                    SessionOutput::Stderr(data) => {
175                        let msg = Message::with_payload(MessageType::ExecStderr, id, &ExecStderr { data })
176                            .map_err(|e| AgentdError::ExecSession(format!("encode stderr: {e}")))?;
177                        codec::encode_to_buf(&msg, &mut serial_out_buf)
178                            .map_err(|e| AgentdError::ExecSession(format!("encode stderr frame: {e}")))?;
179                    }
180                    SessionOutput::Exited(code) => {
181                        let msg = Message::with_payload(MessageType::ExecExited, id, &ExecExited { code })
182                            .map_err(|e| AgentdError::ExecSession(format!("encode exited: {e}")))?;
183                        codec::encode_to_buf(&msg, &mut serial_out_buf)
184                            .map_err(|e| AgentdError::ExecSession(format!("encode exited frame: {e}")))?;
185                        state.sessions.remove(&id);
186                    }
187                    SessionOutput::Raw(frame_bytes) => {
188                        remove_completed_fs_read(&frame_bytes, &mut state.read_sessions);
189                        // Pre-encoded frame — write directly to output buffer.
190                        serial_out_buf.extend_from_slice(&frame_bytes);
191                    }
192                }
193
194                if !serial_out_buf.is_empty() {
195                    flush_write_buf(&async_port, &mut serial_out_buf).await?;
196                }
197            }
198
199            // Heartbeat tick.
200            _ = heartbeat_timer.tick() => {
201                if heartbeat::heartbeat_dir_exists() {
202                    let _ = heartbeat::write_heartbeat(
203                        state.sessions.len() as u32,
204                        last_activity,
205                    ).await;
206                }
207            }
208        }
209    }
210
211    Ok(())
212}
213
214//--------------------------------------------------------------------------------------------------
215// Functions: Helpers
216//--------------------------------------------------------------------------------------------------
217
218/// Handles a single incoming message from the host.
219async fn handle_message(
220    msg: Message,
221    state: &mut AgentState,
222    session_tx: &mpsc::UnboundedSender<(u32, SessionOutput)>,
223    out_buf: &mut Vec<u8>,
224    config: &AgentdConfig,
225) -> AgentdResult<()> {
226    match msg.t {
227        MessageType::ExecRequest => {
228            let mut req: ExecRequest = msg
229                .payload()
230                .map_err(|e| AgentdError::ExecSession(format!("decode exec request: {e}")))?;
231            prepend_scripts_to_path(&mut req);
232            match ExecSession::spawn(msg.id, &req, session_tx.clone(), config.user.as_deref()) {
233                Ok(session) => {
234                    let reply = Message::with_payload(
235                        MessageType::ExecStarted,
236                        msg.id,
237                        &ExecStarted { pid: session.pid() },
238                    )
239                    .map_err(|e| AgentdError::ExecSession(format!("encode started: {e}")))?;
240                    codec::encode_to_buf(&reply, out_buf).map_err(|e| {
241                        AgentdError::ExecSession(format!("encode started frame: {e}"))
242                    })?;
243                    state.sessions.insert(msg.id, session);
244                }
245                Err(e) => {
246                    // Send a typed `ExecFailed` so the host can render a
247                    // useful message + hint. `ExecSpawnFailed` already
248                    // carries the structured payload; other error
249                    // variants (free-form `ExecSession(_)` etc.) get
250                    // wrapped as `Other` with the message preserved.
251                    let payload = match &e {
252                        AgentdError::ExecSpawnFailed(p) => p.clone(),
253                        other => ExecFailed {
254                            kind: ExecFailureKind::Other,
255                            errno: None,
256                            errno_name: None,
257                            message: other.to_string(),
258                            stage: None,
259                        },
260                    };
261                    let reply = Message::with_payload(MessageType::ExecFailed, msg.id, &payload)
262                        .map_err(|e| AgentdError::ExecSession(format!("encode failed: {e}")))?;
263                    codec::encode_to_buf(&reply, out_buf).map_err(|e| {
264                        AgentdError::ExecSession(format!("encode failed frame: {e}"))
265                    })?;
266                    eprintln!("failed to spawn exec session {}: {e}", msg.id);
267                }
268            }
269        }
270
271        MessageType::ExecStdin => {
272            let stdin: ExecStdin = msg
273                .payload()
274                .map_err(|e| AgentdError::ExecSession(format!("decode stdin: {e}")))?;
275            if let Some(session) = state.sessions.get_mut(&msg.id) {
276                if stdin.data.is_empty() {
277                    // Empty data signals EOF — close stdin.
278                    session.close_stdin();
279                } else if let Err(e) = session.write_stdin(&stdin.data).await {
280                    let payload = stdin_error_payload(&e);
281                    eprintln!("stdin write error on session {}: {e}", msg.id);
282                    let reply =
283                        Message::with_payload(MessageType::ExecStdinError, msg.id, &payload)
284                            .map_err(|e| {
285                                AgentdError::ExecSession(format!("encode stdin error: {e}"))
286                            })?;
287                    codec::encode_to_buf(&reply, out_buf).map_err(|e| {
288                        AgentdError::ExecSession(format!("encode stdin error frame: {e}"))
289                    })?;
290                }
291            }
292        }
293
294        MessageType::ExecResize => {
295            let resize: ExecResize = msg
296                .payload()
297                .map_err(|e| AgentdError::ExecSession(format!("decode resize: {e}")))?;
298            if let Some(session) = state.sessions.get(&msg.id) {
299                let _ = session.resize(resize.rows, resize.cols);
300            }
301        }
302
303        MessageType::ExecSignal => {
304            let signal: ExecSignal = msg
305                .payload()
306                .map_err(|e| AgentdError::ExecSession(format!("decode signal: {e}")))?;
307            if let Some(session) = state.sessions.get(&msg.id) {
308                let _ = session.send_signal(signal.signal);
309            }
310        }
311
312        MessageType::FsRequest => {
313            let req: FsRequest = msg
314                .payload()
315                .map_err(|e| AgentdError::ExecSession(format!("decode fs request: {e}")))?;
316            match fs::handle_fs_request(msg.id, req, &mut state.fs, out_buf, session_tx).await {
317                Ok(Some(FsStreamSession::Read(rs))) => {
318                    state.read_sessions.insert(msg.id, rs);
319                }
320                Ok(Some(FsStreamSession::Write(ws))) => {
321                    state.write_sessions.insert(msg.id, ws);
322                }
323                Ok(None) => {}
324                Err(e) => {
325                    eprintln!("fs request error for {}: {e}", msg.id);
326                }
327            }
328        }
329
330        MessageType::FsData => {
331            let data: FsData = msg
332                .payload()
333                .map_err(|e| AgentdError::ExecSession(format!("decode fs data: {e}")))?;
334            if let Some(session) = state.write_sessions.get_mut(&msg.id) {
335                match fs::handle_fs_data(msg.id, data, session, out_buf).await {
336                    Ok(true) => {
337                        // Session complete — remove it.
338                        state.write_sessions.remove(&msg.id);
339                    }
340                    Ok(false) => {}
341                    Err(e) => {
342                        eprintln!("fs data error for {}: {e}", msg.id);
343                        state.write_sessions.remove(&msg.id);
344                    }
345                }
346            } else {
347                // No write session for this ID — send error response.
348                let resp = microsandbox_protocol::fs::FsResponse {
349                    ok: false,
350                    error: Some(format!("unknown write session: {}", msg.id)),
351                    data: None,
352                };
353                let reply = Message::with_payload(MessageType::FsResponse, msg.id, &resp)
354                    .map_err(|e| AgentdError::ExecSession(format!("encode fs error: {e}")))?;
355                codec::encode_to_buf(&reply, out_buf)
356                    .map_err(|e| AgentdError::ExecSession(format!("encode fs error frame: {e}")))?;
357            }
358        }
359
360        MessageType::RelayClientDisconnected => {
361            let disconnected: RelayClientDisconnected = msg
362                .payload()
363                .map_err(|e| AgentdError::ExecSession(format!("decode relay disconnect: {e}")))?;
364            state
365                .fs
366                .close_owner_range(disconnected.id_start, disconnected.id_end_exclusive);
367            abort_read_sessions_in_owner_range(
368                &mut state.read_sessions,
369                disconnected.id_start,
370                disconnected.id_end_exclusive,
371            );
372            state.write_sessions.retain(|_, session| {
373                let owner_id = session.owner_id();
374                owner_id < disconnected.id_start || owner_id >= disconnected.id_end_exclusive
375            });
376        }
377
378        MessageType::Shutdown => {
379            // Graceful shutdown — signal all sessions, then ask the guest
380            // kernel to power off so block-root filesystems can shut down
381            // cleanly instead of leaving ext4 journal recovery pending.
382            for (_, session) in state.sessions.drain() {
383                let _ = session.send_signal(15); // SIGTERM
384            }
385            state.write_sessions.clear();
386            state.fs.clear();
387
388            request_guest_poweroff()?;
389            return Err(AgentdError::Shutdown);
390        }
391
392        _ => {
393            // Ignore unknown or unexpected message types.
394        }
395    }
396
397    Ok(())
398}
399
400/// Prepends `/.msb/scripts` to PATH in the exec request's environment.
401///
402/// If the request already has a PATH entry, prepends to it. Otherwise
403/// inherits from agentd's environment and prepends.
404/// Default PATH for the guest when no PATH is inherited.
405const DEFAULT_GUEST_PATH: &str = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
406
407fn remove_completed_fs_read(frame_bytes: &[u8], read_sessions: &mut HashMap<u32, FsReadSession>) {
408    let mut buf = frame_bytes.to_vec();
409    let Ok(Some(msg)) = codec::try_decode_from_buf(&mut buf) else {
410        return;
411    };
412    if msg.t == MessageType::FsResponse {
413        read_sessions.remove(&msg.id);
414    }
415}
416
417fn abort_read_sessions_in_owner_range(
418    read_sessions: &mut HashMap<u32, FsReadSession>,
419    id_start: u32,
420    id_end_exclusive: u32,
421) {
422    let mut retained = HashMap::new();
423    for (id, session) in read_sessions.drain() {
424        let owner_id = session.owner_id();
425        if owner_id >= id_start && owner_id < id_end_exclusive {
426            session.abort();
427        } else {
428            retained.insert(id, session);
429        }
430    }
431    *read_sessions = retained;
432}
433
434/// Build an `ExecStdinError` payload from a failed `write_stdin` result.
435fn stdin_error_payload(err: &AgentdError) -> ExecStdinError {
436    let io_err = match err {
437        AgentdError::Io(e) => Some(e),
438        _ => None,
439    };
440    let errno = io_err.and_then(|e| e.raw_os_error());
441    ExecStdinError {
442        errno,
443        errno_name: errno.and_then(errno_name),
444        message: err.to_string(),
445    }
446}
447
448/// Map common errno values to their standard names. Returns `None` for
449/// codes we don't recognize; callers fall back to the numeric `errno`.
450fn errno_name(code: i32) -> Option<String> {
451    let name = match code {
452        libc::EPIPE => "EPIPE",
453        libc::EBADF => "EBADF",
454        libc::EINVAL => "EINVAL",
455        libc::EIO => "EIO",
456        libc::ENOSPC => "ENOSPC",
457        libc::EFBIG => "EFBIG",
458        _ => return None,
459    };
460    Some(name.to_string())
461}
462
463fn prepend_scripts_to_path(req: &mut microsandbox_protocol::exec::ExecRequest) {
464    let scripts = microsandbox_protocol::SCRIPTS_PATH;
465
466    // Check if the request already specifies PATH.
467    if let Some(entry) = req.env.iter_mut().find(|e| e.starts_with("PATH=")) {
468        let existing = &entry["PATH=".len()..];
469        *entry = format!("PATH={scripts}:{existing}");
470    } else {
471        // Inherit from agentd's process environment, falling back to a
472        // sensible default since PID 1 in a minimal guest may not have PATH.
473        let inherited = env::var("PATH").unwrap_or_else(|_| DEFAULT_GUEST_PATH.to_string());
474        req.env.push(format!("PATH={scripts}:{inherited}"));
475    }
476}
477
478/// Sets a file descriptor to non-blocking mode.
479fn set_nonblocking(fd: i32) -> AgentdResult<()> {
480    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
481    if flags < 0 {
482        return Err(std::io::Error::last_os_error().into());
483    }
484    let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) };
485    if ret < 0 {
486        return Err(std::io::Error::last_os_error().into());
487    }
488    Ok(())
489}
490
491/// Reads from a raw fd (non-blocking).
492fn read_from_fd(fd: i32, buf: &mut [u8]) -> std::io::Result<usize> {
493    let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
494    if n < 0 {
495        Err(std::io::Error::last_os_error())
496    } else {
497        Ok(n as usize)
498    }
499}
500
501/// Flushes the write buffer to the async fd.
502async fn flush_write_buf(fd: &AsyncFd<std::fs::File>, buf: &mut Vec<u8>) -> AgentdResult<()> {
503    while !buf.is_empty() {
504        let mut guard = fd.writable().await?;
505        match guard.try_io(|inner| write_to_fd(inner.get_ref().as_raw_fd(), buf)) {
506            Ok(Ok(n)) => {
507                buf.drain(..n);
508            }
509            Ok(Err(e)) if e.kind() == std::io::ErrorKind::Interrupted => continue,
510            Ok(Err(e)) => return Err(e.into()),
511            Err(_would_block) => continue,
512        }
513    }
514    Ok(())
515}
516
517/// Writes to a raw fd (non-blocking).
518fn write_to_fd(fd: i32, buf: &[u8]) -> std::io::Result<usize> {
519    let n = unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, buf.len()) };
520    if n < 0 {
521        Err(std::io::Error::last_os_error())
522    } else {
523        Ok(n as usize)
524    }
525}
526
527fn request_guest_poweroff() -> AgentdResult<()> {
528    unsafe {
529        libc::sync();
530    }
531
532    if crate::handoff::is_pid_1() {
533        // PID 1 mode (no handoff): remount root RO and reboot.
534        let _ = remount_root_readonly();
535        unsafe {
536            libc::sync();
537        }
538        let ret = unsafe { libc::reboot(libc::RB_POWER_OFF) };
539        if ret != 0 {
540            return Err(std::io::Error::last_os_error().into());
541        }
542        return Ok(());
543    }
544
545    // Handoff mode: ask the new init (PID 1) to shut down.
546    // SIGRTMIN+4 is systemd's poweroff signal; sysvinit-derived inits
547    // typically default-handle it as a clean exit. Either way, PID 1
548    // exiting causes the kernel to panic the guest, which the VMM
549    // observes as a clean shutdown.
550    if crate::handoff::signal_init_shutdown().is_ok() {
551        std::thread::sleep(HANDOFF_POWEROFF_TIMEOUT);
552    }
553
554    // SIGTERM fallback for inits that didn't act on SIGRTMIN+4. If
555    // both are ignored, we return Ok and let the host's outer
556    // VMM-process kill be the backstop — the VM still dies, just
557    // less gracefully.
558    let _ = crate::handoff::signal_init_term();
559    Ok(())
560}
561
562fn remount_root_readonly() -> AgentdResult<()> {
563    let target = std::ffi::CString::new("/").expect("static path contains no NUL");
564    let ret = unsafe {
565        libc::mount(
566            ptr::null(),
567            target.as_ptr(),
568            ptr::null(),
569            (libc::MS_REMOUNT | libc::MS_RDONLY) as libc::c_ulong,
570            ptr::null(),
571        )
572    };
573
574    if ret != 0 {
575        return Err(std::io::Error::last_os_error().into());
576    }
577
578    Ok(())
579}