a3s_code_core/orchestrator/
agent.rs1use crate::error::Result;
4use crate::orchestrator::{
5 AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6 SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12pub struct AgentOrchestrator {
17 config: OrchestratorConfig,
19
20 agent: Option<Arc<crate::Agent>>,
22
23 event_tx: broadcast::Sender<OrchestratorEvent>,
25
26 subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29 sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36 next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41 pub fn new_memory() -> Self {
47 Self::new(OrchestratorConfig::default())
48 }
49
50 pub fn new(config: OrchestratorConfig) -> Self {
52 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54 Self {
55 config,
56 agent: None,
57 event_tx,
58 subagents: Arc::new(RwLock::new(HashMap::new())),
59 sessions: Arc::new(RwLock::new(HashMap::new())),
60 next_id: Arc::new(RwLock::new(1)),
61 }
62 }
63
64 pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71 Self::from_agent_with_config(agent, OrchestratorConfig::default())
72 }
73
74 pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78 Self {
79 config,
80 agent: Some(agent),
81 event_tx,
82 subagents: Arc::new(RwLock::new(HashMap::new())),
83 sessions: Arc::new(RwLock::new(HashMap::new())),
84 next_id: Arc::new(RwLock::new(1)),
85 }
86 }
87
88 pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92 self.event_tx.subscribe()
93 }
94
95 pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99 let rx = self.event_tx.subscribe();
100 SubAgentEventStream {
101 rx,
102 filter_id: id.to_string(),
103 }
104 }
105
106 pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110 {
112 let subagents = self.subagents.read().await;
113 let active_count = subagents
114 .values()
115 .filter(|h| !h.state().is_terminal())
116 .count();
117
118 if active_count >= self.config.max_concurrent_subagents {
119 return Err(anyhow::anyhow!(
120 "Maximum concurrent subagents ({}) reached",
121 self.config.max_concurrent_subagents
122 )
123 .into());
124 }
125 }
126
127 let id = {
129 let mut next_id = self.next_id.write().await;
130 let id = format!("subagent-{}", *next_id);
131 *next_id += 1;
132 id
133 };
134
135 let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138 let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141 let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144 let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146 id: id.clone(),
147 agent_type: config.agent_type.clone(),
148 description: config.description.clone(),
149 parent_id: config.parent_id.clone(),
150 config: config.clone(),
151 });
152
153 let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155 id.clone(),
156 config.clone(),
157 self.agent.clone(),
158 self.event_tx.clone(),
159 control_rx,
160 state.clone(),
161 activity.clone(),
162 Arc::clone(&self.sessions),
163 );
164
165 let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167 let handle = SubAgentHandle::new(
169 id.clone(),
170 config,
171 control_tx,
172 self.event_tx.clone(),
173 state.clone(),
174 activity.clone(),
175 task_handle,
176 );
177
178 self.subagents
180 .write()
181 .await
182 .insert(id.clone(), handle.clone());
183
184 Ok(handle)
185 }
186
187 pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
193 self.spawn_subagent(SubAgentConfig::from(slot)).await
194 }
195
196 pub async fn run_team(
203 &self,
204 goal: impl Into<String>,
205 workspace: impl Into<String>,
206 slots: Vec<AgentSlot>,
207 ) -> Result<crate::agent_teams::TeamRunResult> {
208 let agent = self
209 .agent
210 .as_ref()
211 .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
212
213 let ws = workspace.into();
214 let goal = goal.into();
215
216 let registry = crate::subagent::AgentRegistry::new();
218 for slot in &slots {
219 for dir in &slot.agent_dirs {
220 for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
221 registry.register(def);
222 }
223 }
224 }
225
226 let team_name = format!(
228 "team-{}",
229 std::time::SystemTime::now()
230 .duration_since(std::time::UNIX_EPOCH)
231 .unwrap_or_default()
232 .as_millis()
233 );
234
235 let team = crate::agent_teams::AgentTeam::new(
236 &team_name,
237 crate::agent_teams::TeamConfig::default(),
238 );
239 let mut runner = crate::agent_teams::TeamRunner::new(team);
240
241 for (i, slot) in slots.iter().enumerate() {
242 let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
243 let member_id = format!("{}-{}", role, i);
244 runner.team_mut().add_member(&member_id, role);
245 runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, ®istry)?;
246 }
247
248 runner.run_until_done(&goal).await
249 }
250
251 pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
253 let subagents = self.subagents.read().await;
254 let handle = subagents
255 .get(id)
256 .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
257
258 handle.send_control(signal.clone()).await?;
259
260 let _ = self
262 .event_tx
263 .send(OrchestratorEvent::ControlSignalReceived {
264 id: id.to_string(),
265 signal,
266 });
267
268 Ok(())
269 }
270
271 pub async fn pause_subagent(&self, id: &str) -> Result<()> {
273 self.send_control(id, ControlSignal::Pause).await
274 }
275
276 pub async fn resume_subagent(&self, id: &str) -> Result<()> {
278 self.send_control(id, ControlSignal::Resume).await
279 }
280
281 pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
283 self.send_control(id, ControlSignal::Cancel).await
284 }
285
286 pub async fn adjust_subagent_params(
288 &self,
289 id: &str,
290 max_steps: Option<usize>,
291 timeout_ms: Option<u64>,
292 ) -> Result<()> {
293 self.send_control(
294 id,
295 ControlSignal::AdjustParams {
296 max_steps,
297 timeout_ms,
298 },
299 )
300 .await
301 }
302
303 pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
305 let subagents = self.subagents.read().await;
306 subagents.get(id).map(|h| h.state())
307 }
308
309 pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
311 let subagents = self.subagents.read().await;
312 subagents
313 .iter()
314 .map(|(id, handle)| (id.clone(), handle.state()))
315 .collect()
316 }
317
318 pub async fn active_count(&self) -> usize {
320 let subagents = self.subagents.read().await;
321 subagents
322 .values()
323 .filter(|h| !h.state().is_terminal())
324 .count()
325 }
326
327 pub async fn wait_all(&self) -> Result<()> {
329 loop {
330 let active = self.active_count().await;
331 if active == 0 {
332 break;
333 }
334 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
335 }
336 Ok(())
337 }
338
339 pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
341 let subagents = self.subagents.read().await;
342 let mut infos = Vec::new();
343
344 for (id, handle) in subagents.iter() {
345 let state = handle.state_async().await;
346 let activity = handle.activity().await;
347 let config = handle.config();
348
349 infos.push(SubAgentInfo {
350 id: id.clone(),
351 agent_type: config.agent_type.clone(),
352 description: config.description.clone(),
353 state: format!("{:?}", state),
354 parent_id: config.parent_id.clone(),
355 created_at: handle.created_at(),
356 updated_at: std::time::SystemTime::now()
357 .duration_since(std::time::UNIX_EPOCH)
358 .unwrap()
359 .as_millis() as u64,
360 current_activity: Some(activity),
361 });
362 }
363
364 infos
365 }
366
367 pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
369 let subagents = self.subagents.read().await;
370 let handle = subagents.get(id)?;
371
372 let state = handle.state_async().await;
373 let activity = handle.activity().await;
374 let config = handle.config();
375
376 Some(SubAgentInfo {
377 id: id.to_string(),
378 agent_type: config.agent_type.clone(),
379 description: config.description.clone(),
380 state: format!("{:?}", state),
381 parent_id: config.parent_id.clone(),
382 created_at: handle.created_at(),
383 updated_at: std::time::SystemTime::now()
384 .duration_since(std::time::UNIX_EPOCH)
385 .unwrap()
386 .as_millis() as u64,
387 current_activity: Some(activity),
388 })
389 }
390
391 pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
393 let subagents = self.subagents.read().await;
394 let mut activities = HashMap::new();
395
396 for (id, handle) in subagents.iter() {
397 if !handle.state().is_terminal() {
398 let activity = handle.activity().await;
399 activities.insert(id.clone(), activity);
400 }
401 }
402
403 activities
404 }
405
406 pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
408 let subagents = self.subagents.read().await;
409 subagents.get(id).cloned()
410 }
411
412 pub async fn pending_external_tasks_for(
425 &self,
426 subagent_id: &str,
427 ) -> Vec<crate::queue::ExternalTask> {
428 let sessions = self.sessions.read().await;
429 match sessions.get(subagent_id) {
430 Some(session) => session.pending_external_tasks().await,
431 None => vec![],
432 }
433 }
434
435 pub async fn complete_external_task(
436 &self,
437 subagent_id: &str,
438 task_id: &str,
439 result: crate::queue::ExternalTaskResult,
440 ) -> bool {
441 let sessions = self.sessions.read().await;
442 match sessions.get(subagent_id) {
443 Some(session) => session.complete_external_task(task_id, result).await,
444 None => false,
445 }
446 }
447}
448
449impl std::fmt::Debug for AgentOrchestrator {
450 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451 f.debug_struct("AgentOrchestrator")
452 .field("event_buffer_size", &self.config.event_buffer_size)
453 .field(
454 "max_concurrent_subagents",
455 &self.config.max_concurrent_subagents,
456 )
457 .finish()
458 }
459}
460
461pub struct SubAgentEventStream {
463 pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
464 pub(crate) filter_id: String,
465}
466
467impl SubAgentEventStream {
468 pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
470 loop {
471 match self.rx.recv().await {
472 Ok(event) => {
473 if let Some(id) = event.subagent_id() {
474 if id == self.filter_id {
475 return Some(event);
476 }
477 }
478 }
479 Err(_) => return None,
480 }
481 }
482 }
483}