mod handle;
mod managed;
mod tracker;
use std::collections::VecDeque;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::process::Command as TokioCommand;
use tracing::warn;
use wasmtime::component::Resource;
use crate::engine::wasm::bindings::astrid::process::host::{
self as process, EnvVar, ErrorCode, ExitInfo, ProcessHandle, ProcessResult, SpawnRequest,
};
use crate::engine::wasm::host::util;
use crate::engine::wasm::host_state::HostState;
use managed::{ManagedProcess, attach_pipes, configure_piped, prepare_sandboxed_command};
pub use tracker::ProcessTracker;
pub use managed::ManagedProcess as PublicManagedProcess;
pub(crate) const MAX_BACKGROUND_PROCESSES: usize = 8;
fn audit_process<T, E: std::fmt::Debug>(
state: &HostState,
op: &'static str,
cmd: &str,
result: &Result<T, E>,
) {
let capsule_id = state.capsule_id.as_str();
let principal = state.effective_principal();
match result {
Ok(_) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
cmd,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
cmd,
error = ?e,
"audit",
),
}
}
fn extract_call_id(state: &HostState) -> Option<String> {
state.caller_context.as_ref().and_then(|msg| {
if let astrid_events::ipc::IpcPayload::ToolExecuteRequest { call_id, .. } = &msg.payload {
Some(call_id.clone())
} else {
None
}
})
}
fn env_summary(env: &[EnvVar]) -> String {
env.iter()
.map(|e| e.key.as_str())
.collect::<Vec<_>>()
.join(",")
}
impl process::Host for HostState {
fn spawn(&mut self, request: SpawnRequest) -> Result<ProcessResult, ErrorCode> {
let workspace_root = self.workspace_root.clone();
let security = self.security.clone();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.host_semaphore.clone();
let cancel_token = self.cancel_token.clone();
let process_tracker = self.process_tracker.clone();
let call_id = extract_call_id(self);
let cmd_for_audit = request.cmd.clone();
let _env_for_audit = env_summary(&request.env);
if let Some(sec) = security {
let cmd = request.cmd.to_string();
let check = util::bounded_block_on(&handle, &semaphore, async move {
sec.check_host_process(&capsule_id, &cmd).await
});
if check.is_err() {
let result: Result<ProcessResult, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
return result;
}
} else {
let result: Result<ProcessResult, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
return result;
}
let mut sandboxed_cmd =
prepare_sandboxed_command(&request.cmd, &request.args, &workspace_root)
.map_err(|_| ErrorCode::InvalidInput)?;
sandboxed_cmd.stdout(Stdio::piped());
sandboxed_cmd.stderr(Stdio::piped());
let child = sandboxed_cmd
.spawn()
.map_err(|e| ErrorCode::Unknown(format!("spawn failed: {e}")))?;
let pid = child.id();
process_tracker.register(pid, call_id);
let output_result =
util::bounded_block_on_cancellable(&handle, &semaphore, &cancel_token, async move {
tokio::task::spawn_blocking(move || child.wait_with_output())
.await
.map_err(std::io::Error::other)
.and_then(|r| r)
});
let result: Result<ProcessResult, ErrorCode> = match output_result {
Some(Ok(output)) => {
process_tracker.unregister(pid);
Ok(ProcessResult {
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
exit: ExitInfo {
exit_code: output.status.code(),
signal: None,
},
})
},
Some(Err(e)) => {
process_tracker.unregister(pid);
Err(ErrorCode::Unknown(format!("exec failed: {e}")))
},
None => {
warn!(capsule_id = %self.capsule_id, pid, "process cancelled");
#[cfg(unix)]
if let Ok(raw) = i32::try_from(pid) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(raw),
nix::sys::signal::Signal::SIGKILL,
);
}
process_tracker.unregister(pid);
Err(ErrorCode::Cancelled)
},
};
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
result
}
fn spawn_background(
&mut self,
request: SpawnRequest,
) -> Result<Resource<ProcessHandle>, ErrorCode> {
let principal = self.effective_principal();
let profile_cap = usize::try_from(self.effective_profile().quotas.max_background_processes)
.unwrap_or(MAX_BACKGROUND_PROCESSES);
let effective_cap = profile_cap.min(MAX_BACKGROUND_PROCESSES);
let by_principal = self
.process_count_by_principal
.get(&principal)
.copied()
.unwrap_or(0);
if by_principal >= effective_cap || self.process_count_total >= MAX_BACKGROUND_PROCESSES {
return Err(ErrorCode::Quota);
}
let workspace_root = self.workspace_root.clone();
let security = self.security.clone();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.host_semaphore.clone();
let cmd_for_audit = request.cmd.clone();
if let Some(sec) = security {
let cmd = request.cmd.to_string();
let check = util::bounded_block_on(&handle, &semaphore, async move {
sec.check_host_process(&capsule_id, &cmd).await
});
if check.is_err() {
return Err(ErrorCode::CapabilityDenied);
}
} else {
return Err(ErrorCode::CapabilityDenied);
}
if self.cancel_token.is_cancelled() {
return Err(ErrorCode::Cancelled);
}
let mut sandboxed_cmd =
prepare_sandboxed_command(&request.cmd, &request.args, &workspace_root)
.map_err(|_| ErrorCode::InvalidInput)?;
configure_piped(&mut sandboxed_cmd);
let mut tokio_cmd = TokioCommand::from(sandboxed_cmd);
tokio_cmd.kill_on_drop(true);
let command_str = format!("{} {}", request.cmd, request.args.join(" "));
let child = tokio_cmd
.spawn()
.map_err(|e| ErrorCode::Unknown(format!("spawn-background failed: {e}")))?;
let stdout_buf: Arc<Mutex<VecDeque<u8>>> = Arc::new(Mutex::new(VecDeque::new()));
let stderr_buf: Arc<Mutex<VecDeque<u8>>> = Arc::new(Mutex::new(VecDeque::new()));
let mut managed = ManagedProcess {
child: Some(child),
stdout_buf: Arc::clone(&stdout_buf),
stderr_buf: Arc::clone(&stderr_buf),
command: command_str,
creator: principal.clone(),
};
let pid = managed
.child
.as_ref()
.and_then(tokio::process::Child::id)
.unwrap_or(0);
attach_pipes(&mut managed, &handle);
self.process_tracker.register(pid, None);
let res = self
.resource_table
.push(managed)
.map_err(|e| ErrorCode::Unknown(format!("resource table: {e}")))?;
self.process_count_total += 1;
*self
.process_count_by_principal
.entry(principal)
.or_insert(0) += 1;
let result: Result<Resource<ProcessHandle>, ErrorCode> = Ok(Resource::new_own(res.rep()));
audit_process(
self,
"astrid:process/host.spawn-background",
&cmd_for_audit,
&result,
);
result
}
}