1use std::collections::HashMap;
4use std::fs::OpenOptions;
5use std::os::fd::AsRawFd;
6use std::{env, ptr};
7
8use chrono::Utc;
9use tokio::io::unix::AsyncFd;
10use tokio::sync::mpsc;
11use tokio::time::{self, Duration};
12
13use microsandbox_protocol::HANDOFF_POWEROFF_TIMEOUT;
14use microsandbox_protocol::codec::{self, MAX_FRAME_SIZE};
15use microsandbox_protocol::core::{Ready, RelayClientDisconnected};
16use microsandbox_protocol::exec::{
17 ExecExited, ExecFailed, ExecFailureKind, ExecRequest, ExecResize, ExecSignal, ExecStarted,
18 ExecStderr, ExecStdin, ExecStdinError, ExecStdout,
19};
20use microsandbox_protocol::fs::{FsData, FsRequest};
21use microsandbox_protocol::message::{Message, MessageType};
22
23use crate::config::AgentdConfig;
24use crate::error::{AgentdError, AgentdResult};
25use crate::fs::{FsReadSession, FsState, FsStreamSession, FsWriteSession};
26use crate::serial::AGENT_PORT_NAME;
27use crate::session::{ExecSession, SessionOutput};
28use crate::{clock, fs, heartbeat, serial};
29
30const HEARTBEAT_INTERVAL_SECS: u64 = 1;
39
40const SERIAL_READ_BUF_SIZE: usize = 64 * 1024;
42
43const MAX_INPUT_BUF_SIZE: usize = MAX_FRAME_SIZE as usize + 4;
45
46#[derive(Default)]
51struct AgentState {
52 sessions: HashMap<u32, ExecSession>,
53 write_sessions: HashMap<u32, FsWriteSession>,
54 read_sessions: HashMap<u32, FsReadSession>,
55 fs: FsState,
56}
57
58pub async fn run(boot_time_ns: u64, init_time_ns: u64, config: &AgentdConfig) -> AgentdResult<()> {
70 let port_path = serial::find_serial_port(AGENT_PORT_NAME)?;
72
73 let port_file = OpenOptions::new().read(true).write(true).open(&port_path)?;
76
77 let port_fd = port_file.as_raw_fd();
79 set_nonblocking(port_fd)?;
80
81 let async_port = AsyncFd::new(port_file)?;
83
84 let mut read_buf = vec![0u8; SERIAL_READ_BUF_SIZE];
86 let mut serial_in_buf = Vec::new();
87 let mut serial_out_buf = Vec::new();
88
89 let mut state = AgentState::default();
90
91 let (session_tx, mut session_rx) = mpsc::unbounded_channel::<(u32, SessionOutput)>();
93
94 let mut last_activity = Utc::now();
96 let mut heartbeat_timer = time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS));
97
98 let ready_time_ns = clock::boottime_ns();
100 let ready_msg = Message::with_payload(
101 MessageType::Ready,
102 0,
103 &Ready {
104 boot_time_ns,
105 init_time_ns,
106 ready_time_ns,
107 },
108 )
109 .map_err(|e| AgentdError::ExecSession(format!("encode ready: {e}")))?;
110 codec::encode_to_buf(&ready_msg, &mut serial_out_buf)
111 .map_err(|e| AgentdError::ExecSession(format!("encode ready frame: {e}")))?;
112 flush_write_buf(&async_port, &mut serial_out_buf).await?;
113
114 'agent: loop {
116 tokio::select! {
117 result = async_port.readable() => {
119 let Ok(mut guard) = result else {
120 break;
121 };
122
123 loop {
124 match guard.try_io(|inner| read_from_fd(inner.get_ref().as_raw_fd(), &mut read_buf)) {
125 Ok(Ok(0)) => {
126 break 'agent;
128 }
129 Ok(Ok(n)) => {
130 serial_in_buf.extend_from_slice(&read_buf[..n]);
131 last_activity = Utc::now();
132
133 if serial_in_buf.len() > MAX_INPUT_BUF_SIZE {
135 return Err(AgentdError::ExecSession(
136 "serial input buffer exceeded maximum size".into(),
137 ));
138 }
139
140 while let Some(msg) = codec::try_decode_from_buf(&mut serial_in_buf)
142 .map_err(|e| AgentdError::ExecSession(format!("decode: {e}")))?
143 {
144 handle_message(
145 msg,
146 &mut state,
147 &session_tx,
148 &mut serial_out_buf,
149 config,
150 ).await?;
151 }
152
153 if !serial_out_buf.is_empty() {
155 flush_write_buf(&async_port, &mut serial_out_buf).await?;
156 }
157 }
158 Ok(Err(e)) if e.kind() == std::io::ErrorKind::Interrupted => continue,
159 Ok(Err(e)) => return Err(e.into()),
160 Err(_would_block) => break,
161 }
162 }
163 }
164
165 Some((id, output)) = session_rx.recv() => {
167 match output {
168 SessionOutput::Stdout(data) => {
169 let msg = Message::with_payload(MessageType::ExecStdout, id, &ExecStdout { data })
170 .map_err(|e| AgentdError::ExecSession(format!("encode stdout: {e}")))?;
171 codec::encode_to_buf(&msg, &mut serial_out_buf)
172 .map_err(|e| AgentdError::ExecSession(format!("encode stdout frame: {e}")))?;
173 }
174 SessionOutput::Stderr(data) => {
175 let msg = Message::with_payload(MessageType::ExecStderr, id, &ExecStderr { data })
176 .map_err(|e| AgentdError::ExecSession(format!("encode stderr: {e}")))?;
177 codec::encode_to_buf(&msg, &mut serial_out_buf)
178 .map_err(|e| AgentdError::ExecSession(format!("encode stderr frame: {e}")))?;
179 }
180 SessionOutput::Exited(code) => {
181 let msg = Message::with_payload(MessageType::ExecExited, id, &ExecExited { code })
182 .map_err(|e| AgentdError::ExecSession(format!("encode exited: {e}")))?;
183 codec::encode_to_buf(&msg, &mut serial_out_buf)
184 .map_err(|e| AgentdError::ExecSession(format!("encode exited frame: {e}")))?;
185 state.sessions.remove(&id);
186 }
187 SessionOutput::Raw(frame_bytes) => {
188 remove_completed_fs_read(&frame_bytes, &mut state.read_sessions);
189 serial_out_buf.extend_from_slice(&frame_bytes);
191 }
192 }
193
194 if !serial_out_buf.is_empty() {
195 flush_write_buf(&async_port, &mut serial_out_buf).await?;
196 }
197 }
198
199 _ = heartbeat_timer.tick() => {
201 if heartbeat::heartbeat_dir_exists() {
202 let _ = heartbeat::write_heartbeat(
203 state.sessions.len() as u32,
204 last_activity,
205 ).await;
206 }
207 }
208 }
209 }
210
211 Ok(())
212}
213
214async fn handle_message(
220 msg: Message,
221 state: &mut AgentState,
222 session_tx: &mpsc::UnboundedSender<(u32, SessionOutput)>,
223 out_buf: &mut Vec<u8>,
224 config: &AgentdConfig,
225) -> AgentdResult<()> {
226 match msg.t {
227 MessageType::ExecRequest => {
228 let mut req: ExecRequest = msg
229 .payload()
230 .map_err(|e| AgentdError::ExecSession(format!("decode exec request: {e}")))?;
231 prepend_scripts_to_path(&mut req);
232 match ExecSession::spawn(msg.id, &req, session_tx.clone(), config.user.as_deref()) {
233 Ok(session) => {
234 let reply = Message::with_payload(
235 MessageType::ExecStarted,
236 msg.id,
237 &ExecStarted { pid: session.pid() },
238 )
239 .map_err(|e| AgentdError::ExecSession(format!("encode started: {e}")))?;
240 codec::encode_to_buf(&reply, out_buf).map_err(|e| {
241 AgentdError::ExecSession(format!("encode started frame: {e}"))
242 })?;
243 state.sessions.insert(msg.id, session);
244 }
245 Err(e) => {
246 let payload = match &e {
252 AgentdError::ExecSpawnFailed(p) => p.clone(),
253 other => ExecFailed {
254 kind: ExecFailureKind::Other,
255 errno: None,
256 errno_name: None,
257 message: other.to_string(),
258 stage: None,
259 },
260 };
261 let reply = Message::with_payload(MessageType::ExecFailed, msg.id, &payload)
262 .map_err(|e| AgentdError::ExecSession(format!("encode failed: {e}")))?;
263 codec::encode_to_buf(&reply, out_buf).map_err(|e| {
264 AgentdError::ExecSession(format!("encode failed frame: {e}"))
265 })?;
266 eprintln!("failed to spawn exec session {}: {e}", msg.id);
267 }
268 }
269 }
270
271 MessageType::ExecStdin => {
272 let stdin: ExecStdin = msg
273 .payload()
274 .map_err(|e| AgentdError::ExecSession(format!("decode stdin: {e}")))?;
275 if let Some(session) = state.sessions.get_mut(&msg.id) {
276 if stdin.data.is_empty() {
277 session.close_stdin();
279 } else if let Err(e) = session.write_stdin(&stdin.data).await {
280 let payload = stdin_error_payload(&e);
281 eprintln!("stdin write error on session {}: {e}", msg.id);
282 let reply =
283 Message::with_payload(MessageType::ExecStdinError, msg.id, &payload)
284 .map_err(|e| {
285 AgentdError::ExecSession(format!("encode stdin error: {e}"))
286 })?;
287 codec::encode_to_buf(&reply, out_buf).map_err(|e| {
288 AgentdError::ExecSession(format!("encode stdin error frame: {e}"))
289 })?;
290 }
291 }
292 }
293
294 MessageType::ExecResize => {
295 let resize: ExecResize = msg
296 .payload()
297 .map_err(|e| AgentdError::ExecSession(format!("decode resize: {e}")))?;
298 if let Some(session) = state.sessions.get(&msg.id) {
299 let _ = session.resize(resize.rows, resize.cols);
300 }
301 }
302
303 MessageType::ExecSignal => {
304 let signal: ExecSignal = msg
305 .payload()
306 .map_err(|e| AgentdError::ExecSession(format!("decode signal: {e}")))?;
307 if let Some(session) = state.sessions.get(&msg.id) {
308 let _ = session.send_signal(signal.signal);
309 }
310 }
311
312 MessageType::FsRequest => {
313 let req: FsRequest = msg
314 .payload()
315 .map_err(|e| AgentdError::ExecSession(format!("decode fs request: {e}")))?;
316 match fs::handle_fs_request(msg.id, req, &mut state.fs, out_buf, session_tx).await {
317 Ok(Some(FsStreamSession::Read(rs))) => {
318 state.read_sessions.insert(msg.id, rs);
319 }
320 Ok(Some(FsStreamSession::Write(ws))) => {
321 state.write_sessions.insert(msg.id, ws);
322 }
323 Ok(None) => {}
324 Err(e) => {
325 eprintln!("fs request error for {}: {e}", msg.id);
326 }
327 }
328 }
329
330 MessageType::FsData => {
331 let data: FsData = msg
332 .payload()
333 .map_err(|e| AgentdError::ExecSession(format!("decode fs data: {e}")))?;
334 if let Some(session) = state.write_sessions.get_mut(&msg.id) {
335 match fs::handle_fs_data(msg.id, data, session, out_buf).await {
336 Ok(true) => {
337 state.write_sessions.remove(&msg.id);
339 }
340 Ok(false) => {}
341 Err(e) => {
342 eprintln!("fs data error for {}: {e}", msg.id);
343 state.write_sessions.remove(&msg.id);
344 }
345 }
346 } else {
347 let resp = microsandbox_protocol::fs::FsResponse {
349 ok: false,
350 error: Some(format!("unknown write session: {}", msg.id)),
351 data: None,
352 };
353 let reply = Message::with_payload(MessageType::FsResponse, msg.id, &resp)
354 .map_err(|e| AgentdError::ExecSession(format!("encode fs error: {e}")))?;
355 codec::encode_to_buf(&reply, out_buf)
356 .map_err(|e| AgentdError::ExecSession(format!("encode fs error frame: {e}")))?;
357 }
358 }
359
360 MessageType::RelayClientDisconnected => {
361 let disconnected: RelayClientDisconnected = msg
362 .payload()
363 .map_err(|e| AgentdError::ExecSession(format!("decode relay disconnect: {e}")))?;
364 state
365 .fs
366 .close_owner_range(disconnected.id_start, disconnected.id_end_exclusive);
367 abort_read_sessions_in_owner_range(
368 &mut state.read_sessions,
369 disconnected.id_start,
370 disconnected.id_end_exclusive,
371 );
372 state.write_sessions.retain(|_, session| {
373 let owner_id = session.owner_id();
374 owner_id < disconnected.id_start || owner_id >= disconnected.id_end_exclusive
375 });
376 }
377
378 MessageType::Shutdown => {
379 for (_, session) in state.sessions.drain() {
383 let _ = session.send_signal(15); }
385 state.write_sessions.clear();
386 state.fs.clear();
387
388 request_guest_poweroff()?;
389 return Err(AgentdError::Shutdown);
390 }
391
392 _ => {
393 }
395 }
396
397 Ok(())
398}
399
400const DEFAULT_GUEST_PATH: &str = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
406
407fn remove_completed_fs_read(frame_bytes: &[u8], read_sessions: &mut HashMap<u32, FsReadSession>) {
408 let mut buf = frame_bytes.to_vec();
409 let Ok(Some(msg)) = codec::try_decode_from_buf(&mut buf) else {
410 return;
411 };
412 if msg.t == MessageType::FsResponse {
413 read_sessions.remove(&msg.id);
414 }
415}
416
417fn abort_read_sessions_in_owner_range(
418 read_sessions: &mut HashMap<u32, FsReadSession>,
419 id_start: u32,
420 id_end_exclusive: u32,
421) {
422 let mut retained = HashMap::new();
423 for (id, session) in read_sessions.drain() {
424 let owner_id = session.owner_id();
425 if owner_id >= id_start && owner_id < id_end_exclusive {
426 session.abort();
427 } else {
428 retained.insert(id, session);
429 }
430 }
431 *read_sessions = retained;
432}
433
434fn stdin_error_payload(err: &AgentdError) -> ExecStdinError {
436 let io_err = match err {
437 AgentdError::Io(e) => Some(e),
438 _ => None,
439 };
440 let errno = io_err.and_then(|e| e.raw_os_error());
441 ExecStdinError {
442 errno,
443 errno_name: errno.and_then(errno_name),
444 message: err.to_string(),
445 }
446}
447
448fn errno_name(code: i32) -> Option<String> {
451 let name = match code {
452 libc::EPIPE => "EPIPE",
453 libc::EBADF => "EBADF",
454 libc::EINVAL => "EINVAL",
455 libc::EIO => "EIO",
456 libc::ENOSPC => "ENOSPC",
457 libc::EFBIG => "EFBIG",
458 _ => return None,
459 };
460 Some(name.to_string())
461}
462
463fn prepend_scripts_to_path(req: &mut microsandbox_protocol::exec::ExecRequest) {
464 let scripts = microsandbox_protocol::SCRIPTS_PATH;
465
466 if let Some(entry) = req.env.iter_mut().find(|e| e.starts_with("PATH=")) {
468 let existing = &entry["PATH=".len()..];
469 *entry = format!("PATH={scripts}:{existing}");
470 } else {
471 let inherited = env::var("PATH").unwrap_or_else(|_| DEFAULT_GUEST_PATH.to_string());
474 req.env.push(format!("PATH={scripts}:{inherited}"));
475 }
476}
477
478fn set_nonblocking(fd: i32) -> AgentdResult<()> {
480 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
481 if flags < 0 {
482 return Err(std::io::Error::last_os_error().into());
483 }
484 let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) };
485 if ret < 0 {
486 return Err(std::io::Error::last_os_error().into());
487 }
488 Ok(())
489}
490
491fn read_from_fd(fd: i32, buf: &mut [u8]) -> std::io::Result<usize> {
493 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
494 if n < 0 {
495 Err(std::io::Error::last_os_error())
496 } else {
497 Ok(n as usize)
498 }
499}
500
501async fn flush_write_buf(fd: &AsyncFd<std::fs::File>, buf: &mut Vec<u8>) -> AgentdResult<()> {
503 while !buf.is_empty() {
504 let mut guard = fd.writable().await?;
505 match guard.try_io(|inner| write_to_fd(inner.get_ref().as_raw_fd(), buf)) {
506 Ok(Ok(n)) => {
507 buf.drain(..n);
508 }
509 Ok(Err(e)) if e.kind() == std::io::ErrorKind::Interrupted => continue,
510 Ok(Err(e)) => return Err(e.into()),
511 Err(_would_block) => continue,
512 }
513 }
514 Ok(())
515}
516
517fn write_to_fd(fd: i32, buf: &[u8]) -> std::io::Result<usize> {
519 let n = unsafe { libc::write(fd, buf.as_ptr() as *const libc::c_void, buf.len()) };
520 if n < 0 {
521 Err(std::io::Error::last_os_error())
522 } else {
523 Ok(n as usize)
524 }
525}
526
527fn request_guest_poweroff() -> AgentdResult<()> {
528 unsafe {
529 libc::sync();
530 }
531
532 if crate::handoff::is_pid_1() {
533 let _ = remount_root_readonly();
535 unsafe {
536 libc::sync();
537 }
538 let ret = unsafe { libc::reboot(libc::RB_POWER_OFF) };
539 if ret != 0 {
540 return Err(std::io::Error::last_os_error().into());
541 }
542 return Ok(());
543 }
544
545 if crate::handoff::signal_init_shutdown().is_ok() {
551 std::thread::sleep(HANDOFF_POWEROFF_TIMEOUT);
552 }
553
554 let _ = crate::handoff::signal_init_term();
559 Ok(())
560}
561
562fn remount_root_readonly() -> AgentdResult<()> {
563 let target = std::ffi::CString::new("/").expect("static path contains no NUL");
564 let ret = unsafe {
565 libc::mount(
566 ptr::null(),
567 target.as_ptr(),
568 ptr::null(),
569 (libc::MS_REMOUNT | libc::MS_RDONLY) as libc::c_ulong,
570 ptr::null(),
571 )
572 };
573
574 if ret != 0 {
575 return Err(std::io::Error::last_os_error().into());
576 }
577
578 Ok(())
579}