use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::RwLock;
use tokio::time::Instant;
use crate::{
AgentSpec, ArtifactId, ExecutionContext, ExecutionHandle, ExecutionMetrics, ExecutionResult,
ResourceLimits, Run, RunError, RunId, RunStatus, RuntimeAdapter, RuntimeKind, StatusResult,
};
#[derive(Debug, Clone)]
pub struct DockerConfig {
pub host: Option<String>,
pub default_image: Option<String>,
pub default_limits: ResourceLimits,
pub network_enabled: bool,
}
impl Default for DockerConfig {
fn default() -> Self {
DockerConfig {
host: None,
default_image: Some("alpine:latest".into()),
default_limits: ResourceLimits::default(),
network_enabled: true,
}
}
}
struct ActiveContainer {
run_id: RunId,
status: RunStatus,
started_at: Instant,
artifacts: Vec<ArtifactId>,
container_id: String,
exit_code: Option<i32>,
captured_output: Option<serde_json::Value>,
}
pub struct DockerRuntime {
config: DockerConfig,
active_containers: Arc<RwLock<HashMap<String, ActiveContainer>>>,
}
impl DockerRuntime {
pub fn new() -> Self {
DockerRuntime::with_config(DockerConfig::default())
}
pub fn with_config(config: DockerConfig) -> Self {
DockerRuntime {
config,
active_containers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn default_image(&self) -> &str {
self.config
.default_image
.as_deref()
.unwrap_or("alpine:latest")
}
fn build_docker_command(
spec: &AgentSpec,
image: &str,
is_background: bool,
input: Option<&serde_json::Value>,
) -> Option<tokio::process::Command> {
let command_str = spec
.runtime
.config
.get("command")
.map(|s| s.as_str())
.unwrap_or("echo done");
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("run");
if is_background {
cmd.arg("-d"); }
if let Some(input_val) = input {
if let Ok(json_str) = serde_json::to_string(input_val) {
cmd.arg("-e").arg(format!("BZZZ_INPUT={}", json_str));
}
}
cmd.arg(image);
cmd.arg("sh").arg("-c").arg(command_str);
cmd.kill_on_drop(!is_background);
Some(cmd)
}
async fn is_container_running(container_id: &str) -> bool {
let output = tokio::process::Command::new("docker")
.args(["inspect", "--format", "{{.State.Running}}", container_id])
.output()
.await;
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout);
stdout.trim() == "true"
}
Err(_) => false,
}
}
async fn get_exit_code(container_id: &str) -> Option<i32> {
let output = tokio::process::Command::new("docker")
.args(["inspect", "--format", "{{.State.ExitCode}}", container_id])
.output()
.await;
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout);
stdout.trim().parse().ok()
}
Err(_) => None,
}
}
fn load_spec_from_run(&self, run: &Run, ctx: &ExecutionContext) -> Result<AgentSpec, RunError> {
match &run.target {
crate::RunTarget::Agent { spec_path } => {
let default_dir = std::path::PathBuf::from(".");
let base = ctx.working_dir.as_ref().unwrap_or(&default_dir);
let full_path = if spec_path.is_absolute() {
spec_path.clone()
} else {
base.join(spec_path)
};
if full_path.exists() {
AgentSpec::from_yaml_file(&full_path)
} else {
let name = full_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("agent");
Ok(AgentSpec::new(name, RuntimeKind::Docker))
}
}
crate::RunTarget::Swarm { swarmfile_path: _ } => Err(RunError::InvalidConfig {
message: "Swarm execution requires worker loading".into(),
}),
crate::RunTarget::A2AAgent { .. } => Err(RunError::InvalidConfig {
message: "A2A targets should use A2ARuntime, not DockerRuntime".into(),
}),
}
}
}
impl Default for DockerRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RuntimeAdapter for DockerRuntime {
fn kind(&self) -> RuntimeKind {
RuntimeKind::Docker
}
async fn create(&self, spec: &AgentSpec) -> Result<ExecutionContext, RunError> {
let ctx =
ExecutionContext::new(format!("docker-{}", spec.id.as_str()), RuntimeKind::Docker)
.with_limits(self.config.default_limits.clone());
Ok(ctx)
}
async fn execute(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let image = spec
.runtime
.config
.get("image")
.map(|s| s.as_str())
.unwrap_or_else(|| self.default_image());
let mut cmd = Self::build_docker_command(&spec, image, false, run.input.as_ref())
.ok_or_else(|| RunError::InvalidConfig {
message: "Failed to build docker command".into(),
})?;
let started_at = Instant::now();
let output = cmd.output().await.map_err(|e| RunError::StartupFailed {
message: format!("Failed to run docker container: {}", e),
})?;
let status = if output.status.success() {
RunStatus::Completed
} else {
RunStatus::Failed
};
let stderr_raw = String::from_utf8_lossy(&output.stderr);
let captured_output = super::parse_worker_output(
&String::from_utf8_lossy(&output.stdout),
super::truncate_stderr(&stderr_raw),
);
let handle = ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Docker,
"foreground".to_string(),
);
let container = ActiveContainer {
run_id: run.id.clone(),
status,
started_at,
artifacts: Vec::new(),
container_id: String::new(),
exit_code: output.status.code(),
captured_output,
};
{
let mut containers = self.active_containers.write().await;
containers.insert(run.id.as_str().to_string(), container);
}
Ok(handle)
}
async fn execute_background(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let image = spec
.runtime
.config
.get("image")
.map(|s| s.as_str())
.unwrap_or_else(|| self.default_image());
let mut cmd = Self::build_docker_command(&spec, image, true, run.input.as_ref())
.ok_or_else(|| RunError::InvalidConfig {
message: "Failed to build docker command".into(),
})?;
let started_at = Instant::now();
let output = cmd.output().await.map_err(|e| RunError::StartupFailed {
message: format!("Failed to run docker container: {}", e),
})?;
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
if container_id.is_empty() {
return Err(RunError::StartupFailed {
message: "Failed to get container ID from docker run".into(),
});
}
let handle = ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Docker,
format!("container:{}", &container_id[..12.min(container_id.len())]),
);
let container = ActiveContainer {
run_id: run.id.clone(),
status: RunStatus::Running,
started_at,
artifacts: Vec::new(),
container_id: container_id.clone(),
exit_code: None,
captured_output: None,
};
{
let mut containers = self.active_containers.write().await;
containers.insert(run.id.as_str().to_string(), container);
}
Ok(handle)
}
async fn status(&self, handle: &ExecutionHandle) -> Result<StatusResult, RunError> {
let containers = self.active_containers.read().await;
let container =
containers
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Container not found: {}", handle.run_id.as_str()),
})?;
let run_id = container.run_id.clone();
let started_at = container.started_at;
let artifacts = container.artifacts.clone();
let actual_status = if container.container_id.is_empty() {
container.status
} else {
let running = Self::is_container_running(&container.container_id).await;
if running {
RunStatus::Running
} else {
let exit_code = Self::get_exit_code(&container.container_id).await;
match exit_code {
Some(0) => RunStatus::Completed,
Some(_) => RunStatus::Failed,
None => container.status,
}
}
};
Ok(StatusResult {
run_id,
status: actual_status,
current_step: None,
progress: 0,
elapsed_ms: started_at.elapsed().as_millis() as u64,
artifacts,
})
}
async fn destroy(&self, _ctx: &ExecutionContext) -> Result<(), RunError> {
Ok(())
}
async fn cancel(&self, handle: &ExecutionHandle) -> Result<(), RunError> {
let container_id = {
let containers = self.active_containers.read().await;
let container =
containers
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Container not found: {}", handle.run_id.as_str()),
})?;
container.container_id.clone()
};
if container_id.is_empty() {
return Ok(()); }
let output = tokio::process::Command::new("docker")
.args(["stop", &container_id])
.output()
.await
.map_err(|e| RunError::RuntimeError {
message: format!("Failed to stop container: {}", e),
})?;
if !output.status.success() {
}
{
let mut containers = self.active_containers.write().await;
if let Some(c) = containers.get_mut(handle.run_id.as_str()) {
c.status = RunStatus::Cancelled;
}
}
Ok(())
}
async fn wait(&self, handle: &ExecutionHandle) -> Result<ExecutionResult, RunError> {
let container_id = {
let containers = self.active_containers.read().await;
let container =
containers
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Container not found: {}", handle.run_id.as_str()),
})?;
container.container_id.clone()
};
if container_id.is_empty() {
let containers = self.active_containers.read().await;
let container =
containers
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::InvalidConfig {
message: format!("Container not found: {}", handle.run_id.as_str()),
})?;
return Ok(ExecutionResult {
run_id: container.run_id.clone(),
status: container.status,
artifacts: container.artifacts.clone(),
error: if container.status == RunStatus::Failed {
Some(RunError::ExecutionFailed {
message: format!("Container exited with code: {:?}", container.exit_code),
})
} else {
None
},
metrics: ExecutionMetrics {
wall_time_ms: container.started_at.elapsed().as_millis() as u64,
..Default::default()
},
output: container.captured_output.clone(),
});
}
let wait_output = tokio::process::Command::new("docker")
.args(["wait", &container_id])
.output()
.await
.map_err(|e| RunError::ExecutionFailed {
message: format!("Failed to wait for container: {}", e),
})?;
let exit_code_str = String::from_utf8_lossy(&wait_output.stdout);
let exit_code: i32 = exit_code_str.trim().parse().unwrap_or(-1);
let status = if exit_code == 0 {
RunStatus::Completed
} else {
RunStatus::Failed
};
let logs_output = tokio::process::Command::new("docker")
.args(["logs", "--stdout", &container_id])
.output()
.await
.ok();
let captured_output = logs_output
.as_ref()
.map(|o| {
let stderr_raw = String::from_utf8_lossy(&o.stderr);
super::parse_worker_output(
&String::from_utf8_lossy(&o.stdout),
super::truncate_stderr(&stderr_raw),
)
})
.unwrap_or(None);
let _ = tokio::process::Command::new("docker")
.args(["rm", &container_id])
.output()
.await;
{
let mut containers = self.active_containers.write().await;
if let Some(c) = containers.get_mut(handle.run_id.as_str()) {
c.status = status;
c.exit_code = Some(exit_code);
c.captured_output = captured_output.clone();
}
}
let containers = self.active_containers.read().await;
let container = containers.get(handle.run_id.as_str()).unwrap();
Ok(ExecutionResult {
run_id: container.run_id.clone(),
status,
artifacts: container.artifacts.clone(),
error: if status == RunStatus::Failed {
Some(RunError::ExecutionFailed {
message: format!("Container exited with code: {}", exit_code),
})
} else {
None
},
metrics: ExecutionMetrics {
wall_time_ms: container.started_at.elapsed().as_millis() as u64,
..Default::default()
},
output: captured_output,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_docker_runtime_creation() {
let runtime = DockerRuntime::new();
assert_eq!(runtime.kind(), RuntimeKind::Docker);
}
#[test]
fn test_docker_config() {
let config = DockerConfig {
host: Some("unix:///var/run/docker.sock".into()),
default_image: Some("bzzz/agent:latest".into()),
..Default::default()
};
let runtime = DockerRuntime::with_config(config);
assert_eq!(
runtime.config.host,
Some("unix:///var/run/docker.sock".into())
);
}
#[tokio::test]
async fn test_docker_runtime_create() {
let runtime = DockerRuntime::new();
let spec = AgentSpec::new("test-agent", RuntimeKind::Docker);
let ctx = runtime.create(&spec).await.unwrap();
assert!(ctx.id.starts_with("docker-"));
assert_eq!(ctx.runtime_kind, RuntimeKind::Docker);
}
#[tokio::test]
#[ignore = "Requires Docker to be installed and running"]
async fn test_docker_real_execute() {
let runtime = DockerRuntime::new();
let mut spec = AgentSpec::new("test-agent", RuntimeKind::Docker);
spec.runtime
.config
.insert("command".into(), "echo hello".into());
spec.runtime
.config
.insert("image".into(), "alpine:latest".into());
let ctx = runtime.create(&spec).await.unwrap();
let run = Run::new(
crate::RunTarget::Agent {
spec_path: std::path::PathBuf::from("agent.yaml"),
},
RuntimeKind::Docker,
);
let handle = runtime.execute(&ctx, &run).await.unwrap();
assert_eq!(handle.runtime_kind, RuntimeKind::Docker);
let status = runtime.status(&handle).await.unwrap();
assert_eq!(status.status, RunStatus::Completed);
}
}