Skip to main content

aster/blueprint/
agent_coordinator.rs

1//! Agent 协调器
2//!
3//!
4//! 实现蜂王-蜜蜂协作模型:
5//! - 主 Agent(蜂王):全局视野,负责任务分配和协调
6//! - 子 Agent(蜜蜂):在各自的树枝上工作,执行具体任务
7
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use uuid::Uuid;
12
13use super::blueprint_manager::BlueprintManager;
14use super::task_tree_manager::TaskTreeManager;
15use super::types::*;
16
17// ============================================================================
18// 协调器配置
19// ============================================================================
20
21/// 协调器配置
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct CoordinatorConfig {
24    /// 最大并发 Worker 数量
25    pub max_concurrent_workers: usize,
26    /// Worker 任务超时时间(毫秒)
27    pub worker_timeout: u64,
28    /// 主循环间隔(毫秒)
29    pub main_loop_interval: u64,
30    /// 是否自动分配任务
31    pub auto_assign_tasks: bool,
32    /// Worker 模型选择策略
33    pub model_strategy: ModelStrategy,
34    /// 默认 Worker 模型
35    pub default_worker_model: String,
36}
37
38impl Default for CoordinatorConfig {
39    fn default() -> Self {
40        Self {
41            max_concurrent_workers: 5,
42            worker_timeout: 300000,   // 5 分钟
43            main_loop_interval: 5000, // 5 秒
44            auto_assign_tasks: true,
45            model_strategy: ModelStrategy::Adaptive,
46            default_worker_model: "haiku".to_string(),
47        }
48    }
49}
50
51/// 模型选择策略
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum ModelStrategy {
55    Fixed,
56    Adaptive,
57    RoundRobin,
58}
59
60// ============================================================================
61// Agent 协调器
62// ============================================================================
63
64/// Agent 协调器
65pub struct AgentCoordinator {
66    config: CoordinatorConfig,
67    queen: Option<QueenAgent>,
68    workers: HashMap<String, WorkerAgent>,
69    timeline: Vec<TimelineEvent>,
70    is_running: bool,
71}
72
73impl Default for AgentCoordinator {
74    fn default() -> Self {
75        Self::new(CoordinatorConfig::default())
76    }
77}
78
79impl AgentCoordinator {
80    /// 创建新的协调器
81    pub fn new(config: CoordinatorConfig) -> Self {
82        Self {
83            config,
84            queen: None,
85            workers: HashMap::new(),
86            timeline: Vec::new(),
87            is_running: false,
88        }
89    }
90
91    // ------------------------------------------------------------------------
92    // 蜂王初始化
93    // ------------------------------------------------------------------------
94
95    /// 初始化蜂王 Agent
96    pub async fn initialize_queen(
97        &mut self,
98        blueprint_manager: &mut BlueprintManager,
99        tree_manager: &mut TaskTreeManager,
100        blueprint_id: &str,
101    ) -> Result<&QueenAgent, String> {
102        let blueprint = blueprint_manager
103            .get_blueprint(blueprint_id)
104            .await
105            .ok_or_else(|| format!("蓝图 {} 不存在", blueprint_id))?;
106
107        if blueprint.status != BlueprintStatus::Approved
108            && blueprint.status != BlueprintStatus::Executing
109        {
110            return Err(format!(
111                "蓝图必须是已批准状态才能执行,当前状态: {:?}",
112                blueprint.status
113            ));
114        }
115
116        // 生成任务树(如果还没有)
117        let task_tree_id = if let Some(ref tree_id) = blueprint.task_tree_id {
118            if tree_manager.get_task_tree(tree_id).await.is_some() {
119                tree_id.clone()
120            } else {
121                let tree = tree_manager
122                    .generate_from_blueprint(&blueprint)
123                    .await
124                    .map_err(|e| e.to_string())?;
125                blueprint_manager
126                    .start_execution(blueprint_id, tree.id.clone())
127                    .await
128                    .map_err(|e| e.to_string())?;
129                tree.id
130            }
131        } else {
132            let tree = tree_manager
133                .generate_from_blueprint(&blueprint)
134                .await
135                .map_err(|e| e.to_string())?;
136            blueprint_manager
137                .start_execution(blueprint_id, tree.id.clone())
138                .await
139                .map_err(|e| e.to_string())?;
140            tree.id
141        };
142
143        // 构建全局上下文
144        let tree = tree_manager.get_task_tree(&task_tree_id).await.unwrap();
145        let global_context = self.build_global_context(&blueprint, &tree);
146
147        // 创建蜂王
148        let queen = QueenAgent {
149            id: Uuid::new_v4().to_string(),
150            blueprint_id: blueprint_id.to_string(),
151            task_tree_id,
152            status: QueenStatus::Idle,
153            worker_agents: Vec::new(),
154            global_context,
155            decisions: Vec::new(),
156        };
157
158        self.add_timeline_event(
159            TimelineEventType::TaskStart,
160            "蜂王初始化完成".to_string(),
161            None,
162            Some(queen.id.clone()),
163        );
164
165        self.queen = Some(queen);
166        Ok(self.queen.as_ref().unwrap())
167    }
168
169    // ------------------------------------------------------------------------
170    // Worker 管理
171    // ------------------------------------------------------------------------
172
173    /// 创建 Worker Agent(蜜蜂)
174    pub fn create_worker(&mut self, task_id: String) -> Result<&WorkerAgent, String> {
175        let queen = self
176            .queen
177            .as_ref()
178            .ok_or_else(|| "蜂王未初始化".to_string())?;
179
180        // 检查并发限制
181        let active_count = self
182            .workers
183            .values()
184            .filter(|w| w.status != WorkerStatus::Idle)
185            .count();
186
187        if active_count >= self.config.max_concurrent_workers {
188            return Err(format!(
189                "已达到最大并发 Worker 数量: {}",
190                self.config.max_concurrent_workers
191            ));
192        }
193
194        let worker = WorkerAgent {
195            id: Uuid::new_v4().to_string(),
196            queen_id: queen.id.clone(),
197            task_id: task_id.clone(),
198            status: WorkerStatus::Idle,
199            tdd_cycle: TddCycleState::default(),
200            history: Vec::new(),
201        };
202
203        let worker_id = worker.id.clone();
204        self.workers.insert(worker_id.clone(), worker);
205
206        self.add_timeline_event(
207            TimelineEventType::TaskStart,
208            format!("Worker 创建: {}", worker_id),
209            None,
210            Some(worker_id.clone()),
211        );
212
213        Ok(self.workers.get(&worker_id).unwrap())
214    }
215
216    /// 分配任务给 Worker
217    pub async fn assign_task(
218        &mut self,
219        tree_manager: &mut TaskTreeManager,
220        worker_id: &str,
221        task_id: &str,
222    ) -> Result<(), String> {
223        let queen = self
224            .queen
225            .as_ref()
226            .ok_or_else(|| "蜂王未初始化".to_string())?;
227
228        // 检查任务是否可以开始
229        let (can_start, blockers) = tree_manager
230            .can_start_task(&queen.task_tree_id, task_id)
231            .await;
232        if !can_start {
233            return Err(format!(
234                "任务 {} 无法开始: {}",
235                task_id,
236                blockers.join(", ")
237            ));
238        }
239
240        // 更新 Worker 状态
241        let worker = self
242            .workers
243            .get_mut(worker_id)
244            .ok_or_else(|| format!("Worker {} 不存在", worker_id))?;
245
246        worker.task_id = task_id.to_string();
247        worker.status = WorkerStatus::TestWriting;
248
249        // 记录决策
250        self.record_decision(
251            DecisionType::TaskAssignment,
252            format!("分配任务 {} 给 Worker {}", task_id, worker_id),
253            "根据优先级和依赖关系选择".to_string(),
254        );
255
256        self.add_timeline_event(
257            TimelineEventType::TaskStart,
258            format!("任务分配: {}", task_id),
259            Some(task_id.to_string()),
260            Some(worker_id.to_string()),
261        );
262
263        Ok(())
264    }
265
266    /// Worker 完成任务
267    pub fn worker_complete_task(&mut self, worker_id: &str) -> Result<(), String> {
268        let worker = self
269            .workers
270            .get_mut(worker_id)
271            .ok_or_else(|| format!("Worker {} 不存在", worker_id))?;
272
273        let task_id = worker.task_id.clone();
274        worker.status = WorkerStatus::Idle;
275        worker.tdd_cycle.test_passed = true;
276
277        self.add_timeline_event(
278            TimelineEventType::TaskComplete,
279            format!("Worker 完成任务: {}", task_id),
280            Some(task_id),
281            Some(worker_id.to_string()),
282        );
283
284        Ok(())
285    }
286
287    /// Worker 任务失败
288    pub fn worker_fail_task(&mut self, worker_id: &str, error: &str) -> Result<(), String> {
289        let worker = self
290            .workers
291            .get_mut(worker_id)
292            .ok_or_else(|| format!("Worker {} 不存在", worker_id))?;
293
294        let task_id = worker.task_id.clone();
295        worker.status = WorkerStatus::Idle;
296
297        self.add_timeline_event(
298            TimelineEventType::TestFail,
299            format!("Worker 任务失败: {} - {}", task_id, error),
300            Some(task_id),
301            Some(worker_id.to_string()),
302        );
303
304        Ok(())
305    }
306
307    // ------------------------------------------------------------------------
308    // 决策和时间线
309    // ------------------------------------------------------------------------
310
311    /// 记录蜂王决策
312    pub fn record_decision(
313        &mut self,
314        decision_type: DecisionType,
315        description: String,
316        reasoning: String,
317    ) {
318        if let Some(ref mut queen) = self.queen {
319            let decision = AgentDecision {
320                id: Uuid::new_v4().to_string(),
321                timestamp: Utc::now(),
322                decision_type,
323                description,
324                reasoning,
325                result: None,
326            };
327            queen.decisions.push(decision);
328        }
329    }
330
331    /// 添加时间线事件
332    pub fn add_timeline_event(
333        &mut self,
334        event_type: TimelineEventType,
335        description: String,
336        task_id: Option<String>,
337        agent_id: Option<String>,
338    ) {
339        let event = TimelineEvent {
340            id: Uuid::new_v4().to_string(),
341            timestamp: Utc::now(),
342            event_type,
343            task_id,
344            agent_id,
345            description,
346            data: None,
347        };
348        self.timeline.push(event);
349    }
350
351    /// 获取时间线
352    pub fn get_timeline(&self) -> &[TimelineEvent] {
353        &self.timeline
354    }
355
356    // ------------------------------------------------------------------------
357    // 上下文构建
358    // ------------------------------------------------------------------------
359
360    /// 构建全局上下文
361    fn build_global_context(&self, blueprint: &Blueprint, tree: &TaskTree) -> String {
362        let mut lines = Vec::new();
363
364        lines.push("# 项目全局上下文".to_string());
365        lines.push(String::new());
366
367        lines.push(format!(
368            "## 蓝图: {} (v{})",
369            blueprint.name, blueprint.version
370        ));
371        lines.push(blueprint.description.clone());
372        lines.push(String::new());
373
374        // 模块边界
375        lines.push("## 模块边界(你必须严格遵守)".to_string());
376        for module in &blueprint.modules {
377            lines.push(format!("### {}", module.name));
378            lines.push(format!("- 类型: {:?}", module.module_type));
379            let responsibilities = module
380                .responsibilities
381                .iter()
382                .take(3)
383                .cloned()
384                .collect::<Vec<_>>()
385                .join("、");
386            lines.push(format!("- 职责: {}", responsibilities));
387            if let Some(ref tech) = module.tech_stack {
388                lines.push(format!("- 技术栈: {}", tech.join(" + ")));
389            }
390            let root = module
391                .root_path
392                .clone()
393                .unwrap_or_else(|| format!("src/{}", module.name.to_lowercase()));
394            lines.push(format!("- 根路径: {}", root));
395            lines.push(String::new());
396        }
397
398        // 任务树统计
399        lines.push("## 任务树统计".to_string());
400        lines.push(format!("- 总任务数: {}", tree.stats.total_tasks));
401        lines.push(format!("- 待执行: {}", tree.stats.pending_tasks));
402        lines.push(format!("- 执行中: {}", tree.stats.running_tasks));
403        lines.push(format!("- 已完成: {}", tree.stats.passed_tasks));
404        lines.push(format!("- 进度: {:.1}%", tree.stats.progress_percentage));
405
406        lines.join("\n")
407    }
408
409    // ------------------------------------------------------------------------
410    // 查询方法
411    // ------------------------------------------------------------------------
412
413    /// 获取蜂王状态
414    pub fn get_queen(&self) -> Option<&QueenAgent> {
415        self.queen.as_ref()
416    }
417
418    /// 获取所有 Worker
419    pub fn get_workers(&self) -> Vec<&WorkerAgent> {
420        self.workers.values().collect()
421    }
422
423    /// 获取指定 Worker
424    pub fn get_worker(&self, worker_id: &str) -> Option<&WorkerAgent> {
425        self.workers.get(worker_id)
426    }
427
428    /// 获取空闲 Worker
429    pub fn get_idle_workers(&self) -> Vec<&WorkerAgent> {
430        self.workers
431            .values()
432            .filter(|w| w.status == WorkerStatus::Idle)
433            .collect()
434    }
435
436    /// 获取活跃 Worker 数量
437    pub fn get_active_worker_count(&self) -> usize {
438        self.workers
439            .values()
440            .filter(|w| w.status != WorkerStatus::Idle)
441            .count()
442    }
443
444    /// 是否正在运行
445    pub fn is_running(&self) -> bool {
446        self.is_running
447    }
448
449    /// 获取配置
450    pub fn get_config(&self) -> &CoordinatorConfig {
451        &self.config
452    }
453
454    /// 更新配置
455    pub fn update_config(&mut self, config: CoordinatorConfig) {
456        self.config = config;
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_coordinator_creation() {
466        let coordinator = AgentCoordinator::default();
467        assert!(coordinator.queen.is_none());
468        assert!(coordinator.workers.is_empty());
469        assert!(!coordinator.is_running());
470    }
471
472    #[test]
473    fn test_config_defaults() {
474        let config = CoordinatorConfig::default();
475        assert_eq!(config.max_concurrent_workers, 5);
476        assert_eq!(config.worker_timeout, 300000);
477        assert!(config.auto_assign_tasks);
478    }
479
480    #[test]
481    fn test_timeline_event() {
482        let mut coordinator = AgentCoordinator::default();
483        coordinator.add_timeline_event(
484            TimelineEventType::TaskStart,
485            "测试事件".to_string(),
486            None,
487            None,
488        );
489
490        assert_eq!(coordinator.get_timeline().len(), 1);
491        assert_eq!(coordinator.get_timeline()[0].description, "测试事件");
492    }
493}