mod handle;
mod managed;
mod persistent;
mod tracker;
use std::collections::VecDeque;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::process::Command as TokioCommand;
use tracing::warn;
use wasmtime::component::Resource;
use crate::engine::wasm::bindings::astrid::process::host::{
self as process, EnvVar, ErrorCode, ExitInfo, LogChunk, LogCursor, LogStream, ProcessHandle,
ProcessInfo, ProcessResult, ProcessSignal, ReadLogsResult, SpawnRequest,
};
use crate::engine::wasm::host::util;
use crate::engine::wasm::host_state::HostState;
use managed::{ManagedProcess, attach_pipes, configure_piped, prepare_sandboxed_command};
pub use persistent::PersistentProcessRegistry;
pub use tracker::ProcessTracker;
pub use managed::ManagedProcess as PublicManagedProcess;
pub(crate) const MAX_BACKGROUND_PROCESSES: usize = 8;
const MAX_SPAWN_STDIN_BYTES: usize = 4 * 1024 * 1024;
fn audit_process<T, E: std::fmt::Debug>(
state: &HostState,
op: &'static str,
cmd: &str,
result: &Result<T, E>,
) {
let capsule_id = state.capsule_id.as_str();
let principal = state.effective_principal();
match result {
Ok(_) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
cmd,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
cmd,
error = ?e,
"audit",
),
}
}
fn audit_process_id<T, E: std::fmt::Debug>(
state: &HostState,
op: &'static str,
id: &str,
result: &Result<T, E>,
) {
let id_hash = blake3::hash(id.as_bytes()).to_hex();
let id = &id_hash[..16];
let capsule_id = state.capsule_id.as_str();
let principal = state.effective_principal();
match result {
Ok(_) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
id,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.process",
%capsule_id,
%principal,
fn = op,
id,
error = ?e,
"audit",
),
}
}
fn extract_call_id(state: &HostState) -> Option<String> {
state.caller_context.as_ref().and_then(|msg| {
if let astrid_events::ipc::IpcPayload::ToolExecuteRequest { call_id, .. } = &msg.payload {
Some(call_id.clone())
} else {
None
}
})
}
fn env_summary(env: &[EnvVar]) -> String {
env.iter()
.map(|e| e.key.as_str())
.collect::<Vec<_>>()
.join(",")
}
fn authenticated_principal(state: &HostState) -> Option<astrid_core::principal::PrincipalId> {
state
.caller_context
.as_ref()
.and_then(|m| m.principal.as_deref())
.and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
}
fn build_persistent_child(
request: &SpawnRequest,
workspace_root: &std::path::Path,
want_stdin: bool,
) -> Result<tokio::process::Child, ErrorCode> {
let mut sandboxed = prepare_sandboxed_command(&request.cmd, &request.args, workspace_root)
.map_err(|_| ErrorCode::InvalidInput)?;
configure_piped(&mut sandboxed);
if want_stdin {
sandboxed.stdin(Stdio::piped());
} else {
sandboxed.stdin(Stdio::null());
}
let mut tokio_cmd = TokioCommand::from(sandboxed);
tokio_cmd.kill_on_drop(true);
tokio_cmd
.spawn()
.map_err(|e| ErrorCode::Unknown(format!("spawn-persistent failed: {e}")))
}
impl process::Host for HostState {
fn spawn(&mut self, request: SpawnRequest) -> Result<ProcessResult, ErrorCode> {
let workspace_root = self.workspace_root.clone();
let security = self.security.clone();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.blocking_semaphore.clone();
let cancel_token = self.cancel_token.clone();
let process_tracker = self.process_tracker.clone();
let call_id = extract_call_id(self);
let cmd_for_audit = request.cmd.clone();
let _env_for_audit = env_summary(&request.env);
if let Some(sec) = security {
let cmd = request.cmd.to_string();
let check = util::bounded_block_on(&handle, &semaphore, async move {
sec.check_host_process(&capsule_id, &cmd).await
});
if check.is_err() {
let result: Result<ProcessResult, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
return result;
}
} else {
let result: Result<ProcessResult, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
return result;
}
let mut sandboxed_cmd =
prepare_sandboxed_command(&request.cmd, &request.args, &workspace_root)
.map_err(|_| ErrorCode::InvalidInput)?;
sandboxed_cmd.stdout(Stdio::piped());
sandboxed_cmd.stderr(Stdio::piped());
let child = sandboxed_cmd
.spawn()
.map_err(|e| ErrorCode::Unknown(format!("spawn failed: {e}")))?;
let pid = child.id();
process_tracker.register(pid, call_id);
let output_result =
util::bounded_block_on_cancellable(&handle, &semaphore, &cancel_token, async move {
tokio::task::spawn_blocking(move || child.wait_with_output())
.await
.map_err(std::io::Error::other)
.and_then(|r| r)
});
let result: Result<ProcessResult, ErrorCode> = match output_result {
Some(Ok(output)) => {
process_tracker.unregister(pid);
Ok(ProcessResult {
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
exit: ExitInfo {
exit_code: output.status.code(),
signal: None,
},
})
},
Some(Err(e)) => {
process_tracker.unregister(pid);
Err(ErrorCode::Unknown(format!("exec failed: {e}")))
},
None => {
warn!(capsule_id = %self.capsule_id, pid, "process cancelled");
#[cfg(unix)]
if let Ok(raw) = i32::try_from(pid) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(raw),
nix::sys::signal::Signal::SIGKILL,
);
}
process_tracker.unregister(pid);
Err(ErrorCode::Cancelled)
},
};
audit_process(self, "astrid:process/host.spawn", &cmd_for_audit, &result);
result
}
fn spawn_background(
&mut self,
request: SpawnRequest,
) -> Result<Resource<ProcessHandle>, ErrorCode> {
let principal = self.effective_principal();
let profile_cap = usize::try_from(self.effective_profile().quotas.max_background_processes)
.unwrap_or(MAX_BACKGROUND_PROCESSES);
let effective_cap = profile_cap.min(MAX_BACKGROUND_PROCESSES);
let by_principal = self
.process_count_by_principal
.get(&principal)
.copied()
.unwrap_or(0);
let persistent_live = self.persistent_processes.live_count(&principal);
if by_principal + persistent_live >= effective_cap
|| self.process_count_total >= MAX_BACKGROUND_PROCESSES
{
return Err(ErrorCode::Quota);
}
let workspace_root = self.workspace_root.clone();
let security = self.security.clone();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.blocking_semaphore.clone();
let cmd_for_audit = request.cmd.clone();
if let Some(sec) = security {
let cmd = request.cmd.to_string();
let check = util::bounded_block_on(&handle, &semaphore, async move {
sec.check_host_process(&capsule_id, &cmd).await
});
if check.is_err() {
return Err(ErrorCode::CapabilityDenied);
}
} else {
return Err(ErrorCode::CapabilityDenied);
}
if self.cancel_token.is_cancelled() {
return Err(ErrorCode::Cancelled);
}
let mut sandboxed_cmd =
prepare_sandboxed_command(&request.cmd, &request.args, &workspace_root)
.map_err(|_| ErrorCode::InvalidInput)?;
configure_piped(&mut sandboxed_cmd);
let mut tokio_cmd = TokioCommand::from(sandboxed_cmd);
tokio_cmd.kill_on_drop(true);
let command_str = format!("{} {}", request.cmd, request.args.join(" "));
let child = tokio_cmd
.spawn()
.map_err(|e| ErrorCode::Unknown(format!("spawn-background failed: {e}")))?;
let stdout_buf: Arc<Mutex<VecDeque<u8>>> = Arc::new(Mutex::new(VecDeque::new()));
let stderr_buf: Arc<Mutex<VecDeque<u8>>> = Arc::new(Mutex::new(VecDeque::new()));
let mut managed = ManagedProcess {
child: Some(child),
stdout_buf: Arc::clone(&stdout_buf),
stderr_buf: Arc::clone(&stderr_buf),
command: command_str,
creator: principal.clone(),
};
let pid = managed
.child
.as_ref()
.and_then(tokio::process::Child::id)
.unwrap_or(0);
attach_pipes(&mut managed, &handle);
self.process_tracker.register(pid, None);
let res = self
.resource_table
.push(managed)
.map_err(|e| ErrorCode::Unknown(format!("resource table: {e}")))?;
self.process_count_total += 1;
*self
.process_count_by_principal
.entry(principal)
.or_insert(0) += 1;
let result: Result<Resource<ProcessHandle>, ErrorCode> = Ok(Resource::new_own(res.rep()));
audit_process(
self,
"astrid:process/host.spawn-background",
&cmd_for_audit,
&result,
);
result
}
fn spawn_persistent(&mut self, request: SpawnRequest) -> Result<String, ErrorCode> {
let cmd_for_audit = request.cmd.clone();
let handle = self.runtime_handle.clone();
let semaphore = self.blocking_semaphore.clone();
let Some(sec) = self.security.clone() else {
let result: Result<String, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
};
{
let cmd = request.cmd.to_string();
let cid = self.capsule_id.as_str().to_owned();
let check = util::bounded_block_on(&handle, &semaphore, async move {
sec.check_host_process(&cid, &cmd).await
});
if check.is_err() {
let result: Result<String, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
}
}
if !self
.capability_names
.iter()
.any(|c| c == "allow_persistent")
{
let result: Result<String, ErrorCode> = Err(ErrorCode::CapabilityDenied);
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
}
let Some(principal) = authenticated_principal(self) else {
return Err(ErrorCode::PersistUnsupported);
};
if request.idle_timeout_ms == Some(0) {
return Err(ErrorCode::InvalidInput);
}
if self.cancel_token.is_cancelled() {
return Err(ErrorCode::Cancelled);
}
let capsule_id_arc: Arc<str> = Arc::from(self.capsule_id.as_str());
let workspace_root = self.workspace_root.clone();
let concurrent_cap =
usize::try_from(self.effective_profile().quotas.max_background_processes)
.unwrap_or(MAX_BACKGROUND_PROCESSES)
.min(MAX_BACKGROUND_PROCESSES);
let ephemeral_used = self
.process_count_by_principal
.get(&principal)
.copied()
.unwrap_or(0);
let effective_cap = concurrent_cap.saturating_sub(ephemeral_used);
if request
.stdin
.as_ref()
.is_some_and(|s| s.len() > MAX_SPAWN_STDIN_BYTES)
{
let result: Result<String, ErrorCode> = Err(ErrorCode::TooLarge);
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
}
let want_stdin = request.keep_stdin_open.unwrap_or(false) || request.stdin.is_some();
let mut child = match build_persistent_child(&request, &workspace_root, want_stdin) {
Ok(c) => c,
Err(e) => {
let result: Result<String, ErrorCode> = Err(e);
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
},
};
let Some(os_pid) = child.id().filter(|&p| p != 0) else {
let result: Result<String, ErrorCode> = Err(ErrorCode::Unknown(
"spawn-persistent: child has no usable pid".to_string(),
));
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
};
let (Some(stdout), Some(stderr)) = (child.stdout.take(), child.stderr.take()) else {
return Err(ErrorCode::Unknown(
"spawn-persistent: missing stdio pipes".to_string(),
));
};
let mut stdin = child.stdin.take();
if let (Some(prelude), Some(pipe)) = (request.stdin.clone(), stdin.take()) {
let (pipe, write_res) = util::bounded_block_on(&handle, &semaphore, async move {
use tokio::io::AsyncWriteExt as _;
let mut pipe = pipe;
let r = pipe.write_all(&prelude).await;
(pipe, r)
});
if write_res.is_err() {
let result: Result<String, ErrorCode> = Err(ErrorCode::Unknown(
"spawn-persistent: stdin prelude write failed".to_string(),
));
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
return result;
}
stdin = Some(pipe);
}
let stdin_for_registry = if request.keep_stdin_open.unwrap_or(false) {
stdin
} else {
None
};
let command = format!("{} {}", request.cmd, request.args.join(" "));
let result = self.persistent_processes.spawn(persistent::SpawnParams {
creator: principal,
capsule_id: capsule_id_arc,
command,
os_pid,
child,
stdout,
stderr,
stdin: stdin_for_registry,
concurrent_cap: effective_cap,
label: request.label.clone(),
overflow: request.overflow,
log_ring_bytes: request.log_ring_bytes,
max_lifetime_ms: request.max_lifetime_ms,
idle_timeout_ms: request.idle_timeout_ms,
exit_retention_ms: request.exit_retention_ms,
});
audit_process(
self,
"astrid:process/host.spawn-persistent",
&cmd_for_audit,
&result,
);
result
}
fn attach(&mut self, id: String) -> Result<Resource<ProcessHandle>, ErrorCode> {
let result: Result<Resource<ProcessHandle>, ErrorCode> = Err(ErrorCode::Unknown(
"attach: resource-handle materialisation pending — use the id-keyed ops".to_string(),
));
audit_process_id(self, "astrid:process/host.attach", &id, &result);
result
}
fn list_processes(
&mut self,
label_filter: Option<String>,
) -> Result<Vec<ProcessInfo>, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result =
Ok(self
.persistent_processes
.list(&principal, &capsule_id, label_filter.as_deref()));
audit_process(
self,
"astrid:process/host.list-processes",
label_filter.as_deref().unwrap_or("*"),
&result,
);
result
}
fn status(&mut self, id: String) -> Result<ProcessInfo, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self
.persistent_processes
.status(&id, &principal, &capsule_id);
audit_process_id(self, "astrid:process/host.status", &id, &result);
result
}
fn status_many(&mut self, ids: Vec<String>) -> Result<Vec<ProcessInfo>, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = Ok(self
.persistent_processes
.status_many(&ids, &principal, &capsule_id));
audit_process(
self,
"astrid:process/host.status-many",
&format!("{} ids", ids.len()),
&result,
);
result
}
fn read_logs(&mut self, id: String) -> Result<ReadLogsResult, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self
.persistent_processes
.read_logs(&id, &principal, &capsule_id);
audit_process_id(self, "astrid:process/host.read-logs", &id, &result);
result
}
fn read_since(
&mut self,
id: String,
which_stream: LogStream,
cursor: LogCursor,
max_bytes: u32,
) -> Result<LogChunk, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self.persistent_processes.read_since(
&id,
&principal,
&capsule_id,
which_stream,
&cursor,
max_bytes,
);
audit_process_id(self, "astrid:process/host.read-since", &id, &result);
result
}
fn write_stdin(&mut self, id: String, data: Vec<u8>) -> Result<u32, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.io_semaphore.clone();
let registry = self.persistent_processes.clone();
let id_for_audit = id.clone();
let result = util::bounded_block_on(&handle, &semaphore, async move {
registry
.write_stdin(&id, &principal, &capsule_id, &data)
.await
});
audit_process_id(
self,
"astrid:process/host.write-stdin",
&id_for_audit,
&result,
);
result
}
fn close_stdin(&mut self, id: String) -> Result<(), ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self
.persistent_processes
.close_stdin(&id, &principal, &capsule_id);
audit_process_id(self, "astrid:process/host.close-stdin", &id, &result);
result
}
fn signal(&mut self, id: String, sig: ProcessSignal) -> Result<(), ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self
.persistent_processes
.signal(&id, &principal, &capsule_id, sig);
audit_process_id(self, "astrid:process/host.signal", &id, &result);
result
}
fn wait(&mut self, id: String, timeout_ms: u64) -> Result<ExitInfo, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.blocking_semaphore.clone();
let cancel = self.cancel_token.clone();
let registry = self.persistent_processes.clone();
let timeout = std::time::Duration::from_millis(timeout_ms);
let id_for_audit = id.clone();
let result = util::bounded_block_on_cancellable(&handle, &semaphore, &cancel, async move {
registry.wait(&id, &principal, &capsule_id, timeout).await
})
.unwrap_or(Err(ErrorCode::Cancelled));
audit_process_id(self, "astrid:process/host.wait", &id_for_audit, &result);
result
}
fn stop(&mut self, id: String, grace_ms: Option<u64>) -> Result<ExitInfo, ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let handle = self.runtime_handle.clone();
let semaphore = self.blocking_semaphore.clone();
let cancel = self.cancel_token.clone();
let registry = self.persistent_processes.clone();
let grace = grace_ms.map(std::time::Duration::from_millis);
let id_for_audit = id.clone();
let result = util::bounded_block_on_cancellable(&handle, &semaphore, &cancel, async move {
registry.stop(&id, &principal, &capsule_id, grace).await
})
.unwrap_or(Err(ErrorCode::Cancelled));
audit_process_id(self, "astrid:process/host.stop", &id_for_audit, &result);
result
}
fn release_process(&mut self, id: String) -> Result<(), ErrorCode> {
let principal = self.effective_principal();
let capsule_id = self.capsule_id.as_str().to_owned();
let result = self
.persistent_processes
.release(&id, &principal, &capsule_id);
audit_process_id(self, "astrid:process/host.release-process", &id, &result);
result
}
fn watch(&mut self, id: String, _suffix: Option<String>) -> Result<(), ErrorCode> {
let result: Result<(), ErrorCode> = Err(ErrorCode::Unknown(
"watch: host lifecycle events deferred (publish-authority — RFC host_abi)".to_string(),
));
audit_process_id(self, "astrid:process/host.watch", &id, &result);
result
}
fn unwatch(&mut self, id: String) -> Result<(), ErrorCode> {
let result: Result<(), ErrorCode> = Ok(());
audit_process_id(self, "astrid:process/host.unwatch", &id, &result);
result
}
}