Skip to main content

task_graph_mcp/tools/
tracking.rs

1//! Live status and tracking tools.
2
3use super::{get_f64, get_i64, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::{Prompts, StatesConfig};
5use crate::db::Database;
6use crate::error::ToolError;
7use crate::format::{OutputFormat, markdown_to_json};
8use anyhow::Result;
9use rmcp::model::Tool;
10use serde_json::{Value, json};
11use std::collections::HashMap;
12
13/// Format a duration in milliseconds to a human-readable string.
14fn format_duration_ms(ms: i64) -> String {
15    if ms < 1000 {
16        format!("{}ms", ms)
17    } else if ms < 60_000 {
18        format!("{:.1}s", ms as f64 / 1000.0)
19    } else if ms < 3_600_000 {
20        let mins = ms / 60_000;
21        let secs = (ms % 60_000) / 1000;
22        format!("{}m {}s", mins, secs)
23    } else {
24        let hours = ms / 3_600_000;
25        let mins = (ms % 3_600_000) / 60_000;
26        format!("{}h {}m", hours, mins)
27    }
28}
29
30/// Format a timestamp (ms since epoch) to ISO-like string.
31fn format_timestamp(ts: i64) -> String {
32    let secs = ts / 1000;
33    let datetime = chrono::DateTime::from_timestamp(secs, 0)
34        .unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap());
35    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
36}
37
38pub fn get_tools(prompts: &Prompts, states_config: &StatesConfig) -> Vec<Tool> {
39    // Build state enum from config
40    let state_names: Vec<&str> = states_config.state_names();
41    let state_enum: Vec<Value> = state_names.iter().map(|s| json!(s)).collect();
42
43    vec![
44        make_tool_with_prompts(
45            "thinking",
46            "Broadcast real-time status updates (what you're doing right now). Also refreshes heartbeat. Call frequently during work to show live progress.",
47            json!({
48                "agent": {
49                    "type": "string",
50                    "description": "Agent ID"
51                },
52                "thought": {
53                    "type": "string",
54                    "description": "What the agent is currently doing"
55                },
56                "tasks": {
57                    "type": "array",
58                    "items": { "type": "string" },
59                    "description": "Specific task IDs to update (default: all claimed tasks)"
60                }
61            }),
62            vec!["agent", "thought"],
63            prompts,
64        ),
65        make_tool_with_prompts(
66            "task_history",
67            "Get the status transition history for a task, including automatic time tracking data and aggregate statistics.",
68            json!({
69                "task": {
70                    "type": "string",
71                    "description": "Task ID"
72                },
73                "states": {
74                    "type": "array",
75                    "items": { "type": "string", "enum": state_enum },
76                    "description": "Filter to only show transitions involving these statuses"
77                }
78            }),
79            vec!["task"],
80            prompts,
81        ),
82        make_tool_with_prompts(
83            "log_metrics",
84            "Log metrics and cost for a task. Values are aggregated (added to existing).",
85            json!({
86                "agent": {
87                    "type": "string",
88                    "description": "Agent ID"
89                },
90                "task": {
91                    "type": "string",
92                    "description": "Task ID"
93                },
94                "cost_usd": {
95                    "type": "number",
96                    "description": "Cost in USD to add"
97                },
98                "values": {
99                    "type": "array",
100                    "items": { "type": "integer" },
101                    "description": "Array of up to 8 integer metric values [metric_0..metric_7] to aggregate"
102                }
103            }),
104            vec!["agent", "task"],
105            prompts,
106        ),
107        make_tool_with_prompts(
108            "project_history",
109            "Get project-wide status transition history and aggregate statistics. Like task_history but across all tasks with date/time range filters.",
110            json!({
111                "from": {
112                    "type": "string",
113                    "description": "Start of time range (ISO 8601 datetime or milliseconds since epoch)"
114                },
115                "to": {
116                    "type": "string",
117                    "description": "End of time range (ISO 8601 datetime or milliseconds since epoch)"
118                },
119                "states": {
120                    "type": "array",
121                    "items": { "type": "string", "enum": state_enum },
122                    "description": "Filter to only show transitions involving these statuses"
123                },
124                "limit": {
125                    "type": "integer",
126                    "description": "Maximum number of transitions to return (default: 100)"
127                }
128            }),
129            vec![],
130            prompts,
131        ),
132        make_tool_with_prompts(
133            "get_metrics",
134            "Get metrics and cost for one or more tasks. Returns cost_usd and metrics array, aggregated across all tasks if multiple provided.",
135            json!({
136                "task": {
137                    "oneOf": [
138                        { "type": "string", "description": "Single task ID" },
139                        { "type": "array", "items": { "type": "string" }, "description": "Array of task IDs" }
140                    ],
141                    "description": "Task ID or array of task IDs to get metrics for"
142                }
143            }),
144            vec!["task"],
145            prompts,
146        ),
147    ]
148}
149
150pub fn thinking(db: &Database, args: Value) -> Result<Value> {
151    let agent_id = get_string(&args, "agent").ok_or_else(|| ToolError::missing_field("agent"))?;
152    let thought =
153        get_string(&args, "thought").ok_or_else(|| ToolError::missing_field("thought"))?;
154    let task_ids = get_string_array(&args, "tasks");
155
156    // Also refresh heartbeat since updating thought implies activity
157    let _ = db.heartbeat(&agent_id);
158
159    let updated = db.set_thought(&agent_id, Some(thought), task_ids)?;
160
161    Ok(json!({
162        "success": true,
163        "updated_count": updated
164    }))
165}
166
167pub fn task_history(
168    db: &Database,
169    states_config: &StatesConfig,
170    default_format: OutputFormat,
171    args: Value,
172) -> Result<Value> {
173    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
174    let state_filter = get_string_array(&args, "states");
175    let format = get_string(&args, "format")
176        .and_then(|s| OutputFormat::parse(&s))
177        .unwrap_or(default_format);
178
179    let history = db.get_task_state_history(&task_id)?;
180    let current_duration = db.get_current_state_duration(&task_id, states_config)?;
181
182    // Filter history by statuses if specified
183    let filtered_history: Vec<_> = if let Some(ref states) = state_filter {
184        history
185            .into_iter()
186            .filter(|e| e.status.as_ref().is_some_and(|s| states.contains(s)))
187            .collect()
188    } else {
189        history
190    };
191
192    // Calculate aggregate stats
193    let mut time_per_status: HashMap<String, i64> = HashMap::new();
194    let mut time_per_agent: HashMap<String, i64> = HashMap::new();
195
196    for event in &filtered_history {
197        if let Some(end_ts) = event.end_timestamp {
198            let duration = end_ts - event.timestamp;
199            if let Some(ref status) = event.status {
200                *time_per_status.entry(status.clone()).or_insert(0) += duration;
201            }
202            if let Some(ref agent) = event.worker_id {
203                *time_per_agent.entry(agent.clone()).or_insert(0) += duration;
204            }
205        }
206    }
207
208    // Add current duration to the current state if applicable
209    if let Some(current_dur) = current_duration
210        && let Some(last_event) = filtered_history.last()
211        && last_event.end_timestamp.is_none()
212    {
213        // Include in state filter check
214        if let Some(ref status) = last_event.status
215            && (state_filter.is_none() || state_filter.as_ref().unwrap().contains(status))
216        {
217            *time_per_status.entry(status.clone()).or_insert(0) += current_dur;
218            if let Some(ref agent) = last_event.worker_id {
219                *time_per_agent.entry(agent.clone()).or_insert(0) += current_dur;
220            }
221        }
222    }
223
224    match format {
225        OutputFormat::Markdown => {
226            let mut md = String::from("# Task History\n\n");
227
228            // History table
229            md.push_str("## Status Transitions\n\n");
230            if filtered_history.is_empty() {
231                md.push_str("No status transitions found.\n");
232            } else {
233                md.push_str("| # | Status | Agent | Timestamp | Duration |\n");
234                md.push_str("|---|-------|-------|-----------|----------|\n");
235                for (i, event) in filtered_history.iter().enumerate() {
236                    let duration = if let Some(end_ts) = event.end_timestamp {
237                        format_duration_ms(end_ts - event.timestamp)
238                    } else if let Some(dur) = current_duration {
239                        format!("{} (ongoing)", format_duration_ms(dur))
240                    } else {
241                        "ongoing".to_string()
242                    };
243                    let agent = event.worker_id.as_deref().unwrap_or("-");
244                    let status = event.status.as_deref().unwrap_or("-");
245                    md.push_str(&format!(
246                        "| {} | {} | {} | {} | {} |\n",
247                        i + 1,
248                        status,
249                        agent,
250                        format_timestamp(event.timestamp),
251                        duration
252                    ));
253                }
254            }
255
256            // Aggregate stats
257            md.push_str("\n## Time per Status\n\n");
258            if time_per_status.is_empty() {
259                md.push_str("No completed status durations.\n");
260            } else {
261                md.push_str("| Status | Total Time |\n");
262                md.push_str("|--------|------------|\n");
263                let mut sorted_statuses: Vec<_> = time_per_status.iter().collect();
264                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
265                for (status, time) in sorted_statuses {
266                    md.push_str(&format!("| {} | {} |\n", status, format_duration_ms(*time)));
267                }
268            }
269
270            md.push_str("\n## Time per Agent\n\n");
271            if time_per_agent.is_empty() {
272                md.push_str("No agent time tracked.\n");
273            } else {
274                md.push_str("| Agent | Total Time |\n");
275                md.push_str("|-------|------------|\n");
276                let mut sorted_agents: Vec<_> = time_per_agent.iter().collect();
277                sorted_agents.sort_by_key(|(k, _)| k.as_str());
278                for (agent, time) in sorted_agents {
279                    md.push_str(&format!("| {} | {} |\n", agent, format_duration_ms(*time)));
280                }
281            }
282
283            Ok(markdown_to_json(md))
284        }
285        OutputFormat::Json => Ok(json!({
286            "history": filtered_history,
287            "current_duration_ms": current_duration,
288            "time_per_status_ms": time_per_status,
289            "time_per_agent_ms": time_per_agent
290        })),
291    }
292}
293
294pub fn log_metrics(db: &Database, args: Value) -> Result<Value> {
295    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
296
297    let cost_usd = get_f64(&args, "cost_usd");
298
299    // Parse values array
300    let values: Vec<i64> = args
301        .get("values")
302        .and_then(|v| v.as_array())
303        .map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect())
304        .unwrap_or_default();
305
306    let task = db.log_metrics(&task_id, cost_usd, &values)?;
307
308    Ok(json!({
309        "success": true,
310        "cost_usd": task.cost_usd,
311        "metrics": task.metrics
312    }))
313}
314
315/// Parse a timestamp from either ISO 8601 string or milliseconds.
316fn parse_timestamp(s: &str) -> Option<i64> {
317    // Try parsing as milliseconds first
318    if let Ok(ms) = s.parse::<i64>() {
319        return Some(ms);
320    }
321
322    // Try parsing as ISO 8601 datetime
323    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
324        return Some(dt.timestamp_millis());
325    }
326
327    // Try parsing common datetime formats
328    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
329        return Some(dt.and_utc().timestamp_millis());
330    }
331
332    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
333        return Some(dt.and_utc().timestamp_millis());
334    }
335
336    // Try parsing date only
337    if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
338        return Some(d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis());
339    }
340
341    None
342}
343
344pub fn project_history(db: &Database, default_format: OutputFormat, args: Value) -> Result<Value> {
345    let from_timestamp = get_string(&args, "from").and_then(|s| parse_timestamp(&s));
346    let to_timestamp = get_string(&args, "to").and_then(|s| parse_timestamp(&s));
347    let state_filter = get_string_array(&args, "states");
348    let limit = get_i64(&args, "limit").or(Some(100));
349    let format = get_string(&args, "format")
350        .and_then(|s| OutputFormat::parse(&s))
351        .unwrap_or(default_format);
352
353    // Get transitions
354    let history =
355        db.get_project_state_history(from_timestamp, to_timestamp, state_filter.as_deref(), limit)?;
356
357    // Get aggregate stats
358    let stats = db.get_project_state_stats(from_timestamp, to_timestamp)?;
359
360    match format {
361        OutputFormat::Markdown => {
362            let mut md = String::from("# Project History\n\n");
363
364            // Time range info
365            md.push_str("## Time Range\n\n");
366            let from_str = from_timestamp
367                .map(format_timestamp)
368                .unwrap_or_else(|| "beginning".to_string());
369            let to_str = to_timestamp
370                .map(format_timestamp)
371                .unwrap_or_else(|| "now".to_string());
372            md.push_str(&format!("**From:** {} **To:** {}\n\n", from_str, to_str));
373
374            // Summary stats
375            md.push_str("## Summary\n\n");
376            md.push_str(&format!(
377                "- **Total Transitions:** {}\n",
378                stats.total_transitions
379            ));
380            md.push_str(&format!("- **Tasks Affected:** {}\n", stats.tasks_affected));
381            md.push_str(&format!(
382                "- **Total Time Tracked:** {}\n\n",
383                format_duration_ms(stats.total_time_ms)
384            ));
385
386            // Recent transitions table
387            md.push_str("## Recent Transitions\n\n");
388            if history.is_empty() {
389                md.push_str("No status transitions found.\n");
390            } else {
391                md.push_str("| # | Task | Status | Agent | Timestamp | Duration |\n");
392                md.push_str("|---|------|-------|-------|-----------|----------|\n");
393                for (i, event) in history.iter().enumerate() {
394                    let duration = if let Some(end_ts) = event.end_timestamp {
395                        format_duration_ms(end_ts - event.timestamp)
396                    } else {
397                        "ongoing".to_string()
398                    };
399                    let agent = event.worker_id.as_deref().unwrap_or("-");
400                    let short_task = if event.task_id.len() > 12 {
401                        format!("{}...", &event.task_id[..12])
402                    } else {
403                        event.task_id.clone()
404                    };
405                    let status = event.status.as_deref().unwrap_or("-");
406                    md.push_str(&format!(
407                        "| {} | {} | {} | {} | {} | {} |\n",
408                        i + 1,
409                        short_task,
410                        status,
411                        agent,
412                        format_timestamp(event.timestamp),
413                        duration
414                    ));
415                }
416            }
417
418            // Transitions by status
419            md.push_str("\n## Transitions by Status\n\n");
420            if stats.transitions_by_status.is_empty() {
421                md.push_str("No transitions found.\n");
422            } else {
423                md.push_str("| Status | Count | Total Time |\n");
424                md.push_str("|-------|-------|------------|\n");
425                let mut sorted_statuses: Vec<_> = stats.transitions_by_status.iter().collect();
426                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
427                for (status, count) in sorted_statuses {
428                    let time = stats.time_by_status_ms.get(status).copied().unwrap_or(0);
429                    md.push_str(&format!(
430                        "| {} | {} | {} |\n",
431                        status,
432                        count,
433                        format_duration_ms(time)
434                    ));
435                }
436            }
437
438            // Transitions by agent
439            md.push_str("\n## Transitions by Agent\n\n");
440            if stats.transitions_by_agent.is_empty() {
441                md.push_str("No agent activity tracked.\n");
442            } else {
443                md.push_str("| Agent | Count | Total Time |\n");
444                md.push_str("|-------|-------|------------|\n");
445                let mut sorted_agents: Vec<_> = stats.transitions_by_agent.iter().collect();
446                sorted_agents.sort_by(|(_, a), (_, b)| b.cmp(a)); // Sort by count descending
447                for (agent, count) in sorted_agents {
448                    let time = stats.time_by_agent_ms.get(agent).copied().unwrap_or(0);
449                    md.push_str(&format!(
450                        "| {} | {} | {} |\n",
451                        agent,
452                        count,
453                        format_duration_ms(time)
454                    ));
455                }
456            }
457
458            Ok(markdown_to_json(md))
459        }
460        OutputFormat::Json => Ok(json!({
461            "time_range": {
462                "from_ms": from_timestamp,
463                "to_ms": to_timestamp
464            },
465            "summary": {
466                "total_transitions": stats.total_transitions,
467                "tasks_affected": stats.tasks_affected,
468                "total_time_ms": stats.total_time_ms
469            },
470            "transitions": history,
471            "transitions_by_status": stats.transitions_by_status,
472            "time_by_status_ms": stats.time_by_status_ms,
473            "transitions_by_agent": stats.transitions_by_agent,
474            "time_by_agent_ms": stats.time_by_agent_ms
475        })),
476    }
477}
478
479pub fn get_metrics(db: &Database, args: Value) -> Result<Value> {
480    use super::get_string_or_array;
481
482    let task_ids =
483        get_string_or_array(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
484
485    if task_ids.is_empty() {
486        return Err(ToolError::missing_field("task").into());
487    }
488
489    // Get metrics for all specified tasks
490    let mut total_cost_usd: f64 = 0.0;
491    let mut total_metrics: [i64; 8] = [0; 8];
492    let mut found_count = 0;
493
494    for task_id in &task_ids {
495        if let Some(task) = db.get_task(task_id)? {
496            total_cost_usd += task.cost_usd;
497            for (total, task_metric) in total_metrics.iter_mut().zip(task.metrics.iter()) {
498                *total += task_metric;
499            }
500            found_count += 1;
501        }
502    }
503
504    if found_count == 0 {
505        return Err(anyhow::anyhow!("No tasks found with the provided IDs"));
506    }
507
508    let response = if task_ids.len() == 1 {
509        // Single task - return flat response
510        json!({
511            "task": task_ids[0],
512            "cost_usd": total_cost_usd,
513            "metrics": total_metrics
514        })
515    } else {
516        // Multiple tasks - return aggregated response
517        json!({
518            "tasks": task_ids,
519            "task_count": found_count,
520            "cost_usd": total_cost_usd,
521            "metrics": total_metrics
522        })
523    };
524
525    Ok(response)
526}