use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::process::Child;
use tokio::sync::{Mutex, RwLock};
use tokio::time::Instant;
use crate::{
AgentSpec, ArtifactId, ExecutionContext, ExecutionHandle, ExecutionMetrics, ExecutionResult,
ResourceLimits, Run, RunError, RunId, RunStatus, RuntimeAdapter, RuntimeKind, StatusResult,
SwarmFile,
};
#[derive(Debug, Clone)]
pub struct LocalConfig {
pub base_dir: PathBuf,
pub default_limits: ResourceLimits,
pub isolate_processes: bool,
}
impl Default for LocalConfig {
fn default() -> Self {
LocalConfig {
base_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
default_limits: ResourceLimits::default(),
isolate_processes: false,
}
}
}
struct ActiveExecution {
run_id: RunId,
status: RunStatus,
started_at: Instant,
artifacts: Vec<ArtifactId>,
child: Arc<Mutex<Option<Child>>>,
pid: u32,
exit_status: Option<std::process::ExitStatus>,
captured_output: Option<serde_json::Value>,
captured_stderr: Option<String>,
}
pub struct LocalRuntime {
config: LocalConfig,
active_executions: Arc<RwLock<HashMap<String, ActiveExecution>>>,
}
impl LocalRuntime {
pub fn new() -> Self {
LocalRuntime::with_config(LocalConfig::default())
}
pub fn with_config(config: LocalConfig) -> Self {
LocalRuntime {
config,
active_executions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn base_dir(&self) -> &PathBuf {
&self.config.base_dir
}
pub async fn create_swarm_context(
&self,
swarm: &SwarmFile,
) -> Result<ExecutionContext, RunError> {
let ctx = ExecutionContext::new(
format!("local-swarm-{}", swarm.id.as_str()),
RuntimeKind::Local,
)
.with_working_dir(self.config.base_dir.clone())
.with_limits(self.config.default_limits.clone());
Ok(ctx)
}
fn build_command(
spec: &AgentSpec,
working_dir: &Option<PathBuf>,
is_background: bool,
) -> Option<tokio::process::Command> {
let command_str = spec
.runtime
.config
.get("command")
.map(|s| s.as_str())
.unwrap_or("echo done");
let parts: Vec<&str> = command_str.split_whitespace().collect();
if parts.is_empty() {
return None;
}
let program = parts[0];
let args = &parts[1..];
let mut cmd = tokio::process::Command::new(program);
cmd.args(args);
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(!is_background);
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in &spec.runtime.config {
if key != "command" && key != "image" {
cmd.env(key, value);
}
}
Some(cmd)
}
#[cfg(unix)]
fn is_process_running(pid: u32) -> bool {
use std::process::Command;
let output = Command::new("ps").arg("-p").arg(pid.to_string()).output();
match output {
Ok(o) => o.status.success(),
Err(_) => false,
}
}
#[cfg(not(unix))]
fn is_process_running(_pid: u32) -> bool {
true }
fn load_spec_from_run(&self, run: &Run, ctx: &ExecutionContext) -> Result<AgentSpec, RunError> {
match &run.target {
crate::RunTarget::Agent { spec_path } => {
let base = ctx.working_dir.as_ref().unwrap_or(&self.config.base_dir);
let full_path = if spec_path.is_absolute() {
spec_path.clone()
} else {
base.join(spec_path)
};
if full_path.exists() {
AgentSpec::from_yaml_file(&full_path)
} else {
let name = full_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("agent");
Ok(AgentSpec::new(name, RuntimeKind::Local))
}
}
crate::RunTarget::Swarm { swarmfile_path: _ } => {
Err(RunError::InvalidConfig {
message: "Swarm execution requires worker loading".into(),
})
}
crate::RunTarget::A2AAgent { .. } => Err(RunError::InvalidConfig {
message: "A2A targets must use A2ARuntime, not LocalRuntime".into(),
}),
}
}
}
impl Default for LocalRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RuntimeAdapter for LocalRuntime {
fn kind(&self) -> RuntimeKind {
RuntimeKind::Local
}
async fn create(&self, spec: &AgentSpec) -> Result<ExecutionContext, RunError> {
let ctx = ExecutionContext::new(format!("local-{}", spec.id.as_str()), RuntimeKind::Local)
.with_working_dir(self.config.base_dir.clone())
.with_limits(self.config.default_limits.clone());
Ok(ctx)
}
async fn execute(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let mut cmd = Self::build_command(&spec, &ctx.working_dir, false).ok_or_else(|| {
RunError::InvalidConfig {
message: "No 'command' specified in AgentSpec runtime.config".into(),
}
})?;
if let Some(input) = &run.input {
if let Ok(json_str) = serde_json::to_string(input) {
cmd.env("BZZZ_INPUT", &json_str);
}
}
let started_at = Instant::now();
let child = cmd.spawn().map_err(|e| RunError::StartupFailed {
message: format!("Failed to spawn process: {}", e),
})?;
let pid = child.id().unwrap_or(0);
let output = child
.wait_with_output()
.await
.map_err(|e| RunError::ExecutionFailed {
message: format!("Failed to wait for process: {}", e),
})?;
let exit_status = output.status;
let stderr_raw = String::from_utf8_lossy(&output.stderr);
let stderr_truncated = super::truncate_stderr(&stderr_raw).to_string();
let captured_stderr = if stderr_truncated.trim().is_empty() {
None
} else {
Some(stderr_truncated.clone())
};
let captured_output = super::parse_worker_output(
&String::from_utf8_lossy(&output.stdout),
&stderr_truncated,
);
let _elapsed = started_at.elapsed();
let status = if exit_status.success() {
RunStatus::Completed
} else {
RunStatus::Failed
};
let execution = ActiveExecution {
run_id: run.id.clone(),
status,
started_at,
artifacts: Vec::new(),
child: Arc::new(Mutex::new(None)), pid,
exit_status: Some(exit_status),
captured_output,
captured_stderr,
};
{
let mut executions = self.active_executions.write().await;
executions.insert(run.id.as_str().to_string(), execution);
}
Ok(ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Local,
format!("pid:{}", pid),
))
}
async fn execute_background(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let mut cmd = Self::build_command(&spec, &ctx.working_dir, true).ok_or_else(|| {
RunError::InvalidConfig {
message: "No 'command' specified in AgentSpec runtime.config".into(),
}
})?;
if let Some(input) = &run.input {
if let Ok(json_str) = serde_json::to_string(input) {
cmd.env("BZZZ_INPUT", &json_str);
}
}
let started_at = Instant::now();
let child = cmd.spawn().map_err(|e| RunError::StartupFailed {
message: format!("Failed to spawn process: {}", e),
})?;
let pid = child.id().unwrap_or(0);
let execution = ActiveExecution {
run_id: run.id.clone(),
status: RunStatus::Running,
started_at,
artifacts: Vec::new(),
child: Arc::new(Mutex::new(Some(child))),
pid,
exit_status: None,
captured_output: None,
captured_stderr: None,
};
{
let mut executions = self.active_executions.write().await;
executions.insert(run.id.as_str().to_string(), execution);
}
Ok(ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Local,
format!("pid:{}", pid),
))
}
async fn status(&self, handle: &ExecutionHandle) -> Result<StatusResult, RunError> {
let (run_id, status, started_at, artifacts, child_arc, pid, exit_status_opt) = {
let executions = self.active_executions.read().await;
match executions.get(handle.run_id.as_str()) {
Some(execution) => (
execution.run_id.clone(),
execution.status,
execution.started_at,
execution.artifacts.clone(),
execution.child.clone(),
execution.pid,
execution.exit_status,
),
None => {
return Err(RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
});
}
}
};
let actual_status = if exit_status_opt.is_some() {
status
} else if status == RunStatus::Cancelled {
RunStatus::Cancelled
} else {
let child_guard = child_arc.lock().await;
if child_guard.is_some() {
if Self::is_process_running(pid) {
RunStatus::Running
} else {
RunStatus::Running }
} else {
status
}
};
Ok(StatusResult {
run_id,
status: actual_status,
current_step: None,
progress: 0,
elapsed_ms: started_at.elapsed().as_millis() as u64,
artifacts,
})
}
async fn destroy(&self, _ctx: &ExecutionContext) -> Result<(), RunError> {
Ok(())
}
async fn cancel(&self, handle: &ExecutionHandle) -> Result<(), RunError> {
let child_arc = {
let executions = self.active_executions.read().await;
match executions.get(handle.run_id.as_str()) {
Some(execution) => execution.child.clone(),
None => {
return Err(RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
});
}
}
};
let mut child_guard = child_arc.lock().await;
if let Some(mut child) = child_guard.take() {
child.kill().await.map_err(|e| RunError::RuntimeError {
message: format!("Failed to kill process: {}", e),
})?;
let exit_status = child.wait().await.ok();
drop(child_guard);
let mut executions = self.active_executions.write().await;
if let Some(exec) = executions.get_mut(handle.run_id.as_str()) {
exec.status = RunStatus::Cancelled;
exec.exit_status = exit_status;
}
}
Ok(())
}
async fn wait(&self, handle: &ExecutionHandle) -> Result<ExecutionResult, RunError> {
let (run_id, started_at, artifacts, child_arc) = {
let executions = self.active_executions.read().await;
match executions.get(handle.run_id.as_str()) {
Some(execution) => {
(
execution.run_id.clone(),
execution.started_at,
execution.artifacts.clone(),
execution.child.clone(), )
}
None => {
return Err(RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
});
}
}
};
let mut child_guard = child_arc.lock().await;
if let Some(child) = child_guard.take() {
let output = child
.wait_with_output()
.await
.map_err(|e| RunError::ExecutionFailed {
message: format!("Failed to wait for process: {}", e),
})?;
let exit_status = output.status;
let stderr_raw = String::from_utf8_lossy(&output.stderr);
let stderr_truncated = super::truncate_stderr(&stderr_raw).to_string();
let captured_stderr = if stderr_truncated.trim().is_empty() {
None
} else {
Some(stderr_truncated.clone())
};
let captured_output = super::parse_worker_output(
&String::from_utf8_lossy(&output.stdout),
&stderr_truncated,
);
let elapsed = started_at.elapsed();
let status = if exit_status.success() {
RunStatus::Completed
} else {
RunStatus::Failed
};
{
let mut executions = self.active_executions.write().await;
if let Some(exec) = executions.get_mut(handle.run_id.as_str()) {
exec.status = status;
exec.exit_status = Some(exit_status);
exec.captured_output = captured_output.clone();
exec.captured_stderr = captured_stderr.clone();
}
}
let error = if status == RunStatus::Failed {
let base_msg = format!(
"Process exited with code: {}",
exit_status.code().unwrap_or(-1)
);
let msg = if let Some(ref stderr) = captured_stderr {
format!("{}
stderr: {}", base_msg, stderr)
} else {
base_msg
};
Some(RunError::ExecutionFailed { message: msg })
} else {
None
};
Ok(ExecutionResult {
run_id,
status,
artifacts,
error,
metrics: ExecutionMetrics {
wall_time_ms: elapsed.as_millis() as u64,
cpu_time_ms: 0,
peak_memory_bytes: 0,
retries: 0,
selection_trace: None,
},
output: captured_output,
})
} else {
let executions = self.active_executions.read().await;
match executions.get(handle.run_id.as_str()) {
Some(execution) => {
let elapsed = execution.started_at.elapsed();
let error = if execution.status == RunStatus::Failed {
let base_msg = format!(
"Process exited with code: {}",
execution.exit_status.and_then(|s| s.code()).unwrap_or(-1)
);
let msg = if let Some(ref stderr) = execution.captured_stderr {
format!("{}
stderr: {}", base_msg, stderr)
} else {
base_msg
};
Some(RunError::ExecutionFailed { message: msg })
} else if execution.status == RunStatus::Cancelled {
Some(RunError::Cancelled {
reason: "Process was cancelled by user".into(),
})
} else {
None
};
Ok(ExecutionResult {
run_id: execution.run_id.clone(),
status: execution.status,
artifacts: execution.artifacts.clone(),
error,
metrics: ExecutionMetrics {
wall_time_ms: elapsed.as_millis() as u64,
cpu_time_ms: 0,
peak_memory_bytes: 0,
retries: 0,
selection_trace: None,
},
output: execution.captured_output.clone(),
})
}
None => Err(RunError::InvalidConfig {
message: format!("Execution not found: {}", handle.run_id.as_str()),
}),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{timeout, Duration};
#[test]
fn test_local_runtime_creation() {
let runtime = LocalRuntime::new();
assert_eq!(runtime.kind(), RuntimeKind::Local);
}
#[test]
fn test_local_runtime_config() {
let config = LocalConfig {
base_dir: PathBuf::from("/tmp/test"),
default_limits: ResourceLimits {
max_cpu_cores: 2,
max_memory_bytes: 1024 * 1024,
max_time_seconds: 60,
},
isolate_processes: true,
};
let runtime = LocalRuntime::with_config(config);
assert_eq!(runtime.base_dir(), &PathBuf::from("/tmp/test"));
}
#[tokio::test]
async fn test_local_runtime_create_context() {
let runtime = LocalRuntime::new();
let spec = AgentSpec::new("test-agent", RuntimeKind::Local);
let ctx = runtime.create(&spec).await.unwrap();
assert!(ctx.id.starts_with("local-"));
assert_eq!(ctx.runtime_kind, RuntimeKind::Local);
}
fn create_test_agent_spec(dir: &std::path::Path, name: &str, command: &str) -> AgentSpec {
let mut spec = AgentSpec::new(name, RuntimeKind::Local);
spec.runtime.config.insert("command".into(), command.into());
spec.spec_path = Some(dir.join(format!("{}.yaml", name)));
spec.to_yaml_file(&spec.spec_path.clone().unwrap()).unwrap();
spec
}
#[tokio::test]
async fn test_real_execute_success() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-real-execute");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "echo-agent", "echo hello");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute(&ctx, &run).await.unwrap();
assert_eq!(handle.runtime_kind, RuntimeKind::Local);
let status = runtime.status(&handle).await.unwrap();
assert_eq!(status.status, RunStatus::Completed);
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_real_execute_failure() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-real-execute-fail");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "fail-agent", "false");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute(&ctx, &run).await.unwrap();
let status = runtime.status(&handle).await.unwrap();
assert_eq!(status.status, RunStatus::Failed);
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_real_background_execute() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-real-bg");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "sleep-agent", "sleep 0.1");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute_background(&ctx, &run).await.unwrap();
assert_eq!(handle.runtime_kind, RuntimeKind::Local);
let status = runtime.status(&handle).await.unwrap();
assert!(status.status == RunStatus::Running || status.status == RunStatus::Completed);
let result = runtime.wait(&handle).await.unwrap();
assert_eq!(result.status, RunStatus::Completed);
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_real_cancel() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-real-cancel");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "long-sleep-agent", "sleep 30");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute_background(&ctx, &run).await.unwrap();
let status = runtime.status(&handle).await.unwrap();
assert_eq!(status.status, RunStatus::Running);
runtime.cancel(&handle).await.unwrap();
let result = timeout(Duration::from_secs(2), runtime.wait(&handle))
.await
.expect("wait should complete quickly after cancel")
.unwrap();
assert_eq!(result.status, RunStatus::Cancelled);
assert!(result.error.is_some());
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_foreground_captures_json_stdout() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-fg-json");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "json-agent", r#"echo {"key":"value"}"#);
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute(&ctx, &run).await.unwrap();
let result = runtime.wait(&handle).await.unwrap();
assert_eq!(result.status, RunStatus::Completed);
assert!(result.output.is_some());
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_background_captures_stdout() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-bg-stdout");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "plain-agent", "echo hello");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute_background(&ctx, &run).await.unwrap();
let result = runtime.wait(&handle).await.unwrap();
assert_eq!(result.status, RunStatus::Completed);
assert!(result.output.is_some());
assert_eq!(
result.output.unwrap(),
serde_json::json!({"stdout": "hello"})
);
std::fs::remove_dir_all(&test_dir).ok();
}
#[tokio::test]
async fn test_real_wait_timeout() {
let temp_dir = std::env::temp_dir();
let test_dir = temp_dir.join("bzzz-test-real-wait");
std::fs::create_dir_all(&test_dir).unwrap();
let spec = create_test_agent_spec(&test_dir, "quick-agent", "echo test");
let config = LocalConfig {
base_dir: test_dir.clone(),
default_limits: ResourceLimits::default(),
isolate_processes: false,
};
let runtime = LocalRuntime::with_config(config);
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: spec.spec_path.clone().unwrap(),
},
RuntimeKind::Local,
);
let handle = runtime.execute_background(&ctx, &run).await.unwrap();
let result = timeout(Duration::from_secs(5), runtime.wait(&handle))
.await
.expect("wait should not timeout")
.unwrap();
assert_eq!(result.status, RunStatus::Completed);
std::fs::remove_dir_all(&test_dir).ok();
}
}