a3s_code_core/orchestrator/
agent.rs1use crate::error::Result;
4use crate::orchestrator::{
5 ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6 SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12pub struct AgentOrchestrator {
18 config: OrchestratorConfig,
20
21 agent: Option<Arc<crate::Agent>>,
23
24 event_tx: broadcast::Sender<OrchestratorEvent>,
26
27 subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
29
30 next_id: Arc<RwLock<u64>>,
32}
33
34impl AgentOrchestrator {
35 pub fn new_memory() -> Self {
40 Self::new(OrchestratorConfig::default())
41 }
42
43 pub fn new(config: OrchestratorConfig) -> Self {
45 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
46
47 Self {
48 config,
49 agent: None,
50 event_tx,
51 subagents: Arc::new(RwLock::new(HashMap::new())),
52 next_id: Arc::new(RwLock::new(1)),
53 }
54 }
55
56 pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
63 Self::from_agent_with_config(agent, OrchestratorConfig::default())
64 }
65
66 pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
68 let (event_tx, _) = broadcast::channel(config.event_buffer_size);
69
70 Self {
71 config,
72 agent: Some(agent),
73 event_tx,
74 subagents: Arc::new(RwLock::new(HashMap::new())),
75 next_id: Arc::new(RwLock::new(1)),
76 }
77 }
78
79 pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
83 self.event_tx.subscribe()
84 }
85
86 pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
90 let rx = self.event_tx.subscribe();
91 SubAgentEventStream {
92 history: VecDeque::new(),
93 rx,
94 filter_id: id.to_string(),
95 }
96 }
97
98 pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
102 let agent = self.agent.clone().ok_or_else(|| {
103 anyhow::anyhow!("SubAgent execution requires AgentOrchestrator::from_agent")
104 })?;
105
106 {
108 let subagents = self.subagents.read().await;
109 let active_count = subagents
110 .values()
111 .filter(|h| !h.state().is_terminal())
112 .count();
113
114 if active_count >= self.config.max_concurrent_subagents {
115 return Err(anyhow::anyhow!(
116 "Maximum concurrent subagents ({}) reached",
117 self.config.max_concurrent_subagents
118 )
119 .into());
120 }
121 }
122
123 let id = {
125 let mut next_id = self.next_id.write().await;
126 let id = format!("subagent-{}", *next_id);
127 *next_id += 1;
128 id
129 };
130
131 let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
133 let (subagent_event_tx, _) = broadcast::channel(self.config.event_buffer_size);
134
135 let state = Arc::new(RwLock::new(SubAgentState::Initializing));
137
138 let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
140 let event_history = Arc::new(RwLock::new(VecDeque::with_capacity(
141 self.config.event_buffer_size,
142 )));
143
144 let started_event = OrchestratorEvent::SubAgentStarted {
146 id: id.clone(),
147 agent_type: config.agent_type.clone(),
148 description: config.description.clone(),
149 parent_id: config.parent_id.clone(),
150 config: config.clone(),
151 };
152 let _ = self.event_tx.send(started_event.clone());
153 let _ = subagent_event_tx.send(started_event.clone());
154 event_history.write().await.push_back(started_event);
155
156 let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
158 id.clone(),
159 config.clone(),
160 agent,
161 self.event_tx.clone(),
162 subagent_event_tx.clone(),
163 Arc::clone(&event_history),
164 control_rx,
165 state.clone(),
166 activity.clone(),
167 );
168
169 let task_handle = tokio::spawn(async move { wrapper.execute().await });
170
171 let handle = SubAgentHandle::new(crate::orchestrator::handle::SubAgentHandleParts {
173 id: id.clone(),
174 config,
175 control_tx,
176 subagent_event_tx,
177 event_history,
178 state: state.clone(),
179 activity: activity.clone(),
180 task_handle,
181 });
182
183 self.subagents
185 .write()
186 .await
187 .insert(id.clone(), handle.clone());
188
189 Ok(handle)
190 }
191
192 pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
194 let subagents = self.subagents.read().await;
195 let handle = subagents
196 .get(id)
197 .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
198
199 handle.send_control(signal.clone()).await?;
200
201 let _ = self
203 .event_tx
204 .send(OrchestratorEvent::ControlSignalReceived {
205 id: id.to_string(),
206 signal,
207 });
208
209 Ok(())
210 }
211
212 pub async fn pause_subagent(&self, id: &str) -> Result<()> {
214 self.send_control(id, ControlSignal::Pause).await
215 }
216
217 pub async fn resume_subagent(&self, id: &str) -> Result<()> {
219 self.send_control(id, ControlSignal::Resume).await
220 }
221
222 pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
224 self.send_control(id, ControlSignal::Cancel).await
225 }
226
227 pub async fn adjust_subagent_params(
229 &self,
230 id: &str,
231 max_steps: Option<usize>,
232 timeout_ms: Option<u64>,
233 ) -> Result<()> {
234 self.send_control(
235 id,
236 ControlSignal::AdjustParams {
237 max_steps,
238 timeout_ms,
239 },
240 )
241 .await
242 }
243
244 pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
246 let subagents = self.subagents.read().await;
247 subagents.get(id).map(|h| h.state())
248 }
249
250 pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
252 let subagents = self.subagents.read().await;
253 subagents
254 .iter()
255 .map(|(id, handle)| (id.clone(), handle.state()))
256 .collect()
257 }
258
259 pub async fn active_count(&self) -> usize {
261 let subagents = self.subagents.read().await;
262 subagents
263 .values()
264 .filter(|h| !h.state().is_terminal())
265 .count()
266 }
267
268 pub async fn wait_all(&self) -> Result<()> {
270 loop {
271 let active = self.active_count().await;
272 if active == 0 {
273 break;
274 }
275 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
276 }
277 Ok(())
278 }
279
280 pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
282 let subagents = self.subagents.read().await;
283 let mut infos = Vec::new();
284
285 for (id, handle) in subagents.iter() {
286 let state = handle.state_async().await;
287 let activity = handle.activity().await;
288 let config = handle.config();
289
290 infos.push(SubAgentInfo {
291 id: id.clone(),
292 agent_type: config.agent_type.clone(),
293 description: config.description.clone(),
294 state: format!("{:?}", state),
295 parent_id: config.parent_id.clone(),
296 created_at: handle.created_at(),
297 updated_at: std::time::SystemTime::now()
298 .duration_since(std::time::UNIX_EPOCH)
299 .unwrap()
300 .as_millis() as u64,
301 current_activity: Some(activity),
302 });
303 }
304
305 infos
306 }
307
308 pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
310 let subagents = self.subagents.read().await;
311 let handle = subagents.get(id)?;
312
313 let state = handle.state_async().await;
314 let activity = handle.activity().await;
315 let config = handle.config();
316
317 Some(SubAgentInfo {
318 id: id.to_string(),
319 agent_type: config.agent_type.clone(),
320 description: config.description.clone(),
321 state: format!("{:?}", state),
322 parent_id: config.parent_id.clone(),
323 created_at: handle.created_at(),
324 updated_at: std::time::SystemTime::now()
325 .duration_since(std::time::UNIX_EPOCH)
326 .unwrap()
327 .as_millis() as u64,
328 current_activity: Some(activity),
329 })
330 }
331
332 pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
334 let subagents = self.subagents.read().await;
335 let mut activities = HashMap::new();
336
337 for (id, handle) in subagents.iter() {
338 if !handle.state().is_terminal() {
339 let activity = handle.activity().await;
340 activities.insert(id.clone(), activity);
341 }
342 }
343
344 activities
345 }
346
347 pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
349 let subagents = self.subagents.read().await;
350 subagents.get(id).cloned()
351 }
352}
353
354impl std::fmt::Debug for AgentOrchestrator {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 f.debug_struct("AgentOrchestrator")
357 .field("event_buffer_size", &self.config.event_buffer_size)
358 .field(
359 "max_concurrent_subagents",
360 &self.config.max_concurrent_subagents,
361 )
362 .finish()
363 }
364}
365
366pub struct SubAgentEventStream {
368 pub(crate) history: VecDeque<OrchestratorEvent>,
369 pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
370 pub(crate) filter_id: String,
371}
372
373impl SubAgentEventStream {
374 pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
376 if let Some(event) = self.history.pop_front() {
377 return Some(event);
378 }
379
380 loop {
381 match self.rx.recv().await {
382 Ok(event) => {
383 if let Some(id) = event.subagent_id() {
384 if id == self.filter_id {
385 return Some(event);
386 }
387 }
388 }
389 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
390 Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
391 }
392 }
393 }
394}