a3s_code_core/orchestrator/
agent.rs1use crate::error::Result;
4use crate::orchestrator::{
5 ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6 SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12pub struct AgentOrchestrator {
17 config: OrchestratorConfig,
19
20 agent: Option<Arc<crate::Agent>>,
22
23 event_tx: broadcast::Sender<OrchestratorEvent>,
25
26 subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29 sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36 next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41 pub fn new_memory() -> Self {
47 Self::new(OrchestratorConfig::default())
48 }
49
50 pub fn new(config: OrchestratorConfig) -> Self {
52 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54 Self {
55 config,
56 agent: None,
57 event_tx,
58 subagents: Arc::new(RwLock::new(HashMap::new())),
59 sessions: Arc::new(RwLock::new(HashMap::new())),
60 next_id: Arc::new(RwLock::new(1)),
61 }
62 }
63
64 pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71 Self::from_agent_with_config(agent, OrchestratorConfig::default())
72 }
73
74 pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78 Self {
79 config,
80 agent: Some(agent),
81 event_tx,
82 subagents: Arc::new(RwLock::new(HashMap::new())),
83 sessions: Arc::new(RwLock::new(HashMap::new())),
84 next_id: Arc::new(RwLock::new(1)),
85 }
86 }
87
88 pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92 self.event_tx.subscribe()
93 }
94
95 pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99 let rx = self.event_tx.subscribe();
100 SubAgentEventStream {
101 rx,
102 filter_id: id.to_string(),
103 }
104 }
105
106 pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110 {
112 let subagents = self.subagents.read().await;
113 let active_count = subagents
114 .values()
115 .filter(|h| !h.state().is_terminal())
116 .count();
117
118 if active_count >= self.config.max_concurrent_subagents {
119 return Err(anyhow::anyhow!(
120 "Maximum concurrent subagents ({}) reached",
121 self.config.max_concurrent_subagents
122 )
123 .into());
124 }
125 }
126
127 let id = {
129 let mut next_id = self.next_id.write().await;
130 let id = format!("subagent-{}", *next_id);
131 *next_id += 1;
132 id
133 };
134
135 let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138 let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141 let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144 let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146 id: id.clone(),
147 agent_type: config.agent_type.clone(),
148 description: config.description.clone(),
149 parent_id: config.parent_id.clone(),
150 config: config.clone(),
151 });
152
153 let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155 id.clone(),
156 config.clone(),
157 self.agent.clone(),
158 self.event_tx.clone(),
159 control_rx,
160 state.clone(),
161 activity.clone(),
162 Arc::clone(&self.sessions),
163 );
164
165 let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167 let handle = SubAgentHandle::new(
169 id.clone(),
170 config,
171 control_tx,
172 state.clone(),
173 activity.clone(),
174 task_handle,
175 );
176
177 self.subagents
179 .write()
180 .await
181 .insert(id.clone(), handle.clone());
182
183 Ok(handle)
184 }
185
186 pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
188 let subagents = self.subagents.read().await;
189 let handle = subagents
190 .get(id)
191 .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
192
193 handle.send_control(signal.clone()).await?;
194
195 let _ = self
197 .event_tx
198 .send(OrchestratorEvent::ControlSignalReceived {
199 id: id.to_string(),
200 signal,
201 });
202
203 Ok(())
204 }
205
206 pub async fn pause_subagent(&self, id: &str) -> Result<()> {
208 self.send_control(id, ControlSignal::Pause).await
209 }
210
211 pub async fn resume_subagent(&self, id: &str) -> Result<()> {
213 self.send_control(id, ControlSignal::Resume).await
214 }
215
216 pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
218 self.send_control(id, ControlSignal::Cancel).await
219 }
220
221 pub async fn adjust_subagent_params(
223 &self,
224 id: &str,
225 max_steps: Option<usize>,
226 timeout_ms: Option<u64>,
227 ) -> Result<()> {
228 self.send_control(
229 id,
230 ControlSignal::AdjustParams {
231 max_steps,
232 timeout_ms,
233 },
234 )
235 .await
236 }
237
238 pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
240 let subagents = self.subagents.read().await;
241 subagents.get(id).map(|h| h.state())
242 }
243
244 pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
246 let subagents = self.subagents.read().await;
247 subagents
248 .iter()
249 .map(|(id, handle)| (id.clone(), handle.state()))
250 .collect()
251 }
252
253 pub async fn active_count(&self) -> usize {
255 let subagents = self.subagents.read().await;
256 subagents
257 .values()
258 .filter(|h| !h.state().is_terminal())
259 .count()
260 }
261
262 pub async fn wait_all(&self) -> Result<()> {
264 loop {
265 let active = self.active_count().await;
266 if active == 0 {
267 break;
268 }
269 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
270 }
271 Ok(())
272 }
273
274 pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
276 let subagents = self.subagents.read().await;
277 let mut infos = Vec::new();
278
279 for (id, handle) in subagents.iter() {
280 let state = handle.state_async().await;
281 let activity = handle.activity().await;
282 let config = handle.config();
283
284 infos.push(SubAgentInfo {
285 id: id.clone(),
286 agent_type: config.agent_type.clone(),
287 description: config.description.clone(),
288 state: format!("{:?}", state),
289 parent_id: config.parent_id.clone(),
290 created_at: handle.created_at(),
291 updated_at: std::time::SystemTime::now()
292 .duration_since(std::time::UNIX_EPOCH)
293 .unwrap()
294 .as_millis() as u64,
295 current_activity: Some(activity),
296 });
297 }
298
299 infos
300 }
301
302 pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
304 let subagents = self.subagents.read().await;
305 let handle = subagents.get(id)?;
306
307 let state = handle.state_async().await;
308 let activity = handle.activity().await;
309 let config = handle.config();
310
311 Some(SubAgentInfo {
312 id: id.to_string(),
313 agent_type: config.agent_type.clone(),
314 description: config.description.clone(),
315 state: format!("{:?}", state),
316 parent_id: config.parent_id.clone(),
317 created_at: handle.created_at(),
318 updated_at: std::time::SystemTime::now()
319 .duration_since(std::time::UNIX_EPOCH)
320 .unwrap()
321 .as_millis() as u64,
322 current_activity: Some(activity),
323 })
324 }
325
326 pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
328 let subagents = self.subagents.read().await;
329 let mut activities = HashMap::new();
330
331 for (id, handle) in subagents.iter() {
332 if !handle.state().is_terminal() {
333 let activity = handle.activity().await;
334 activities.insert(id.clone(), activity);
335 }
336 }
337
338 activities
339 }
340
341 pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
343 let subagents = self.subagents.read().await;
344 subagents.get(id).cloned()
345 }
346
347 pub async fn pending_external_tasks_for(
360 &self,
361 subagent_id: &str,
362 ) -> Vec<crate::queue::ExternalTask> {
363 let sessions = self.sessions.read().await;
364 match sessions.get(subagent_id) {
365 Some(session) => session.pending_external_tasks().await,
366 None => vec![],
367 }
368 }
369
370 pub async fn complete_external_task(
371 &self,
372 subagent_id: &str,
373 task_id: &str,
374 result: crate::queue::ExternalTaskResult,
375 ) -> bool {
376 let sessions = self.sessions.read().await;
377 match sessions.get(subagent_id) {
378 Some(session) => session.complete_external_task(task_id, result).await,
379 None => false,
380 }
381 }
382}
383
384impl std::fmt::Debug for AgentOrchestrator {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 f.debug_struct("AgentOrchestrator")
387 .field("event_buffer_size", &self.config.event_buffer_size)
388 .field(
389 "max_concurrent_subagents",
390 &self.config.max_concurrent_subagents,
391 )
392 .finish()
393 }
394}
395
396pub struct SubAgentEventStream {
398 rx: broadcast::Receiver<OrchestratorEvent>,
399 filter_id: String,
400}
401
402impl SubAgentEventStream {
403 pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
405 loop {
406 match self.rx.recv().await {
407 Ok(event) => {
408 if let Some(id) = event.subagent_id() {
409 if id == self.filter_id {
410 return Some(event);
411 }
412 }
413 }
414 Err(_) => return None,
415 }
416 }
417 }
418}