Skip to main content

a3s_code_core/orchestrator/
agent.rs

1//! Agent Orchestrator 核心实现
2
3use crate::error::Result;
4use crate::orchestrator::{
5    AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6    SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12/// Agent Orchestrator - 主子智能体协调器
13///
14/// 基于事件总线实现统一的监控和控制机制。
15/// 默认使用内存事件通讯,支持用户自定义 NATS provider。
16pub struct AgentOrchestrator {
17    /// 配置
18    config: OrchestratorConfig,
19
20    /// Agent used to execute SubAgents (None = placeholder mode)
21    agent: Option<Arc<crate::Agent>>,
22
23    /// 事件广播通道
24    event_tx: broadcast::Sender<OrchestratorEvent>,
25
26    /// SubAgent 注册表
27    subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29    /// Live session references keyed by subagent ID.
30    ///
31    /// Populated only for real-agent SubAgents (i.e., created via `from_agent()`).
32    /// Used by `complete_external_task()` to route results back into the
33    /// session's lane queue without exposing the session to the caller.
34    sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36    /// 下一个 SubAgent ID
37    next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41    /// 创建新的 orchestrator(使用内存事件通讯)
42    ///
43    /// 这是默认的创建方式,适用于单进程场景。
44    /// SubAgents 将以占位符模式运行,不执行实际的 LLM 操作。
45    /// 要执行真实的 LLM 操作,请使用 `from_agent()`。
46    pub fn new_memory() -> Self {
47        Self::new(OrchestratorConfig::default())
48    }
49
50    /// 使用自定义配置创建 orchestrator(占位符模式)
51    pub fn new(config: OrchestratorConfig) -> Self {
52        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54        Self {
55            config,
56            agent: None,
57            event_tx,
58            subagents: Arc::new(RwLock::new(HashMap::new())),
59            sessions: Arc::new(RwLock::new(HashMap::new())),
60            next_id: Arc::new(RwLock::new(1)),
61        }
62    }
63
64    /// Create an orchestrator backed by a real Agent for LLM execution.
65    ///
66    /// SubAgents spawned by this orchestrator will run the actual agent
67    /// definition (permissions, system prompt, model, max_steps) loaded from
68    /// the agent's configuration and any extra `agent_dirs` provided in
69    /// `SubAgentConfig`.
70    pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71        Self::from_agent_with_config(agent, OrchestratorConfig::default())
72    }
73
74    /// Create an orchestrator backed by a real Agent with custom config.
75    pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78        Self {
79            config,
80            agent: Some(agent),
81            event_tx,
82            subagents: Arc::new(RwLock::new(HashMap::new())),
83            sessions: Arc::new(RwLock::new(HashMap::new())),
84            next_id: Arc::new(RwLock::new(1)),
85        }
86    }
87
88    /// 订阅所有 SubAgent 事件
89    ///
90    /// 返回一个接收器,可以接收所有 SubAgent 的事件。
91    pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92        self.event_tx.subscribe()
93    }
94
95    /// 订阅特定 SubAgent 的事件
96    ///
97    /// 返回一个过滤后的接收器,只接收指定 SubAgent 的事件。
98    pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99        let rx = self.event_tx.subscribe();
100        SubAgentEventStream {
101            rx,
102            filter_id: id.to_string(),
103        }
104    }
105
106    /// 启动新的 SubAgent
107    ///
108    /// 返回 SubAgent 句柄,可用于控制和查询状态。
109    pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110        // 检查并发限制
111        {
112            let subagents = self.subagents.read().await;
113            let active_count = subagents
114                .values()
115                .filter(|h| !h.state().is_terminal())
116                .count();
117
118            if active_count >= self.config.max_concurrent_subagents {
119                return Err(anyhow::anyhow!(
120                    "Maximum concurrent subagents ({}) reached",
121                    self.config.max_concurrent_subagents
122                )
123                .into());
124            }
125        }
126
127        // 生成 SubAgent ID
128        let id = {
129            let mut next_id = self.next_id.write().await;
130            let id = format!("subagent-{}", *next_id);
131            *next_id += 1;
132            id
133        };
134
135        // 创建控制通道
136        let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138        // 创建状态
139        let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141        // 创建活动跟踪
142        let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144        // 发布启动事件
145        let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146            id: id.clone(),
147            agent_type: config.agent_type.clone(),
148            description: config.description.clone(),
149            parent_id: config.parent_id.clone(),
150            config: config.clone(),
151        });
152
153        // 创建 SubAgentWrapper 并启动执行
154        let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155            id.clone(),
156            config.clone(),
157            self.agent.clone(),
158            self.event_tx.clone(),
159            control_rx,
160            state.clone(),
161            activity.clone(),
162            Arc::clone(&self.sessions),
163        );
164
165        let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167        // 创建句柄
168        let handle = SubAgentHandle::new(
169            id.clone(),
170            config,
171            control_tx,
172            state.clone(),
173            activity.clone(),
174            task_handle,
175        );
176
177        // 注册到 orchestrator
178        self.subagents
179            .write()
180            .await
181            .insert(id.clone(), handle.clone());
182
183        Ok(handle)
184    }
185
186    /// Spawn a subagent from a unified `AgentSlot` declaration.
187    ///
188    /// Convenience wrapper around `spawn_subagent` that accepts the unified slot
189    /// type.  The `role` field is ignored here — for team-based workflows use
190    /// `run_team` instead.
191    pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
192        self.spawn_subagent(SubAgentConfig::from(slot)).await
193    }
194
195    /// Run a goal through a Lead → Worker → Reviewer team built from `AgentSlot`s.
196    ///
197    /// Requires `from_agent()` mode — returns an error if no backing `Agent` is
198    /// configured.  Each slot's `role` field determines its position in the team;
199    /// slots without a role default to `Worker`.  Agent definitions are loaded
200    /// from each slot's `agent_dirs` and looked up by `agent_type`.
201    pub async fn run_team(
202        &self,
203        goal: impl Into<String>,
204        workspace: impl Into<String>,
205        slots: Vec<AgentSlot>,
206    ) -> Result<crate::agent_teams::TeamRunResult> {
207        let agent = self
208            .agent
209            .as_ref()
210            .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
211
212        let ws = workspace.into();
213        let goal = goal.into();
214
215        // Build a shared registry from all agent_dirs across every slot.
216        let registry = crate::subagent::AgentRegistry::new();
217        for slot in &slots {
218            for dir in &slot.agent_dirs {
219                for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
220                    registry.register(def);
221                }
222            }
223        }
224
225        // Use wall-clock millis for a unique team name.
226        let team_name = format!(
227            "team-{}",
228            std::time::SystemTime::now()
229                .duration_since(std::time::UNIX_EPOCH)
230                .unwrap_or_default()
231                .as_millis()
232        );
233
234        let team = crate::agent_teams::AgentTeam::new(
235            &team_name,
236            crate::agent_teams::TeamConfig::default(),
237        );
238        let mut runner = crate::agent_teams::TeamRunner::new(team);
239
240        for (i, slot) in slots.iter().enumerate() {
241            let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
242            let member_id = format!("{}-{}", role, i);
243            runner.team_mut().add_member(&member_id, role);
244            runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, &registry)?;
245        }
246
247        runner.run_until_done(&goal).await
248    }
249
250    /// 发送控制信号到 SubAgent
251    pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
252        let subagents = self.subagents.read().await;
253        let handle = subagents
254            .get(id)
255            .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
256
257        handle.send_control(signal.clone()).await?;
258
259        // 发布控制信号接收事件
260        let _ = self
261            .event_tx
262            .send(OrchestratorEvent::ControlSignalReceived {
263                id: id.to_string(),
264                signal,
265            });
266
267        Ok(())
268    }
269
270    /// 暂停 SubAgent
271    pub async fn pause_subagent(&self, id: &str) -> Result<()> {
272        self.send_control(id, ControlSignal::Pause).await
273    }
274
275    /// 恢复 SubAgent
276    pub async fn resume_subagent(&self, id: &str) -> Result<()> {
277        self.send_control(id, ControlSignal::Resume).await
278    }
279
280    /// 取消 SubAgent
281    pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
282        self.send_control(id, ControlSignal::Cancel).await
283    }
284
285    /// 调整 SubAgent 参数
286    pub async fn adjust_subagent_params(
287        &self,
288        id: &str,
289        max_steps: Option<usize>,
290        timeout_ms: Option<u64>,
291    ) -> Result<()> {
292        self.send_control(
293            id,
294            ControlSignal::AdjustParams {
295                max_steps,
296                timeout_ms,
297            },
298        )
299        .await
300    }
301
302    /// 获取 SubAgent 状态
303    pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
304        let subagents = self.subagents.read().await;
305        subagents.get(id).map(|h| h.state())
306    }
307
308    /// 获取所有 SubAgent 的状态
309    pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
310        let subagents = self.subagents.read().await;
311        subagents
312            .iter()
313            .map(|(id, handle)| (id.clone(), handle.state()))
314            .collect()
315    }
316
317    /// 获取活跃的 SubAgent 数量
318    pub async fn active_count(&self) -> usize {
319        let subagents = self.subagents.read().await;
320        subagents
321            .values()
322            .filter(|h| !h.state().is_terminal())
323            .count()
324    }
325
326    /// 等待所有 SubAgent 完成
327    pub async fn wait_all(&self) -> Result<()> {
328        loop {
329            let active = self.active_count().await;
330            if active == 0 {
331                break;
332            }
333            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
334        }
335        Ok(())
336    }
337
338    /// 获取所有 SubAgent 的信息列表
339    pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
340        let subagents = self.subagents.read().await;
341        let mut infos = Vec::new();
342
343        for (id, handle) in subagents.iter() {
344            let state = handle.state_async().await;
345            let activity = handle.activity().await;
346            let config = handle.config();
347
348            infos.push(SubAgentInfo {
349                id: id.clone(),
350                agent_type: config.agent_type.clone(),
351                description: config.description.clone(),
352                state: format!("{:?}", state),
353                parent_id: config.parent_id.clone(),
354                created_at: handle.created_at(),
355                updated_at: std::time::SystemTime::now()
356                    .duration_since(std::time::UNIX_EPOCH)
357                    .unwrap()
358                    .as_millis() as u64,
359                current_activity: Some(activity),
360            });
361        }
362
363        infos
364    }
365
366    /// 获取特定 SubAgent 的详细信息
367    pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
368        let subagents = self.subagents.read().await;
369        let handle = subagents.get(id)?;
370
371        let state = handle.state_async().await;
372        let activity = handle.activity().await;
373        let config = handle.config();
374
375        Some(SubAgentInfo {
376            id: id.to_string(),
377            agent_type: config.agent_type.clone(),
378            description: config.description.clone(),
379            state: format!("{:?}", state),
380            parent_id: config.parent_id.clone(),
381            created_at: handle.created_at(),
382            updated_at: std::time::SystemTime::now()
383                .duration_since(std::time::UNIX_EPOCH)
384                .unwrap()
385                .as_millis() as u64,
386            current_activity: Some(activity),
387        })
388    }
389
390    /// 获取所有活跃 SubAgent 的当前活动
391    pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
392        let subagents = self.subagents.read().await;
393        let mut activities = HashMap::new();
394
395        for (id, handle) in subagents.iter() {
396            if !handle.state().is_terminal() {
397                let activity = handle.activity().await;
398                activities.insert(id.clone(), activity);
399            }
400        }
401
402        activities
403    }
404
405    /// 获取 SubAgent 句柄(用于直接控制)
406    pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
407        let subagents = self.subagents.read().await;
408        subagents.get(id).cloned()
409    }
410
411    /// Complete a pending external task for a SubAgent.
412    ///
413    /// Call this after processing an `OrchestratorEvent::ExternalTaskPending`
414    /// event.  The `subagent_id` and `task_id` identify the waiting tool call;
415    /// `result` is the outcome produced by the external worker.
416    ///
417    /// Returns `true` if the task was found and unblocked, `false` if the
418    /// subagent or task ID was not found (e.g., already timed out).
419    /// Return any external tasks currently waiting for the given SubAgent.
420    ///
421    /// Returns an empty list if the SubAgent does not exist or has no pending
422    /// external tasks (e.g. when running with the default Internal lane mode).
423    pub async fn pending_external_tasks_for(
424        &self,
425        subagent_id: &str,
426    ) -> Vec<crate::queue::ExternalTask> {
427        let sessions = self.sessions.read().await;
428        match sessions.get(subagent_id) {
429            Some(session) => session.pending_external_tasks().await,
430            None => vec![],
431        }
432    }
433
434    pub async fn complete_external_task(
435        &self,
436        subagent_id: &str,
437        task_id: &str,
438        result: crate::queue::ExternalTaskResult,
439    ) -> bool {
440        let sessions = self.sessions.read().await;
441        match sessions.get(subagent_id) {
442            Some(session) => session.complete_external_task(task_id, result).await,
443            None => false,
444        }
445    }
446}
447
448impl std::fmt::Debug for AgentOrchestrator {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        f.debug_struct("AgentOrchestrator")
451            .field("event_buffer_size", &self.config.event_buffer_size)
452            .field(
453                "max_concurrent_subagents",
454                &self.config.max_concurrent_subagents,
455            )
456            .finish()
457    }
458}
459
460/// SubAgent 事件流(过滤特定 SubAgent 的事件)
461pub struct SubAgentEventStream {
462    rx: broadcast::Receiver<OrchestratorEvent>,
463    filter_id: String,
464}
465
466impl SubAgentEventStream {
467    /// 接收下一个事件
468    pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
469        loop {
470            match self.rx.recv().await {
471                Ok(event) => {
472                    if let Some(id) = event.subagent_id() {
473                        if id == self.filter_id {
474                            return Some(event);
475                        }
476                    }
477                }
478                Err(_) => return None,
479            }
480        }
481    }
482}