Skip to main content

task_graph_mcp/tools/
tracking.rs

1//! Live status and tracking tools.
2
3use super::{
4    get_f64, get_i64, get_string, get_string_array, get_string_or_array, make_tool_with_prompts,
5};
6use crate::config::{Prompts, StatesConfig};
7use crate::db::Database;
8use crate::error::ToolError;
9use crate::format::{OutputFormat, markdown_to_json};
10use anyhow::Result;
11use rmcp::model::Tool;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14
15/// Format a duration in milliseconds to a human-readable string.
16fn format_duration_ms(ms: i64) -> String {
17    if ms < 1000 {
18        format!("{}ms", ms)
19    } else if ms < 60_000 {
20        format!("{:.1}s", ms as f64 / 1000.0)
21    } else if ms < 3_600_000 {
22        let mins = ms / 60_000;
23        let secs = (ms % 60_000) / 1000;
24        format!("{}m {}s", mins, secs)
25    } else {
26        let hours = ms / 3_600_000;
27        let mins = (ms % 3_600_000) / 60_000;
28        format!("{}h {}m", hours, mins)
29    }
30}
31
32/// Format a timestamp (ms since epoch) to ISO-like string.
33fn format_timestamp(ts: i64) -> String {
34    let secs = ts / 1000;
35    let datetime = chrono::DateTime::from_timestamp(secs, 0)
36        .unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap());
37    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
38}
39
40pub fn get_tools(prompts: &Prompts, states_config: &StatesConfig) -> Vec<Tool> {
41    // Build state enum from config
42    let state_names: Vec<&str> = states_config.state_names();
43    let state_enum: Vec<Value> = state_names.iter().map(|s| json!(s)).collect();
44
45    vec![
46        make_tool_with_prompts(
47            "thinking",
48            "Broadcast real-time status updates (what you're doing right now). Also refreshes heartbeat. Call frequently during work to show live progress.",
49            json!({
50                "agent": {
51                    "type": "string",
52                    "description": "Agent ID"
53                },
54                "thought": {
55                    "type": "string",
56                    "description": "What the agent is currently doing"
57                },
58                "tasks": {
59                    "type": "array",
60                    "items": { "type": "string" },
61                    "description": "Specific task IDs to update (default: all claimed tasks)"
62                }
63            }),
64            vec!["agent", "thought"],
65            prompts,
66        ),
67        make_tool_with_prompts(
68            "task_history",
69            "Get the status transition history for a task, including automatic time tracking data and aggregate statistics.",
70            json!({
71                "task": {
72                    "type": "string",
73                    "description": "Task ID"
74                },
75                "states": {
76                    "type": "array",
77                    "items": { "type": "string", "enum": state_enum },
78                    "description": "Filter to only show transitions involving these statuses"
79                }
80            }),
81            vec!["task"],
82            prompts,
83        ),
84        make_tool_with_prompts(
85            "log_metrics",
86            "Log metrics and cost for a task. Values are aggregated (added to existing).",
87            json!({
88                "agent": {
89                    "type": "string",
90                    "description": "Agent ID"
91                },
92                "task": {
93                    "type": "string",
94                    "description": "Task ID"
95                },
96                "cost_usd": {
97                    "type": "number",
98                    "description": "Cost in USD to add"
99                },
100                "values": {
101                    "type": "array",
102                    "items": { "type": "integer" },
103                    "description": "Array of up to 8 integer metric values [metric_0..metric_7] to aggregate"
104                }
105            }),
106            vec!["agent", "task"],
107            prompts,
108        ),
109        make_tool_with_prompts(
110            "project_history",
111            "Get project-wide status transition history and aggregate statistics. Like task_history but across all tasks with date/time range filters.",
112            json!({
113                "from": {
114                    "type": "string",
115                    "description": "Start of time range (ISO 8601 datetime or milliseconds since epoch)"
116                },
117                "to": {
118                    "type": "string",
119                    "description": "End of time range (ISO 8601 datetime or milliseconds since epoch)"
120                },
121                "states": {
122                    "type": "array",
123                    "items": { "type": "string", "enum": state_enum },
124                    "description": "Filter to only show transitions involving these statuses"
125                },
126                "limit": {
127                    "type": "integer",
128                    "description": "Maximum number of transitions to return (default: 100)"
129                }
130            }),
131            vec![],
132            prompts,
133        ),
134        make_tool_with_prompts(
135            "get_metrics",
136            "Get metrics and cost for one or more tasks. Returns cost_usd and metrics array, aggregated across all tasks if multiple provided.",
137            json!({
138                "task": {
139                    "oneOf": [
140                        { "type": "string", "description": "Single task ID" },
141                        { "type": "array", "items": { "type": "string" }, "description": "Array of task IDs" }
142                    ],
143                    "description": "Task ID or array of task IDs to get metrics for"
144                }
145            }),
146            vec!["task"],
147            prompts,
148        ),
149    ]
150}
151
152pub fn thinking(db: &Database, args: Value) -> Result<Value> {
153    let agent_id = get_string(&args, "agent").ok_or_else(|| ToolError::missing_field("agent"))?;
154    let thought =
155        get_string(&args, "thought").ok_or_else(|| ToolError::missing_field("thought"))?;
156    let task_ids = get_string_or_array(&args, "tasks");
157
158    // Also refresh heartbeat since updating thought implies activity
159    let _ = db.heartbeat(&agent_id);
160
161    let updated = db.set_thought(&agent_id, Some(thought), task_ids)?;
162
163    Ok(json!({
164        "success": true,
165        "updated_count": updated
166    }))
167}
168
169pub fn task_history(
170    db: &Database,
171    states_config: &StatesConfig,
172    default_format: OutputFormat,
173    args: Value,
174) -> Result<Value> {
175    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
176    let state_filter = get_string_array(&args, "states");
177    let format = get_string(&args, "format")
178        .and_then(|s| OutputFormat::parse(&s))
179        .unwrap_or(default_format);
180
181    let history = db.get_task_state_history(&task_id)?;
182    let current_duration = db.get_current_state_duration(&task_id, states_config)?;
183
184    // Filter history by statuses if specified
185    let filtered_history: Vec<_> = if let Some(ref states) = state_filter {
186        history
187            .into_iter()
188            .filter(|e| e.status.as_ref().is_some_and(|s| states.contains(s)))
189            .collect()
190    } else {
191        history
192    };
193
194    // Calculate aggregate stats
195    let mut time_per_status: HashMap<String, i64> = HashMap::new();
196    let mut time_per_agent: HashMap<String, i64> = HashMap::new();
197
198    for event in &filtered_history {
199        if let Some(end_ts) = event.end_timestamp {
200            let duration = end_ts - event.timestamp;
201            if let Some(ref status) = event.status {
202                *time_per_status.entry(status.clone()).or_insert(0) += duration;
203            }
204            if let Some(ref agent) = event.worker_id {
205                *time_per_agent.entry(agent.clone()).or_insert(0) += duration;
206            }
207        }
208    }
209
210    // Add current duration to the current state if applicable
211    if let Some(current_dur) = current_duration
212        && let Some(last_event) = filtered_history.last()
213        && last_event.end_timestamp.is_none()
214    {
215        // Include in state filter check
216        if let Some(ref status) = last_event.status
217            && (state_filter.is_none() || state_filter.as_ref().unwrap().contains(status))
218        {
219            *time_per_status.entry(status.clone()).or_insert(0) += current_dur;
220            if let Some(ref agent) = last_event.worker_id {
221                *time_per_agent.entry(agent.clone()).or_insert(0) += current_dur;
222            }
223        }
224    }
225
226    match format {
227        OutputFormat::Markdown => {
228            let mut md = String::from("# Task History\n\n");
229
230            // History table
231            md.push_str("## Status Transitions\n\n");
232            if filtered_history.is_empty() {
233                md.push_str("No status transitions found.\n");
234            } else {
235                md.push_str("| # | Status | Agent | Timestamp | Duration |\n");
236                md.push_str("|---|-------|-------|-----------|----------|\n");
237                for (i, event) in filtered_history.iter().enumerate() {
238                    let duration = if let Some(end_ts) = event.end_timestamp {
239                        format_duration_ms(end_ts - event.timestamp)
240                    } else if let Some(dur) = current_duration {
241                        format!("{} (ongoing)", format_duration_ms(dur))
242                    } else {
243                        "ongoing".to_string()
244                    };
245                    let agent = event.worker_id.as_deref().unwrap_or("-");
246                    let status = event.status.as_deref().unwrap_or("-");
247                    md.push_str(&format!(
248                        "| {} | {} | {} | {} | {} |\n",
249                        i + 1,
250                        status,
251                        agent,
252                        format_timestamp(event.timestamp),
253                        duration
254                    ));
255                }
256            }
257
258            // Aggregate stats
259            md.push_str("\n## Time per Status\n\n");
260            if time_per_status.is_empty() {
261                md.push_str("No completed status durations.\n");
262            } else {
263                md.push_str("| Status | Total Time |\n");
264                md.push_str("|--------|------------|\n");
265                let mut sorted_statuses: Vec<_> = time_per_status.iter().collect();
266                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
267                for (status, time) in sorted_statuses {
268                    md.push_str(&format!("| {} | {} |\n", status, format_duration_ms(*time)));
269                }
270            }
271
272            md.push_str("\n## Time per Agent\n\n");
273            if time_per_agent.is_empty() {
274                md.push_str("No agent time tracked.\n");
275            } else {
276                md.push_str("| Agent | Total Time |\n");
277                md.push_str("|-------|------------|\n");
278                let mut sorted_agents: Vec<_> = time_per_agent.iter().collect();
279                sorted_agents.sort_by_key(|(k, _)| k.as_str());
280                for (agent, time) in sorted_agents {
281                    md.push_str(&format!("| {} | {} |\n", agent, format_duration_ms(*time)));
282                }
283            }
284
285            Ok(markdown_to_json(md))
286        }
287        OutputFormat::Json => Ok(json!({
288            "history": filtered_history,
289            "current_duration_ms": current_duration,
290            "time_per_status_ms": time_per_status,
291            "time_per_agent_ms": time_per_agent
292        })),
293    }
294}
295
296pub fn log_metrics(db: &Database, args: Value) -> Result<Value> {
297    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
298
299    let cost_usd = get_f64(&args, "cost_usd");
300
301    // Parse values array
302    let values: Vec<i64> = args
303        .get("values")
304        .and_then(|v| v.as_array())
305        .map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect())
306        .unwrap_or_default();
307
308    let task = db.log_metrics(&task_id, cost_usd, &values)?;
309
310    Ok(json!({
311        "success": true,
312        "cost_usd": task.cost_usd,
313        "metrics": task.metrics
314    }))
315}
316
317/// Parse a timestamp from either ISO 8601 string or milliseconds.
318fn parse_timestamp(s: &str) -> Option<i64> {
319    // Try parsing as milliseconds first
320    if let Ok(ms) = s.parse::<i64>() {
321        return Some(ms);
322    }
323
324    // Try parsing as ISO 8601 datetime
325    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
326        return Some(dt.timestamp_millis());
327    }
328
329    // Try parsing common datetime formats
330    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
331        return Some(dt.and_utc().timestamp_millis());
332    }
333
334    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
335        return Some(dt.and_utc().timestamp_millis());
336    }
337
338    // Try parsing date only
339    if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
340        return Some(d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis());
341    }
342
343    None
344}
345
346pub fn project_history(db: &Database, default_format: OutputFormat, args: Value) -> Result<Value> {
347    let from_timestamp = get_string(&args, "from").and_then(|s| parse_timestamp(&s));
348    let to_timestamp = get_string(&args, "to").and_then(|s| parse_timestamp(&s));
349    let state_filter = get_string_array(&args, "states");
350    let limit = get_i64(&args, "limit").or(Some(100));
351    let format = get_string(&args, "format")
352        .and_then(|s| OutputFormat::parse(&s))
353        .unwrap_or(default_format);
354
355    // Get transitions
356    let history =
357        db.get_project_state_history(from_timestamp, to_timestamp, state_filter.as_deref(), limit)?;
358
359    // Get aggregate stats
360    let stats = db.get_project_state_stats(from_timestamp, to_timestamp)?;
361
362    match format {
363        OutputFormat::Markdown => {
364            let mut md = String::from("# Project History\n\n");
365
366            // Time range info
367            md.push_str("## Time Range\n\n");
368            let from_str = from_timestamp
369                .map(format_timestamp)
370                .unwrap_or_else(|| "beginning".to_string());
371            let to_str = to_timestamp
372                .map(format_timestamp)
373                .unwrap_or_else(|| "now".to_string());
374            md.push_str(&format!("**From:** {} **To:** {}\n\n", from_str, to_str));
375
376            // Summary stats
377            md.push_str("## Summary\n\n");
378            md.push_str(&format!(
379                "- **Total Transitions:** {}\n",
380                stats.total_transitions
381            ));
382            md.push_str(&format!("- **Tasks Affected:** {}\n", stats.tasks_affected));
383            md.push_str(&format!(
384                "- **Total Time Tracked:** {}\n\n",
385                format_duration_ms(stats.total_time_ms)
386            ));
387
388            // Recent transitions table
389            md.push_str("## Recent Transitions\n\n");
390            if history.is_empty() {
391                md.push_str("No status transitions found.\n");
392            } else {
393                md.push_str("| # | Task | Status | Agent | Timestamp | Duration |\n");
394                md.push_str("|---|------|-------|-------|-----------|----------|\n");
395                for (i, event) in history.iter().enumerate() {
396                    let duration = if let Some(end_ts) = event.end_timestamp {
397                        format_duration_ms(end_ts - event.timestamp)
398                    } else {
399                        "ongoing".to_string()
400                    };
401                    let agent = event.worker_id.as_deref().unwrap_or("-");
402                    let short_task = if event.task_id.len() > 12 {
403                        format!("{}...", &event.task_id[..12])
404                    } else {
405                        event.task_id.clone()
406                    };
407                    let status = event.status.as_deref().unwrap_or("-");
408                    md.push_str(&format!(
409                        "| {} | {} | {} | {} | {} | {} |\n",
410                        i + 1,
411                        short_task,
412                        status,
413                        agent,
414                        format_timestamp(event.timestamp),
415                        duration
416                    ));
417                }
418            }
419
420            // Transitions by status
421            md.push_str("\n## Transitions by Status\n\n");
422            if stats.transitions_by_status.is_empty() {
423                md.push_str("No transitions found.\n");
424            } else {
425                md.push_str("| Status | Count | Total Time |\n");
426                md.push_str("|-------|-------|------------|\n");
427                let mut sorted_statuses: Vec<_> = stats.transitions_by_status.iter().collect();
428                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
429                for (status, count) in sorted_statuses {
430                    let time = stats.time_by_status_ms.get(status).copied().unwrap_or(0);
431                    md.push_str(&format!(
432                        "| {} | {} | {} |\n",
433                        status,
434                        count,
435                        format_duration_ms(time)
436                    ));
437                }
438            }
439
440            // Transitions by agent
441            md.push_str("\n## Transitions by Agent\n\n");
442            if stats.transitions_by_agent.is_empty() {
443                md.push_str("No agent activity tracked.\n");
444            } else {
445                md.push_str("| Agent | Count | Total Time |\n");
446                md.push_str("|-------|-------|------------|\n");
447                let mut sorted_agents: Vec<_> = stats.transitions_by_agent.iter().collect();
448                sorted_agents.sort_by(|(_, a), (_, b)| b.cmp(a)); // Sort by count descending
449                for (agent, count) in sorted_agents {
450                    let time = stats.time_by_agent_ms.get(agent).copied().unwrap_or(0);
451                    md.push_str(&format!(
452                        "| {} | {} | {} |\n",
453                        agent,
454                        count,
455                        format_duration_ms(time)
456                    ));
457                }
458            }
459
460            Ok(markdown_to_json(md))
461        }
462        OutputFormat::Json => Ok(json!({
463            "time_range": {
464                "from_ms": from_timestamp,
465                "to_ms": to_timestamp
466            },
467            "summary": {
468                "total_transitions": stats.total_transitions,
469                "tasks_affected": stats.tasks_affected,
470                "total_time_ms": stats.total_time_ms
471            },
472            "transitions": history,
473            "transitions_by_status": stats.transitions_by_status,
474            "time_by_status_ms": stats.time_by_status_ms,
475            "transitions_by_agent": stats.transitions_by_agent,
476            "time_by_agent_ms": stats.time_by_agent_ms
477        })),
478    }
479}
480
481pub fn get_metrics(db: &Database, args: Value) -> Result<Value> {
482    use super::get_string_or_array;
483
484    let task_ids =
485        get_string_or_array(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
486
487    if task_ids.is_empty() {
488        return Err(ToolError::missing_field("task").into());
489    }
490
491    // Get metrics for all specified tasks
492    let mut total_cost_usd: f64 = 0.0;
493    let mut total_metrics: [i64; 8] = [0; 8];
494    let mut found_count = 0;
495
496    for task_id in &task_ids {
497        if let Some(task) = db.get_task(task_id)? {
498            total_cost_usd += task.cost_usd;
499            for (total, task_metric) in total_metrics.iter_mut().zip(task.metrics.iter()) {
500                *total += task_metric;
501            }
502            found_count += 1;
503        }
504    }
505
506    if found_count == 0 {
507        return Err(anyhow::anyhow!("No tasks found with the provided IDs"));
508    }
509
510    let response = if task_ids.len() == 1 {
511        // Single task - return flat response
512        json!({
513            "task": task_ids[0],
514            "cost_usd": total_cost_usd,
515            "metrics": total_metrics
516        })
517    } else {
518        // Multiple tasks - return aggregated response
519        json!({
520            "tasks": task_ids,
521            "task_count": found_count,
522            "cost_usd": total_cost_usd,
523            "metrics": total_metrics
524        })
525    };
526
527    Ok(response)
528}