Skip to main content

a3s_code_core/orchestrator/
agent.rs

1//! Agent Orchestrator 核心实现
2
3use crate::error::Result;
4use crate::orchestrator::{
5    AgentSlot, ControlSignal, OrchestratorConfig, OrchestratorEvent, SubAgentActivity,
6    SubAgentConfig, SubAgentHandle, SubAgentInfo, SubAgentState,
7};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12/// Agent Orchestrator - 主子智能体协调器
13///
14/// 基于事件总线实现统一的监控和控制机制。
15/// 默认使用内存事件通讯,支持用户自定义 NATS provider。
16pub struct AgentOrchestrator {
17    /// 配置
18    config: OrchestratorConfig,
19
20    /// Agent used to execute SubAgents (None = placeholder mode)
21    agent: Option<Arc<crate::Agent>>,
22
23    /// 事件广播通道
24    event_tx: broadcast::Sender<OrchestratorEvent>,
25
26    /// SubAgent 注册表
27    subagents: Arc<RwLock<HashMap<String, SubAgentHandle>>>,
28
29    /// Live session references keyed by subagent ID.
30    ///
31    /// Populated only for real-agent SubAgents (i.e., created via `from_agent()`).
32    /// Used by `complete_external_task()` to route results back into the
33    /// session's lane queue without exposing the session to the caller.
34    sessions: Arc<RwLock<HashMap<String, Arc<crate::agent_api::AgentSession>>>>,
35
36    /// 下一个 SubAgent ID
37    next_id: Arc<RwLock<u64>>,
38}
39
40impl AgentOrchestrator {
41    /// 创建新的 orchestrator(使用内存事件通讯)
42    ///
43    /// 这是默认的创建方式,适用于单进程场景。
44    /// SubAgents 将以占位符模式运行,不执行实际的 LLM 操作。
45    /// 要执行真实的 LLM 操作,请使用 `from_agent()`。
46    pub fn new_memory() -> Self {
47        Self::new(OrchestratorConfig::default())
48    }
49
50    /// 使用自定义配置创建 orchestrator(占位符模式)
51    pub fn new(config: OrchestratorConfig) -> Self {
52        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
53
54        Self {
55            config,
56            agent: None,
57            event_tx,
58            subagents: Arc::new(RwLock::new(HashMap::new())),
59            sessions: Arc::new(RwLock::new(HashMap::new())),
60            next_id: Arc::new(RwLock::new(1)),
61        }
62    }
63
64    /// Create an orchestrator backed by a real Agent for LLM execution.
65    ///
66    /// SubAgents spawned by this orchestrator will run the actual agent
67    /// definition (permissions, system prompt, model, max_steps) loaded from
68    /// the agent's configuration and any extra `agent_dirs` provided in
69    /// `SubAgentConfig`.
70    pub fn from_agent(agent: Arc<crate::Agent>) -> Self {
71        Self::from_agent_with_config(agent, OrchestratorConfig::default())
72    }
73
74    /// Create an orchestrator backed by a real Agent with custom config.
75    pub fn from_agent_with_config(agent: Arc<crate::Agent>, config: OrchestratorConfig) -> Self {
76        let (event_tx, _) = broadcast::channel(config.event_buffer_size);
77
78        Self {
79            config,
80            agent: Some(agent),
81            event_tx,
82            subagents: Arc::new(RwLock::new(HashMap::new())),
83            sessions: Arc::new(RwLock::new(HashMap::new())),
84            next_id: Arc::new(RwLock::new(1)),
85        }
86    }
87
88    /// 订阅所有 SubAgent 事件
89    ///
90    /// 返回一个接收器,可以接收所有 SubAgent 的事件。
91    pub fn subscribe_all(&self) -> broadcast::Receiver<OrchestratorEvent> {
92        self.event_tx.subscribe()
93    }
94
95    /// 订阅特定 SubAgent 的事件
96    ///
97    /// 返回一个过滤后的接收器,只接收指定 SubAgent 的事件。
98    pub fn subscribe_subagent(&self, id: &str) -> SubAgentEventStream {
99        let rx = self.event_tx.subscribe();
100        SubAgentEventStream {
101            history: VecDeque::new(),
102            rx,
103            filter_id: id.to_string(),
104        }
105    }
106
107    /// 启动新的 SubAgent
108    ///
109    /// 返回 SubAgent 句柄,可用于控制和查询状态。
110    pub async fn spawn_subagent(&self, config: SubAgentConfig) -> Result<SubAgentHandle> {
111        // 检查并发限制
112        {
113            let subagents = self.subagents.read().await;
114            let active_count = subagents
115                .values()
116                .filter(|h| !h.state().is_terminal())
117                .count();
118
119            if active_count >= self.config.max_concurrent_subagents {
120                return Err(anyhow::anyhow!(
121                    "Maximum concurrent subagents ({}) reached",
122                    self.config.max_concurrent_subagents
123                )
124                .into());
125            }
126        }
127
128        // 生成 SubAgent ID
129        let id = {
130            let mut next_id = self.next_id.write().await;
131            let id = format!("subagent-{}", *next_id);
132            *next_id += 1;
133            id
134        };
135
136        // 创建控制通道
137        let (control_tx, control_rx) = tokio::sync::mpsc::channel(self.config.control_buffer_size);
138        let (subagent_event_tx, _) = broadcast::channel(self.config.event_buffer_size);
139
140        // 创建状态
141        let state = Arc::new(RwLock::new(SubAgentState::Initializing));
142
143        // 创建活动跟踪
144        let activity = Arc::new(RwLock::new(SubAgentActivity::Idle));
145        let event_history = Arc::new(RwLock::new(VecDeque::with_capacity(
146            self.config.event_buffer_size,
147        )));
148
149        // 发布启动事件
150        let started_event = OrchestratorEvent::SubAgentStarted {
151            id: id.clone(),
152            agent_type: config.agent_type.clone(),
153            description: config.description.clone(),
154            parent_id: config.parent_id.clone(),
155            config: config.clone(),
156        };
157        let _ = self.event_tx.send(started_event.clone());
158        let _ = subagent_event_tx.send(started_event.clone());
159        event_history.write().await.push_back(started_event);
160
161        // 创建 SubAgentWrapper 并启动执行
162        let wrapper = crate::orchestrator::wrapper::SubAgentWrapper::new(
163            id.clone(),
164            config.clone(),
165            self.agent.clone(),
166            self.event_tx.clone(),
167            subagent_event_tx.clone(),
168            Arc::clone(&event_history),
169            control_rx,
170            state.clone(),
171            activity.clone(),
172            Arc::clone(&self.sessions),
173        );
174
175        let task_handle = tokio::spawn(async move { wrapper.execute().await });
176
177        // 创建句柄
178        let handle = SubAgentHandle::new(crate::orchestrator::handle::SubAgentHandleParts {
179            id: id.clone(),
180            config,
181            control_tx,
182            subagent_event_tx,
183            event_history,
184            state: state.clone(),
185            activity: activity.clone(),
186            task_handle,
187        });
188
189        // 注册到 orchestrator
190        self.subagents
191            .write()
192            .await
193            .insert(id.clone(), handle.clone());
194
195        Ok(handle)
196    }
197
198    /// Spawn a subagent from a unified `AgentSlot` declaration.
199    ///
200    /// Convenience wrapper around `spawn_subagent` that accepts the unified slot
201    /// type.  The `role` field is ignored here — for team-based workflows use
202    /// `run_team` instead.
203    pub async fn spawn(&self, slot: AgentSlot) -> Result<SubAgentHandle> {
204        self.spawn_subagent(SubAgentConfig::from(slot)).await
205    }
206
207    /// Run a goal through a Lead → Worker → Reviewer team built from `AgentSlot`s.
208    ///
209    /// Requires `from_agent()` mode — returns an error if no backing `Agent` is
210    /// configured.  Each slot's `role` field determines its position in the team;
211    /// slots without a role default to `Worker`.  Agent definitions are loaded
212    /// from each slot's `agent_dirs` and looked up by `agent_type`.
213    pub async fn run_team(
214        &self,
215        goal: impl Into<String>,
216        workspace: impl Into<String>,
217        slots: Vec<AgentSlot>,
218    ) -> Result<crate::agent_teams::TeamRunResult> {
219        let agent = self
220            .agent
221            .as_ref()
222            .ok_or_else(|| anyhow::anyhow!("run_team requires a real Agent (use from_agent())"))?;
223
224        let ws = workspace.into();
225        let goal = goal.into();
226
227        // Build a shared registry from all agent_dirs across every slot.
228        let registry = crate::subagent::AgentRegistry::new();
229        for slot in &slots {
230            for dir in &slot.agent_dirs {
231                for def in crate::subagent::load_agents_from_dir(std::path::Path::new(dir)) {
232                    registry.register(def);
233                }
234            }
235        }
236
237        // Use wall-clock millis for a unique team name.
238        let team_name = format!(
239            "team-{}",
240            std::time::SystemTime::now()
241                .duration_since(std::time::UNIX_EPOCH)
242                .unwrap_or_default()
243                .as_millis()
244        );
245
246        let team = crate::agent_teams::AgentTeam::new(
247            &team_name,
248            crate::agent_teams::TeamConfig::default(),
249        );
250        let mut runner = crate::agent_teams::TeamRunner::new(team);
251
252        for (i, slot) in slots.iter().enumerate() {
253            let role = slot.role.unwrap_or(crate::agent_teams::TeamRole::Worker);
254            let member_id = format!("{}-{}", role, i);
255            runner.team_mut().add_member(&member_id, role);
256            runner.bind_agent(&member_id, agent, &ws, &slot.agent_type, &registry)?;
257        }
258
259        runner.run_until_done(&goal).await
260    }
261
262    /// 发送控制信号到 SubAgent
263    pub async fn send_control(&self, id: &str, signal: ControlSignal) -> Result<()> {
264        let subagents = self.subagents.read().await;
265        let handle = subagents
266            .get(id)
267            .ok_or_else(|| anyhow::anyhow!("SubAgent '{}' not found", id))?;
268
269        handle.send_control(signal.clone()).await?;
270
271        // 发布控制信号接收事件
272        let _ = self
273            .event_tx
274            .send(OrchestratorEvent::ControlSignalReceived {
275                id: id.to_string(),
276                signal,
277            });
278
279        Ok(())
280    }
281
282    /// 暂停 SubAgent
283    pub async fn pause_subagent(&self, id: &str) -> Result<()> {
284        self.send_control(id, ControlSignal::Pause).await
285    }
286
287    /// 恢复 SubAgent
288    pub async fn resume_subagent(&self, id: &str) -> Result<()> {
289        self.send_control(id, ControlSignal::Resume).await
290    }
291
292    /// 取消 SubAgent
293    pub async fn cancel_subagent(&self, id: &str) -> Result<()> {
294        self.send_control(id, ControlSignal::Cancel).await
295    }
296
297    /// 调整 SubAgent 参数
298    pub async fn adjust_subagent_params(
299        &self,
300        id: &str,
301        max_steps: Option<usize>,
302        timeout_ms: Option<u64>,
303    ) -> Result<()> {
304        self.send_control(
305            id,
306            ControlSignal::AdjustParams {
307                max_steps,
308                timeout_ms,
309            },
310        )
311        .await
312    }
313
314    /// 获取 SubAgent 状态
315    pub async fn get_subagent_state(&self, id: &str) -> Option<SubAgentState> {
316        let subagents = self.subagents.read().await;
317        subagents.get(id).map(|h| h.state())
318    }
319
320    /// 获取所有 SubAgent 的状态
321    pub async fn get_all_states(&self) -> HashMap<String, SubAgentState> {
322        let subagents = self.subagents.read().await;
323        subagents
324            .iter()
325            .map(|(id, handle)| (id.clone(), handle.state()))
326            .collect()
327    }
328
329    /// 获取活跃的 SubAgent 数量
330    pub async fn active_count(&self) -> usize {
331        let subagents = self.subagents.read().await;
332        subagents
333            .values()
334            .filter(|h| !h.state().is_terminal())
335            .count()
336    }
337
338    /// 等待所有 SubAgent 完成
339    pub async fn wait_all(&self) -> Result<()> {
340        loop {
341            let active = self.active_count().await;
342            if active == 0 {
343                break;
344            }
345            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
346        }
347        Ok(())
348    }
349
350    /// 获取所有 SubAgent 的信息列表
351    pub async fn list_subagents(&self) -> Vec<SubAgentInfo> {
352        let subagents = self.subagents.read().await;
353        let mut infos = Vec::new();
354
355        for (id, handle) in subagents.iter() {
356            let state = handle.state_async().await;
357            let activity = handle.activity().await;
358            let config = handle.config();
359
360            infos.push(SubAgentInfo {
361                id: id.clone(),
362                agent_type: config.agent_type.clone(),
363                description: config.description.clone(),
364                state: format!("{:?}", state),
365                parent_id: config.parent_id.clone(),
366                created_at: handle.created_at(),
367                updated_at: std::time::SystemTime::now()
368                    .duration_since(std::time::UNIX_EPOCH)
369                    .unwrap()
370                    .as_millis() as u64,
371                current_activity: Some(activity),
372            });
373        }
374
375        infos
376    }
377
378    /// 获取特定 SubAgent 的详细信息
379    pub async fn get_subagent_info(&self, id: &str) -> Option<SubAgentInfo> {
380        let subagents = self.subagents.read().await;
381        let handle = subagents.get(id)?;
382
383        let state = handle.state_async().await;
384        let activity = handle.activity().await;
385        let config = handle.config();
386
387        Some(SubAgentInfo {
388            id: id.to_string(),
389            agent_type: config.agent_type.clone(),
390            description: config.description.clone(),
391            state: format!("{:?}", state),
392            parent_id: config.parent_id.clone(),
393            created_at: handle.created_at(),
394            updated_at: std::time::SystemTime::now()
395                .duration_since(std::time::UNIX_EPOCH)
396                .unwrap()
397                .as_millis() as u64,
398            current_activity: Some(activity),
399        })
400    }
401
402    /// 获取所有活跃 SubAgent 的当前活动
403    pub async fn get_active_activities(&self) -> HashMap<String, SubAgentActivity> {
404        let subagents = self.subagents.read().await;
405        let mut activities = HashMap::new();
406
407        for (id, handle) in subagents.iter() {
408            if !handle.state().is_terminal() {
409                let activity = handle.activity().await;
410                activities.insert(id.clone(), activity);
411            }
412        }
413
414        activities
415    }
416
417    /// 获取 SubAgent 句柄(用于直接控制)
418    pub async fn get_handle(&self, id: &str) -> Option<SubAgentHandle> {
419        let subagents = self.subagents.read().await;
420        subagents.get(id).cloned()
421    }
422
423    /// Complete a pending external task for a SubAgent.
424    ///
425    /// Call this after processing an `OrchestratorEvent::ExternalTaskPending`
426    /// event.  The `subagent_id` and `task_id` identify the waiting tool call;
427    /// `result` is the outcome produced by the external worker.
428    ///
429    /// Returns `true` if the task was found and unblocked, `false` if the
430    /// subagent or task ID was not found (e.g., already timed out).
431    /// Return any external tasks currently waiting for the given SubAgent.
432    ///
433    /// Returns an empty list if the SubAgent does not exist or has no pending
434    /// external tasks (e.g. when running with the default Internal lane mode).
435    pub async fn pending_external_tasks_for(
436        &self,
437        subagent_id: &str,
438    ) -> Vec<crate::queue::ExternalTask> {
439        let sessions = self.sessions.read().await;
440        match sessions.get(subagent_id) {
441            Some(session) => session.pending_external_tasks().await,
442            None => vec![],
443        }
444    }
445
446    pub async fn complete_external_task(
447        &self,
448        subagent_id: &str,
449        task_id: &str,
450        result: crate::queue::ExternalTaskResult,
451    ) -> bool {
452        let sessions = self.sessions.read().await;
453        match sessions.get(subagent_id) {
454            Some(session) => session.complete_external_task(task_id, result).await,
455            None => false,
456        }
457    }
458}
459
460impl std::fmt::Debug for AgentOrchestrator {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        f.debug_struct("AgentOrchestrator")
463            .field("event_buffer_size", &self.config.event_buffer_size)
464            .field(
465                "max_concurrent_subagents",
466                &self.config.max_concurrent_subagents,
467            )
468            .finish()
469    }
470}
471
472/// SubAgent 事件流(过滤特定 SubAgent 的事件)
473pub struct SubAgentEventStream {
474    pub(crate) history: VecDeque<OrchestratorEvent>,
475    pub(crate) rx: broadcast::Receiver<OrchestratorEvent>,
476    pub(crate) filter_id: String,
477}
478
479impl SubAgentEventStream {
480    /// 接收下一个事件
481    pub async fn recv(&mut self) -> Option<OrchestratorEvent> {
482        if let Some(event) = self.history.pop_front() {
483            return Some(event);
484        }
485
486        loop {
487            match self.rx.recv().await {
488                Ok(event) => {
489                    if let Some(id) = event.subagent_id() {
490                        if id == self.filter_id {
491                            return Some(event);
492                        }
493                    }
494                }
495                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
496                Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
497            }
498        }
499    }
500}