use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use bzzz_core::{
create_runtime,
pattern::{execute_with_timeout, get_executor, PatternContext},
AgentSpecParser, CancellationToken, ExecutionContext, Run, RunTarget, RuntimeKind,
SwarmFileParser,
};
use tokio::sync::Mutex;
use crate::commands::output::{OutputFormat, RunOutput};
use crate::run_registry::RunState;
pub struct AdaptiveMonitor {
start_time: Instant,
worker_count: usize,
check_interval_ms: u64,
metrics: Arc<Mutex<AdaptiveMetrics>>,
}
#[derive(Debug, Default)]
pub struct AdaptiveMetrics {
checks_count: u64,
workers_completed: u64,
workers_failed: u64,
avg_worker_time_ms: u64,
peak_concurrent: usize,
}
impl AdaptiveMonitor {
pub fn new(worker_count: usize) -> Self {
Self {
start_time: Instant::now(),
worker_count,
check_interval_ms: 500, metrics: Arc::new(Mutex::new(AdaptiveMetrics::default())),
}
}
#[allow(dead_code)]
pub fn metrics(&self) -> Arc<Mutex<AdaptiveMetrics>> {
self.metrics.clone()
}
pub fn log_status(&self, format: OutputFormat) {
if format == OutputFormat::Text {
let elapsed = self.start_time.elapsed();
println!(
" [Adaptive] {} workers, {}ms elapsed, checking every {}ms",
self.worker_count,
elapsed.as_millis(),
self.check_interval_ms
);
}
}
pub async fn record_completion(&self, success: bool, duration_ms: u64) {
let mut metrics = self.metrics.lock().await;
metrics.checks_count += 1;
if success {
metrics.workers_completed += 1;
} else {
metrics.workers_failed += 1;
}
let total_time = metrics.avg_worker_time_ms * (metrics.workers_completed + metrics.workers_failed - 1) + duration_ms;
metrics.avg_worker_time_ms = total_time / (metrics.workers_completed + metrics.workers_failed);
}
pub async fn record_concurrent(&self, count: usize) {
let mut metrics = self.metrics.lock().await;
if count > metrics.peak_concurrent {
metrics.peak_concurrent = count;
}
}
pub async fn print_report(&self, format: OutputFormat) {
let metrics = self.metrics.lock().await;
let elapsed = self.start_time.elapsed();
if format == OutputFormat::Text {
println!(" [Adaptive] Execution Summary:");
println!(" Total time: {}ms", elapsed.as_millis());
println!(" Checks performed: {}", metrics.checks_count);
println!(" Workers completed: {}", metrics.workers_completed);
println!(" Workers failed: {}", metrics.workers_failed);
println!(" Avg worker time: {}ms", metrics.avg_worker_time_ms);
println!(" Peak concurrent: {}", metrics.peak_concurrent);
}
}
}
pub async fn execute(
file: PathBuf,
background: bool,
timeout_secs: Option<u64>,
runtime: &str,
input: Option<String>,
output_format: OutputFormat,
adaptive: bool,
) -> Result<()> {
if output_format == OutputFormat::Text {
println!("🚀 Running: {}", file.display());
if adaptive {
println!(" Mode: Adaptive runtime monitoring enabled");
}
}
let input_value: Option<serde_json::Value> = match input {
Some(s) => Some(
serde_json::from_str(&s)
.context("Failed to parse --input as JSON")?,
),
None => None,
};
let cli_runtime_kind = match runtime {
"docker" => RuntimeKind::Docker,
"http" => RuntimeKind::Http,
_ => RuntimeKind::Local,
};
let ext = file
.extension()
.map(|e| e.to_string_lossy().to_string())
.unwrap_or_default();
let _is_swarm = ext == "swarm" || ext == "yaml" || ext == "yml";
if let Ok(swarm) = SwarmFileParser::from_yaml_file(&file) {
let swarm_runtime_kind = swarm.runtime.unwrap_or(cli_runtime_kind);
run_swarm(
&file,
swarm,
background,
timeout_secs,
swarm_runtime_kind,
input_value,
output_format,
adaptive,
)
.await
} else if let Ok(spec) = AgentSpecParser::from_yaml_file(&file) {
let spec_runtime_kind = cli_runtime_kind;
run_agent(&file, spec, background, timeout_secs, spec_runtime_kind, output_format, adaptive).await
} else {
anyhow::bail!(
"Failed to parse {} as SwarmFile or Agent Spec",
file.display()
)
}
}
async fn run_agent(
file: &std::path::Path,
spec: bzzz_core::AgentSpec,
background: bool,
timeout_secs: Option<u64>,
runtime_kind: RuntimeKind,
output_format: OutputFormat,
adaptive: bool,
) -> Result<()> {
if output_format == OutputFormat::Text {
println!(" Agent: {}", spec.id.as_str());
println!(" Runtime: {:?}", runtime_kind);
}
let monitor = if adaptive {
Some(AdaptiveMonitor::new(1))
} else {
None
};
let runtime = create_runtime(runtime_kind)?;
let registry = crate::run_registry::RunRegistry::default();
let ctx = runtime.create(&spec).await?;
let mut run = Run::new(
RunTarget::Agent {
spec_path: file.to_path_buf(),
},
runtime_kind,
);
if let Some(secs) = timeout_secs {
run = run.with_timeout(Duration::from_secs(secs));
}
let handle = if background {
runtime.execute_background(&ctx, &run).await?
} else {
runtime.execute(&ctx, &run).await?
};
let pid = handle
.runtime_handle
.strip_prefix("pid:")
.and_then(|s| s.parse::<u32>().ok());
let run_state = RunState::new(
handle.run_id.clone(),
runtime_kind,
file.to_path_buf(),
std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
)
.with_pid_opt(pid);
registry.register(&run_state)?;
if let Some(ref m) = monitor {
m.log_status(output_format);
}
if background {
if output_format == OutputFormat::Json {
let out = RunOutput {
run_id: handle.run_id.as_str().to_string(),
status: "Running".to_string(),
output: None,
error: None,
duration_ms: 0,
artifacts_count: 0,
};
println!("{}", serde_json::to_string(&out)?);
} else {
println!(" Run ID: {}", handle.run_id.as_str());
println!(" Status: Running in background");
println!(
" Use `bzzz status --id {}` to check progress",
handle.run_id.as_str()
);
}
} else {
let start_time = Instant::now();
let result = runtime.wait(&handle).await?;
let duration_ms = start_time.elapsed().as_millis() as u64;
registry.update_status(handle.run_id.as_str(), result.status)?;
if let Some(ref m) = monitor {
m.record_completion(result.status == bzzz_core::RunStatus::Completed, duration_ms).await;
m.print_report(output_format).await;
}
if output_format == OutputFormat::Json {
let out = RunOutput {
run_id: handle.run_id.as_str().to_string(),
status: format!("{:?}", result.status),
output: None,
error: result.error.map(|e| format!("{:?}", e)),
duration_ms: result.metrics.wall_time_ms,
artifacts_count: result.artifacts.len(),
};
println!("{}", serde_json::to_string(&out)?);
} else {
println!(" Run ID: {}", handle.run_id.as_str());
println!(" Status: {:?}", result.status);
if let Some(error) = result.error {
println!(" Error: {:?}", error);
}
println!(" Duration: {}ms", result.metrics.wall_time_ms);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_swarm(
file: &std::path::Path,
mut swarm: bzzz_core::SwarmFile,
_background: bool,
timeout_secs: Option<u64>,
runtime_kind: RuntimeKind,
input_value: Option<serde_json::Value>,
output_format: OutputFormat,
adaptive: bool,
) -> Result<()> {
if swarm.is_simple() {
return run_simple_swarm(
file,
swarm,
timeout_secs,
runtime_kind,
input_value,
output_format,
adaptive,
)
.await;
}
if output_format == OutputFormat::Text {
println!(" Swarm: {}", swarm.id.as_str());
println!(" Workers: {}", swarm.workers.len());
println!(" Pattern: {}", swarm.flow.type_name());
println!(" Runtime: {:?}", runtime_kind);
}
let monitor = if adaptive {
let m = AdaptiveMonitor::new(swarm.workers.len());
m.log_status(output_format);
Some(m)
} else {
None
};
if let Some(secs) = timeout_secs {
swarm = swarm.with_timeout(Duration::from_secs(secs));
}
let runtime = create_runtime(runtime_kind)?;
let swarm_dir = file.parent().unwrap_or_else(|| std::path::Path::new("."));
let runtime_ctx = ExecutionContext::new(format!("swarm-{}", swarm.id.as_str()), runtime_kind)
.with_working_dir(swarm_dir.to_path_buf());
let pattern_ctx = match input_value {
Some(input) => PatternContext::with_input(swarm, runtime_ctx, input),
None => PatternContext::new(swarm, runtime_ctx),
};
let executor = get_executor(&pattern_ctx.swarm.flow);
let cancel = CancellationToken::new();
if output_format == OutputFormat::Text {
println!(" Executor: {}", executor.name());
}
if let Some(ref m) = monitor {
m.record_concurrent(pattern_ctx.swarm.workers.len()).await;
}
let result = execute_with_timeout(&*executor, &pattern_ctx, runtime, &cancel).await?;
if let Some(ref m) = monitor {
m.record_completion(result.status == bzzz_core::RunStatus::Completed, result.metrics.wall_time_ms).await;
m.print_report(output_format).await;
}
if output_format == OutputFormat::Json {
let out = RunOutput {
run_id: result.run_id.as_str().to_string(),
status: format!("{:?}", result.status),
output: result.output,
error: result.error.map(|e| format!("{:?}", e)),
duration_ms: result.metrics.wall_time_ms,
artifacts_count: result.artifacts.len(),
};
println!("{}", serde_json::to_string(&out)?);
} else {
println!(" Run ID: {}", result.run_id.as_str());
println!(" Status: {:?}", result.status);
if let Some(error) = result.error {
println!(" Error: {:?}", error);
}
println!(" Duration: {}ms", result.metrics.wall_time_ms);
if !result.artifacts.is_empty() {
println!(" Artifacts: {} produced", result.artifacts.len());
}
if let Some(output) = result.output {
println!(" Output:");
println!("{}", serde_json::to_string_pretty(&output)?);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
#[allow(dead_code)]
async fn run_simple_swarm(
file: &std::path::Path,
swarm: bzzz_core::SwarmFile,
timeout_secs: Option<u64>,
runtime_kind: RuntimeKind,
input_value: Option<serde_json::Value>,
output_format: OutputFormat,
adaptive: bool,
) -> Result<()> {
if output_format == OutputFormat::Text {
println!(" Swarm: {} (simple task)", swarm.id.as_str());
println!(" Worker: {}", swarm.workers[0].name);
println!(" Runtime: {:?}", runtime_kind);
}
let monitor = if adaptive {
let m = AdaptiveMonitor::new(1);
m.log_status(output_format);
Some(m)
} else {
None
};
let swarm = match timeout_secs {
Some(secs) => swarm.with_timeout(Duration::from_secs(secs)),
None => swarm,
};
let runtime = create_runtime(runtime_kind)?;
let swarm_dir = file.parent().unwrap_or_else(|| std::path::Path::new("."));
let runtime_ctx = ExecutionContext::new(format!("simple-{}", swarm.id.as_str()), runtime_kind)
.with_working_dir(swarm_dir.to_path_buf());
let pattern_ctx = match input_value {
Some(input) => PatternContext::with_input(swarm, runtime_ctx, input),
None => PatternContext::new(swarm, runtime_ctx),
};
let executor = get_executor(&pattern_ctx.swarm.flow);
let cancel = CancellationToken::new();
let start_time = Instant::now();
let result = execute_with_timeout(&*executor, &pattern_ctx, runtime, &cancel).await?;
let duration_ms = start_time.elapsed().as_millis() as u64;
if let Some(ref m) = monitor {
m.record_completion(result.status == bzzz_core::RunStatus::Completed, duration_ms).await;
m.print_report(output_format).await;
}
if output_format == OutputFormat::Json {
let out = RunOutput {
run_id: result.run_id.as_str().to_string(),
status: format!("{:?}", result.status),
output: result.output,
error: result.error.map(|e| format!("{:?}", e)),
duration_ms: result.metrics.wall_time_ms,
artifacts_count: result.artifacts.len(),
};
println!("{}", serde_json::to_string(&out)?);
} else {
println!(" Run ID: {}", result.run_id.as_str());
println!(" Status: {:?}", result.status);
if let Some(error) = result.error {
println!(" Error: {:?}", error);
}
println!(" Duration: {}ms", result.metrics.wall_time_ms);
if let Some(output) = result.output {
println!(" Output:");
println!("{}", serde_json::to_string_pretty(&output)?);
}
}
Ok(())
}