#![allow(dead_code)]
use crate::agency::agent::Agent;
use crate::agency::error::{AgencyError, AgencyResult};
use crate::agency::executor::{ExecutionContext, ExecutionResult, Executor};
use crate::agency::models::{AgencyEvent, EventType, TokenUsage};
use crate::agency::session::Session;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OrchestrationType {
Sequential,
Parallel,
Loop,
Hierarchical,
}
#[derive(Debug, Clone)]
pub struct Pipeline {
pub name: String,
pub orchestration: OrchestrationType,
pub agents: Vec<Arc<Agent>>,
pub max_iterations: u32,
}
impl Pipeline {
pub fn sequential(name: impl Into<String>, agents: Vec<Agent>) -> Self {
Self {
name: name.into(),
orchestration: OrchestrationType::Sequential,
agents: agents.into_iter().map(Arc::new).collect(),
max_iterations: 1,
}
}
pub fn parallel(name: impl Into<String>, agents: Vec<Agent>) -> Self {
Self {
name: name.into(),
orchestration: OrchestrationType::Parallel,
agents: agents.into_iter().map(Arc::new).collect(),
max_iterations: 1,
}
}
pub fn loop_agent(name: impl Into<String>, agent: Agent, max_iterations: u32) -> Self {
Self {
name: name.into(),
orchestration: OrchestrationType::Loop,
agents: vec![Arc::new(agent)],
max_iterations,
}
}
}
#[derive(Debug, Clone)]
pub struct Swarm {
pub name: String,
pub description: String,
pub coordinator: Arc<Agent>,
pub workers: Vec<Arc<Agent>>,
pub goal: Option<String>,
}
impl Swarm {
pub fn new(
name: impl Into<String>,
description: impl Into<String>,
coordinator: Agent,
workers: Vec<Agent>,
) -> Self {
Self {
name: name.into(),
description: description.into(),
coordinator: Arc::new(coordinator),
workers: workers.into_iter().map(Arc::new).collect(),
goal: None,
}
}
pub fn with_goal(mut self, goal: impl Into<String>) -> Self {
self.goal = Some(goal.into());
self
}
}
pub struct Orchestrator {
executor: Arc<Executor>,
}
impl Orchestrator {
pub fn new(executor: Arc<Executor>) -> Self {
Self { executor }
}
pub async fn run_pipeline(
&self,
pipeline: &Pipeline,
input: &str,
ctx: &mut ExecutionContext,
) -> AgencyResult<OrchestratorResult> {
match pipeline.orchestration {
OrchestrationType::Sequential => self.run_sequential(pipeline, input, ctx).await,
OrchestrationType::Parallel => self.run_parallel(pipeline, input, ctx).await,
OrchestrationType::Loop => self.run_loop(pipeline, input, ctx).await,
OrchestrationType::Hierarchical => Err(AgencyError::OrchestrationError(
"Use run_swarm for hierarchical orchestration".to_string(),
)),
}
}
async fn run_sequential(
&self,
pipeline: &Pipeline,
input: &str,
ctx: &mut ExecutionContext,
) -> AgencyResult<OrchestratorResult> {
let start_time = std::time::Instant::now();
let mut results = Vec::new();
let mut events = Vec::new();
let mut token_usage = TokenUsage::default();
let mut current_input = input.to_string();
for agent_arc in &pipeline.agents {
let agent = agent_arc.as_ref();
let mut session = Session::new(agent.name(), ctx.user_id.clone());
let result = self
.executor
.execute(agent, &mut session, ¤t_input, ctx)
.await?;
current_input = result.response.clone();
token_usage.add(&result.token_usage);
events.extend(result.events.clone());
results.push(result);
}
let final_response = results
.last()
.map(|r| r.response.clone())
.unwrap_or_default();
Ok(OrchestratorResult {
response: final_response,
agent_results: results,
events,
token_usage,
duration_ms: start_time.elapsed().as_millis() as u64,
iterations: 1,
})
}
async fn run_parallel(
&self,
pipeline: &Pipeline,
input: &str,
ctx: &mut ExecutionContext,
) -> AgencyResult<OrchestratorResult> {
let start_time = std::time::Instant::now();
let mut handles = Vec::new();
for agent_arc in &pipeline.agents {
let agent = agent_arc.clone();
let executor = self.executor.clone();
let input = input.to_string();
let user_id = ctx.user_id.clone();
handles.push(tokio::spawn(async move {
let mut session = Session::new(agent.name(), user_id.clone());
let mut ctx = ExecutionContext::new(&session);
ctx.user_id = user_id;
executor
.execute(agent.as_ref(), &mut session, &input, &mut ctx)
.await
}));
}
let mut results = Vec::new();
let mut events = Vec::new();
let mut token_usage = TokenUsage::default();
let mut responses = Vec::new();
for handle in handles {
match handle.await {
Ok(Ok(result)) => {
responses.push(result.response.clone());
token_usage.add(&result.token_usage);
events.extend(result.events.clone());
results.push(result);
}
Ok(Err(e)) => {
return Err(e);
}
Err(e) => {
return Err(AgencyError::ExecutionFailed(e.to_string()));
}
}
}
let final_response = responses.join("\n\n---\n\n");
Ok(OrchestratorResult {
response: final_response,
agent_results: results,
events,
token_usage,
duration_ms: start_time.elapsed().as_millis() as u64,
iterations: 1,
})
}
async fn run_loop(
&self,
pipeline: &Pipeline,
input: &str,
ctx: &mut ExecutionContext,
) -> AgencyResult<OrchestratorResult> {
let start_time = std::time::Instant::now();
let mut results = Vec::new();
let mut events = Vec::new();
let mut token_usage = TokenUsage::default();
let mut current_input = input.to_string();
let mut iterations = 0;
let agent_arc = pipeline.agents.first().ok_or_else(|| {
AgencyError::OrchestrationError("Loop pipeline requires at least one agent".to_string())
})?;
loop {
iterations += 1;
if iterations > pipeline.max_iterations {
break;
}
let agent = agent_arc.as_ref();
let mut session = Session::new(agent.name(), ctx.user_id.clone());
let result = self
.executor
.execute(agent, &mut session, ¤t_input, ctx)
.await?;
token_usage.add(&result.token_usage);
events.extend(result.events.clone());
results.push(result.clone());
if result.response.contains("DONE")
|| result.response.contains("COMPLETE")
|| result.response.contains("FINISHED")
{
break;
}
current_input = result.response;
}
let final_response = results
.last()
.map(|r| r.response.clone())
.unwrap_or_default();
Ok(OrchestratorResult {
response: final_response,
agent_results: results,
events,
token_usage,
duration_ms: start_time.elapsed().as_millis() as u64,
iterations,
})
}
pub async fn run_swarm(
&self,
swarm: &Swarm,
input: &str,
ctx: &mut ExecutionContext,
) -> AgencyResult<OrchestratorResult> {
let start_time = std::time::Instant::now();
let mut results = Vec::new();
let mut events = Vec::new();
let mut token_usage = TokenUsage::default();
let coordinator = swarm.coordinator.as_ref();
let mut coord_session = Session::new(coordinator.name(), ctx.user_id.clone());
let worker_info: Vec<_> = swarm
.workers
.iter()
.map(|w| format!("- {}: {}", w.name(), w.description()))
.collect();
let coordinator_input = format!(
"Task: {}\n\nAvailable workers:\n{}\n\nAnalyze the task and delegate to appropriate workers.",
input,
worker_info.join("\n")
);
let coord_result = self
.executor
.execute(coordinator, &mut coord_session, &coordinator_input, ctx)
.await?;
token_usage.add(&coord_result.token_usage);
events.extend(coord_result.events.clone());
results.push(coord_result.clone());
let handoff_event = AgencyEvent {
event_type: EventType::Handoff,
agent_name: coordinator.name().to_string(),
data: serde_json::json!({
"from": coordinator.name(),
"task": input
}),
timestamp: Utc::now(),
session_id: Some(coord_session.id.clone()),
};
events.push(handoff_event.clone());
ctx.emit(handoff_event).await;
for worker_arc in &swarm.workers {
let worker = worker_arc.as_ref();
let mut worker_session = Session::new(worker.name(), ctx.user_id.clone());
let worker_result = self
.executor
.execute(worker, &mut worker_session, input, ctx)
.await?;
token_usage.add(&worker_result.token_usage);
events.extend(worker_result.events.clone());
results.push(worker_result);
}
let worker_results: Vec<_> = results
.iter()
.skip(1) .map(|r| format!("Result: {}", r.response))
.collect();
let synthesis_input = format!(
"Original task: {}\n\nWorker results:\n{}\n\nSynthesize these results into a final response.",
input,
worker_results.join("\n\n")
);
let final_result = self
.executor
.execute(coordinator, &mut coord_session, &synthesis_input, ctx)
.await?;
token_usage.add(&final_result.token_usage);
events.extend(final_result.events.clone());
results.push(final_result.clone());
Ok(OrchestratorResult {
response: final_result.response,
agent_results: results,
events,
token_usage,
duration_ms: start_time.elapsed().as_millis() as u64,
iterations: 1,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratorResult {
pub response: String,
pub agent_results: Vec<ExecutionResult>,
pub events: Vec<AgencyEvent>,
pub token_usage: TokenUsage,
pub duration_ms: u64,
pub iterations: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agency::agent::AgentBuilder;
use crate::agency::tools::ToolRegistry;
fn create_test_agent(name: &str) -> Agent {
AgentBuilder::new(name)
.description(format!("{} agent", name))
.instruction("You are a helpful assistant.")
.model("gemini-2.5-flash")
.build()
}
#[tokio::test]
#[ignore = "Integration test - requires API credentials"]
async fn test_sequential_pipeline() {
let tool_registry = Arc::new(ToolRegistry::new());
let executor = Arc::new(Executor::new(tool_registry));
let orchestrator = Orchestrator::new(executor);
let agents = vec![create_test_agent("researcher"), create_test_agent("writer")];
let pipeline = Pipeline::sequential("research_pipeline", agents);
let session = Session::new("test", None);
let mut ctx = ExecutionContext::new(&session);
let result = orchestrator
.run_pipeline(&pipeline, "Tell me about Rust", &mut ctx)
.await
.unwrap();
assert!(!result.response.is_empty());
assert_eq!(result.agent_results.len(), 2);
}
#[tokio::test]
#[ignore = "Integration test - requires API credentials"]
async fn test_parallel_pipeline() {
let tool_registry = Arc::new(ToolRegistry::new());
let executor = Arc::new(Executor::new(tool_registry));
let orchestrator = Orchestrator::new(executor);
let agents = vec![create_test_agent("analyst1"), create_test_agent("analyst2")];
let pipeline = Pipeline::parallel("analysis_pipeline", agents);
let session = Session::new("test", None);
let mut ctx = ExecutionContext::new(&session);
let result = orchestrator
.run_pipeline(&pipeline, "Analyze this data", &mut ctx)
.await
.unwrap();
assert!(!result.response.is_empty());
assert_eq!(result.agent_results.len(), 2);
}
}