Skip to main content

task_graph_mcp/tools/
tracking.rs

1//! Live status and tracking tools.
2
3use super::{
4    get_f64, get_i64, get_string, get_string_array, get_string_or_array, make_tool_with_prompts,
5};
6use crate::config::{Prompts, StatesConfig};
7use crate::db::Database;
8use crate::error::ToolError;
9use crate::format::{OutputFormat, ToolResult};
10use anyhow::Result;
11use rmcp::model::Tool;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14
15/// Format a duration in milliseconds to a human-readable string.
16fn format_duration_ms(ms: i64) -> String {
17    if ms < 1000 {
18        format!("{}ms", ms)
19    } else if ms < 60_000 {
20        format!("{:.1}s", ms as f64 / 1000.0)
21    } else if ms < 3_600_000 {
22        let mins = ms / 60_000;
23        let secs = (ms % 60_000) / 1000;
24        format!("{}m {}s", mins, secs)
25    } else {
26        let hours = ms / 3_600_000;
27        let mins = (ms % 3_600_000) / 60_000;
28        format!("{}h {}m", hours, mins)
29    }
30}
31
32/// Format a timestamp (ms since epoch) to ISO 8601 string.
33fn format_timestamp(ts: i64) -> String {
34    crate::types::ms_to_iso(ts)
35}
36
37pub fn get_tools(prompts: &Prompts, states_config: &StatesConfig) -> Vec<Tool> {
38    // Build state enum from config
39    let state_names: Vec<&str> = states_config.state_names();
40    let state_enum: Vec<Value> = state_names.iter().map(|s| json!(s)).collect();
41
42    vec![
43        make_tool_with_prompts(
44            "thinking",
45            "Broadcast real-time status updates (what you're doing right now). Also refreshes heartbeat. Call frequently during work to show live progress.",
46            json!({
47                "agent": {
48                    "type": "string",
49                    "description": "Agent ID"
50                },
51                "thought": {
52                    "type": "string",
53                    "description": "What the agent is currently doing"
54                },
55                "tasks": {
56                    "type": "array",
57                    "items": { "type": "string" },
58                    "description": "Specific task IDs to update (default: all claimed tasks)"
59                }
60            }),
61            vec!["agent", "thought"],
62            prompts,
63        ),
64        make_tool_with_prompts(
65            "task_history",
66            "Get the status transition history for a task, including automatic time tracking data and aggregate statistics.",
67            json!({
68                "task": {
69                    "type": "string",
70                    "description": "Task ID"
71                },
72                "states": {
73                    "type": "array",
74                    "items": { "type": "string", "enum": state_enum },
75                    "description": "Filter to only show transitions involving these statuses"
76                }
77            }),
78            vec!["task"],
79            prompts,
80        ),
81        make_tool_with_prompts(
82            "log_metrics",
83            "Log metrics and cost for a task. Values are aggregated (added to existing).",
84            json!({
85                "agent": {
86                    "type": "string",
87                    "description": "Agent ID"
88                },
89                "task": {
90                    "type": "string",
91                    "description": "Task ID"
92                },
93                "cost_usd": {
94                    "type": "number",
95                    "description": "Cost in USD to add"
96                },
97                "values": {
98                    "type": "array",
99                    "items": { "type": "integer" },
100                    "description": "Array of up to 8 integer metric values [metric_0..metric_7] to aggregate"
101                }
102            }),
103            vec!["agent", "task"],
104            prompts,
105        ),
106        make_tool_with_prompts(
107            "project_history",
108            "Get project-wide status transition history and aggregate statistics. Like task_history but across all tasks with date/time range filters.",
109            json!({
110                "from": {
111                    "type": "string",
112                    "description": "Start of time range (ISO 8601 datetime or milliseconds since epoch)"
113                },
114                "to": {
115                    "type": "string",
116                    "description": "End of time range (ISO 8601 datetime or milliseconds since epoch)"
117                },
118                "states": {
119                    "type": "array",
120                    "items": { "type": "string", "enum": state_enum },
121                    "description": "Filter to only show transitions involving these statuses"
122                },
123                "limit": {
124                    "type": "integer",
125                    "description": "Maximum number of transitions to return (default: 100)"
126                }
127            }),
128            vec![],
129            prompts,
130        ),
131        make_tool_with_prompts(
132            "get_metrics",
133            "Get metrics and cost for one or more tasks. Returns cost_usd and metrics array, aggregated across all tasks if multiple provided.",
134            json!({
135                "task": {
136                    "oneOf": [
137                        { "type": "string", "description": "Single task ID" },
138                        { "type": "array", "items": { "type": "string" }, "description": "Array of task IDs" }
139                    ],
140                    "description": "Task ID or array of task IDs to get metrics for"
141                }
142            }),
143            vec!["task"],
144            prompts,
145        ),
146    ]
147}
148
149pub fn thinking(
150    db: &Database,
151    states_config: &crate::config::StatesConfig,
152    args: Value,
153) -> Result<Value> {
154    let agent_id = get_string(&args, "agent").ok_or_else(|| ToolError::missing_field("agent"))?;
155    let thought =
156        get_string(&args, "thought").ok_or_else(|| ToolError::missing_field("thought"))?;
157    let task_ids = get_string_or_array(&args, "tasks");
158
159    // Also refresh heartbeat since updating thought implies activity
160    let _ = db.heartbeat(&agent_id, states_config);
161
162    let updated = db.set_thought(&agent_id, Some(thought), task_ids)?;
163
164    Ok(json!({
165        "success": true,
166        "updated_count": updated
167    }))
168}
169
170pub fn task_history(
171    db: &Database,
172    states_config: &StatesConfig,
173    default_format: OutputFormat,
174    args: Value,
175) -> Result<ToolResult> {
176    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
177    let state_filter = get_string_array(&args, "states");
178    let format = get_string(&args, "format")
179        .and_then(|s| OutputFormat::parse(&s))
180        .unwrap_or(default_format);
181
182    let history = db.get_task_state_history(&task_id)?;
183    let current_duration = db.get_current_state_duration(&task_id, states_config)?;
184
185    // Filter history by statuses if specified
186    let filtered_history: Vec<_> = if let Some(ref states) = state_filter {
187        history
188            .into_iter()
189            .filter(|e| e.status.as_ref().is_some_and(|s| states.contains(s)))
190            .collect()
191    } else {
192        history
193    };
194
195    // Calculate aggregate stats
196    let mut time_per_status: HashMap<String, i64> = HashMap::new();
197    let mut time_per_agent: HashMap<String, i64> = HashMap::new();
198
199    for event in &filtered_history {
200        if let Some(end_ts) = event.end_timestamp {
201            let duration = end_ts - event.timestamp;
202            if let Some(ref status) = event.status {
203                *time_per_status.entry(status.clone()).or_insert(0) += duration;
204            }
205            if let Some(ref agent) = event.worker_id {
206                *time_per_agent.entry(agent.clone()).or_insert(0) += duration;
207            }
208        }
209    }
210
211    // Add current duration to the current state if applicable
212    if let Some(current_dur) = current_duration
213        && let Some(last_event) = filtered_history.last()
214        && last_event.end_timestamp.is_none()
215    {
216        // Include in state filter check
217        if let Some(ref status) = last_event.status
218            && (state_filter.is_none() || state_filter.as_ref().unwrap().contains(status))
219        {
220            *time_per_status.entry(status.clone()).or_insert(0) += current_dur;
221            if let Some(ref agent) = last_event.worker_id {
222                *time_per_agent.entry(agent.clone()).or_insert(0) += current_dur;
223            }
224        }
225    }
226
227    match format {
228        OutputFormat::Markdown => {
229            let mut md = String::from("# Task History\n\n");
230
231            // History table
232            md.push_str("## Status Transitions\n\n");
233            if filtered_history.is_empty() {
234                md.push_str("No status transitions found.\n");
235            } else {
236                md.push_str("| # | Status | Agent | Timestamp | Duration |\n");
237                md.push_str("|---|-------|-------|-----------|----------|\n");
238                for (i, event) in filtered_history.iter().enumerate() {
239                    let duration = if let Some(end_ts) = event.end_timestamp {
240                        format_duration_ms(end_ts - event.timestamp)
241                    } else if let Some(dur) = current_duration {
242                        format!("{} (ongoing)", format_duration_ms(dur))
243                    } else {
244                        "ongoing".to_string()
245                    };
246                    let agent = event.worker_id.as_deref().unwrap_or("-");
247                    let status = event.status.as_deref().unwrap_or("-");
248                    md.push_str(&format!(
249                        "| {} | {} | {} | {} | {} |\n",
250                        i + 1,
251                        status,
252                        agent,
253                        format_timestamp(event.timestamp),
254                        duration
255                    ));
256                }
257            }
258
259            // Aggregate stats
260            md.push_str("\n## Time per Status\n\n");
261            if time_per_status.is_empty() {
262                md.push_str("No completed status durations.\n");
263            } else {
264                md.push_str("| Status | Total Time |\n");
265                md.push_str("|--------|------------|\n");
266                let mut sorted_statuses: Vec<_> = time_per_status.iter().collect();
267                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
268                for (status, time) in sorted_statuses {
269                    md.push_str(&format!("| {} | {} |\n", status, format_duration_ms(*time)));
270                }
271            }
272
273            md.push_str("\n## Time per Agent\n\n");
274            if time_per_agent.is_empty() {
275                md.push_str("No agent time tracked.\n");
276            } else {
277                md.push_str("| Agent | Total Time |\n");
278                md.push_str("|-------|------------|\n");
279                let mut sorted_agents: Vec<_> = time_per_agent.iter().collect();
280                sorted_agents.sort_by_key(|(k, _)| k.as_str());
281                for (agent, time) in sorted_agents {
282                    md.push_str(&format!("| {} | {} |\n", agent, format_duration_ms(*time)));
283                }
284            }
285
286            Ok(ToolResult::Raw(md))
287        }
288        OutputFormat::Json => Ok(ToolResult::Json(json!({
289            "history": filtered_history,
290            "current_duration_ms": current_duration,
291            "time_per_status_ms": time_per_status,
292            "time_per_agent_ms": time_per_agent
293        }))),
294    }
295}
296
297pub fn log_metrics(db: &Database, args: Value) -> Result<Value> {
298    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
299
300    let cost_usd = get_f64(&args, "cost_usd");
301
302    // Parse values array
303    let values: Vec<i64> = args
304        .get("values")
305        .and_then(|v| v.as_array())
306        .map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect())
307        .unwrap_or_default();
308
309    let task = db.log_metrics(&task_id, cost_usd, &values)?;
310
311    Ok(json!({
312        "success": true,
313        "cost_usd": task.cost_usd,
314        "metrics": task.metrics
315    }))
316}
317
318/// Parse a timestamp from either ISO 8601 string or milliseconds.
319fn parse_timestamp(s: &str) -> Option<i64> {
320    // Try parsing as milliseconds first
321    if let Ok(ms) = s.parse::<i64>() {
322        return Some(ms);
323    }
324
325    // Try parsing as ISO 8601 datetime
326    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
327        return Some(dt.timestamp_millis());
328    }
329
330    // Try parsing common datetime formats
331    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
332        return Some(dt.and_utc().timestamp_millis());
333    }
334
335    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
336        return Some(dt.and_utc().timestamp_millis());
337    }
338
339    // Try parsing date only
340    if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
341        return Some(d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis());
342    }
343
344    None
345}
346
347pub fn project_history(
348    db: &Database,
349    default_format: OutputFormat,
350    args: Value,
351) -> Result<ToolResult> {
352    let from_timestamp = get_string(&args, "from").and_then(|s| parse_timestamp(&s));
353    let to_timestamp = get_string(&args, "to").and_then(|s| parse_timestamp(&s));
354    let state_filter = get_string_array(&args, "states");
355    let limit = get_i64(&args, "limit").or(Some(100));
356    let format = get_string(&args, "format")
357        .and_then(|s| OutputFormat::parse(&s))
358        .unwrap_or(default_format);
359
360    // Get transitions
361    let history =
362        db.get_project_state_history(from_timestamp, to_timestamp, state_filter.as_deref(), limit)?;
363
364    // Get aggregate stats
365    let stats = db.get_project_state_stats(from_timestamp, to_timestamp)?;
366
367    match format {
368        OutputFormat::Markdown => {
369            let mut md = String::from("# Project History\n\n");
370
371            // Time range info
372            md.push_str("## Time Range\n\n");
373            let from_str = from_timestamp
374                .map(format_timestamp)
375                .unwrap_or_else(|| "beginning".to_string());
376            let to_str = to_timestamp
377                .map(format_timestamp)
378                .unwrap_or_else(|| "now".to_string());
379            md.push_str(&format!("**From:** {} **To:** {}\n\n", from_str, to_str));
380
381            // Summary stats
382            md.push_str("## Summary\n\n");
383            md.push_str(&format!(
384                "- **Total Transitions:** {}\n",
385                stats.total_transitions
386            ));
387            md.push_str(&format!("- **Tasks Affected:** {}\n", stats.tasks_affected));
388            md.push_str(&format!(
389                "- **Total Time Tracked:** {}\n\n",
390                format_duration_ms(stats.total_time_ms)
391            ));
392
393            // Recent transitions table
394            md.push_str("## Recent Transitions\n\n");
395            if history.is_empty() {
396                md.push_str("No status transitions found.\n");
397            } else {
398                md.push_str("| # | Task | Status | Agent | Timestamp | Duration |\n");
399                md.push_str("|---|------|-------|-------|-----------|----------|\n");
400                for (i, event) in history.iter().enumerate() {
401                    let duration = if let Some(end_ts) = event.end_timestamp {
402                        format_duration_ms(end_ts - event.timestamp)
403                    } else {
404                        "ongoing".to_string()
405                    };
406                    let agent = event.worker_id.as_deref().unwrap_or("-");
407                    let short_task = if event.task_id.len() > 12 {
408                        format!("{}...", &event.task_id[..12])
409                    } else {
410                        event.task_id.clone()
411                    };
412                    let status = event.status.as_deref().unwrap_or("-");
413                    md.push_str(&format!(
414                        "| {} | {} | {} | {} | {} | {} |\n",
415                        i + 1,
416                        short_task,
417                        status,
418                        agent,
419                        format_timestamp(event.timestamp),
420                        duration
421                    ));
422                }
423            }
424
425            // Transitions by status
426            md.push_str("\n## Transitions by Status\n\n");
427            if stats.transitions_by_status.is_empty() {
428                md.push_str("No transitions found.\n");
429            } else {
430                md.push_str("| Status | Count | Total Time |\n");
431                md.push_str("|-------|-------|------------|\n");
432                let mut sorted_statuses: Vec<_> = stats.transitions_by_status.iter().collect();
433                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
434                for (status, count) in sorted_statuses {
435                    let time = stats.time_by_status_ms.get(status).copied().unwrap_or(0);
436                    md.push_str(&format!(
437                        "| {} | {} | {} |\n",
438                        status,
439                        count,
440                        format_duration_ms(time)
441                    ));
442                }
443            }
444
445            // Transitions by agent
446            md.push_str("\n## Transitions by Agent\n\n");
447            if stats.transitions_by_agent.is_empty() {
448                md.push_str("No agent activity tracked.\n");
449            } else {
450                md.push_str("| Agent | Count | Total Time |\n");
451                md.push_str("|-------|-------|------------|\n");
452                let mut sorted_agents: Vec<_> = stats.transitions_by_agent.iter().collect();
453                sorted_agents.sort_by(|(_, a), (_, b)| b.cmp(a)); // Sort by count descending
454                for (agent, count) in sorted_agents {
455                    let time = stats.time_by_agent_ms.get(agent).copied().unwrap_or(0);
456                    md.push_str(&format!(
457                        "| {} | {} | {} |\n",
458                        agent,
459                        count,
460                        format_duration_ms(time)
461                    ));
462                }
463            }
464
465            Ok(ToolResult::Raw(md))
466        }
467        OutputFormat::Json => Ok(ToolResult::Json(json!({
468            "time_range": {
469                "from": from_timestamp.map(crate::types::ms_to_iso),
470                "to": to_timestamp.map(crate::types::ms_to_iso)
471            },
472            "summary": {
473                "total_transitions": stats.total_transitions,
474                "tasks_affected": stats.tasks_affected,
475                "total_time_ms": stats.total_time_ms
476            },
477            "transitions": history,
478            "transitions_by_status": stats.transitions_by_status,
479            "time_by_status_ms": stats.time_by_status_ms,
480            "transitions_by_agent": stats.transitions_by_agent,
481            "time_by_agent_ms": stats.time_by_agent_ms
482        }))),
483    }
484}
485
486pub fn get_metrics(db: &Database, args: Value) -> Result<Value> {
487    use super::get_string_or_array;
488
489    let task_ids =
490        get_string_or_array(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
491
492    if task_ids.is_empty() {
493        return Err(ToolError::missing_field("task").into());
494    }
495
496    // Get metrics for all specified tasks
497    let mut total_cost_usd: f64 = 0.0;
498    let mut total_metrics: [i64; 8] = [0; 8];
499    let mut found_count = 0;
500
501    for task_id in &task_ids {
502        if let Some(task) = db.get_task(task_id)? {
503            total_cost_usd += task.cost_usd;
504            for (total, task_metric) in total_metrics.iter_mut().zip(task.metrics.iter()) {
505                *total += task_metric;
506            }
507            found_count += 1;
508        }
509    }
510
511    if found_count == 0 {
512        return Err(anyhow::anyhow!("No tasks found with the provided IDs"));
513    }
514
515    let response = if task_ids.len() == 1 {
516        // Single task - return flat response
517        json!({
518            "task": task_ids[0],
519            "cost_usd": total_cost_usd,
520            "metrics": total_metrics
521        })
522    } else {
523        // Multiple tasks - return aggregated response
524        json!({
525            "tasks": task_ids,
526            "task_count": found_count,
527            "cost_usd": total_cost_usd,
528            "metrics": total_metrics
529        })
530    };
531
532    Ok(response)
533}