Skip to main content

mofa_foundation/secretary/default/
monitor.rs

1//! 任务监控器 - 阶段4: 监控反馈,推送关键决策给人类
2
3use super::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::{RwLock, mpsc};
7
8/// 监控事件
9#[derive(Debug, Clone)]
10pub enum MonitorEvent {
11    /// 任务开始
12    TaskStarted { task_id: String, agent_id: String },
13    /// 任务进度更新
14    TaskProgress {
15        task_id: String,
16        progress: u32,
17        message: Option<String>,
18    },
19    /// 任务完成
20    TaskCompleted {
21        task_id: String,
22        result: ExecutionResult,
23    },
24    /// 任务失败
25    TaskFailed { task_id: String, error: String },
26    /// 需要决策
27    DecisionRequired { decision: CriticalDecision },
28}
29
30/// 任务快照
31#[derive(Debug, Clone)]
32pub struct TaskSnapshot {
33    /// 任务ID
34    pub task_id: String,
35    /// 执行Agent ID
36    pub agent_id: String,
37    /// 当前状态
38    pub status: TaskExecutionStatus,
39    /// 进度(0-100)
40    pub progress: u32,
41    /// 最后更新时间
42    pub last_updated: u64,
43    /// 执行结果(如果已完成)
44    pub result: Option<ExecutionResult>,
45}
46
47/// 任务监控器
48pub struct TaskMonitor {
49    /// 任务快照
50    snapshots: Arc<RwLock<HashMap<String, TaskSnapshot>>>,
51    /// 待处理的决策
52    pending_decisions: Arc<RwLock<HashMap<String, CriticalDecision>>>,
53    /// 决策响应通道
54    decision_responses: Arc<RwLock<HashMap<String, mpsc::Sender<HumanResponse>>>>,
55    /// 事件发送器
56    event_tx: Option<mpsc::Sender<MonitorEvent>>,
57}
58
59impl TaskMonitor {
60    /// 创建新的任务监控器
61    pub fn new() -> Self {
62        Self {
63            snapshots: Arc::new(RwLock::new(HashMap::new())),
64            pending_decisions: Arc::new(RwLock::new(HashMap::new())),
65            decision_responses: Arc::new(RwLock::new(HashMap::new())),
66            event_tx: None,
67        }
68    }
69
70    /// 设置事件发送器
71    pub fn with_event_sender(mut self, tx: mpsc::Sender<MonitorEvent>) -> Self {
72        self.event_tx = Some(tx);
73        self
74    }
75
76    /// 开始监控任务
77    pub async fn start_monitoring(&self, task_id: &str, agent_id: &str) {
78        let now = std::time::SystemTime::now()
79            .duration_since(std::time::UNIX_EPOCH)
80            .unwrap_or_default()
81            .as_secs();
82
83        let snapshot = TaskSnapshot {
84            task_id: task_id.to_string(),
85            agent_id: agent_id.to_string(),
86            status: TaskExecutionStatus::Received,
87            progress: 0,
88            last_updated: now,
89            result: None,
90        };
91
92        {
93            let mut snapshots = self.snapshots.write().await;
94            snapshots.insert(task_id.to_string(), snapshot);
95        }
96
97        self.emit_event(MonitorEvent::TaskStarted {
98            task_id: task_id.to_string(),
99            agent_id: agent_id.to_string(),
100        })
101        .await;
102
103        tracing::info!("Started monitoring task {} on agent {}", task_id, agent_id);
104    }
105
106    /// 更新任务状态
107    pub async fn update_task_status(
108        &self,
109        task_id: &str,
110        status: TaskExecutionStatus,
111        progress: u32,
112        message: Option<String>,
113    ) {
114        let now = std::time::SystemTime::now()
115            .duration_since(std::time::UNIX_EPOCH)
116            .unwrap_or_default()
117            .as_secs();
118
119        {
120            let mut snapshots = self.snapshots.write().await;
121            if let Some(snapshot) = snapshots.get_mut(task_id) {
122                snapshot.status = status.clone();
123                snapshot.progress = progress;
124                snapshot.last_updated = now;
125            }
126        }
127
128        self.emit_event(MonitorEvent::TaskProgress {
129            task_id: task_id.to_string(),
130            progress,
131            message,
132        })
133        .await;
134    }
135
136    /// 任务完成
137    pub async fn complete_task(&self, task_id: &str, result: ExecutionResult) {
138        let now = std::time::SystemTime::now()
139            .duration_since(std::time::UNIX_EPOCH)
140            .unwrap_or_default()
141            .as_secs();
142
143        {
144            let mut snapshots = self.snapshots.write().await;
145            if let Some(snapshot) = snapshots.get_mut(task_id) {
146                snapshot.status = TaskExecutionStatus::Completed;
147                snapshot.progress = 100;
148                snapshot.last_updated = now;
149                snapshot.result = Some(result.clone());
150            }
151        }
152
153        self.emit_event(MonitorEvent::TaskCompleted {
154            task_id: task_id.to_string(),
155            result,
156        })
157        .await;
158
159        tracing::info!("Task {} completed", task_id);
160    }
161
162    /// 任务失败
163    pub async fn fail_task(&self, task_id: &str, error: &str) {
164        let now = std::time::SystemTime::now()
165            .duration_since(std::time::UNIX_EPOCH)
166            .unwrap_or_default()
167            .as_secs();
168
169        {
170            let mut snapshots = self.snapshots.write().await;
171            if let Some(snapshot) = snapshots.get_mut(task_id) {
172                snapshot.status = TaskExecutionStatus::Failed(error.to_string());
173                snapshot.last_updated = now;
174            }
175        }
176
177        self.emit_event(MonitorEvent::TaskFailed {
178            task_id: task_id.to_string(),
179            error: error.to_string(),
180        })
181        .await;
182
183        tracing::warn!("Task {} failed: {}", task_id, error);
184    }
185
186    /// 获取任务快照
187    pub async fn get_task_snapshot(&self, task_id: &str) -> Option<TaskSnapshot> {
188        let snapshots = self.snapshots.read().await;
189        snapshots.get(task_id).cloned()
190    }
191
192    /// 获取所有任务快照
193    pub async fn get_all_snapshots(&self) -> Vec<TaskSnapshot> {
194        let snapshots = self.snapshots.read().await;
195        snapshots.values().cloned().collect()
196    }
197
198    /// 创建决策请求
199    pub async fn create_decision(
200        &self,
201        todo_id: &str,
202        decision_type: DecisionType,
203        description: &str,
204        options: Vec<DecisionOption>,
205        recommended_option: Option<usize>,
206        deadline: Option<u64>,
207    ) -> CriticalDecision {
208        let now = std::time::SystemTime::now()
209            .duration_since(std::time::UNIX_EPOCH)
210            .unwrap_or_default()
211            .as_secs();
212
213        let decision_id = format!("decision_{}_{}", todo_id, now);
214
215        CriticalDecision {
216            id: decision_id,
217            todo_id: todo_id.to_string(),
218            decision_type,
219            description: description.to_string(),
220            options,
221            recommended_option,
222            deadline,
223            created_at: now,
224            human_response: None,
225        }
226    }
227
228    /// 请求人类决策
229    pub async fn request_decision(
230        &self,
231        decision: CriticalDecision,
232    ) -> anyhow::Result<HumanResponse> {
233        let decision_id = decision.id.clone();
234        let (tx, mut rx) = mpsc::channel(1);
235
236        {
237            let mut pending = self.pending_decisions.write().await;
238            pending.insert(decision_id.clone(), decision.clone());
239
240            let mut responses = self.decision_responses.write().await;
241            responses.insert(decision_id.clone(), tx);
242        }
243
244        self.emit_event(MonitorEvent::DecisionRequired { decision })
245            .await;
246
247        // 等待人类响应
248        rx.recv()
249            .await
250            .ok_or_else(|| anyhow::anyhow!("Decision channel closed"))
251    }
252
253    /// 提交人类响应
254    pub async fn submit_human_response(
255        &self,
256        decision_id: &str,
257        selected_option: usize,
258        comment: Option<String>,
259    ) -> anyhow::Result<()> {
260        let now = std::time::SystemTime::now()
261            .duration_since(std::time::UNIX_EPOCH)
262            .unwrap_or_default()
263            .as_secs();
264
265        let response = HumanResponse {
266            selected_option,
267            comment,
268            responded_at: now,
269        };
270
271        // 更新决策
272        {
273            let mut pending = self.pending_decisions.write().await;
274            if let Some(decision) = pending.get_mut(decision_id) {
275                decision.human_response = Some(response.clone());
276            }
277        }
278
279        // 发送响应
280        {
281            let mut responses = self.decision_responses.write().await;
282            if let Some(tx) = responses.remove(decision_id) {
283                tx.send(response)
284                    .await
285                    .map_err(|_| anyhow::anyhow!("Failed to send response"))?;
286            }
287        }
288
289        // 清理
290        {
291            let mut pending = self.pending_decisions.write().await;
292            pending.remove(decision_id);
293        }
294
295        tracing::info!("Human response submitted for decision {}", decision_id);
296        Ok(())
297    }
298
299    /// 获取待处理的决策
300    pub async fn get_pending_decisions(&self) -> Vec<CriticalDecision> {
301        let pending = self.pending_decisions.read().await;
302        pending.values().cloned().collect()
303    }
304
305    /// 处理来自执行Agent的消息
306    pub async fn handle_agent_message(&self, message: SecretaryMessage) -> anyhow::Result<()> {
307        match message {
308            SecretaryMessage::TaskStatusReport {
309                task_id,
310                status,
311                progress,
312                message,
313            } => {
314                self.update_task_status(&task_id, status, progress, message)
315                    .await;
316            }
317            SecretaryMessage::TaskCompleteReport { task_id, result } => {
318                self.complete_task(&task_id, result).await;
319            }
320            SecretaryMessage::RequestDecision { decision, .. } => {
321                let mut pending = self.pending_decisions.write().await;
322                pending.insert(decision.id.clone(), decision.clone());
323
324                self.emit_event(MonitorEvent::DecisionRequired { decision })
325                    .await;
326            }
327            _ => {}
328        }
329        Ok(())
330    }
331
332    /// 发送监控事件
333    async fn emit_event(&self, event: MonitorEvent) {
334        if let Some(ref tx) = self.event_tx {
335            let _ = tx.send(event).await;
336        }
337    }
338
339    /// 获取统计信息
340    pub async fn get_statistics(&self) -> HashMap<String, usize> {
341        let snapshots = self.snapshots.read().await;
342        let mut stats = HashMap::new();
343
344        stats.insert("total_tasks".to_string(), snapshots.len());
345
346        let completed = snapshots
347            .values()
348            .filter(|s| matches!(s.status, TaskExecutionStatus::Completed))
349            .count();
350        stats.insert("completed_tasks".to_string(), completed);
351
352        let in_progress = snapshots
353            .values()
354            .filter(|s| matches!(s.status, TaskExecutionStatus::Executing))
355            .count();
356        stats.insert("in_progress_tasks".to_string(), in_progress);
357
358        let pending_decisions = self.pending_decisions.read().await;
359        stats.insert("pending_decisions".to_string(), pending_decisions.len());
360
361        stats
362    }
363}
364
365impl Default for TaskMonitor {
366    fn default() -> Self {
367        Self::new()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[tokio::test]
376    async fn test_start_monitoring() {
377        let monitor = TaskMonitor::new();
378        monitor.start_monitoring("task_1", "agent_1").await;
379
380        let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
381        assert_eq!(snapshot.task_id, "task_1");
382        assert_eq!(snapshot.agent_id, "agent_1");
383    }
384
385    #[tokio::test]
386    async fn test_update_status() {
387        let monitor = TaskMonitor::new();
388        monitor.start_monitoring("task_1", "agent_1").await;
389
390        monitor
391            .update_task_status("task_1", TaskExecutionStatus::Executing, 50, None)
392            .await;
393
394        let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
395        assert_eq!(snapshot.progress, 50);
396    }
397
398    #[tokio::test]
399    async fn test_complete_task() {
400        let monitor = TaskMonitor::new();
401        monitor.start_monitoring("task_1", "agent_1").await;
402
403        let result = ExecutionResult {
404            success: true,
405            summary: "Done".to_string(),
406            details: HashMap::new(),
407            artifacts: vec![],
408            execution_time_ms: 1000,
409            error: None,
410        };
411
412        monitor.complete_task("task_1", result).await;
413
414        let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
415        assert!(matches!(snapshot.status, TaskExecutionStatus::Completed));
416        assert_eq!(snapshot.progress, 100);
417    }
418}