a3s_code_core/orchestrator/
agent.rs1use crate::error::Result;
4use crate::orchestrator::{
5 AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6 SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12pub struct AgentOrchestrator {
17 config: OrchestratorConfig,
19
20 agent: Option<Arc<crate::Agent>>,
22
23 event_tx: broadcast::Sender<OrchestratorEvent>,
25
26 subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29 sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36 next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41 pub fn new_memory() -> Self {
47 Self::new(OrchestratorConfig::default())
48 }
49
50 pub fn new(config: OrchestratorConfig) -> Self {
52 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54 Self {
55 config,
56 agent: None,
57 event_tx,
58 subagents: Arc::new(RwLock::new(HashMap::new())),
59 sessions: Arc::new(RwLock::new(HashMap::new())),
60 next_id: Arc::new(RwLock::new(1)),
61 }
62 }
63
64 pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71 Self::from_agent_with_config(agent, OrchestratorConfig::default())
72 }
73
74 pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78 Self {
79 config,
80 agent: Some(agent),
81 event_tx,
82 subagents: Arc::new(RwLock::new(HashMap::new())),
83 sessions: Arc::new(RwLock::new(HashMap::new())),
84 next_id: Arc::new(RwLock::new(1)),
85 }
86 }
87
88 pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92 self.event_tx.subscribe()
93 }
94
95 pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99 let rx = self.event_tx.subscribe();
100 SubAgentEventStream {
101 history: VecDeque::new(),
102 rx,
103 filter_id: id.to_string(),
104 }
105 }
106
107 pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
111 {
113 let subagents = self.subagents.read().await;
114 let active_count = subagents
115 .values()
116 .filter(|h| !h.state().is_terminal())
117 .count();
118
119 if active_count >= self.config.max_concurrent_subagents {
120 return Err(anyhow::anyhow!(
121 "Maximum concurrent subagents ({}) reached",
122 self.config.max_concurrent_subagents
123 )
124 .into());
125 }
126 }
127
128 let id = {
130 let mut next_id = self.next_id.write().await;
131 let id = format!("subagent-{}", *next_id);
132 *next_id += 1;
133 id
134 };
135
136 let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
138 let (subagent_event_tx, _) = broadcast::channel(self.config.event_buffer_size);
139
140 let state = Arc::new(RwLock::new(SubAgentState::Initializing));
142
143 let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
145 let event_history = Arc::new(RwLock::new(VecDeque::with_capacity(
146 self.config.event_buffer_size,
147 )));
148
149 let started_event = OrchestratorEvent::SubAgentStarted {
151 id: id.clone(),
152 agent_type: config.agent_type.clone(),
153 description: config.description.clone(),
154 parent_id: config.parent_id.clone(),
155 config: config.clone(),
156 };
157 let _ = self.event_tx.send(started_event.clone());
158 let _ = subagent_event_tx.send(started_event.clone());
159 event_history.write().await.push_back(started_event);
160
161 let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
163 id.clone(),
164 config.clone(),
165 self.agent.clone(),
166 self.event_tx.clone(),
167 subagent_event_tx.clone(),
168 Arc::clone(&event_history),
169 control_rx,
170 state.clone(),
171 activity.clone(),
172 Arc::clone(&self.sessions),
173 );
174
175 let task_handle = tokio::spawn(async move { wrapper.execute().await });
176
177 let handle = SubAgentHandle::new(crate::orchestrator::handle::SubAgentHandleParts {
179 id: id.clone(),
180 config,
181 control_tx,
182 subagent_event_tx,
183 event_history,
184 state: state.clone(),
185 activity: activity.clone(),
186 task_handle,
187 });
188
189 self.subagents
191 .write()
192 .await
193 .insert(id.clone(), handle.clone());
194
195 Ok(handle)
196 }
197
198 pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
204 self.spawn_subagent(SubAgentConfig::from(slot)).await
205 }
206
207 pub async fn run_team(
214 &self,
215 goal: impl Into<String>,
216 workspace: impl Into<String>,
217 slots: Vec<AgentSlot>,
218 ) -> Result<crate::agent_teams::TeamRunResult> {
219 let agent = self
220 .agent
221 .as_ref()
222 .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
223
224 let ws = workspace.into();
225 let goal = goal.into();
226
227 let registry = crate::subagent::AgentRegistry::new();
229 for slot in &slots {
230 for dir in &slot.agent_dirs {
231 for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
232 registry.register(def);
233 }
234 }
235 }
236
237 let team_name = format!(
239 "team-{}",
240 std::time::SystemTime::now()
241 .duration_since(std::time::UNIX_EPOCH)
242 .unwrap_or_default()
243 .as_millis()
244 );
245
246 let team = crate::agent_teams::AgentTeam::new(
247 &team_name,
248 crate::agent_teams::TeamConfig::default(),
249 );
250 let mut runner = crate::agent_teams::TeamRunner::new(team);
251
252 for (i, slot) in slots.iter().enumerate() {
253 let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
254 let member_id = format!("{}-{}", role, i);
255 runner.team_mut().add_member(&member_id, role);
256 runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, ®istry)?;
257 }
258
259 runner.run_until_done(&goal).await
260 }
261
262 pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
264 let subagents = self.subagents.read().await;
265 let handle = subagents
266 .get(id)
267 .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
268
269 handle.send_control(signal.clone()).await?;
270
271 let _ = self
273 .event_tx
274 .send(OrchestratorEvent::ControlSignalReceived {
275 id: id.to_string(),
276 signal,
277 });
278
279 Ok(())
280 }
281
282 pub async fn pause_subagent(&self, id: &str) -> Result<()> {
284 self.send_control(id, ControlSignal::Pause).await
285 }
286
287 pub async fn resume_subagent(&self, id: &str) -> Result<()> {
289 self.send_control(id, ControlSignal::Resume).await
290 }
291
292 pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
294 self.send_control(id, ControlSignal::Cancel).await
295 }
296
297 pub async fn adjust_subagent_params(
299 &self,
300 id: &str,
301 max_steps: Option<usize>,
302 timeout_ms: Option<u64>,
303 ) -> Result<()> {
304 self.send_control(
305 id,
306 ControlSignal::AdjustParams {
307 max_steps,
308 timeout_ms,
309 },
310 )
311 .await
312 }
313
314 pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
316 let subagents = self.subagents.read().await;
317 subagents.get(id).map(|h| h.state())
318 }
319
320 pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
322 let subagents = self.subagents.read().await;
323 subagents
324 .iter()
325 .map(|(id, handle)| (id.clone(), handle.state()))
326 .collect()
327 }
328
329 pub async fn active_count(&self) -> usize {
331 let subagents = self.subagents.read().await;
332 subagents
333 .values()
334 .filter(|h| !h.state().is_terminal())
335 .count()
336 }
337
338 pub async fn wait_all(&self) -> Result<()> {
340 loop {
341 let active = self.active_count().await;
342 if active == 0 {
343 break;
344 }
345 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
346 }
347 Ok(())
348 }
349
350 pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
352 let subagents = self.subagents.read().await;
353 let mut infos = Vec::new();
354
355 for (id, handle) in subagents.iter() {
356 let state = handle.state_async().await;
357 let activity = handle.activity().await;
358 let config = handle.config();
359
360 infos.push(SubAgentInfo {
361 id: id.clone(),
362 agent_type: config.agent_type.clone(),
363 description: config.description.clone(),
364 state: format!("{:?}", state),
365 parent_id: config.parent_id.clone(),
366 created_at: handle.created_at(),
367 updated_at: std::time::SystemTime::now()
368 .duration_since(std::time::UNIX_EPOCH)
369 .unwrap()
370 .as_millis() as u64,
371 current_activity: Some(activity),
372 });
373 }
374
375 infos
376 }
377
378 pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
380 let subagents = self.subagents.read().await;
381 let handle = subagents.get(id)?;
382
383 let state = handle.state_async().await;
384 let activity = handle.activity().await;
385 let config = handle.config();
386
387 Some(SubAgentInfo {
388 id: id.to_string(),
389 agent_type: config.agent_type.clone(),
390 description: config.description.clone(),
391 state: format!("{:?}", state),
392 parent_id: config.parent_id.clone(),
393 created_at: handle.created_at(),
394 updated_at: std::time::SystemTime::now()
395 .duration_since(std::time::UNIX_EPOCH)
396 .unwrap()
397 .as_millis() as u64,
398 current_activity: Some(activity),
399 })
400 }
401
402 pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
404 let subagents = self.subagents.read().await;
405 let mut activities = HashMap::new();
406
407 for (id, handle) in subagents.iter() {
408 if !handle.state().is_terminal() {
409 let activity = handle.activity().await;
410 activities.insert(id.clone(), activity);
411 }
412 }
413
414 activities
415 }
416
417 pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
419 let subagents = self.subagents.read().await;
420 subagents.get(id).cloned()
421 }
422
423 pub async fn pending_external_tasks_for(
436 &self,
437 subagent_id: &str,
438 ) -> Vec<crate::queue::ExternalTask> {
439 let sessions = self.sessions.read().await;
440 match sessions.get(subagent_id) {
441 Some(session) => session.pending_external_tasks().await,
442 None => vec![],
443 }
444 }
445
446 pub async fn complete_external_task(
447 &self,
448 subagent_id: &str,
449 task_id: &str,
450 result: crate::queue::ExternalTaskResult,
451 ) -> bool {
452 let sessions = self.sessions.read().await;
453 match sessions.get(subagent_id) {
454 Some(session) => session.complete_external_task(task_id, result).await,
455 None => false,
456 }
457 }
458}
459
460impl std::fmt::Debug for AgentOrchestrator {
461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 f.debug_struct("AgentOrchestrator")
463 .field("event_buffer_size", &self.config.event_buffer_size)
464 .field(
465 "max_concurrent_subagents",
466 &self.config.max_concurrent_subagents,
467 )
468 .finish()
469 }
470}
471
472pub struct SubAgentEventStream {
474 pub(crate) history: VecDeque<OrchestratorEvent>,
475 pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
476 pub(crate) filter_id: String,
477}
478
479impl SubAgentEventStream {
480 pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
482 if let Some(event) = self.history.pop_front() {
483 return Some(event);
484 }
485
486 loop {
487 match self.rx.recv().await {
488 Ok(event) => {
489 if let Some(id) = event.subagent_id() {
490 if id == self.filter_id {
491 return Some(event);
492 }
493 }
494 }
495 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
496 Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
497 }
498 }
499 }
500}