use wasmtime::component::Resource;
use wasmtime_wasi::p2::DynPollable;
use super::managed::{ManagedProcess, drain_buffer, kill_and_reap};
use crate::engine::wasm::bindings::astrid::process::host::{
ErrorCode, ExitInfo, HostProcessHandle, KillResult, ProcessHandle, ProcessResult,
ProcessSignal, ReadLogsResult,
};
use crate::engine::wasm::host_state::HostState;
impl HostProcessHandle for HostState {
fn read_logs(&mut self, self_: Resource<ProcessHandle>) -> Result<ReadLogsResult, ErrorCode> {
let proc = self
.resource_table
.get_mut::<ManagedProcess>(&Resource::new_borrow(self_.rep()))
.map_err(|_| ErrorCode::Closed)?;
let (running, exit_code) = if let Some(child) = proc.child.as_mut() {
match child.try_wait() {
Ok(Some(status)) => {
proc.child.take();
(false, status.code())
},
Ok(None) => (true, None),
Err(_) => {
proc.child.take();
(false, Some(-1))
},
}
} else {
(false, None)
};
let stdout = drain_buffer(&proc.stdout_buf);
let stderr = drain_buffer(&proc.stderr_buf);
Ok(ReadLogsResult {
stdout,
stderr,
running,
exit: if running {
None
} else {
Some(ExitInfo {
exit_code,
signal: None,
})
},
})
}
fn write_stdin(
&mut self,
_self_: Resource<ProcessHandle>,
_data: Vec<u8>,
) -> Result<u32, ErrorCode> {
Err(ErrorCode::Unknown(
"ProcessHandle.write-stdin: stdin pipe storage port pending".to_string(),
))
}
fn close_stdin(&mut self, _self_: Resource<ProcessHandle>) -> Result<(), ErrorCode> {
Err(ErrorCode::Unknown(
"ProcessHandle.close-stdin: stdin pipe storage port pending".to_string(),
))
}
fn signal(
&mut self,
self_: Resource<ProcessHandle>,
sig: ProcessSignal,
) -> Result<(), ErrorCode> {
#[cfg(unix)]
{
let proc = self
.resource_table
.get::<ManagedProcess>(&Resource::new_borrow(self_.rep()))
.map_err(|_| ErrorCode::Closed)?;
let pid = proc
.child
.as_ref()
.and_then(tokio::process::Child::id)
.ok_or(ErrorCode::Closed)?;
let nix_sig = match sig {
ProcessSignal::Term => nix::sys::signal::Signal::SIGTERM,
ProcessSignal::Hup => nix::sys::signal::Signal::SIGHUP,
ProcessSignal::Usr1 => nix::sys::signal::Signal::SIGUSR1,
ProcessSignal::Usr2 => nix::sys::signal::Signal::SIGUSR2,
ProcessSignal::Int => nix::sys::signal::Signal::SIGINT,
ProcessSignal::Stop => nix::sys::signal::Signal::SIGSTOP,
ProcessSignal::Cont => nix::sys::signal::Signal::SIGCONT,
};
let raw = i32::try_from(pid).map_err(|_| ErrorCode::InvalidInput)?;
nix::sys::signal::kill(nix::unistd::Pid::from_raw(raw), nix_sig)
.map_err(|e| ErrorCode::Unknown(format!("kill({sig:?}): {e}")))?;
Ok(())
}
#[cfg(not(unix))]
{
let _ = (self_, sig);
Err(ErrorCode::Unknown(
"ProcessHandle.signal: not supported on this platform".to_string(),
))
}
}
fn kill(&mut self, self_: Resource<ProcessHandle>) -> Result<KillResult, ErrorCode> {
let proc = self
.resource_table
.get_mut::<ManagedProcess>(&Resource::new_borrow(self_.rep()))
.map_err(|_| ErrorCode::Closed)?;
let (killed, exit_code) = match proc.child.take() {
Some(mut child) => {
let code = kill_and_reap(&mut child);
(true, code)
},
None => (false, None),
};
let stdout = drain_buffer(&proc.stdout_buf);
let stderr = drain_buffer(&proc.stderr_buf);
Ok(KillResult {
killed,
exit: Some(ExitInfo {
exit_code,
signal: None,
}),
stdout,
stderr,
})
}
fn wait(
&mut self,
self_: Resource<ProcessHandle>,
timeout_ms: Option<u64>,
) -> Result<ExitInfo, ErrorCode> {
let rt = self.runtime_handle.clone();
let sem = self.blocking_semaphore.clone();
let tok = self.cancel_token.clone();
let proc = self
.resource_table
.get_mut::<ManagedProcess>(&Resource::new_borrow(self_.rep()))
.map_err(|_| ErrorCode::Closed)?;
let child = match proc.child.as_mut() {
Some(c) => c,
None => return Err(ErrorCode::Closed),
};
let result = crate::engine::wasm::host::util::bounded_block_on_cancellable(
&rt,
&sem,
&tok,
async move {
match timeout_ms {
Some(ms) => match tokio::time::timeout(
std::time::Duration::from_millis(ms),
child.wait(),
)
.await
{
Ok(Ok(status)) => Ok(status.code()),
Ok(Err(e)) => Err(ErrorCode::Unknown(format!("wait: {e}"))),
Err(_) => Err(ErrorCode::WaitTimeout),
},
None => match child.wait().await {
Ok(status) => Ok(status.code()),
Err(e) => Err(ErrorCode::Unknown(format!("wait: {e}"))),
},
}
},
);
let succeeded = matches!(result, Some(Ok(_)));
if succeeded {
proc.child.take();
}
match result {
Some(Ok(code)) => Ok(ExitInfo {
exit_code: code,
signal: None,
}),
Some(Err(e)) => Err(e),
None => Err(ErrorCode::Cancelled),
}
}
fn wait_with_output(
&mut self,
_self_: Resource<ProcessHandle>,
_timeout_ms: Option<u64>,
) -> Result<ProcessResult, ErrorCode> {
Err(ErrorCode::Unknown(
"ProcessHandle.wait-with-output: atomic drain port pending".to_string(),
))
}
fn os_pid(&mut self, self_: Resource<ProcessHandle>) -> Result<u32, ErrorCode> {
let proc = self
.resource_table
.get::<ManagedProcess>(&Resource::new_borrow(self_.rep()))
.map_err(|_| ErrorCode::Closed)?;
proc.child
.as_ref()
.and_then(tokio::process::Child::id)
.ok_or(ErrorCode::Closed)
}
fn subscribe_exit(&mut self, _self_: Resource<ProcessHandle>) -> Resource<DynPollable> {
super::super::stubs::always_ready_pollable(&mut self.resource_table)
}
fn subscribe_logs(&mut self, _self_: Resource<ProcessHandle>) -> Resource<DynPollable> {
super::super::stubs::always_ready_pollable(&mut self.resource_table)
}
fn drop(&mut self, rep: Resource<ProcessHandle>) -> wasmtime::Result<()> {
if let Ok(managed) = self
.resource_table
.delete::<ManagedProcess>(Resource::new_own(rep.rep()))
{
if let Some(pid) = managed.child.as_ref().and_then(tokio::process::Child::id) {
self.process_tracker.unregister(pid);
}
self.process_count_total = self.process_count_total.saturating_sub(1);
if let Some(count) = self.process_count_by_principal.get_mut(&managed.creator) {
*count = count.saturating_sub(1);
if *count == 0 {
self.process_count_by_principal.remove(&managed.creator);
}
}
drop(managed);
}
Ok(())
}
}