use std::collections::HashMap;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use bzzz_core::{FlowPattern, InlineWorker, RunStatus, SwarmFile, Worker};
use tokio::process::Command;
use crate::commands::output::{OutputFormat, RunOutput};
pub async fn execute(
role: String,
command: String,
env: Vec<String>,
timeout_secs: Option<u64>,
input: Option<String>,
output_format: OutputFormat,
) -> Result<()> {
if output_format == OutputFormat::Text {
println!("🚀 Spawning inline worker: {}", role);
println!(" Command: {}", command);
}
let env_map: HashMap<String, String> = env
.iter()
.filter_map(|e| {
let parts: Vec<&str> = e.splitn(2, '=').collect();
if parts.len() == 2 {
Some((parts[0].to_string(), parts[1].to_string()))
} else {
None
}
})
.collect();
let mut inline = InlineWorker::new(&command);
for (key, value) in &env_map {
inline = inline.with_env(key, value);
}
let swarm = SwarmFile::new(
&role,
FlowPattern::Sequence {
steps: vec![role.clone()],
},
)
.with_worker(Worker::new_inline(&role, &command));
swarm.validate().context("Inline worker validation failed")?;
let input_value: Option<serde_json::Value> = match input {
Some(s) => Some(
serde_json::from_str(&s)
.context("Failed to parse --input as JSON")?,
),
None => None,
};
let start_time = Instant::now();
let result = execute_inline_command(&command, &env_map, input_value, timeout_secs).await?;
let duration_ms = start_time.elapsed().as_millis() as u64;
if output_format == OutputFormat::Json {
let out = RunOutput {
run_id: format!("spawn-{}", role),
status: format!("{:?}", result.status),
output: result.output,
error: result.error.map(|e| format!("{:?}", e)),
duration_ms,
artifacts_count: 0,
};
println!("{}", serde_json::to_string(&out)?);
} else {
println!(" Status: {:?}", result.status);
if let Some(error) = result.error {
println!(" Error: {:?}", error);
}
println!(" Duration: {}ms", duration_ms);
if let Some(output) = result.output {
println!(" Output:");
println!("{}", serde_json::to_string_pretty(&output)?);
}
}
Ok(())
}
struct InlineResult {
status: RunStatus,
output: Option<serde_json::Value>,
error: Option<bzzz_core::RunError>,
}
async fn execute_inline_command(
command: &str,
env: &HashMap<String, String>,
input: Option<serde_json::Value>,
timeout_secs: Option<u64>,
) -> Result<InlineResult> {
let parts: Vec<&str> = command.split_whitespace().collect();
if parts.is_empty() {
return Ok(InlineResult {
status: RunStatus::Failed,
output: None,
error: Some(bzzz_core::RunError::InvalidConfig {
message: "Empty command".into(),
}),
});
}
let program = parts[0];
let args = &parts[1..];
let mut cmd = Command::new(program);
cmd.args(args);
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.kill_on_drop(true);
for (key, value) in env {
cmd.env(key, value);
}
if let Some(input_val) = input {
if let Ok(json_str) = serde_json::to_string(&input_val) {
cmd.env("BZZZ_INPUT", &json_str);
}
}
let child = cmd.spawn().context("Failed to spawn subprocess")?;
let output_result = if let Some(secs) = timeout_secs {
tokio::time::timeout(Duration::from_secs(secs), child.wait_with_output())
.await
.context("Command timed out")?
} else {
child.wait_with_output().await
}
.context("Failed to wait for process")?;
let exit_status = output_result.status;
let stdout = String::from_utf8_lossy(&output_result.stdout);
let stderr_raw = String::from_utf8_lossy(&output_result.stderr);
let stderr_truncated = if stderr_raw.len() > 8192 {
&stderr_raw[..8192]
} else {
&stderr_raw
};
let output = if let Ok(json) = serde_json::from_str::<serde_json::Value>(stdout.trim()) {
Some(json)
} else if stdout.trim().is_empty() {
None
} else {
Some(serde_json::json!({ "stdout": stdout.trim() }))
};
let status = if exit_status.success() {
RunStatus::Completed
} else {
RunStatus::Failed
};
let error = if status == RunStatus::Failed {
let base_msg = format!(
"Process exited with code: {}",
exit_status.code().unwrap_or(-1)
);
let msg = if stderr_truncated.trim().is_empty() {
base_msg
} else {
format!("{}\nstderr: {}", base_msg, stderr_truncated.trim())
};
Some(bzzz_core::RunError::ExecutionFailed { message: msg })
} else {
None
};
Ok(InlineResult {
status,
output,
error,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_execute_inline_echo() {
let result = execute_inline_command("echo hello", &HashMap::new(), None, None)
.await
.unwrap();
assert_eq!(result.status, RunStatus::Completed);
assert!(result.output.is_some());
assert_eq!(
result.output.unwrap(),
serde_json::json!({ "stdout": "hello" })
);
}
#[tokio::test]
async fn test_execute_inline_failure() {
let result = execute_inline_command("false", &HashMap::new(), None, None)
.await
.unwrap();
assert_eq!(result.status, RunStatus::Failed);
assert!(result.error.is_some());
}
#[tokio::test]
async fn test_execute_inline_timeout() {
let result = execute_inline_command("echo quick", &HashMap::new(), None, Some(5))
.await
.unwrap();
assert_eq!(result.status, RunStatus::Completed);
}
#[test]
fn test_inline_worker_creation() {
let inline = InlineWorker::new("python process.py")
.with_env("DEBUG", "true");
assert_eq!(inline.command, "python process.py");
assert_eq!(inline.env.get("DEBUG"), Some(&"true".to_string()));
}
#[test]
fn test_swarm_with_inline_worker() {
let swarm = SwarmFile::new(
"test-spawn",
FlowPattern::Sequence {
steps: vec!["worker".into()],
},
)
.with_worker(Worker::new_inline("worker", "echo test"));
assert!(swarm.validate().is_ok());
assert!(swarm.is_simple()); }
#[tokio::test]
async fn test_env_propagation() {
let env = HashMap::from([("MY_VAR".to_string(), "my_value".to_string())]);
let result = execute_inline_command("printenv MY_VAR", &env, None, None).await;
if let Ok(res) = result {
if res.status == RunStatus::Completed {
let output = res.output.unwrap_or_default();
let stdout = output["stdout"].as_str().unwrap_or("");
assert!(
stdout.contains("my_value"),
"Expected my_value in output, got: {}",
stdout
);
}
}
}
}