use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, watch};
use agent_os_sidecar::protocol::{
CloseStdinRequest, EventPayload, ExecuteRequest, KillProcessRequest, OwnershipScope,
ProcessSnapshotStatus, RejectedResponse, RequestPayload, ResponsePayload, StreamChannel,
WriteStdinRequest,
};
use crate::agent_os::{AgentOs, ProcessEntry};
use crate::command_line::resolve_exec_command;
use crate::error::ClientError;
use crate::stream::{ByteStream, Subscription};
const PROCESS_STREAM_CAPACITY: usize = 1024;
pub(crate) const SYNTHETIC_PID_BASE: u64 = 1_000_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TimingMitigation {
#[default]
Off,
Freeze,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StdinInput {
Text(String),
Bytes(Vec<u8>),
}
pub type OutputCallback = Box<dyn FnMut(&[u8]) + Send>;
#[derive(Default)]
pub struct ExecOptions {
pub env: BTreeMap<String, String>,
pub cwd: Option<String>,
pub stdin: Option<StdinInput>,
pub timeout: Option<f64>,
pub on_stdout: Option<OutputCallback>,
pub on_stderr: Option<OutputCallback>,
pub capture_stdio: Option<bool>,
pub file_path: Option<String>,
pub cpu_time_limit_ms: Option<f64>,
pub timing_mitigation: Option<TimingMitigation>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecResult {
pub exit_code: i32,
pub stdout: String,
pub stderr: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SpawnStdio {
#[default]
Pipe,
Inherit,
}
#[derive(Default)]
pub struct SpawnOptions {
pub base: ExecOptions,
pub stdio: Option<SpawnStdio>,
pub stdin_fd: Option<i32>,
pub stdout_fd: Option<i32>,
pub stderr_fd: Option<i32>,
pub stream_stdin: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SpawnedProcessInfo {
pub pid: u32,
pub command: String,
pub args: Vec<String>,
pub running: bool,
#[serde(rename = "exitCode")]
pub exit_code: Option<i32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct SpawnHandle {
pub pid: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ProcessStatus {
Running,
Exited,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ProcessInfo {
pub pid: u32,
pub ppid: u32,
pub pgid: u32,
pub sid: u32,
pub driver: String,
pub command: String,
pub args: Vec<String>,
pub cwd: String,
pub status: ProcessStatus,
#[serde(rename = "exitCode")]
pub exit_code: Option<i32>,
#[serde(rename = "startTime")]
pub start_time: f64,
#[serde(rename = "exitTime")]
pub exit_time: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ProcessTreeNode {
#[serde(flatten)]
pub info: ProcessInfo,
pub children: Vec<ProcessTreeNode>,
}
impl AgentOs {
pub async fn exec(&self, command: &str, mut options: ExecOptions) -> Result<ExecResult> {
let process_id = self.next_process_id();
let mut events = self.transport().subscribe_events();
let (resolved_command, resolved_args) = resolve_exec_command(command)?;
let started = self
.send_execute(
&process_id,
Some(resolved_command),
resolved_args,
options.env.clone(),
options.cwd.clone(),
)
.await
.context("exec: Execute request failed")?;
debug_assert_eq!(started.process_id, process_id);
if let Some(stdin) = options.stdin.take() {
let chunk = stdin_to_bytes(stdin);
let ownership = self.vm_scope();
let _ = self
.transport()
.request(
ownership,
RequestPayload::WriteStdin(WriteStdinRequest {
process_id: process_id.clone(),
chunk,
}),
)
.await;
}
{
let ownership = self.vm_scope();
let _ = self
.transport()
.request(
ownership,
RequestPayload::CloseStdin(CloseStdinRequest {
process_id: process_id.clone(),
}),
)
.await;
}
let mut on_stdout = options.on_stdout.take();
let mut on_stderr = options.on_stderr.take();
let timeout_deadline = options
.timeout
.filter(|ms| ms.is_finite() && *ms >= 0.0)
.map(|ms| tokio::time::Instant::now() + std::time::Duration::from_secs_f64(ms / 1000.0));
let mut killed_for_timeout = false;
let mut stdout = Vec::<u8>::new();
let mut stderr = Vec::<u8>::new();
let exit_code = loop {
let recv = events.recv();
let frame = match timeout_deadline {
Some(deadline) => {
tokio::select! {
result = recv => result,
_ = tokio::time::sleep_until(deadline), if !killed_for_timeout => {
killed_for_timeout = true;
self.kill_wire_process(&process_id, "SIGKILL");
continue;
}
}
}
None => recv.await,
};
let (_, payload) = match frame {
Ok(frame) => frame,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
return Err(ClientError::Sidecar(
"exec: event stream closed before process exit".to_owned(),
)
.into());
}
};
match payload {
EventPayload::ProcessOutput(output) if output.process_id == process_id => {
match output.channel {
StreamChannel::Stdout => {
if let Some(cb) = on_stdout.as_mut() {
cb(&output.chunk);
}
stdout.extend_from_slice(&output.chunk);
}
StreamChannel::Stderr => {
if let Some(cb) = on_stderr.as_mut() {
cb(&output.chunk);
}
stderr.extend_from_slice(&output.chunk);
}
}
}
EventPayload::ProcessExited(exited) if exited.process_id == process_id => {
break exited.exit_code;
}
EventPayload::ProcessOutput(_)
| EventPayload::ProcessExited(_)
| EventPayload::VmLifecycle(_)
| EventPayload::Structured(_) => {}
}
};
Ok(ExecResult {
exit_code,
stdout: String::from_utf8_lossy(&stdout).into_owned(),
stderr: String::from_utf8_lossy(&stderr).into_owned(),
})
}
pub fn spawn(
&self,
command: &str,
args: Vec<String>,
mut options: SpawnOptions,
) -> Result<SpawnHandle> {
let pid = self
.inner()
.synthetic_pid_counter
.fetch_add(1, Ordering::SeqCst) as u32;
let process_id = format!("proc-{pid}-{}", uuid::Uuid::new_v4());
let (stdout_tx, _) = broadcast::channel::<Vec<u8>>(PROCESS_STREAM_CAPACITY);
let (stderr_tx, _) = broadcast::channel::<Vec<u8>>(PROCESS_STREAM_CAPACITY);
let (exit_tx, _) = watch::channel::<Option<i32>>(None);
let (kernel_pid_tx, _) = watch::channel::<Option<u32>>(None);
if let Some(cb) = options.base.on_stdout.take() {
install_output_callback(stdout_tx.clone(), cb);
}
if let Some(cb) = options.base.on_stderr.take() {
install_output_callback(stderr_tx.clone(), cb);
}
let entry = ProcessEntry {
command: command.to_owned(),
args: args.clone(),
stdout_tx: stdout_tx.clone(),
stderr_tx: stderr_tx.clone(),
exit_tx: exit_tx.clone(),
process_id: process_id.clone(),
kernel_pid: kernel_pid_tx.clone(),
};
let _ = self.inner().processes.insert(pid, entry);
let events = self.transport().subscribe_events();
let this = self.clone();
let command = command.to_owned();
tokio::spawn(async move {
this.run_spawn(
pid,
process_id,
command,
args,
options,
events,
stdout_tx,
stderr_tx,
exit_tx,
kernel_pid_tx,
)
.await;
});
Ok(SpawnHandle { pid })
}
pub fn write_process_stdin(
&self,
pid: u32,
data: StdinInput,
) -> std::result::Result<(), ClientError> {
let process_id = self.lookup_process_id(pid)?;
let chunk: Vec<u8> = stdin_to_bytes(data);
let this = self.clone();
tokio::spawn(async move {
let ownership = this.vm_scope();
let _ = this
.transport()
.request(
ownership,
RequestPayload::WriteStdin(WriteStdinRequest { process_id, chunk }),
)
.await;
});
Ok(())
}
pub fn close_process_stdin(&self, pid: u32) -> std::result::Result<(), ClientError> {
let process_id = self.lookup_process_id(pid)?;
let this = self.clone();
tokio::spawn(async move {
let ownership = this.vm_scope();
let _ = this
.transport()
.request(
ownership,
RequestPayload::CloseStdin(CloseStdinRequest { process_id }),
)
.await;
});
Ok(())
}
pub fn on_process_stdout(&self, pid: u32) -> std::result::Result<ByteStream, ClientError> {
let rx = self
.inner()
.processes
.read(&pid, |_, entry| entry.stdout_tx.subscribe())
.ok_or(ClientError::ProcessNotFound(pid))?;
Ok(ByteStream::new(rx))
}
pub fn on_process_stderr(&self, pid: u32) -> std::result::Result<ByteStream, ClientError> {
let rx = self
.inner()
.processes
.read(&pid, |_, entry| entry.stderr_tx.subscribe())
.ok_or(ClientError::ProcessNotFound(pid))?;
Ok(ByteStream::new(rx))
}
pub fn on_process_exit(
&self,
pid: u32,
handler: impl FnOnce(i32) + Send + 'static,
) -> std::result::Result<Subscription, ClientError> {
let mut rx = self
.inner()
.processes
.read(&pid, |_, entry| entry.exit_tx.subscribe())
.ok_or(ClientError::ProcessNotFound(pid))?;
if let Some(code) = *rx.borrow() {
handler(code);
return Ok(Subscription::noop());
}
let task = tokio::spawn(async move {
while rx.changed().await.is_ok() {
if let Some(code) = *rx.borrow() {
handler(code);
return;
}
}
});
Ok(Subscription::new(move || task.abort()))
}
pub async fn wait_process(&self, pid: u32) -> std::result::Result<i32, ClientError> {
let mut rx = self
.inner()
.processes
.read(&pid, |_, entry| entry.exit_tx.subscribe())
.ok_or(ClientError::ProcessNotFound(pid))?;
if let Some(code) = *rx.borrow() {
return Ok(code);
}
while rx.changed().await.is_ok() {
if let Some(code) = *rx.borrow() {
return Ok(code);
}
}
Err(ClientError::Sidecar(format!(
"wait_process: exit channel closed before process {pid} reported an exit code"
)))
}
pub fn list_processes(&self) -> Vec<SpawnedProcessInfo> {
let mut out = Vec::new();
self.inner().processes.scan(|pid, entry| {
let exit_code = *entry.exit_tx.borrow();
out.push(SpawnedProcessInfo {
pid: *pid,
command: entry.command.clone(),
args: entry.args.clone(),
running: exit_code.is_none(),
exit_code,
});
});
out
}
pub async fn all_processes(&self) -> Result<Vec<ProcessInfo>> {
let ownership = self.vm_scope();
let response = self
.transport()
.request(
ownership,
RequestPayload::GetProcessSnapshot(Default::default()),
)
.await
.context("all_processes: GetProcessSnapshot request failed")?;
let snapshot = match response {
ResponsePayload::ProcessSnapshot(snapshot) => snapshot,
ResponsePayload::Rejected(RejectedResponse { code, message }) => {
return Err(ClientError::Kernel { code, message }.into());
}
other => {
return Err(ClientError::Sidecar(format!(
"all_processes: unexpected response {other:?}"
))
.into());
}
};
struct Tracked {
display_pid: u32,
exit_code: Option<i32>,
command: String,
args: Vec<String>,
}
let mut tracked_by_process_id: BTreeMap<String, Tracked> = BTreeMap::new();
let mut display_pid_by_kernel_pid: BTreeMap<u32, u32> = BTreeMap::new();
self.inner().processes.scan(|display_pid, entry| {
let exit_code = *entry.exit_tx.borrow();
if let Some(kernel_pid) = *entry.kernel_pid.borrow() {
display_pid_by_kernel_pid.insert(kernel_pid, *display_pid);
}
tracked_by_process_id.insert(
entry.process_id.clone(),
Tracked {
display_pid: *display_pid,
exit_code,
command: entry.command.clone(),
args: entry.args.clone(),
},
);
});
let now_ms = epoch_ms_now();
let mut seen_display_pids: std::collections::BTreeSet<u32> = std::collections::BTreeSet::new();
let mut out: Vec<ProcessInfo> = Vec::new();
for entry in snapshot.processes {
let tracked = tracked_by_process_id.get(&entry.process_id);
let display_pid = display_pid_by_kernel_pid
.get(&entry.pid)
.copied()
.unwrap_or(entry.pid);
let display_ppid = display_pid_by_kernel_pid
.get(&entry.ppid)
.copied()
.unwrap_or(entry.ppid);
let display_pgid = display_pid_by_kernel_pid
.get(&entry.pgid)
.copied()
.unwrap_or(entry.pgid);
let display_sid = display_pid_by_kernel_pid
.get(&entry.sid)
.copied()
.unwrap_or(entry.sid);
let process_key = format!("{}:{}", entry.process_id, entry.pid);
let start_time = self.observed_start_time(&process_key, now_ms);
let (status, exit_code) = match tracked {
Some(t) => match t.exit_code {
Some(code) => (ProcessStatus::Exited, Some(code)),
None => (ProcessStatus::Running, entry.exit_code),
},
None => {
let status = match entry.status {
ProcessSnapshotStatus::Running | ProcessSnapshotStatus::Stopped => {
ProcessStatus::Running
}
ProcessSnapshotStatus::Exited => ProcessStatus::Exited,
};
(status, entry.exit_code)
}
};
let exit_time = match (tracked, status) {
(Some(_), ProcessStatus::Exited) => {
Some(self.observed_exit_time(&entry.process_id, now_ms))
}
_ => None,
};
let (command, args) = match tracked {
Some(t) => (t.command.clone(), t.args.clone()),
None => (entry.command, entry.args),
};
seen_display_pids.insert(display_pid);
out.push(ProcessInfo {
pid: display_pid,
ppid: display_ppid,
pgid: display_pgid,
sid: display_sid,
driver: entry.driver,
command,
args,
cwd: entry.cwd,
status,
exit_code,
start_time,
exit_time,
});
}
self.inner().processes.scan(|display_pid, entry| {
if seen_display_pids.contains(display_pid) {
return;
}
let exit_code = *entry.exit_tx.borrow();
let process_key = format!("{}:{}", entry.process_id, display_pid);
let start_time = self.observed_start_time(&process_key, now_ms);
let (status, exit_time) = match exit_code {
Some(_) => (
ProcessStatus::Exited,
Some(self.observed_exit_time(&entry.process_id, now_ms)),
),
None => (ProcessStatus::Running, None),
};
out.push(ProcessInfo {
pid: *display_pid,
ppid: 0,
pgid: *display_pid,
sid: *display_pid,
driver: String::new(),
command: entry.command.clone(),
args: entry.args.clone(),
cwd: String::new(),
status,
exit_code,
start_time,
exit_time,
});
});
out.sort_by_key(|info| info.pid);
Ok(out)
}
fn observed_start_time(&self, process_key: &str, now_ms: f64) -> f64 {
if let Some(existing) = self
.inner()
.observed_process_start_times
.read(process_key, |_, value| *value)
{
return existing;
}
let _ = self
.inner()
.observed_process_start_times
.insert(process_key.to_owned(), now_ms);
self.inner()
.observed_process_start_times
.read(process_key, |_, value| *value)
.unwrap_or(now_ms)
}
fn observed_exit_time(&self, process_id: &str, now_ms: f64) -> f64 {
if let Some(existing) = self
.inner()
.observed_process_exit_times
.read(process_id, |_, value| *value)
{
return existing;
}
let _ = self
.inner()
.observed_process_exit_times
.insert(process_id.to_owned(), now_ms);
self.inner()
.observed_process_exit_times
.read(process_id, |_, value| *value)
.unwrap_or(now_ms)
}
pub async fn process_tree(&self) -> Result<Vec<ProcessTreeNode>> {
let processes = self.all_processes().await?;
Ok(build_process_forest(processes))
}
pub fn get_process(&self, pid: u32) -> std::result::Result<SpawnedProcessInfo, ClientError> {
self.inner()
.processes
.read(&pid, |pid, entry| {
let exit_code = *entry.exit_tx.borrow();
SpawnedProcessInfo {
pid: *pid,
command: entry.command.clone(),
args: entry.args.clone(),
running: exit_code.is_none(),
exit_code,
}
})
.ok_or(ClientError::ProcessNotFound(pid))
}
pub fn stop_process(&self, pid: u32) -> std::result::Result<(), ClientError> {
self.signal_process(pid, "SIGTERM")
}
pub fn kill_process(&self, pid: u32) -> std::result::Result<(), ClientError> {
self.signal_process(pid, "SIGKILL")
}
fn vm_scope(&self) -> OwnershipScope {
OwnershipScope::vm(self.connection_id(), self.wire_session_id(), self.vm_id())
}
fn next_process_id(&self) -> String {
let n = self.inner().process_counter.fetch_add(1, Ordering::SeqCst);
format!("proc-{n}-{}", uuid::Uuid::new_v4())
}
fn lookup_process_id(&self, pid: u32) -> std::result::Result<String, ClientError> {
self.inner()
.processes
.read(&pid, |_, entry| entry.process_id.clone())
.ok_or(ClientError::ProcessNotFound(pid))
}
async fn send_execute(
&self,
process_id: &str,
command: Option<String>,
args: Vec<String>,
env: BTreeMap<String, String>,
cwd: Option<String>,
) -> std::result::Result<agent_os_sidecar::protocol::ProcessStartedResponse, ClientError> {
let ownership = self.vm_scope();
let response = self
.transport()
.request(
ownership,
RequestPayload::Execute(ExecuteRequest {
process_id: process_id.to_owned(),
command,
runtime: None,
entrypoint: None,
args,
env,
cwd,
wasm_permission_tier: None,
}),
)
.await?;
match response {
ResponsePayload::ProcessStarted(started) => Ok(started),
ResponsePayload::Rejected(RejectedResponse { code, message }) => {
Err(ClientError::Kernel { code, message })
}
other => Err(ClientError::Sidecar(format!(
"Execute: unexpected response {other:?}"
))),
}
}
fn kill_wire_process(&self, process_id: &str, signal: &str) {
let process_id = process_id.to_owned();
let signal = signal.to_owned();
let this = self.clone();
tokio::spawn(async move {
let ownership = this.vm_scope();
let _ = this
.transport()
.request(
ownership,
RequestPayload::KillProcess(KillProcessRequest { process_id, signal }),
)
.await;
});
}
fn signal_process(&self, pid: u32, signal: &str) -> std::result::Result<(), ClientError> {
let (process_id, already_exited) = self
.inner()
.processes
.read(&pid, |_, entry| {
(entry.process_id.clone(), entry.exit_tx.borrow().is_some())
})
.ok_or(ClientError::ProcessNotFound(pid))?;
if already_exited {
return Ok(());
}
let signal = signal.to_owned();
let this = self.clone();
tokio::spawn(async move {
let ownership = this.vm_scope();
let _ = this
.transport()
.request(
ownership,
RequestPayload::KillProcess(KillProcessRequest { process_id, signal }),
)
.await;
});
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_spawn(
self,
pid: u32,
process_id: String,
command: String,
args: Vec<String>,
options: SpawnOptions,
mut events: broadcast::Receiver<(OwnershipScope, EventPayload)>,
stdout_tx: broadcast::Sender<Vec<u8>>,
stderr_tx: broadcast::Sender<Vec<u8>>,
exit_tx: watch::Sender<Option<i32>>,
kernel_pid_tx: watch::Sender<Option<u32>>,
) {
match self
.send_execute(
&process_id,
Some(command),
args,
options.base.env.clone(),
options.base.cwd.clone(),
)
.await
{
Ok(started) => {
if let Some(kernel_pid) = started.pid {
let _ = kernel_pid_tx.send(Some(kernel_pid));
}
}
Err(error) => {
let message = format!("{error}\n");
let _ = stderr_tx.send(message.into_bytes());
tracing::error!(?error, pid, %process_id, "spawn: Execute request failed");
let _ = exit_tx.send(Some(1));
return;
}
}
loop {
let (_, payload) = match events.recv().await {
Ok(frame) => frame,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
let _ = exit_tx.send(Some(0));
break;
}
};
match payload {
EventPayload::ProcessOutput(output) if output.process_id == process_id => {
let bytes = output.chunk;
match output.channel {
StreamChannel::Stdout => {
let _ = stdout_tx.send(bytes);
}
StreamChannel::Stderr => {
let _ = stderr_tx.send(bytes);
}
}
}
EventPayload::ProcessExited(exited) if exited.process_id == process_id => {
let _ = exit_tx.send(Some(exited.exit_code));
break;
}
EventPayload::ProcessOutput(_)
| EventPayload::ProcessExited(_)
| EventPayload::VmLifecycle(_)
| EventPayload::Structured(_) => {}
}
}
}
}
fn build_process_forest(processes: Vec<ProcessInfo>) -> Vec<ProcessTreeNode> {
use std::collections::BTreeMap as Map;
let pids: std::collections::BTreeSet<u32> = processes.iter().map(|p| p.pid).collect();
let mut children_of: Map<u32, Vec<usize>> = Map::new();
let mut roots: Vec<usize> = Vec::new();
for (index, proc) in processes.iter().enumerate() {
if pids.contains(&proc.ppid) {
children_of.entry(proc.ppid).or_default().push(index);
} else {
roots.push(index);
}
}
fn build_node(
index: usize,
processes: &[ProcessInfo],
children_of: &Map<u32, Vec<usize>>,
seen: &mut std::collections::BTreeSet<usize>,
) -> ProcessTreeNode {
let info = processes[index].clone();
seen.insert(index);
let child_indices: Vec<usize> = children_of
.get(&info.pid)
.map(|indices| {
indices
.iter()
.copied()
.filter(|child_index| !seen.contains(child_index))
.collect()
})
.unwrap_or_default();
let children = child_indices
.into_iter()
.map(|child_index| build_node(child_index, processes, children_of, seen))
.collect();
ProcessTreeNode { info, children }
}
let mut seen = std::collections::BTreeSet::new();
roots
.into_iter()
.map(|index| build_node(index, &processes, &children_of, &mut seen))
.collect()
}
fn stdin_to_bytes(input: StdinInput) -> Vec<u8> {
match input {
StdinInput::Text(text) => text.into_bytes(),
StdinInput::Bytes(bytes) => bytes,
}
}
pub(crate) fn install_output_callback(tx: broadcast::Sender<Vec<u8>>, mut callback: OutputCallback) {
let mut rx = tx.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(chunk) => callback(&chunk),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
fn epoch_ms_now() -> f64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs_f64() * 1000.0)
.unwrap_or(0.0)
}