Skip to main content

a3s_code_core/orchestrator/
agent.rs

1//! Advanced SubAgent control-plane implementation.
2
3use crate::error::Result;
4use crate::orchestrator::{
5    ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6    SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12/// Advanced SubAgent control plane.
13///
14/// This API is for explicit SubAgent lifecycle control: spawn, pause, resume,
15/// cancel, inspect, and subscribe to events. Routine model-visible delegation
16/// should use `task` / `parallel_task`.
17pub struct AgentOrchestrator {
18    /// 配置
19    config: OrchestratorConfig,
20
21    /// Agent used to execute SubAgents.
22    agent: Option<Arc<crate::Agent>>,
23
24    /// 事件广播通道
25    event_tx: broadcast::Sender<OrchestratorEvent>,
26
27    /// SubAgent 注册表
28    subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
29
30    /// 下一个 SubAgent ID
31    next_id: Arc<RwLock<u64>>,
32}
33
34impl AgentOrchestrator {
35    /// Create a memory-backed control plane.
36    ///
37    /// This is useful for inspecting an empty control plane in tests. Spawning
38    /// SubAgents requires [`Self::from_agent`].
39    pub fn new_memory() -> Self {
40        Self::new(OrchestratorConfig::default())
41    }
42
43    /// Create a memory-backed control plane with custom config.
44    pub fn new(config: OrchestratorConfig) -> Self {
45        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
46
47        Self {
48            config,
49            agent: None,
50            event_tx,
51            subagents: Arc::new(RwLock::new(HashMap::new())),
52            next_id: Arc::new(RwLock::new(1)),
53        }
54    }
55
56    /// Create an orchestrator backed by a real Agent for LLM execution.
57    ///
58    /// SubAgents spawned by this orchestrator will run the actual agent
59    /// definition (permissions, system prompt, model, max_steps) loaded from
60    /// the agent's configuration and any extra `agent_dirs` provided in
61    /// `SubAgentConfig`.
62    pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
63        Self::from_agent_with_config(agent, OrchestratorConfig::default())
64    }
65
66    /// Create an orchestrator backed by a real Agent with custom config.
67    pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
68        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
69
70        Self {
71            config,
72            agent: Some(agent),
73            event_tx,
74            subagents: Arc::new(RwLock::new(HashMap::new())),
75            next_id: Arc::new(RwLock::new(1)),
76        }
77    }
78
79    /// Subscribe to all SubAgent events.
80    ///
81    /// 返回一个接收器,可以接收所有 SubAgent 的事件。
82    pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
83        self.event_tx.subscribe()
84    }
85
86    /// Subscribe to events for a specific SubAgent.
87    ///
88    /// 返回一个过滤后的接收器,只接收指定 SubAgent 的事件。
89    pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
90        let rx = self.event_tx.subscribe();
91        SubAgentEventStream {
92            history: VecDeque::new(),
93            rx,
94            filter_id: id.to_string(),
95        }
96    }
97
98    /// Spawn a new SubAgent.
99    ///
100    /// 返回 SubAgent 句柄,可用于控制和查询状态。
101    pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
102        let agent = self.agent.clone().ok_or_else(|| {
103            anyhow::anyhow!("SubAgent execution requires AgentOrchestrator::from_agent")
104        })?;
105
106        // 检查并发限制
107        {
108            let subagents = self.subagents.read().await;
109            let active_count = subagents
110                .values()
111                .filter(|h| !h.state().is_terminal())
112                .count();
113
114            if active_count >= self.config.max_concurrent_subagents {
115                return Err(anyhow::anyhow!(
116                    "Maximum concurrent subagents ({}) reached",
117                    self.config.max_concurrent_subagents
118                )
119                .into());
120            }
121        }
122
123        // 生成 SubAgent ID
124        let id = {
125            let mut next_id = self.next_id.write().await;
126            let id = format!("subagent-{}", *next_id);
127            *next_id += 1;
128            id
129        };
130
131        // 创建控制通道
132        let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
133        let (subagent_event_tx, _) = broadcast::channel(self.config.event_buffer_size);
134
135        // 创建状态
136        let state = Arc::new(RwLock::new(SubAgentState::Initializing));
137
138        // 创建活动跟踪
139        let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
140        let event_history = Arc::new(RwLock::new(VecDeque::with_capacity(
141            self.config.event_buffer_size,
142        )));
143
144        // 发布启动事件
145        let started_event = OrchestratorEvent::SubAgentStarted {
146            id: id.clone(),
147            agent_type: config.agent_type.clone(),
148            description: config.description.clone(),
149            parent_id: config.parent_id.clone(),
150            config: config.clone(),
151        };
152        let _ = self.event_tx.send(started_event.clone());
153        let _ = subagent_event_tx.send(started_event.clone());
154        event_history.write().await.push_back(started_event);
155
156        // 创建 SubAgentWrapper 并启动执行
157        let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
158            id.clone(),
159            config.clone(),
160            agent,
161            self.event_tx.clone(),
162            subagent_event_tx.clone(),
163            Arc::clone(&event_history),
164            control_rx,
165            state.clone(),
166            activity.clone(),
167        );
168
169        let task_handle = tokio::spawn(async move { wrapper.execute().await });
170
171        // 创建句柄
172        let handle = SubAgentHandle::new(crate::orchestrator::handle::SubAgentHandleParts {
173            id: id.clone(),
174            config,
175            control_tx,
176            subagent_event_tx,
177            event_history,
178            state: state.clone(),
179            activity: activity.clone(),
180            task_handle,
181        });
182
183        // 注册到 orchestrator
184        self.subagents
185            .write()
186            .await
187            .insert(id.clone(), handle.clone());
188
189        Ok(handle)
190    }
191
192    /// 发送控制信号到 SubAgent
193    pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
194        let subagents = self.subagents.read().await;
195        let handle = subagents
196            .get(id)
197            .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
198
199        handle.send_control(signal.clone()).await?;
200
201        // 发布控制信号接收事件
202        let _ = self
203            .event_tx
204            .send(OrchestratorEvent::ControlSignalReceived {
205                id: id.to_string(),
206                signal,
207            });
208
209        Ok(())
210    }
211
212    /// 暂停 SubAgent
213    pub async fn pause_subagent(&self, id: &str) -> Result<()> {
214        self.send_control(id, ControlSignal::Pause).await
215    }
216
217    /// 恢复 SubAgent
218    pub async fn resume_subagent(&self, id: &str) -> Result<()> {
219        self.send_control(id, ControlSignal::Resume).await
220    }
221
222    /// 取消 SubAgent
223    pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
224        self.send_control(id, ControlSignal::Cancel).await
225    }
226
227    /// 调整 SubAgent 参数
228    pub async fn adjust_subagent_params(
229        &self,
230        id: &str,
231        max_steps: Option<usize>,
232        timeout_ms: Option<u64>,
233    ) -> Result<()> {
234        self.send_control(
235            id,
236            ControlSignal::AdjustParams {
237                max_steps,
238                timeout_ms,
239            },
240        )
241        .await
242    }
243
244    /// 获取 SubAgent 状态
245    pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
246        let subagents = self.subagents.read().await;
247        subagents.get(id).map(|h| h.state())
248    }
249
250    /// 获取所有 SubAgent 的状态
251    pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
252        let subagents = self.subagents.read().await;
253        subagents
254            .iter()
255            .map(|(id, handle)| (id.clone(), handle.state()))
256            .collect()
257    }
258
259    /// 获取活跃的 SubAgent 数量
260    pub async fn active_count(&self) -> usize {
261        let subagents = self.subagents.read().await;
262        subagents
263            .values()
264            .filter(|h| !h.state().is_terminal())
265            .count()
266    }
267
268    /// 等待所有 SubAgent 完成
269    pub async fn wait_all(&self) -> Result<()> {
270        loop {
271            let active = self.active_count().await;
272            if active == 0 {
273                break;
274            }
275            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
276        }
277        Ok(())
278    }
279
280    /// 获取所有 SubAgent 的信息列表
281    pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
282        let subagents = self.subagents.read().await;
283        let mut infos = Vec::new();
284
285        for (id, handle) in subagents.iter() {
286            let state = handle.state_async().await;
287            let activity = handle.activity().await;
288            let config = handle.config();
289
290            infos.push(SubAgentInfo {
291                id: id.clone(),
292                agent_type: config.agent_type.clone(),
293                description: config.description.clone(),
294                state: format!("{:?}", state),
295                parent_id: config.parent_id.clone(),
296                created_at: handle.created_at(),
297                updated_at: std::time::SystemTime::now()
298                    .duration_since(std::time::UNIX_EPOCH)
299                    .unwrap()
300                    .as_millis() as u64,
301                current_activity: Some(activity),
302            });
303        }
304
305        infos
306    }
307
308    /// 获取特定 SubAgent 的详细信息
309    pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
310        let subagents = self.subagents.read().await;
311        let handle = subagents.get(id)?;
312
313        let state = handle.state_async().await;
314        let activity = handle.activity().await;
315        let config = handle.config();
316
317        Some(SubAgentInfo {
318            id: id.to_string(),
319            agent_type: config.agent_type.clone(),
320            description: config.description.clone(),
321            state: format!("{:?}", state),
322            parent_id: config.parent_id.clone(),
323            created_at: handle.created_at(),
324            updated_at: std::time::SystemTime::now()
325                .duration_since(std::time::UNIX_EPOCH)
326                .unwrap()
327                .as_millis() as u64,
328            current_activity: Some(activity),
329        })
330    }
331
332    /// 获取所有活跃 SubAgent 的当前活动
333    pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
334        let subagents = self.subagents.read().await;
335        let mut activities = HashMap::new();
336
337        for (id, handle) in subagents.iter() {
338            if !handle.state().is_terminal() {
339                let activity = handle.activity().await;
340                activities.insert(id.clone(), activity);
341            }
342        }
343
344        activities
345    }
346
347    /// 获取 SubAgent 句柄(用于直接控制)
348    pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
349        let subagents = self.subagents.read().await;
350        subagents.get(id).cloned()
351    }
352}
353
354impl std::fmt::Debug for AgentOrchestrator {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        f.debug_struct("AgentOrchestrator")
357            .field("event_buffer_size", &self.config.event_buffer_size)
358            .field(
359                "max_concurrent_subagents",
360                &self.config.max_concurrent_subagents,
361            )
362            .finish()
363    }
364}
365
366/// SubAgent 事件流(过滤特定 SubAgent 的事件)
367pub struct SubAgentEventStream {
368    pub(crate) history: VecDeque<OrchestratorEvent>,
369    pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
370    pub(crate) filter_id: String,
371}
372
373impl SubAgentEventStream {
374    /// 接收下一个事件
375    pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
376        if let Some(event) = self.history.pop_front() {
377            return Some(event);
378        }
379
380        loop {
381            match self.rx.recv().await {
382                Ok(event) => {
383                    if let Some(id) = event.subagent_id() {
384                        if id == self.filter_id {
385                            return Some(event);
386                        }
387                    }
388                }
389                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
390                Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
391            }
392        }
393    }
394}