Skip to main content

a3s_code_core/orchestrator/
agent.rs

1//! Agent Orchestrator 核心实现
2
3use crate::error::Result;
4use crate::orchestrator::{
5    AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6    SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12/// Agent Orchestrator - 主子智能体协调器
13///
14/// 基于事件总线实现统一的监控和控制机制。
15/// 默认使用内存事件通讯,支持用户自定义 NATS provider。
16pub struct AgentOrchestrator {
17    /// 配置
18    config: OrchestratorConfig,
19
20    /// Agent used to execute SubAgents (None = placeholder mode)
21    agent: Option<Arc<crate::Agent>>,
22
23    /// 事件广播通道
24    event_tx: broadcast::Sender<OrchestratorEvent>,
25
26    /// SubAgent 注册表
27    subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29    /// Live session references keyed by subagent ID.
30    ///
31    /// Populated only for real-agent SubAgents (i.e., created via `from_agent()`).
32    /// Used by `complete_external_task()` to route results back into the
33    /// session's lane queue without exposing the session to the caller.
34    sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36    /// 下一个 SubAgent ID
37    next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41    /// 创建新的 orchestrator(使用内存事件通讯)
42    ///
43    /// 这是默认的创建方式,适用于单进程场景。
44    /// SubAgents 将以占位符模式运行,不执行实际的 LLM 操作。
45    /// 要执行真实的 LLM 操作,请使用 `from_agent()`。
46    pub fn new_memory() -> Self {
47        Self::new(OrchestratorConfig::default())
48    }
49
50    /// 使用自定义配置创建 orchestrator(占位符模式)
51    pub fn new(config: OrchestratorConfig) -> Self {
52        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54        Self {
55            config,
56            agent: None,
57            event_tx,
58            subagents: Arc::new(RwLock::new(HashMap::new())),
59            sessions: Arc::new(RwLock::new(HashMap::new())),
60            next_id: Arc::new(RwLock::new(1)),
61        }
62    }
63
64    /// Create an orchestrator backed by a real Agent for LLM execution.
65    ///
66    /// SubAgents spawned by this orchestrator will run the actual agent
67    /// definition (permissions, system prompt, model, max_steps) loaded from
68    /// the agent's configuration and any extra `agent_dirs` provided in
69    /// `SubAgentConfig`.
70    pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71        Self::from_agent_with_config(agent, OrchestratorConfig::default())
72    }
73
74    /// Create an orchestrator backed by a real Agent with custom config.
75    pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78        Self {
79            config,
80            agent: Some(agent),
81            event_tx,
82            subagents: Arc::new(RwLock::new(HashMap::new())),
83            sessions: Arc::new(RwLock::new(HashMap::new())),
84            next_id: Arc::new(RwLock::new(1)),
85        }
86    }
87
88    /// 订阅所有 SubAgent 事件
89    ///
90    /// 返回一个接收器,可以接收所有 SubAgent 的事件。
91    pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92        self.event_tx.subscribe()
93    }
94
95    /// 订阅特定 SubAgent 的事件
96    ///
97    /// 返回一个过滤后的接收器,只接收指定 SubAgent 的事件。
98    pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99        let rx = self.event_tx.subscribe();
100        SubAgentEventStream {
101            rx,
102            filter_id: id.to_string(),
103        }
104    }
105
106    /// 启动新的 SubAgent
107    ///
108    /// 返回 SubAgent 句柄,可用于控制和查询状态。
109    pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
110        // 检查并发限制
111        {
112            let subagents = self.subagents.read().await;
113            let active_count = subagents
114                .values()
115                .filter(|h| !h.state().is_terminal())
116                .count();
117
118            if active_count >= self.config.max_concurrent_subagents {
119                return Err(anyhow::anyhow!(
120                    "Maximum concurrent subagents ({}) reached",
121                    self.config.max_concurrent_subagents
122                )
123                .into());
124            }
125        }
126
127        // 生成 SubAgent ID
128        let id = {
129            let mut next_id = self.next_id.write().await;
130            let id = format!("subagent-{}", *next_id);
131            *next_id += 1;
132            id
133        };
134
135        // 创建控制通道
136        let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
137
138        // 创建状态
139        let state = Arc::new(RwLock::new(SubAgentState::Initializing));
140
141        // 创建活动跟踪
142        let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
143
144        // 发布启动事件
145        let _ = self.event_tx.send(OrchestratorEvent::SubAgentStarted {
146            id: id.clone(),
147            agent_type: config.agent_type.clone(),
148            description: config.description.clone(),
149            parent_id: config.parent_id.clone(),
150            config: config.clone(),
151        });
152
153        // 创建 SubAgentWrapper 并启动执行
154        let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
155            id.clone(),
156            config.clone(),
157            self.agent.clone(),
158            self.event_tx.clone(),
159            control_rx,
160            state.clone(),
161            activity.clone(),
162            Arc::clone(&self.sessions),
163        );
164
165        let task_handle = tokio::spawn(async move { wrapper.execute().await });
166
167        // 创建句柄
168        let handle = SubAgentHandle::new(
169            id.clone(),
170            config,
171            control_tx,
172            self.event_tx.clone(),
173            state.clone(),
174            activity.clone(),
175            task_handle,
176        );
177
178        // 注册到 orchestrator
179        self.subagents
180            .write()
181            .await
182            .insert(id.clone(), handle.clone());
183
184        Ok(handle)
185    }
186
187    /// Spawn a subagent from a unified `AgentSlot` declaration.
188    ///
189    /// Convenience wrapper around `spawn_subagent` that accepts the unified slot
190    /// type.  The `role` field is ignored here — for team-based workflows use
191    /// `run_team` instead.
192    pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
193        self.spawn_subagent(SubAgentConfig::from(slot)).await
194    }
195
196    /// Run a goal through a Lead → Worker → Reviewer team built from `AgentSlot`s.
197    ///
198    /// Requires `from_agent()` mode — returns an error if no backing `Agent` is
199    /// configured.  Each slot's `role` field determines its position in the team;
200    /// slots without a role default to `Worker`.  Agent definitions are loaded
201    /// from each slot's `agent_dirs` and looked up by `agent_type`.
202    pub async fn run_team(
203        &self,
204        goal: impl Into<String>,
205        workspace: impl Into<String>,
206        slots: Vec<AgentSlot>,
207    ) -> Result<crate::agent_teams::TeamRunResult> {
208        let agent = self
209            .agent
210            .as_ref()
211            .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
212
213        let ws = workspace.into();
214        let goal = goal.into();
215
216        // Build a shared registry from all agent_dirs across every slot.
217        let registry = crate::subagent::AgentRegistry::new();
218        for slot in &slots {
219            for dir in &slot.agent_dirs {
220                for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
221                    registry.register(def);
222                }
223            }
224        }
225
226        // Use wall-clock millis for a unique team name.
227        let team_name = format!(
228            "team-{}",
229            std::time::SystemTime::now()
230                .duration_since(std::time::UNIX_EPOCH)
231                .unwrap_or_default()
232                .as_millis()
233        );
234
235        let team = crate::agent_teams::AgentTeam::new(
236            &team_name,
237            crate::agent_teams::TeamConfig::default(),
238        );
239        let mut runner = crate::agent_teams::TeamRunner::new(team);
240
241        for (i, slot) in slots.iter().enumerate() {
242            let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
243            let member_id = format!("{}-{}", role, i);
244            runner.team_mut().add_member(&member_id, role);
245            runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, &registry)?;
246        }
247
248        runner.run_until_done(&goal).await
249    }
250
251    /// 发送控制信号到 SubAgent
252    pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
253        let subagents = self.subagents.read().await;
254        let handle = subagents
255            .get(id)
256            .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
257
258        handle.send_control(signal.clone()).await?;
259
260        // 发布控制信号接收事件
261        let _ = self
262            .event_tx
263            .send(OrchestratorEvent::ControlSignalReceived {
264                id: id.to_string(),
265                signal,
266            });
267
268        Ok(())
269    }
270
271    /// 暂停 SubAgent
272    pub async fn pause_subagent(&self, id: &str) -> Result<()> {
273        self.send_control(id, ControlSignal::Pause).await
274    }
275
276    /// 恢复 SubAgent
277    pub async fn resume_subagent(&self, id: &str) -> Result<()> {
278        self.send_control(id, ControlSignal::Resume).await
279    }
280
281    /// 取消 SubAgent
282    pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
283        self.send_control(id, ControlSignal::Cancel).await
284    }
285
286    /// 调整 SubAgent 参数
287    pub async fn adjust_subagent_params(
288        &self,
289        id: &str,
290        max_steps: Option<usize>,
291        timeout_ms: Option<u64>,
292    ) -> Result<()> {
293        self.send_control(
294            id,
295            ControlSignal::AdjustParams {
296                max_steps,
297                timeout_ms,
298            },
299        )
300        .await
301    }
302
303    /// 获取 SubAgent 状态
304    pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
305        let subagents = self.subagents.read().await;
306        subagents.get(id).map(|h| h.state())
307    }
308
309    /// 获取所有 SubAgent 的状态
310    pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
311        let subagents = self.subagents.read().await;
312        subagents
313            .iter()
314            .map(|(id, handle)| (id.clone(), handle.state()))
315            .collect()
316    }
317
318    /// 获取活跃的 SubAgent 数量
319    pub async fn active_count(&self) -> usize {
320        let subagents = self.subagents.read().await;
321        subagents
322            .values()
323            .filter(|h| !h.state().is_terminal())
324            .count()
325    }
326
327    /// 等待所有 SubAgent 完成
328    pub async fn wait_all(&self) -> Result<()> {
329        loop {
330            let active = self.active_count().await;
331            if active == 0 {
332                break;
333            }
334            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
335        }
336        Ok(())
337    }
338
339    /// 获取所有 SubAgent 的信息列表
340    pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
341        let subagents = self.subagents.read().await;
342        let mut infos = Vec::new();
343
344        for (id, handle) in subagents.iter() {
345            let state = handle.state_async().await;
346            let activity = handle.activity().await;
347            let config = handle.config();
348
349            infos.push(SubAgentInfo {
350                id: id.clone(),
351                agent_type: config.agent_type.clone(),
352                description: config.description.clone(),
353                state: format!("{:?}", state),
354                parent_id: config.parent_id.clone(),
355                created_at: handle.created_at(),
356                updated_at: std::time::SystemTime::now()
357                    .duration_since(std::time::UNIX_EPOCH)
358                    .unwrap()
359                    .as_millis() as u64,
360                current_activity: Some(activity),
361            });
362        }
363
364        infos
365    }
366
367    /// 获取特定 SubAgent 的详细信息
368    pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
369        let subagents = self.subagents.read().await;
370        let handle = subagents.get(id)?;
371
372        let state = handle.state_async().await;
373        let activity = handle.activity().await;
374        let config = handle.config();
375
376        Some(SubAgentInfo {
377            id: id.to_string(),
378            agent_type: config.agent_type.clone(),
379            description: config.description.clone(),
380            state: format!("{:?}", state),
381            parent_id: config.parent_id.clone(),
382            created_at: handle.created_at(),
383            updated_at: std::time::SystemTime::now()
384                .duration_since(std::time::UNIX_EPOCH)
385                .unwrap()
386                .as_millis() as u64,
387            current_activity: Some(activity),
388        })
389    }
390
391    /// 获取所有活跃 SubAgent 的当前活动
392    pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
393        let subagents = self.subagents.read().await;
394        let mut activities = HashMap::new();
395
396        for (id, handle) in subagents.iter() {
397            if !handle.state().is_terminal() {
398                let activity = handle.activity().await;
399                activities.insert(id.clone(), activity);
400            }
401        }
402
403        activities
404    }
405
406    /// 获取 SubAgent 句柄(用于直接控制)
407    pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
408        let subagents = self.subagents.read().await;
409        subagents.get(id).cloned()
410    }
411
412    /// Complete a pending external task for a SubAgent.
413    ///
414    /// Call this after processing an `OrchestratorEvent::ExternalTaskPending`
415    /// event.  The `subagent_id` and `task_id` identify the waiting tool call;
416    /// `result` is the outcome produced by the external worker.
417    ///
418    /// Returns `true` if the task was found and unblocked, `false` if the
419    /// subagent or task ID was not found (e.g., already timed out).
420    /// Return any external tasks currently waiting for the given SubAgent.
421    ///
422    /// Returns an empty list if the SubAgent does not exist or has no pending
423    /// external tasks (e.g. when running with the default Internal lane mode).
424    pub async fn pending_external_tasks_for(
425        &self,
426        subagent_id: &str,
427    ) -> Vec<crate::queue::ExternalTask> {
428        let sessions = self.sessions.read().await;
429        match sessions.get(subagent_id) {
430            Some(session) => session.pending_external_tasks().await,
431            None => vec![],
432        }
433    }
434
435    pub async fn complete_external_task(
436        &self,
437        subagent_id: &str,
438        task_id: &str,
439        result: crate::queue::ExternalTaskResult,
440    ) -> bool {
441        let sessions = self.sessions.read().await;
442        match sessions.get(subagent_id) {
443            Some(session) => session.complete_external_task(task_id, result).await,
444            None => false,
445        }
446    }
447}
448
449impl std::fmt::Debug for AgentOrchestrator {
450    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
451        f.debug_struct("AgentOrchestrator")
452            .field("event_buffer_size", &self.config.event_buffer_size)
453            .field(
454                "max_concurrent_subagents",
455                &self.config.max_concurrent_subagents,
456            )
457            .finish()
458    }
459}
460
461/// SubAgent 事件流(过滤特定 SubAgent 的事件)
462pub struct SubAgentEventStream {
463    pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
464    pub(crate) filter_id: String,
465}
466
467impl SubAgentEventStream {
468    /// 接收下一个事件
469    pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
470        loop {
471            match self.rx.recv().await {
472                Ok(event) => {
473                    if let Some(id) = event.subagent_id() {
474                        if id == self.filter_id {
475                            return Some(event);
476                        }
477                    }
478                }
479                Err(_) => return None,
480            }
481        }
482    }
483}