a3s_code_core/orchestrator/
agent.rs1use crate::error::Result;
4use crate::orchestrator::{
5 AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6 SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12pub struct AgentOrchestrator {
17 config: OrchestratorConfig,
19
20 agent: Option<Arc<crate::Agent>>,
22
23 event_tx: broadcast::Sender<OrchestratorEvent>,
25
26 subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29 sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36 next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41 pub fn new_memory() -> Self {
47 Self::new(OrchestratorConfig::default())
48 }
49
50 pub fn new(config: OrchestratorConfig) -> Self {
52 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54 Self {
55 config,
56 agent: None,
57 event_tx,
58 subagents: Arc::new(RwLock::new(HashMap::new())),
59 sessions: Arc::new(RwLock::new(HashMap::new())),
60 next_id: Arc::new(RwLock::new(1)),
61 }
62 }
63
64 pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71 Self::from_agent_with_config(agent, OrchestratorConfig::default())
72 }
73
74 pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78 Self {
79 config,
80 agent: Some(agent),
81 event_tx,
82 subagents: Arc::new(RwLock::new(HashMap::new())),
83 sessions: Arc::new(RwLock::new(HashMap::new())),
84 next_id: Arc::new(RwLock::new(1)),
85 }
86 }
87
88 pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92 self.event_tx.subscribe()
93 }
94
95 pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99 let rx = self.event_tx.subscribe();
100 SubAgentEventStream {
101 rx,
102 filter_id: id.to_string(),
103 }
104 }
105
106 pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110 {
112 let subagents = self.subagents.read().await;
113 let active_count = subagents
114 .values()
115 .filter(|h| !h.state().is_terminal())
116 .count();
117
118 if active_count >= self.config.max_concurrent_subagents {
119 return Err(anyhow::anyhow!(
120 "Maximum concurrent subagents ({}) reached",
121 self.config.max_concurrent_subagents
122 )
123 .into());
124 }
125 }
126
127 let id = {
129 let mut next_id = self.next_id.write().await;
130 let id = format!("subagent-{}", *next_id);
131 *next_id += 1;
132 id
133 };
134
135 let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138 let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141 let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144 let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146 id: id.clone(),
147 agent_type: config.agent_type.clone(),
148 description: config.description.clone(),
149 parent_id: config.parent_id.clone(),
150 config: config.clone(),
151 });
152
153 let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155 id.clone(),
156 config.clone(),
157 self.agent.clone(),
158 self.event_tx.clone(),
159 control_rx,
160 state.clone(),
161 activity.clone(),
162 Arc::clone(&self.sessions),
163 );
164
165 let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167 let handle = SubAgentHandle::new(
169 id.clone(),
170 config,
171 control_tx,
172 state.clone(),
173 activity.clone(),
174 task_handle,
175 );
176
177 self.subagents
179 .write()
180 .await
181 .insert(id.clone(), handle.clone());
182
183 Ok(handle)
184 }
185
186 pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
192 self.spawn_subagent(SubAgentConfig::from(slot)).await
193 }
194
195 pub async fn run_team(
202 &self,
203 goal: impl Into<String>,
204 workspace: impl Into<String>,
205 slots: Vec<AgentSlot>,
206 ) -> Result<crate::agent_teams::TeamRunResult> {
207 let agent = self
208 .agent
209 .as_ref()
210 .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
211
212 let ws = workspace.into();
213 let goal = goal.into();
214
215 let registry = crate::subagent::AgentRegistry::new();
217 for slot in &slots {
218 for dir in &slot.agent_dirs {
219 for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
220 registry.register(def);
221 }
222 }
223 }
224
225 let team_name = format!(
227 "team-{}",
228 std::time::SystemTime::now()
229 .duration_since(std::time::UNIX_EPOCH)
230 .unwrap_or_default()
231 .as_millis()
232 );
233
234 let team = crate::agent_teams::AgentTeam::new(
235 &team_name,
236 crate::agent_teams::TeamConfig::default(),
237 );
238 let mut runner = crate::agent_teams::TeamRunner::new(team);
239
240 for (i, slot) in slots.iter().enumerate() {
241 let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
242 let member_id = format!("{}-{}", role, i);
243 runner.team_mut().add_member(&member_id, role);
244 runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, ®istry)?;
245 }
246
247 runner.run_until_done(&goal).await
248 }
249
250 pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
252 let subagents = self.subagents.read().await;
253 let handle = subagents
254 .get(id)
255 .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
256
257 handle.send_control(signal.clone()).await?;
258
259 let _ = self
261 .event_tx
262 .send(OrchestratorEvent::ControlSignalReceived {
263 id: id.to_string(),
264 signal,
265 });
266
267 Ok(())
268 }
269
270 pub async fn pause_subagent(&self, id: &str) -> Result<()> {
272 self.send_control(id, ControlSignal::Pause).await
273 }
274
275 pub async fn resume_subagent(&self, id: &str) -> Result<()> {
277 self.send_control(id, ControlSignal::Resume).await
278 }
279
280 pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
282 self.send_control(id, ControlSignal::Cancel).await
283 }
284
285 pub async fn adjust_subagent_params(
287 &self,
288 id: &str,
289 max_steps: Option<usize>,
290 timeout_ms: Option<u64>,
291 ) -> Result<()> {
292 self.send_control(
293 id,
294 ControlSignal::AdjustParams {
295 max_steps,
296 timeout_ms,
297 },
298 )
299 .await
300 }
301
302 pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
304 let subagents = self.subagents.read().await;
305 subagents.get(id).map(|h| h.state())
306 }
307
308 pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
310 let subagents = self.subagents.read().await;
311 subagents
312 .iter()
313 .map(|(id, handle)| (id.clone(), handle.state()))
314 .collect()
315 }
316
317 pub async fn active_count(&self) -> usize {
319 let subagents = self.subagents.read().await;
320 subagents
321 .values()
322 .filter(|h| !h.state().is_terminal())
323 .count()
324 }
325
326 pub async fn wait_all(&self) -> Result<()> {
328 loop {
329 let active = self.active_count().await;
330 if active == 0 {
331 break;
332 }
333 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
334 }
335 Ok(())
336 }
337
338 pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
340 let subagents = self.subagents.read().await;
341 let mut infos = Vec::new();
342
343 for (id, handle) in subagents.iter() {
344 let state = handle.state_async().await;
345 let activity = handle.activity().await;
346 let config = handle.config();
347
348 infos.push(SubAgentInfo {
349 id: id.clone(),
350 agent_type: config.agent_type.clone(),
351 description: config.description.clone(),
352 state: format!("{:?}", state),
353 parent_id: config.parent_id.clone(),
354 created_at: handle.created_at(),
355 updated_at: std::time::SystemTime::now()
356 .duration_since(std::time::UNIX_EPOCH)
357 .unwrap()
358 .as_millis() as u64,
359 current_activity: Some(activity),
360 });
361 }
362
363 infos
364 }
365
366 pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
368 let subagents = self.subagents.read().await;
369 let handle = subagents.get(id)?;
370
371 let state = handle.state_async().await;
372 let activity = handle.activity().await;
373 let config = handle.config();
374
375 Some(SubAgentInfo {
376 id: id.to_string(),
377 agent_type: config.agent_type.clone(),
378 description: config.description.clone(),
379 state: format!("{:?}", state),
380 parent_id: config.parent_id.clone(),
381 created_at: handle.created_at(),
382 updated_at: std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .unwrap()
385 .as_millis() as u64,
386 current_activity: Some(activity),
387 })
388 }
389
390 pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
392 let subagents = self.subagents.read().await;
393 let mut activities = HashMap::new();
394
395 for (id, handle) in subagents.iter() {
396 if !handle.state().is_terminal() {
397 let activity = handle.activity().await;
398 activities.insert(id.clone(), activity);
399 }
400 }
401
402 activities
403 }
404
405 pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
407 let subagents = self.subagents.read().await;
408 subagents.get(id).cloned()
409 }
410
411 pub async fn pending_external_tasks_for(
424 &self,
425 subagent_id: &str,
426 ) -> Vec<crate::queue::ExternalTask> {
427 let sessions = self.sessions.read().await;
428 match sessions.get(subagent_id) {
429 Some(session) => session.pending_external_tasks().await,
430 None => vec![],
431 }
432 }
433
434 pub async fn complete_external_task(
435 &self,
436 subagent_id: &str,
437 task_id: &str,
438 result: crate::queue::ExternalTaskResult,
439 ) -> bool {
440 let sessions = self.sessions.read().await;
441 match sessions.get(subagent_id) {
442 Some(session) => session.complete_external_task(task_id, result).await,
443 None => false,
444 }
445 }
446}
447
448impl std::fmt::Debug for AgentOrchestrator {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 f.debug_struct("AgentOrchestrator")
451 .field("event_buffer_size", &self.config.event_buffer_size)
452 .field(
453 "max_concurrent_subagents",
454 &self.config.max_concurrent_subagents,
455 )
456 .finish()
457 }
458}
459
460pub struct SubAgentEventStream {
462 rx: broadcast::Receiver<OrchestratorEvent>,
463 filter_id: String,
464}
465
466impl SubAgentEventStream {
467 pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
469 loop {
470 match self.rx.recv().await {
471 Ok(event) => {
472 if let Some(id) = event.subagent_id() {
473 if id == self.filter_id {
474 return Some(event);
475 }
476 }
477 }
478 Err(_) => return None,
479 }
480 }
481 }
482}