Skip to main content

a3s_code_core/orchestrator/
agent.rs

1//! Agent Orchestrator 核心实现
2
3use crate::error::Result;
4use crate::orchestrator::{
5    ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity, SubAgentConfig,
6    SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12/// Agent Orchestrator - 主子智能体协调器
13///
14/// 基于事件总线实现统一的监控和控制机制。
15/// 默认使用内存事件通讯,支持用户自定义 NATS provider。
16pub struct AgentOrchestrator {
17    /// 配置
18    config: OrchestratorConfig,
19
20    /// Agent used to execute SubAgents (None = placeholder mode)
21    agent: Option<Arc<crate::Agent>>,
22
23    /// 事件广播通道
24    event_tx: broadcast::Sender<OrchestratorEvent>,
25
26    /// SubAgent 注册表
27    subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29    /// Live session references keyed by subagent ID.
30    ///
31    /// Populated only for real-agent SubAgents (i.e., created via `from_agent()`).
32    /// Used by `complete_external_task()` to route results back into the
33    /// session's lane queue without exposing the session to the caller.
34    sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36    /// 下一个 SubAgent ID
37    next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41    /// 创建新的 orchestrator(使用内存事件通讯)
42    ///
43    /// 这是默认的创建方式,适用于单进程场景。
44    /// SubAgents 将以占位符模式运行,不执行实际的 LLM 操作。
45    /// 要执行真实的 LLM 操作,请使用 `from_agent()`。
46    pub fn new_memory() -> Self {
47        Self::new(OrchestratorConfig::default())
48    }
49
50    /// 使用自定义配置创建 orchestrator(占位符模式)
51    pub fn new(config: OrchestratorConfig) -> Self {
52        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54        Self {
55            config,
56            agent: None,
57            event_tx,
58            subagents: Arc::new(RwLock::new(HashMap::new())),
59            sessions: Arc::new(RwLock::new(HashMap::new())),
60            next_id: Arc::new(RwLock::new(1)),
61        }
62    }
63
64    /// Create an orchestrator backed by a real Agent for LLM execution.
65    ///
66    /// SubAgents spawned by this orchestrator will run the actual agent
67    /// definition (permissions, system prompt, model, max_steps) loaded from
68    /// the agent's configuration and any extra `agent_dirs` provided in
69    /// `SubAgentConfig`.
70    pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71        Self::from_agent_with_config(agent, OrchestratorConfig::default())
72    }
73
74    /// Create an orchestrator backed by a real Agent with custom config.
75    pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78        Self {
79            config,
80            agent: Some(agent),
81            event_tx,
82            subagents: Arc::new(RwLock::new(HashMap::new())),
83            sessions: Arc::new(RwLock::new(HashMap::new())),
84            next_id: Arc::new(RwLock::new(1)),
85        }
86    }
87
88    /// 订阅所有 SubAgent 事件
89    ///
90    /// 返回一个接收器,可以接收所有 SubAgent 的事件。
91    pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92        self.event_tx.subscribe()
93    }
94
95    /// 订阅特定 SubAgent 的事件
96    ///
97    /// 返回一个过滤后的接收器,只接收指定 SubAgent 的事件。
98    pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99        let rx = self.event_tx.subscribe();
100        SubAgentEventStream {
101            rx,
102            filter_id: id.to_string(),
103        }
104    }
105
106    /// 启动新的 SubAgent
107    ///
108    /// 返回 SubAgent 句柄,可用于控制和查询状态。
109    pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110        // 检查并发限制
111        {
112            let subagents = self.subagents.read().await;
113            let active_count = subagents
114                .values()
115                .filter(|h| !h.state().is_terminal())
116                .count();
117
118            if active_count >= self.config.max_concurrent_subagents {
119                return Err(anyhow::anyhow!(
120                    "Maximum concurrent subagents ({}) reached",
121                    self.config.max_concurrent_subagents
122                )
123                .into());
124            }
125        }
126
127        // 生成 SubAgent ID
128        let id = {
129            let mut next_id = self.next_id.write().await;
130            let id = format!("subagent-{}", *next_id);
131            *next_id += 1;
132            id
133        };
134
135        // 创建控制通道
136        let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138        // 创建状态
139        let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141        // 创建活动跟踪
142        let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144        // 发布启动事件
145        let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146            id: id.clone(),
147            agent_type: config.agent_type.clone(),
148            description: config.description.clone(),
149            parent_id: config.parent_id.clone(),
150            config: config.clone(),
151        });
152
153        // 创建 SubAgentWrapper 并启动执行
154        let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155            id.clone(),
156            config.clone(),
157            self.agent.clone(),
158            self.event_tx.clone(),
159            control_rx,
160            state.clone(),
161            activity.clone(),
162            Arc::clone(&self.sessions),
163        );
164
165        let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167        // 创建句柄
168        let handle = SubAgentHandle::new(
169            id.clone(),
170            config,
171            control_tx,
172            state.clone(),
173            activity.clone(),
174            task_handle,
175        );
176
177        // 注册到 orchestrator
178        self.subagents
179            .write()
180            .await
181            .insert(id.clone(), handle.clone());
182
183        Ok(handle)
184    }
185
186    /// 发送控制信号到 SubAgent
187    pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
188        let subagents = self.subagents.read().await;
189        let handle = subagents
190            .get(id)
191            .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
192
193        handle.send_control(signal.clone()).await?;
194
195        // 发布控制信号接收事件
196        let _ = self
197            .event_tx
198            .send(OrchestratorEvent::ControlSignalReceived {
199                id: id.to_string(),
200                signal,
201            });
202
203        Ok(())
204    }
205
206    /// 暂停 SubAgent
207    pub async fn pause_subagent(&self, id: &str) -> Result<()> {
208        self.send_control(id, ControlSignal::Pause).await
209    }
210
211    /// 恢复 SubAgent
212    pub async fn resume_subagent(&self, id: &str) -> Result<()> {
213        self.send_control(id, ControlSignal::Resume).await
214    }
215
216    /// 取消 SubAgent
217    pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
218        self.send_control(id, ControlSignal::Cancel).await
219    }
220
221    /// 调整 SubAgent 参数
222    pub async fn adjust_subagent_params(
223        &self,
224        id: &str,
225        max_steps: Option<usize>,
226        timeout_ms: Option<u64>,
227    ) -> Result<()> {
228        self.send_control(
229            id,
230            ControlSignal::AdjustParams {
231                max_steps,
232                timeout_ms,
233            },
234        )
235        .await
236    }
237
238    /// 获取 SubAgent 状态
239    pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
240        let subagents = self.subagents.read().await;
241        subagents.get(id).map(|h| h.state())
242    }
243
244    /// 获取所有 SubAgent 的状态
245    pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
246        let subagents = self.subagents.read().await;
247        subagents
248            .iter()
249            .map(|(id, handle)| (id.clone(), handle.state()))
250            .collect()
251    }
252
253    /// 获取活跃的 SubAgent 数量
254    pub async fn active_count(&self) -> usize {
255        let subagents = self.subagents.read().await;
256        subagents
257            .values()
258            .filter(|h| !h.state().is_terminal())
259            .count()
260    }
261
262    /// 等待所有 SubAgent 完成
263    pub async fn wait_all(&self) -> Result<()> {
264        loop {
265            let active = self.active_count().await;
266            if active == 0 {
267                break;
268            }
269            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
270        }
271        Ok(())
272    }
273
274    /// 获取所有 SubAgent 的信息列表
275    pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
276        let subagents = self.subagents.read().await;
277        let mut infos = Vec::new();
278
279        for (id, handle) in subagents.iter() {
280            let state = handle.state_async().await;
281            let activity = handle.activity().await;
282            let config = handle.config();
283
284            infos.push(SubAgentInfo {
285                id: id.clone(),
286                agent_type: config.agent_type.clone(),
287                description: config.description.clone(),
288                state: format!("{:?}", state),
289                parent_id: config.parent_id.clone(),
290                created_at: handle.created_at(),
291                updated_at: std::time::SystemTime::now()
292                    .duration_since(std::time::UNIX_EPOCH)
293                    .unwrap()
294                    .as_millis() as u64,
295                current_activity: Some(activity),
296            });
297        }
298
299        infos
300    }
301
302    /// 获取特定 SubAgent 的详细信息
303    pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
304        let subagents = self.subagents.read().await;
305        let handle = subagents.get(id)?;
306
307        let state = handle.state_async().await;
308        let activity = handle.activity().await;
309        let config = handle.config();
310
311        Some(SubAgentInfo {
312            id: id.to_string(),
313            agent_type: config.agent_type.clone(),
314            description: config.description.clone(),
315            state: format!("{:?}", state),
316            parent_id: config.parent_id.clone(),
317            created_at: handle.created_at(),
318            updated_at: std::time::SystemTime::now()
319                .duration_since(std::time::UNIX_EPOCH)
320                .unwrap()
321                .as_millis() as u64,
322            current_activity: Some(activity),
323        })
324    }
325
326    /// 获取所有活跃 SubAgent 的当前活动
327    pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
328        let subagents = self.subagents.read().await;
329        let mut activities = HashMap::new();
330
331        for (id, handle) in subagents.iter() {
332            if !handle.state().is_terminal() {
333                let activity = handle.activity().await;
334                activities.insert(id.clone(), activity);
335            }
336        }
337
338        activities
339    }
340
341    /// 获取 SubAgent 句柄(用于直接控制)
342    pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
343        let subagents = self.subagents.read().await;
344        subagents.get(id).cloned()
345    }
346
347    /// Complete a pending external task for a SubAgent.
348    ///
349    /// Call this after processing an `OrchestratorEvent::ExternalTaskPending`
350    /// event.  The `subagent_id` and `task_id` identify the waiting tool call;
351    /// `result` is the outcome produced by the external worker.
352    ///
353    /// Returns `true` if the task was found and unblocked, `false` if the
354    /// subagent or task ID was not found (e.g., already timed out).
355    /// Return any external tasks currently waiting for the given SubAgent.
356    ///
357    /// Returns an empty list if the SubAgent does not exist or has no pending
358    /// external tasks (e.g. when running with the default Internal lane mode).
359    pub async fn pending_external_tasks_for(
360        &self,
361        subagent_id: &str,
362    ) -> Vec<crate::queue::ExternalTask> {
363        let sessions = self.sessions.read().await;
364        match sessions.get(subagent_id) {
365            Some(session) => session.pending_external_tasks().await,
366            None => vec![],
367        }
368    }
369
370    pub async fn complete_external_task(
371        &self,
372        subagent_id: &str,
373        task_id: &str,
374        result: crate::queue::ExternalTaskResult,
375    ) -> bool {
376        let sessions = self.sessions.read().await;
377        match sessions.get(subagent_id) {
378            Some(session) => session.complete_external_task(task_id, result).await,
379            None => false,
380        }
381    }
382}
383
384impl std::fmt::Debug for AgentOrchestrator {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        f.debug_struct("AgentOrchestrator")
387            .field("event_buffer_size", &self.config.event_buffer_size)
388            .field(
389                "max_concurrent_subagents",
390                &self.config.max_concurrent_subagents,
391            )
392            .finish()
393    }
394}
395
396/// SubAgent 事件流(过滤特定 SubAgent 的事件)
397pub struct SubAgentEventStream {
398    rx: broadcast::Receiver<OrchestratorEvent>,
399    filter_id: String,
400}
401
402impl SubAgentEventStream {
403    /// 接收下一个事件
404    pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
405        loop {
406            match self.rx.recv().await {
407                Ok(event) => {
408                    if let Some(id) = event.subagent_id() {
409                        if id == self.filter_id {
410                            return Some(event);
411                        }
412                    }
413                }
414                Err(_) => return None,
415            }
416        }
417    }
418}