use super::command::{CleanupRequirements, CommandType, ExecutableCommand, ResourceRequirements};
use super::executor::ExecutionContextInternal;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Instant;
use tokio::process::{Child, ChildStderr, ChildStdout, Command};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ProcessId(pub u32);
pub struct ProcessManager {
resource_monitor: Arc<super::executor::ResourceMonitor>,
security_context: Arc<SecurityContext>,
cleanup_registry: Arc<Mutex<HashMap<ProcessId, CleanupHandler>>>,
}
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
impl ProcessManager {
pub fn new() -> Self {
Self {
resource_monitor: Arc::new(super::executor::ResourceMonitor),
security_context: Arc::new(SecurityContext),
cleanup_registry: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn with_monitors(
resource_monitor: Arc<super::executor::ResourceMonitor>,
security_context: Arc<SecurityContext>,
) -> Self {
Self {
resource_monitor,
security_context,
cleanup_registry: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn spawn(
&self,
executable: ExecutableCommand,
context: &ExecutionContextInternal,
) -> Result<UnifiedProcess> {
self.security_context.validate_command(&executable).await?;
self.resource_monitor
.check_resources(executable.resource_requirements())
.await?;
let mut command = self.create_system_command(&executable, context)?;
command = self.apply_security_context(command, context)?;
command = self.apply_resource_limits(command, executable.resource_requirements())?;
let child = match command.spawn() {
Ok(child) => child,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
anyhow::bail!("Command not found: {}", executable.display());
}
Err(e) => {
return Err(e)
.with_context(|| format!("Failed to spawn command: {}", executable.display()));
}
};
let process = UnifiedProcess::new(child, executable.command_type);
let cleanup_handler =
CleanupHandler::new(process.id(), executable.cleanup_requirements().clone());
self.cleanup_registry
.lock()
.await
.insert(process.id(), cleanup_handler);
Ok(process)
}
fn create_system_command(
&self,
executable: &ExecutableCommand,
context: &ExecutionContextInternal,
) -> Result<Command> {
let mut command = Command::new(&executable.program);
command.args(&executable.args);
if let Some(ref working_dir) = executable.working_dir {
command.current_dir(working_dir);
} else {
command.current_dir(&context.request.context.working_dir);
}
for (key, value) in &executable.env {
command.env(key, value);
}
for (key, value) in &context.request.context.env_vars {
command.env(key, value);
}
match executable.command_type {
CommandType::Claude => {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.stdin(Stdio::piped());
}
CommandType::Shell => {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.stdin(Stdio::null());
}
CommandType::Test => {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.stdin(Stdio::null());
}
CommandType::Handler => {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.stdin(Stdio::null());
}
}
Ok(command)
}
fn apply_security_context(
&self,
mut command: Command,
_context: &ExecutionContextInternal,
) -> Result<Command> {
#[cfg(unix)]
{
command.process_group(0);
}
Ok(command)
}
fn apply_resource_limits(
&self,
command: Command,
_requirements: &ResourceRequirements,
) -> Result<Command> {
Ok(command)
}
pub async fn cleanup_process(&self, process_id: ProcessId) -> Result<()> {
if let Some(cleanup_handler) = self.cleanup_registry.lock().await.remove(&process_id) {
cleanup_handler.cleanup().await?;
}
Ok(())
}
}
pub struct UnifiedProcess {
child: Child,
command_type: CommandType,
started_at: Instant,
resource_usage: ResourceUsage,
}
impl UnifiedProcess {
pub fn new(child: Child, command_type: CommandType) -> Self {
Self {
child,
command_type,
started_at: Instant::now(),
resource_usage: ResourceUsage::default(),
}
}
pub async fn wait(&mut self) -> Result<std::process::ExitStatus> {
let exit_status = self.child.wait().await?;
self.resource_usage.duration = self.started_at.elapsed();
Ok(exit_status)
}
pub async fn kill(&mut self) -> Result<()> {
#[cfg(unix)]
{
if let Some(pid) = self.child.id() {
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
let pgid = Pid::from_raw(-(pid as i32));
let _ = signal::kill(pgid, Signal::SIGTERM);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if let Ok(None) = self.child.try_wait() {
let _ = signal::kill(pgid, Signal::SIGKILL);
}
}
}
self.child.kill().await?;
Ok(())
}
pub fn stdout(&mut self) -> &mut Option<ChildStdout> {
&mut self.child.stdout
}
pub fn stderr(&mut self) -> &mut Option<ChildStderr> {
&mut self.child.stderr
}
pub fn id(&self) -> ProcessId {
ProcessId(self.child.id().unwrap_or(0))
}
pub fn command_type(&self) -> CommandType {
self.command_type
}
pub fn resource_usage(&self) -> &ResourceUsage {
&self.resource_usage
}
}
#[derive(Debug, Default)]
pub struct ResourceUsage {
pub duration: std::time::Duration,
pub peak_memory: Option<u64>,
pub cpu_usage: Option<f32>,
}
pub struct CleanupHandler {
pub process_id: ProcessId,
pub requirements: CleanupRequirements,
}
impl CleanupHandler {
pub fn new(process_id: ProcessId, requirements: CleanupRequirements) -> Self {
Self {
process_id,
requirements,
}
}
pub async fn cleanup(&self) -> Result<()> {
#[cfg(unix)]
if self.requirements.cleanup_children {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let pgid = Pid::from_raw(-(self.process_id.0 as i32));
let _ = kill(pgid, Signal::SIGTERM);
tokio::time::sleep(self.requirements.kill_timeout).await;
let _ = kill(pgid, Signal::SIGKILL);
}
Ok(())
}
}
pub struct SecurityContext;
impl SecurityContext {
pub async fn validate_command(&self, executable: &ExecutableCommand) -> Result<()> {
for arg in &executable.args {
if arg.contains("$") || arg.contains("`") || arg.contains("$(") {
if executable.command_type != CommandType::Shell {
anyhow::bail!("Potential command injection detected in arguments");
}
}
}
if let Some(ref working_dir) = executable.working_dir {
let canonical = working_dir
.canonicalize()
.unwrap_or_else(|_| working_dir.clone());
if canonical
.components()
.any(|c| matches!(c, std::path::Component::ParentDir))
{
anyhow::bail!("Path traversal detected in working directory");
}
}
let dangerous_commands = [
"rm", "dd", "mkfs", "format", "fdisk", "shutdown", "reboot", "kill", "pkill",
];
if dangerous_commands.contains(&executable.program.as_str()) {
match executable.command_type {
CommandType::Shell | CommandType::Handler => {
tracing::warn!("Potentially dangerous command: {}", executable.program);
}
_ => {
anyhow::bail!("Dangerous command not allowed: {}", executable.program);
}
}
}
Ok(())
}
}