use std::sync::Arc;
use async_trait::async_trait;
use tokio::task::JoinSet;
use crate::{ExecutionMetrics, ExecutionResult, FlowPattern, RunError, RunId, RunStatus};
use super::{build_capability_output, execute_worker_with_arc, PatternContext, PatternExecutor};
pub struct AlongsideExecutor;
impl AlongsideExecutor {
pub fn new() -> Self {
AlongsideExecutor
}
}
impl Default for AlongsideExecutor {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl PatternExecutor for AlongsideExecutor {
fn name(&self) -> &'static str {
"alongside"
}
async fn execute(
&self,
_ctx: &PatternContext,
_runtime: &dyn crate::RuntimeAdapter,
_cancel: &crate::CancellationToken,
) -> Result<ExecutionResult, RunError> {
Err(RunError::RuntimeError {
message: "AlongsideExecutor requires Arc runtime. Use execute_with_arc() instead."
.into(),
})
}
async fn execute_with_arc(
&self,
ctx: &PatternContext,
runtime: Arc<dyn crate::RuntimeAdapter>,
cancel: &crate::CancellationToken,
) -> Result<ExecutionResult, RunError> {
let (main_worker_name, side_worker_names) = match &ctx.swarm.flow {
FlowPattern::Alongside { main, side } => (main.clone(), side.clone()),
_ => {
return Err(RunError::PatternError {
pattern: "alongside".into(),
step: "flow".into(),
message: "AlongsideExecutor requires Alongside pattern in flow".into(),
})
}
};
if cancel.is_cancelled().await {
return Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts: vec![],
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics: ExecutionMetrics::default(),
output: None,
});
}
let main = ctx
.get_worker(&main_worker_name)
.ok_or_else(|| RunError::PatternError {
pattern: "alongside".into(),
step: main_worker_name.clone(),
message: format!("Main worker '{}' not found in swarm", main_worker_name),
})?
.clone();
let mut side_tasks: JoinSet<Option<ExecutionResult>> = JoinSet::new();
for side_name in side_worker_names.iter() {
let side_worker = ctx
.get_worker(side_name)
.ok_or_else(|| RunError::PatternError {
pattern: "alongside".into(),
step: side_name.clone(),
message: format!("Side worker '{}' not found in swarm", side_name),
})?
.clone();
let runtime_clone = runtime.clone();
let runtime_ctx = ctx.runtime_ctx.clone();
let scope = ctx.scope.clone();
let cancel_clone = cancel.clone();
side_tasks.spawn(async move {
if cancel_clone.is_cancelled().await {
return None;
}
execute_worker_with_arc(
&side_worker,
runtime_clone,
&runtime_ctx,
&scope,
&cancel_clone,
)
.await
.ok()
});
}
let main_result =
execute_worker_with_arc(&main, runtime, &ctx.runtime_ctx, &ctx.scope, cancel).await?;
match main_result.status {
RunStatus::Completed => {
let mut all_artifacts = main_result.artifacts.clone();
while let Some(side_result) = side_tasks.join_next().await {
if let Ok(Some(result)) = side_result {
all_artifacts.extend(result.artifacts);
}
}
let mut final_scope = ctx.scope.clone();
if let Some(output) = &main_result.output {
final_scope.add_step_output(main_worker_name.clone(), output.clone());
}
let result = ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Completed,
artifacts: all_artifacts,
error: None,
metrics: main_result.metrics,
output: None,
};
Ok(build_capability_output(result, &ctx.swarm, &final_scope))
}
RunStatus::Failed => {
cancel.cancel().await;
side_tasks.abort_all();
Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Failed,
artifacts: main_result.artifacts,
error: main_result.error,
metrics: main_result.metrics,
output: None,
})
}
RunStatus::Cancelled => {
cancel.cancel().await;
side_tasks.abort_all();
Ok(ExecutionResult {
run_id: RunId::new(),
status: RunStatus::Cancelled,
artifacts: vec![],
error: Some(RunError::Cancelled {
reason: "Execution cancelled".into(),
}),
metrics: main_result.metrics,
output: None,
})
}
_ => {
cancel.cancel().await;
side_tasks.abort_all();
Ok(ExecutionResult {
run_id: RunId::new(),
status: main_result.status,
artifacts: main_result.artifacts,
error: main_result.error,
metrics: main_result.metrics,
output: None,
})
}
}
}
async fn on_failure(
&self,
ctx: &mut PatternContext,
_runtime: &dyn crate::RuntimeAdapter,
failed_worker: &str,
_error: &RunError,
) -> Result<bool, RunError> {
let main = match &ctx.swarm.flow {
FlowPattern::Alongside { main, .. } => main,
_ => return Ok(false),
};
if failed_worker == main {
return Ok(false);
}
ctx.state.failed.push(failed_worker.to_string());
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{CancellationToken, ExecutionContext, FlowPattern, RuntimeKind, SwarmFile, Worker};
#[test]
fn test_alongside_executor_name() {
let executor = AlongsideExecutor::new();
assert_eq!(executor.name(), "alongside");
}
#[tokio::test]
async fn test_alongside_execute_requires_arc() {
let executor = AlongsideExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "main".into(),
side: vec![],
},
);
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let result = executor
.execute(&ctx, &crate::LocalRuntime::new(), &cancel)
.await;
assert!(result.is_err());
match result.unwrap_err() {
RunError::RuntimeError { message } => {
assert!(
message.contains("execute_with_arc"),
"Error message should mention execute_with_arc, got: {}",
message
);
}
other => panic!("Expected RuntimeError, got: {:?}", other),
}
}
#[tokio::test]
async fn test_alongside_executor_wrong_pattern() {
let executor = AlongsideExecutor::new();
let swarm = SwarmFile::new("test", FlowPattern::Sequence { steps: vec![] });
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_alongside_executor_real_execution() {
let executor = AlongsideExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "main".into(),
side: vec!["side1".into()],
},
)
.with_worker(Worker::new("main", "agent.yaml"))
.with_worker(Worker::new("side1", "agent.yaml"));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, RunStatus::Completed);
}
#[tokio::test]
async fn test_alongside_side_failure_does_not_affect_main() {
use std::io::Write;
let executor = AlongsideExecutor::new();
let temp_dir = std::env::temp_dir().join("bzzz-alongside-side-fail-test");
std::fs::create_dir_all(&temp_dir).unwrap();
let failing_spec_path = temp_dir.join("failing.yaml");
let mut file = std::fs::File::create(&failing_spec_path).unwrap();
writeln!(file, "apiVersion: v1").unwrap();
writeln!(file, "id: failing-agent").unwrap();
writeln!(file, "runtime:").unwrap();
writeln!(file, " kind: Local").unwrap();
writeln!(file, " config:").unwrap();
writeln!(file, " command: /usr/bin/false").unwrap();
drop(file);
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "main".into(),
side: vec!["failing-side".into()],
},
)
.with_worker(Worker::new("main", "agent.yaml"))
.with_worker(Worker::new(
"failing-side",
failing_spec_path.to_string_lossy().to_string(),
));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;
std::fs::remove_dir_all(&temp_dir).ok();
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, RunStatus::Completed);
}
#[tokio::test]
async fn test_alongside_cancellation_before_start() {
let executor = AlongsideExecutor::new();
let swarm = SwarmFile::new(
"test",
FlowPattern::Alongside {
main: "main".into(),
side: vec!["side1".into()],
},
)
.with_worker(Worker::new("main", "agent.yaml"))
.with_worker(Worker::new("side1", "agent.yaml"));
let ctx = PatternContext::new(swarm, ExecutionContext::new("ctx", RuntimeKind::Local));
let cancel = CancellationToken::new();
cancel.cancel().await;
let runtime = crate::create_runtime(RuntimeKind::Local).unwrap();
let result = executor.execute_with_arc(&ctx, runtime, &cancel).await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.status, RunStatus::Cancelled);
}
}