Skip to main content

task_graph_mcp/tools/
tracking.rs

1//! Live status and tracking tools.
2
3use super::{get_f64, get_i64, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::{Prompts, StatesConfig};
5use crate::db::Database;
6use crate::error::ToolError;
7use crate::format::{markdown_to_json, OutputFormat};
8use anyhow::Result;
9use rmcp::model::Tool;
10use serde_json::{json, Value};
11use std::collections::HashMap;
12
13/// Format a duration in milliseconds to a human-readable string.
14fn format_duration_ms(ms: i64) -> String {
15    if ms < 1000 {
16        format!("{}ms", ms)
17    } else if ms < 60_000 {
18        format!("{:.1}s", ms as f64 / 1000.0)
19    } else if ms < 3_600_000 {
20        let mins = ms / 60_000;
21        let secs = (ms % 60_000) / 1000;
22        format!("{}m {}s", mins, secs)
23    } else {
24        let hours = ms / 3_600_000;
25        let mins = (ms % 3_600_000) / 60_000;
26        format!("{}h {}m", hours, mins)
27    }
28}
29
30/// Format a timestamp (ms since epoch) to ISO-like string.
31fn format_timestamp(ts: i64) -> String {
32    let secs = ts / 1000;
33    let datetime = chrono::DateTime::from_timestamp(secs, 0)
34        .unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap());
35    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
36}
37
38pub fn get_tools(prompts: &Prompts, states_config: &StatesConfig) -> Vec<Tool> {
39    // Build state enum from config
40    let state_names: Vec<&str> = states_config.state_names();
41    let state_enum: Vec<Value> = state_names.iter().map(|s| json!(s)).collect();
42
43    vec![
44        make_tool_with_prompts(
45            "thinking",
46            "Broadcast real-time status updates (what you're doing right now). Also refreshes heartbeat. Call frequently during work to show live progress.",
47            json!({
48                "agent": {
49                    "type": "string",
50                    "description": "Agent ID"
51                },
52                "thought": {
53                    "type": "string",
54                    "description": "What the agent is currently doing"
55                },
56                "tasks": {
57                    "type": "array",
58                    "items": { "type": "string" },
59                    "description": "Specific task IDs to update (default: all claimed tasks)"
60                }
61            }),
62            vec!["agent", "thought"],
63            prompts,
64        ),
65        make_tool_with_prompts(
66            "task_history",
67            "Get the status transition history for a task, including automatic time tracking data and aggregate statistics.",
68            json!({
69                "task": {
70                    "type": "string",
71                    "description": "Task ID"
72                },
73                "states": {
74                    "type": "array",
75                    "items": { "type": "string", "enum": state_enum },
76                    "description": "Filter to only show transitions involving these statuses"
77                }
78            }),
79            vec!["task"],
80            prompts,
81        ),
82        make_tool_with_prompts(
83            "log_metrics",
84            "Log metrics and cost for a task. Values are aggregated (added to existing).",
85            json!({
86                "agent": {
87                    "type": "string",
88                    "description": "Agent ID"
89                },
90                "task": {
91                    "type": "string",
92                    "description": "Task ID"
93                },
94                "cost_usd": {
95                    "type": "number",
96                    "description": "Cost in USD to add"
97                },
98                "values": {
99                    "type": "array",
100                    "items": { "type": "integer" },
101                    "description": "Array of up to 8 integer metric values [metric_0..metric_7] to aggregate"
102                }
103            }),
104            vec!["agent", "task"],
105            prompts,
106        ),
107        make_tool_with_prompts(
108            "project_history",
109            "Get project-wide status transition history and aggregate statistics. Like task_history but across all tasks with date/time range filters.",
110            json!({
111                "from": {
112                    "type": "string",
113                    "description": "Start of time range (ISO 8601 datetime or milliseconds since epoch)"
114                },
115                "to": {
116                    "type": "string",
117                    "description": "End of time range (ISO 8601 datetime or milliseconds since epoch)"
118                },
119                "states": {
120                    "type": "array",
121                    "items": { "type": "string", "enum": state_enum },
122                    "description": "Filter to only show transitions involving these statuses"
123                },
124                "limit": {
125                    "type": "integer",
126                    "description": "Maximum number of transitions to return (default: 100)"
127                }
128            }),
129            vec![],
130            prompts,
131        ),
132        make_tool_with_prompts(
133            "get_metrics",
134            "Get metrics and cost for one or more tasks. Returns cost_usd and metrics array, aggregated across all tasks if multiple provided.",
135            json!({
136                "task": {
137                    "oneOf": [
138                        { "type": "string", "description": "Single task ID" },
139                        { "type": "array", "items": { "type": "string" }, "description": "Array of task IDs" }
140                    ],
141                    "description": "Task ID or array of task IDs to get metrics for"
142                }
143            }),
144            vec!["task"],
145            prompts,
146        ),
147    ]
148}
149
150pub fn thinking(db: &Database, args: Value) -> Result<Value> {
151    let agent_id = get_string(&args, "agent")
152        .ok_or_else(|| ToolError::missing_field("agent"))?;
153    let thought = get_string(&args, "thought")
154        .ok_or_else(|| ToolError::missing_field("thought"))?;
155    let task_ids = get_string_array(&args, "tasks");
156
157    // Also refresh heartbeat since updating thought implies activity
158    let _ = db.heartbeat(&agent_id);
159
160    let updated = db.set_thought(&agent_id, Some(thought), task_ids)?;
161
162    Ok(json!({
163        "success": true,
164        "updated_count": updated
165    }))
166}
167
168pub fn task_history(db: &Database, states_config: &StatesConfig, default_format: OutputFormat, args: Value) -> Result<Value> {
169    let task_id = get_string(&args, "task")
170        .ok_or_else(|| ToolError::missing_field("task"))?;
171    let state_filter = get_string_array(&args, "states");
172    let format = get_string(&args, "format")
173        .and_then(|s| OutputFormat::from_str(&s))
174        .unwrap_or(default_format);
175
176    let history = db.get_task_state_history(&task_id)?;
177    let current_duration = db.get_current_state_duration(&task_id, states_config)?;
178
179    // Filter history by statuses if specified
180    let filtered_history: Vec<_> = if let Some(ref states) = state_filter {
181        history
182            .into_iter()
183            .filter(|e| states.contains(&e.event))
184            .collect()
185    } else {
186        history
187    };
188
189    // Calculate aggregate stats
190    let mut time_per_status: HashMap<String, i64> = HashMap::new();
191    let mut time_per_agent: HashMap<String, i64> = HashMap::new();
192
193    for event in &filtered_history {
194        if let Some(end_ts) = event.end_timestamp {
195            let duration = end_ts - event.timestamp;
196            *time_per_status.entry(event.event.clone()).or_insert(0) += duration;
197            if let Some(ref agent) = event.worker_id {
198                *time_per_agent.entry(agent.clone()).or_insert(0) += duration;
199            }
200        }
201    }
202
203    // Add current duration to the current state if applicable
204    if let Some(current_dur) = current_duration {
205        if let Some(last_event) = filtered_history.last() {
206            if last_event.end_timestamp.is_none() {
207                // Include in state filter check
208                if state_filter.is_none() || state_filter.as_ref().unwrap().contains(&last_event.event) {
209                    *time_per_status.entry(last_event.event.clone()).or_insert(0) += current_dur;
210                    if let Some(ref agent) = last_event.worker_id {
211                        *time_per_agent.entry(agent.clone()).or_insert(0) += current_dur;
212                    }
213                }
214            }
215        }
216    }
217
218    match format {
219        OutputFormat::Markdown => {
220            let mut md = String::from("# Task History\n\n");
221
222            // History table
223            md.push_str("## Status Transitions\n\n");
224            if filtered_history.is_empty() {
225                md.push_str("No status transitions found.\n");
226            } else {
227                md.push_str("| # | Status | Agent | Timestamp | Duration |\n");
228                md.push_str("|---|-------|-------|-----------|----------|\n");
229                for (i, event) in filtered_history.iter().enumerate() {
230                    let duration = if let Some(end_ts) = event.end_timestamp {
231                        format_duration_ms(end_ts - event.timestamp)
232                    } else if let Some(dur) = current_duration {
233                        format!("{} (ongoing)", format_duration_ms(dur))
234                    } else {
235                        "ongoing".to_string()
236                    };
237                    let agent = event.worker_id.as_deref().unwrap_or("-");
238                    md.push_str(&format!(
239                        "| {} | {} | {} | {} | {} |\n",
240                        i + 1,
241                        event.event,
242                        agent,
243                        format_timestamp(event.timestamp),
244                        duration
245                    ));
246                }
247            }
248
249            // Aggregate stats
250            md.push_str("\n## Time per Status\n\n");
251            if time_per_status.is_empty() {
252                md.push_str("No completed status durations.\n");
253            } else {
254                md.push_str("| Status | Total Time |\n");
255                md.push_str("|--------|------------|\n");
256                let mut sorted_statuses: Vec<_> = time_per_status.iter().collect();
257                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
258                for (status, time) in sorted_statuses {
259                    md.push_str(&format!("| {} | {} |\n", status, format_duration_ms(*time)));
260                }
261            }
262
263            md.push_str("\n## Time per Agent\n\n");
264            if time_per_agent.is_empty() {
265                md.push_str("No agent time tracked.\n");
266            } else {
267                md.push_str("| Agent | Total Time |\n");
268                md.push_str("|-------|------------|\n");
269                let mut sorted_agents: Vec<_> = time_per_agent.iter().collect();
270                sorted_agents.sort_by_key(|(k, _)| k.as_str());
271                for (agent, time) in sorted_agents {
272                    md.push_str(&format!("| {} | {} |\n", agent, format_duration_ms(*time)));
273                }
274            }
275
276            Ok(markdown_to_json(md))
277        }
278        OutputFormat::Json => {
279            Ok(json!({
280                "history": filtered_history,
281                "current_duration_ms": current_duration,
282                "time_per_status_ms": time_per_status,
283                "time_per_agent_ms": time_per_agent
284            }))
285        }
286    }
287}
288
289pub fn log_metrics(db: &Database, args: Value) -> Result<Value> {
290    let task_id = get_string(&args, "task")
291        .ok_or_else(|| ToolError::missing_field("task"))?;
292
293    let cost_usd = get_f64(&args, "cost_usd");
294
295    // Parse values array
296    let values: Vec<i64> = args
297        .get("values")
298        .and_then(|v| v.as_array())
299        .map(|arr| {
300            arr.iter()
301                .filter_map(|v| v.as_i64())
302                .collect()
303        })
304        .unwrap_or_default();
305
306    let task = db.log_metrics(
307        &task_id,
308        cost_usd,
309        &values,
310    )?;
311
312    Ok(json!({
313        "success": true,
314        "cost_usd": task.cost_usd,
315        "metrics": task.metrics
316    }))
317}
318
319/// Parse a timestamp from either ISO 8601 string or milliseconds.
320fn parse_timestamp(s: &str) -> Option<i64> {
321    // Try parsing as milliseconds first
322    if let Ok(ms) = s.parse::<i64>() {
323        return Some(ms);
324    }
325
326    // Try parsing as ISO 8601 datetime
327    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
328        return Some(dt.timestamp_millis());
329    }
330
331    // Try parsing common datetime formats
332    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
333        return Some(dt.and_utc().timestamp_millis());
334    }
335
336    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
337        return Some(dt.and_utc().timestamp_millis());
338    }
339
340    // Try parsing date only
341    if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
342        return Some(d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis());
343    }
344
345    None
346}
347
348pub fn project_history(db: &Database, default_format: OutputFormat, args: Value) -> Result<Value> {
349    let from_timestamp = get_string(&args, "from").and_then(|s| parse_timestamp(&s));
350    let to_timestamp = get_string(&args, "to").and_then(|s| parse_timestamp(&s));
351    let state_filter = get_string_array(&args, "states");
352    let limit = get_i64(&args, "limit").or(Some(100));
353    let format = get_string(&args, "format")
354        .and_then(|s| OutputFormat::from_str(&s))
355        .unwrap_or(default_format);
356
357    // Get transitions
358    let history = db.get_project_state_history(
359        from_timestamp,
360        to_timestamp,
361        state_filter.as_deref(),
362        limit,
363    )?;
364
365    // Get aggregate stats
366    let stats = db.get_project_state_stats(from_timestamp, to_timestamp)?;
367
368    match format {
369        OutputFormat::Markdown => {
370            let mut md = String::from("# Project History\n\n");
371
372            // Time range info
373            md.push_str("## Time Range\n\n");
374            let from_str = from_timestamp
375                .map(|ts| format_timestamp(ts))
376                .unwrap_or_else(|| "beginning".to_string());
377            let to_str = to_timestamp
378                .map(|ts| format_timestamp(ts))
379                .unwrap_or_else(|| "now".to_string());
380            md.push_str(&format!("**From:** {} **To:** {}\n\n", from_str, to_str));
381
382            // Summary stats
383            md.push_str("## Summary\n\n");
384            md.push_str(&format!("- **Total Transitions:** {}\n", stats.total_transitions));
385            md.push_str(&format!("- **Tasks Affected:** {}\n", stats.tasks_affected));
386            md.push_str(&format!("- **Total Time Tracked:** {}\n\n", format_duration_ms(stats.total_time_ms)));
387
388            // Recent transitions table
389            md.push_str("## Recent Transitions\n\n");
390            if history.is_empty() {
391                md.push_str("No status transitions found.\n");
392            } else {
393                md.push_str("| # | Task | Status | Agent | Timestamp | Duration |\n");
394                md.push_str("|---|------|-------|-------|-----------|----------|\n");
395                for (i, event) in history.iter().enumerate() {
396                    let duration = if let Some(end_ts) = event.end_timestamp {
397                        format_duration_ms(end_ts - event.timestamp)
398                    } else {
399                        "ongoing".to_string()
400                    };
401                    let agent = event.worker_id.as_deref().unwrap_or("-");
402                    let short_task = if event.task_id.len() > 12 {
403                        format!("{}...", &event.task_id[..12])
404                    } else {
405                        event.task_id.clone()
406                    };
407                    md.push_str(&format!(
408                        "| {} | {} | {} | {} | {} | {} |\n",
409                        i + 1,
410                        short_task,
411                        event.event,
412                        agent,
413                        format_timestamp(event.timestamp),
414                        duration
415                    ));
416                }
417            }
418
419            // Transitions by status
420            md.push_str("\n## Transitions by Status\n\n");
421            if stats.transitions_by_status.is_empty() {
422                md.push_str("No transitions found.\n");
423            } else {
424                md.push_str("| Status | Count | Total Time |\n");
425                md.push_str("|-------|-------|------------|\n");
426                let mut sorted_statuses: Vec<_> = stats.transitions_by_status.iter().collect();
427                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
428                for (status, count) in sorted_statuses {
429                    let time = stats.time_by_status_ms.get(status).copied().unwrap_or(0);
430                    md.push_str(&format!("| {} | {} | {} |\n", status, count, format_duration_ms(time)));
431                }
432            }
433
434            // Transitions by agent
435            md.push_str("\n## Transitions by Agent\n\n");
436            if stats.transitions_by_agent.is_empty() {
437                md.push_str("No agent activity tracked.\n");
438            } else {
439                md.push_str("| Agent | Count | Total Time |\n");
440                md.push_str("|-------|-------|------------|\n");
441                let mut sorted_agents: Vec<_> = stats.transitions_by_agent.iter().collect();
442                sorted_agents.sort_by(|(_, a), (_, b)| b.cmp(a)); // Sort by count descending
443                for (agent, count) in sorted_agents {
444                    let time = stats.time_by_agent_ms.get(agent).copied().unwrap_or(0);
445                    md.push_str(&format!("| {} | {} | {} |\n", agent, count, format_duration_ms(time)));
446                }
447            }
448
449            Ok(markdown_to_json(md))
450        }
451        OutputFormat::Json => {
452            Ok(json!({
453                "time_range": {
454                    "from_ms": from_timestamp,
455                    "to_ms": to_timestamp
456                },
457                "summary": {
458                    "total_transitions": stats.total_transitions,
459                    "tasks_affected": stats.tasks_affected,
460                    "total_time_ms": stats.total_time_ms
461                },
462                "transitions": history,
463                "transitions_by_status": stats.transitions_by_status,
464                "time_by_status_ms": stats.time_by_status_ms,
465                "transitions_by_agent": stats.transitions_by_agent,
466                "time_by_agent_ms": stats.time_by_agent_ms
467            }))
468        }
469    }
470}
471
472pub fn get_metrics(db: &Database, args: Value) -> Result<Value> {
473    use super::get_string_or_array;
474
475    let task_ids = get_string_or_array(&args, "task")
476        .ok_or_else(|| ToolError::missing_field("task"))?;
477
478    if task_ids.is_empty() {
479        return Err(ToolError::missing_field("task").into());
480    }
481
482    // Get metrics for all specified tasks
483    let mut total_cost_usd: f64 = 0.0;
484    let mut total_metrics: [i64; 8] = [0; 8];
485    let mut found_count = 0;
486
487    for task_id in &task_ids {
488        if let Some(task) = db.get_task(task_id)? {
489            total_cost_usd += task.cost_usd;
490            for i in 0..8 {
491                total_metrics[i] += task.metrics[i];
492            }
493            found_count += 1;
494        }
495    }
496
497    if found_count == 0 {
498        return Err(anyhow::anyhow!("No tasks found with the provided IDs").into());
499    }
500
501    let response = if task_ids.len() == 1 {
502        // Single task - return flat response
503        json!({
504            "task": task_ids[0],
505            "cost_usd": total_cost_usd,
506            "metrics": total_metrics
507        })
508    } else {
509        // Multiple tasks - return aggregated response
510        json!({
511            "tasks": task_ids,
512            "task_count": found_count,
513            "cost_usd": total_cost_usd,
514            "metrics": total_metrics
515        })
516    };
517
518    Ok(response)
519}