use crate::error::Result;
use crate::orchestrator::{
AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
pub struct AgentOrchestrator {
config: OrchestratorConfig,
agent: Option<Arc<crate::Agent>>,
event_tx: broadcast::Sender<OrchestratorEvent>,
subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
next_id: Arc<RwLock<u64>>,
}
impl AgentOrchestrator {
pub fn new_memory() -> Self {
Self::new(OrchestratorConfig::default())
}
pub fn new(config: OrchestratorConfig) -> Self {
let (event_tx, _) = broadcast::channel(config.event_buffer_size);
Self {
config,
agent: None,
event_tx,
subagents: Arc::new(RwLock::new(HashMap::new())),
sessions: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
}
}
pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
Self::from_agent_with_config(agent, OrchestratorConfig::default())
}
pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
let (event_tx, _) = broadcast::channel(config.event_buffer_size);
Self {
config,
agent: Some(agent),
event_tx,
subagents: Arc::new(RwLock::new(HashMap::new())),
sessions: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
}
}
pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
self.event_tx.subscribe()
}
pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
let rx = self.event_tx.subscribe();
SubAgentEventStream {
history: VecDeque::new(),
rx,
filter_id: id.to_string(),
}
}
pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
{
let subagents = self.subagents.read().await;
let active_count = subagents
.values()
.filter(|h| !h.state().is_terminal())
.count();
if active_count >= self.config.max_concurrent_subagents {
return Err(anyhow::anyhow!(
"Maximum concurrent subagents ({}) reached",
self.config.max_concurrent_subagents
)
.into());
}
}
let id = {
let mut next_id = self.next_id.write().await;
let id = format!("subagent-{}", *next_id);
*next_id += 1;
id
};
let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
let (subagent_event_tx, _) = broadcast::channel(self.config.event_buffer_size);
let state = Arc::new(RwLock::new(SubAgentState::Initializing));
let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
let event_history = Arc::new(RwLock::new(VecDeque::with_capacity(
self.config.event_buffer_size,
)));
let started_event = OrchestratorEvent::SubAgentStarted {
id: id.clone(),
agent_type: config.agent_type.clone(),
description: config.description.clone(),
parent_id: config.parent_id.clone(),
config: config.clone(),
};
let _ = self.event_tx.send(started_event.clone());
let _ = subagent_event_tx.send(started_event.clone());
event_history.write().await.push_back(started_event);
let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
id.clone(),
config.clone(),
self.agent.clone(),
self.event_tx.clone(),
subagent_event_tx.clone(),
Arc::clone(&event_history),
control_rx,
state.clone(),
activity.clone(),
Arc::clone(&self.sessions),
);
let task_handle = tokio::spawn(async move { wrapper.execute().await });
let handle = SubAgentHandle::new(crate::orchestrator::handle::SubAgentHandleParts {
id: id.clone(),
config,
control_tx,
subagent_event_tx,
event_history,
state: state.clone(),
activity: activity.clone(),
task_handle,
});
self.subagents
.write()
.await
.insert(id.clone(), handle.clone());
Ok(handle)
}
pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
self.spawn_subagent(SubAgentConfig::from(slot)).await
}
pub async fn run_team(
&self,
goal: impl Into<String>,
workspace: impl Into<String>,
slots: Vec<AgentSlot>,
) -> Result<crate::agent_teams::TeamRunResult> {
let agent = self
.agent
.as_ref()
.ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
let ws = workspace.into();
let goal = goal.into();
let registry = crate::subagent::AgentRegistry::new();
for slot in &slots {
for dir in &slot.agent_dirs {
for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
registry.register(def);
}
}
}
let team_name = format!(
"team-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
let team = crate::agent_teams::AgentTeam::new(
&team_name,
crate::agent_teams::TeamConfig::default(),
);
let mut runner = crate::agent_teams::TeamRunner::new(team);
for (i, slot) in slots.iter().enumerate() {
let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
let member_id = format!("{}-{}", role, i);
runner.team_mut().add_member(&member_id, role);
runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, ®istry)?;
}
runner.run_until_done(&goal).await
}
pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
let subagents = self.subagents.read().await;
let handle = subagents
.get(id)
.ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
handle.send_control(signal.clone()).await?;
let _ = self
.event_tx
.send(OrchestratorEvent::ControlSignalReceived {
id: id.to_string(),
signal,
});
Ok(())
}
pub async fn pause_subagent(&self, id: &str) -> Result<()> {
self.send_control(id, ControlSignal::Pause).await
}
pub async fn resume_subagent(&self, id: &str) -> Result<()> {
self.send_control(id, ControlSignal::Resume).await
}
pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
self.send_control(id, ControlSignal::Cancel).await
}
pub async fn adjust_subagent_params(
&self,
id: &str,
max_steps: Option<usize>,
timeout_ms: Option<u64>,
) -> Result<()> {
self.send_control(
id,
ControlSignal::AdjustParams {
max_steps,
timeout_ms,
},
)
.await
}
pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
let subagents = self.subagents.read().await;
subagents.get(id).map(|h| h.state())
}
pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
let subagents = self.subagents.read().await;
subagents
.iter()
.map(|(id, handle)| (id.clone(), handle.state()))
.collect()
}
pub async fn active_count(&self) -> usize {
let subagents = self.subagents.read().await;
subagents
.values()
.filter(|h| !h.state().is_terminal())
.count()
}
pub async fn wait_all(&self) -> Result<()> {
loop {
let active = self.active_count().await;
if active == 0 {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok(())
}
pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
let subagents = self.subagents.read().await;
let mut infos = Vec::new();
for (id, handle) in subagents.iter() {
let state = handle.state_async().await;
let activity = handle.activity().await;
let config = handle.config();
infos.push(SubAgentInfo {
id: id.clone(),
agent_type: config.agent_type.clone(),
description: config.description.clone(),
state: format!("{:?}", state),
parent_id: config.parent_id.clone(),
created_at: handle.created_at(),
updated_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
current_activity: Some(activity),
});
}
infos
}
pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
let subagents = self.subagents.read().await;
let handle = subagents.get(id)?;
let state = handle.state_async().await;
let activity = handle.activity().await;
let config = handle.config();
Some(SubAgentInfo {
id: id.to_string(),
agent_type: config.agent_type.clone(),
description: config.description.clone(),
state: format!("{:?}", state),
parent_id: config.parent_id.clone(),
created_at: handle.created_at(),
updated_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
current_activity: Some(activity),
})
}
pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
let subagents = self.subagents.read().await;
let mut activities = HashMap::new();
for (id, handle) in subagents.iter() {
if !handle.state().is_terminal() {
let activity = handle.activity().await;
activities.insert(id.clone(), activity);
}
}
activities
}
pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
let subagents = self.subagents.read().await;
subagents.get(id).cloned()
}
pub async fn pending_external_tasks_for(
&self,
subagent_id: &str,
) -> Vec<crate::queue::ExternalTask> {
let sessions = self.sessions.read().await;
match sessions.get(subagent_id) {
Some(session) => session.pending_external_tasks().await,
None => vec![],
}
}
pub async fn complete_external_task(
&self,
subagent_id: &str,
task_id: &str,
result: crate::queue::ExternalTaskResult,
) -> bool {
let sessions = self.sessions.read().await;
match sessions.get(subagent_id) {
Some(session) => session.complete_external_task(task_id, result).await,
None => false,
}
}
}
impl std::fmt::Debug for AgentOrchestrator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentOrchestrator")
.field("event_buffer_size", &self.config.event_buffer_size)
.field(
"max_concurrent_subagents",
&self.config.max_concurrent_subagents,
)
.finish()
}
}
pub struct SubAgentEventStream {
pub(crate) history: VecDeque<OrchestratorEvent>,
pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
pub(crate) filter_id: String,
}
impl SubAgentEventStream {
pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
if let Some(event) = self.history.pop_front() {
return Some(event);
}
loop {
match self.rx.recv().await {
Ok(event) => {
if let Some(id) = event.subagent_id() {
if id == self.filter_id {
return Some(event);
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
}
}