use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::RwLock;
use tokio::time::Instant;
use crate::{
AgentSpec, ArtifactId, ExecutionContext, ExecutionHandle, ExecutionMetrics, ExecutionResult,
ResourceLimits, Run, RunError, RunId, RunStatus, RuntimeAdapter, RuntimeKind, StatusResult,
};
#[derive(Debug, Clone)]
pub struct ComposeConfig {
pub compose_file: Option<String>,
pub project_name: Option<String>,
pub env_file: Option<String>,
pub build: bool,
pub auto_remove: bool,
}
impl Default for ComposeConfig {
fn default() -> Self {
ComposeConfig {
compose_file: None,
project_name: None,
env_file: None,
build: false,
auto_remove: true,
}
}
}
#[allow(dead_code)]
struct ActiveStack {
run_id: RunId,
status: RunStatus,
started_at: Instant,
artifacts: Vec<ArtifactId>,
project_name: String,
services: Vec<String>,
}
pub struct ComposeRuntime {
config: ComposeConfig,
active_stacks: Arc<RwLock<HashMap<String, ActiveStack>>>,
}
impl ComposeRuntime {
pub fn new() -> Self {
ComposeRuntime::with_config(ComposeConfig::default())
}
pub fn with_config(config: ComposeConfig) -> Self {
ComposeRuntime {
config,
active_stacks: Arc::new(RwLock::new(HashMap::new())),
}
}
fn build_up_command(
compose_file: Option<&str>,
project_name: Option<&str>,
build: bool,
detached: bool,
) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("compose");
if let Some(file) = compose_file {
cmd.arg("-f").arg(file);
}
if let Some(name) = project_name {
cmd.arg("-p").arg(name);
}
cmd.arg("up");
if build {
cmd.arg("--build");
}
if detached {
cmd.arg("-d");
}
cmd
}
fn build_down_command(
compose_file: Option<&str>,
project_name: Option<&str>,
remove_volumes: bool,
) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("compose");
if let Some(file) = compose_file {
cmd.arg("-f").arg(file);
}
if let Some(name) = project_name {
cmd.arg("-p").arg(name);
}
cmd.arg("down");
if remove_volumes {
cmd.arg("-v");
}
cmd
}
async fn is_stack_running(project_name: &str) -> bool {
let output = tokio::process::Command::new("docker")
.args(["compose", "-p", project_name, "ps", "-q"])
.output()
.await;
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout);
!stdout.trim().is_empty()
}
Err(_) => false,
}
}
async fn get_services(project_name: &str) -> Vec<String> {
let output = tokio::process::Command::new("docker")
.args(["compose", "-p", project_name, "config", "--services"])
.output()
.await;
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout);
stdout.lines().map(|s| s.to_string()).collect()
}
Err(_) => Vec::new(),
}
}
fn load_spec_from_run(&self, run: &Run, ctx: &ExecutionContext) -> Result<AgentSpec, RunError> {
match &run.target {
crate::RunTarget::Agent { spec_path } => {
let default_dir = std::path::PathBuf::from(".");
let base = ctx.working_dir.as_ref().unwrap_or(&default_dir);
let full_path = if spec_path.is_absolute() {
spec_path.clone()
} else {
base.join(spec_path)
};
if full_path.exists() {
AgentSpec::from_yaml_file(&full_path)
} else {
let name = full_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("agent");
Ok(AgentSpec::new(name, RuntimeKind::Docker))
}
}
crate::RunTarget::Swarm { swarmfile_path: _ } => Err(RunError::InvalidConfig {
message: "Swarm execution requires worker loading".into(),
}),
crate::RunTarget::A2AAgent { .. } => Err(RunError::InvalidConfig {
message: "A2A targets should use A2ARuntime, not ComposeRuntime".into(),
}),
}
}
}
impl Default for ComposeRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RuntimeAdapter for ComposeRuntime {
fn kind(&self) -> RuntimeKind {
RuntimeKind::Docker
}
async fn create(&self, spec: &AgentSpec) -> Result<ExecutionContext, RunError> {
let ctx =
ExecutionContext::new(format!("compose-{}", spec.id.as_str()), RuntimeKind::Docker)
.with_limits(ResourceLimits::default());
Ok(ctx)
}
async fn execute(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let compose_file = spec.runtime.config.get("compose_file").map(|s| s.as_str());
let project_name = spec
.runtime
.config
.get("project_name")
.map(|s| s.as_str())
.or(self.config.project_name.as_deref())
.unwrap_or("bzzz-compose");
let started_at = Instant::now();
let mut cmd = Self::build_up_command(
compose_file,
Some(project_name),
self.config.build,
false, );
let output = cmd.output().await.map_err(|e| RunError::StartupFailed {
message: format!("Failed to run docker compose up: {}", e),
})?;
let status = if output.status.success() {
RunStatus::Completed
} else {
RunStatus::Failed
};
let services = Self::get_services(project_name).await;
let handle = ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Docker,
format!("compose:{}", project_name),
);
let stack = ActiveStack {
run_id: run.id.clone(),
status,
started_at,
artifacts: Vec::new(),
project_name: project_name.to_string(),
services,
};
{
let mut stacks = self.active_stacks.write().await;
stacks.insert(run.id.as_str().to_string(), stack);
}
if self.config.auto_remove {
let mut cmd = Self::build_down_command(compose_file, Some(project_name), true);
let _ = cmd.output().await;
}
Ok(handle)
}
async fn execute_background(
&self,
ctx: &ExecutionContext,
run: &Run,
) -> Result<ExecutionHandle, RunError> {
let spec = self.load_spec_from_run(run, ctx)?;
let compose_file = spec.runtime.config.get("compose_file").map(|s| s.as_str());
let project_name = spec
.runtime
.config
.get("project_name")
.map(|s| s.as_str())
.or(self.config.project_name.as_deref())
.unwrap_or("bzzz-compose");
let started_at = Instant::now();
let mut cmd = Self::build_up_command(
compose_file,
Some(project_name),
self.config.build,
true, );
let output = cmd.output().await.map_err(|e| RunError::StartupFailed {
message: format!("Failed to run docker compose up: {}", e),
})?;
if !output.status.success() {
return Err(RunError::StartupFailed {
message: format!(
"docker compose up failed: {}",
String::from_utf8_lossy(&output.stderr)
),
});
}
let services = Self::get_services(project_name).await;
let handle = ExecutionHandle::new(
run.id.clone(),
RuntimeKind::Docker,
format!("compose:{}", project_name),
);
let stack = ActiveStack {
run_id: run.id.clone(),
status: RunStatus::Running,
started_at,
artifacts: Vec::new(),
project_name: project_name.to_string(),
services,
};
{
let mut stacks = self.active_stacks.write().await;
stacks.insert(run.id.as_str().to_string(), stack);
}
Ok(handle)
}
async fn status(&self, handle: &ExecutionHandle) -> Result<StatusResult, RunError> {
let stacks = self.active_stacks.read().await;
let stack = stacks
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::not_found("stack", handle.run_id.as_str()))?;
let run_id = stack.run_id.clone();
let started_at = stack.started_at;
let artifacts = stack.artifacts.clone();
let actual_status = if Self::is_stack_running(&stack.project_name).await {
RunStatus::Running
} else {
stack.status
};
Ok(StatusResult {
run_id,
status: actual_status,
current_step: None,
progress: 0,
elapsed_ms: started_at.elapsed().as_millis() as u64,
artifacts,
})
}
async fn destroy(&self, _ctx: &ExecutionContext) -> Result<(), RunError> {
Ok(())
}
async fn cancel(&self, handle: &ExecutionHandle) -> Result<(), RunError> {
let (project_name, compose_file) = {
let stacks = self.active_stacks.read().await;
let stack = stacks
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::not_found("stack", handle.run_id.as_str()))?;
(stack.project_name.clone(), None as Option<String>)
};
let mut cmd = Self::build_down_command(compose_file.as_deref(), Some(&project_name), true);
let output = cmd.output().await.map_err(|e| RunError::RuntimeError {
message: format!("Failed to stop compose stack: {}", e),
})?;
if !output.status.success() {
}
{
let mut stacks = self.active_stacks.write().await;
if let Some(s) = stacks.get_mut(handle.run_id.as_str()) {
s.status = RunStatus::Cancelled;
}
}
Ok(())
}
async fn wait(&self, handle: &ExecutionHandle) -> Result<ExecutionResult, RunError> {
let (project_name, started_at) = {
let stacks = self.active_stacks.read().await;
let stack = stacks
.get(handle.run_id.as_str())
.ok_or_else(|| RunError::not_found("stack", handle.run_id.as_str()))?;
(stack.project_name.clone(), stack.started_at)
};
loop {
if !Self::is_stack_running(&project_name).await {
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
{
let mut stacks = self.active_stacks.write().await;
if let Some(s) = stacks.get_mut(handle.run_id.as_str()) {
s.status = RunStatus::Completed;
}
}
Ok(ExecutionResult {
run_id: handle.run_id.clone(),
status: RunStatus::Completed,
artifacts: Vec::new(),
error: None,
metrics: ExecutionMetrics {
wall_time_ms: started_at.elapsed().as_millis() as u64,
..Default::default()
},
output: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compose_runtime_creation() {
let runtime = ComposeRuntime::new();
assert_eq!(runtime.kind(), RuntimeKind::Docker);
}
#[test]
fn test_compose_config() {
let config = ComposeConfig {
compose_file: Some("docker-compose.prod.yaml".into()),
project_name: Some("my-app".into()),
env_file: None,
build: true,
auto_remove: false,
};
let runtime = ComposeRuntime::with_config(config);
assert_eq!(runtime.config.project_name, Some("my-app".into()));
}
#[tokio::test]
async fn test_compose_runtime_create() {
let runtime = ComposeRuntime::new();
let spec = AgentSpec::new("test-compose", RuntimeKind::Docker);
let ctx = runtime.create(&spec).await.unwrap();
assert!(ctx.id.starts_with("compose-"));
assert_eq!(ctx.runtime_kind, RuntimeKind::Docker);
}
}