mofa_foundation/secretary/default/
monitor.rs1use super::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::{RwLock, mpsc};
7
8#[derive(Debug, Clone)]
10pub enum MonitorEvent {
11 TaskStarted { task_id: String, agent_id: String },
13 TaskProgress {
15 task_id: String,
16 progress: u32,
17 message: Option<String>,
18 },
19 TaskCompleted {
21 task_id: String,
22 result: ExecutionResult,
23 },
24 TaskFailed { task_id: String, error: String },
26 DecisionRequired { decision: CriticalDecision },
28}
29
30#[derive(Debug, Clone)]
32pub struct TaskSnapshot {
33 pub task_id: String,
35 pub agent_id: String,
37 pub status: TaskExecutionStatus,
39 pub progress: u32,
41 pub last_updated: u64,
43 pub result: Option<ExecutionResult>,
45}
46
47pub struct TaskMonitor {
49 snapshots: Arc<RwLock<HashMap<String, TaskSnapshot>>>,
51 pending_decisions: Arc<RwLock<HashMap<String, CriticalDecision>>>,
53 decision_responses: Arc<RwLock<HashMap<String, mpsc::Sender<HumanResponse>>>>,
55 event_tx: Option<mpsc::Sender<MonitorEvent>>,
57}
58
59impl TaskMonitor {
60 pub fn new() -> Self {
62 Self {
63 snapshots: Arc::new(RwLock::new(HashMap::new())),
64 pending_decisions: Arc::new(RwLock::new(HashMap::new())),
65 decision_responses: Arc::new(RwLock::new(HashMap::new())),
66 event_tx: None,
67 }
68 }
69
70 pub fn with_event_sender(mut self, tx: mpsc::Sender<MonitorEvent>) -> Self {
72 self.event_tx = Some(tx);
73 self
74 }
75
76 pub async fn start_monitoring(&self, task_id: &str, agent_id: &str) {
78 let now = std::time::SystemTime::now()
79 .duration_since(std::time::UNIX_EPOCH)
80 .unwrap_or_default()
81 .as_secs();
82
83 let snapshot = TaskSnapshot {
84 task_id: task_id.to_string(),
85 agent_id: agent_id.to_string(),
86 status: TaskExecutionStatus::Received,
87 progress: 0,
88 last_updated: now,
89 result: None,
90 };
91
92 {
93 let mut snapshots = self.snapshots.write().await;
94 snapshots.insert(task_id.to_string(), snapshot);
95 }
96
97 self.emit_event(MonitorEvent::TaskStarted {
98 task_id: task_id.to_string(),
99 agent_id: agent_id.to_string(),
100 })
101 .await;
102
103 tracing::info!("Started monitoring task {} on agent {}", task_id, agent_id);
104 }
105
106 pub async fn update_task_status(
108 &self,
109 task_id: &str,
110 status: TaskExecutionStatus,
111 progress: u32,
112 message: Option<String>,
113 ) {
114 let now = std::time::SystemTime::now()
115 .duration_since(std::time::UNIX_EPOCH)
116 .unwrap_or_default()
117 .as_secs();
118
119 {
120 let mut snapshots = self.snapshots.write().await;
121 if let Some(snapshot) = snapshots.get_mut(task_id) {
122 snapshot.status = status.clone();
123 snapshot.progress = progress;
124 snapshot.last_updated = now;
125 }
126 }
127
128 self.emit_event(MonitorEvent::TaskProgress {
129 task_id: task_id.to_string(),
130 progress,
131 message,
132 })
133 .await;
134 }
135
136 pub async fn complete_task(&self, task_id: &str, result: ExecutionResult) {
138 let now = std::time::SystemTime::now()
139 .duration_since(std::time::UNIX_EPOCH)
140 .unwrap_or_default()
141 .as_secs();
142
143 {
144 let mut snapshots = self.snapshots.write().await;
145 if let Some(snapshot) = snapshots.get_mut(task_id) {
146 snapshot.status = TaskExecutionStatus::Completed;
147 snapshot.progress = 100;
148 snapshot.last_updated = now;
149 snapshot.result = Some(result.clone());
150 }
151 }
152
153 self.emit_event(MonitorEvent::TaskCompleted {
154 task_id: task_id.to_string(),
155 result,
156 })
157 .await;
158
159 tracing::info!("Task {} completed", task_id);
160 }
161
162 pub async fn fail_task(&self, task_id: &str, error: &str) {
164 let now = std::time::SystemTime::now()
165 .duration_since(std::time::UNIX_EPOCH)
166 .unwrap_or_default()
167 .as_secs();
168
169 {
170 let mut snapshots = self.snapshots.write().await;
171 if let Some(snapshot) = snapshots.get_mut(task_id) {
172 snapshot.status = TaskExecutionStatus::Failed(error.to_string());
173 snapshot.last_updated = now;
174 }
175 }
176
177 self.emit_event(MonitorEvent::TaskFailed {
178 task_id: task_id.to_string(),
179 error: error.to_string(),
180 })
181 .await;
182
183 tracing::warn!("Task {} failed: {}", task_id, error);
184 }
185
186 pub async fn get_task_snapshot(&self, task_id: &str) -> Option<TaskSnapshot> {
188 let snapshots = self.snapshots.read().await;
189 snapshots.get(task_id).cloned()
190 }
191
192 pub async fn get_all_snapshots(&self) -> Vec<TaskSnapshot> {
194 let snapshots = self.snapshots.read().await;
195 snapshots.values().cloned().collect()
196 }
197
198 pub async fn create_decision(
200 &self,
201 todo_id: &str,
202 decision_type: DecisionType,
203 description: &str,
204 options: Vec<DecisionOption>,
205 recommended_option: Option<usize>,
206 deadline: Option<u64>,
207 ) -> CriticalDecision {
208 let now = std::time::SystemTime::now()
209 .duration_since(std::time::UNIX_EPOCH)
210 .unwrap_or_default()
211 .as_secs();
212
213 let decision_id = format!("decision_{}_{}", todo_id, now);
214
215 CriticalDecision {
216 id: decision_id,
217 todo_id: todo_id.to_string(),
218 decision_type,
219 description: description.to_string(),
220 options,
221 recommended_option,
222 deadline,
223 created_at: now,
224 human_response: None,
225 }
226 }
227
228 pub async fn request_decision(
230 &self,
231 decision: CriticalDecision,
232 ) -> anyhow::Result<HumanResponse> {
233 let decision_id = decision.id.clone();
234 let (tx, mut rx) = mpsc::channel(1);
235
236 {
237 let mut pending = self.pending_decisions.write().await;
238 pending.insert(decision_id.clone(), decision.clone());
239
240 let mut responses = self.decision_responses.write().await;
241 responses.insert(decision_id.clone(), tx);
242 }
243
244 self.emit_event(MonitorEvent::DecisionRequired { decision })
245 .await;
246
247 rx.recv()
249 .await
250 .ok_or_else(|| anyhow::anyhow!("Decision channel closed"))
251 }
252
253 pub async fn submit_human_response(
255 &self,
256 decision_id: &str,
257 selected_option: usize,
258 comment: Option<String>,
259 ) -> anyhow::Result<()> {
260 let now = std::time::SystemTime::now()
261 .duration_since(std::time::UNIX_EPOCH)
262 .unwrap_or_default()
263 .as_secs();
264
265 let response = HumanResponse {
266 selected_option,
267 comment,
268 responded_at: now,
269 };
270
271 {
273 let mut pending = self.pending_decisions.write().await;
274 if let Some(decision) = pending.get_mut(decision_id) {
275 decision.human_response = Some(response.clone());
276 }
277 }
278
279 {
281 let mut responses = self.decision_responses.write().await;
282 if let Some(tx) = responses.remove(decision_id) {
283 tx.send(response)
284 .await
285 .map_err(|_| anyhow::anyhow!("Failed to send response"))?;
286 }
287 }
288
289 {
291 let mut pending = self.pending_decisions.write().await;
292 pending.remove(decision_id);
293 }
294
295 tracing::info!("Human response submitted for decision {}", decision_id);
296 Ok(())
297 }
298
299 pub async fn get_pending_decisions(&self) -> Vec<CriticalDecision> {
301 let pending = self.pending_decisions.read().await;
302 pending.values().cloned().collect()
303 }
304
305 pub async fn handle_agent_message(&self, message: SecretaryMessage) -> anyhow::Result<()> {
307 match message {
308 SecretaryMessage::TaskStatusReport {
309 task_id,
310 status,
311 progress,
312 message,
313 } => {
314 self.update_task_status(&task_id, status, progress, message)
315 .await;
316 }
317 SecretaryMessage::TaskCompleteReport { task_id, result } => {
318 self.complete_task(&task_id, result).await;
319 }
320 SecretaryMessage::RequestDecision { decision, .. } => {
321 let mut pending = self.pending_decisions.write().await;
322 pending.insert(decision.id.clone(), decision.clone());
323
324 self.emit_event(MonitorEvent::DecisionRequired { decision })
325 .await;
326 }
327 _ => {}
328 }
329 Ok(())
330 }
331
332 async fn emit_event(&self, event: MonitorEvent) {
334 if let Some(ref tx) = self.event_tx {
335 let _ = tx.send(event).await;
336 }
337 }
338
339 pub async fn get_statistics(&self) -> HashMap<String, usize> {
341 let snapshots = self.snapshots.read().await;
342 let mut stats = HashMap::new();
343
344 stats.insert("total_tasks".to_string(), snapshots.len());
345
346 let completed = snapshots
347 .values()
348 .filter(|s| matches!(s.status, TaskExecutionStatus::Completed))
349 .count();
350 stats.insert("completed_tasks".to_string(), completed);
351
352 let in_progress = snapshots
353 .values()
354 .filter(|s| matches!(s.status, TaskExecutionStatus::Executing))
355 .count();
356 stats.insert("in_progress_tasks".to_string(), in_progress);
357
358 let pending_decisions = self.pending_decisions.read().await;
359 stats.insert("pending_decisions".to_string(), pending_decisions.len());
360
361 stats
362 }
363}
364
365impl Default for TaskMonitor {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[tokio::test]
376 async fn test_start_monitoring() {
377 let monitor = TaskMonitor::new();
378 monitor.start_monitoring("task_1", "agent_1").await;
379
380 let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
381 assert_eq!(snapshot.task_id, "task_1");
382 assert_eq!(snapshot.agent_id, "agent_1");
383 }
384
385 #[tokio::test]
386 async fn test_update_status() {
387 let monitor = TaskMonitor::new();
388 monitor.start_monitoring("task_1", "agent_1").await;
389
390 monitor
391 .update_task_status("task_1", TaskExecutionStatus::Executing, 50, None)
392 .await;
393
394 let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
395 assert_eq!(snapshot.progress, 50);
396 }
397
398 #[tokio::test]
399 async fn test_complete_task() {
400 let monitor = TaskMonitor::new();
401 monitor.start_monitoring("task_1", "agent_1").await;
402
403 let result = ExecutionResult {
404 success: true,
405 summary: "Done".to_string(),
406 details: HashMap::new(),
407 artifacts: vec![],
408 execution_time_ms: 1000,
409 error: None,
410 };
411
412 monitor.complete_task("task_1", result).await;
413
414 let snapshot = monitor.get_task_snapshot("task_1").await.unwrap();
415 assert!(matches!(snapshot.status, TaskExecutionStatus::Completed));
416 assert_eq!(snapshot.progress, 100);
417 }
418}