Skip to main content

task_graph_mcp/tools/
tracking.rs

1//! Live status and tracking tools.
2
3use super::{get_f64, get_i64, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::{Prompts, StatesConfig};
5use crate::db::Database;
6use crate::error::ToolError;
7use crate::format::{OutputFormat, markdown_to_json};
8use anyhow::Result;
9use rmcp::model::Tool;
10use serde_json::{Value, json};
11use std::collections::HashMap;
12
13/// Format a duration in milliseconds to a human-readable string.
14fn format_duration_ms(ms: i64) -> String {
15    if ms < 1000 {
16        format!("{}ms", ms)
17    } else if ms < 60_000 {
18        format!("{:.1}s", ms as f64 / 1000.0)
19    } else if ms < 3_600_000 {
20        let mins = ms / 60_000;
21        let secs = (ms % 60_000) / 1000;
22        format!("{}m {}s", mins, secs)
23    } else {
24        let hours = ms / 3_600_000;
25        let mins = (ms % 3_600_000) / 60_000;
26        format!("{}h {}m", hours, mins)
27    }
28}
29
30/// Format a timestamp (ms since epoch) to ISO-like string.
31fn format_timestamp(ts: i64) -> String {
32    let secs = ts / 1000;
33    let datetime = chrono::DateTime::from_timestamp(secs, 0)
34        .unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap());
35    datetime.format("%Y-%m-%d %H:%M:%S").to_string()
36}
37
38pub fn get_tools(prompts: &Prompts, states_config: &StatesConfig) -> Vec<Tool> {
39    // Build state enum from config
40    let state_names: Vec<&str> = states_config.state_names();
41    let state_enum: Vec<Value> = state_names.iter().map(|s| json!(s)).collect();
42
43    vec![
44        make_tool_with_prompts(
45            "thinking",
46            "Broadcast real-time status updates (what you're doing right now). Also refreshes heartbeat. Call frequently during work to show live progress.",
47            json!({
48                "agent": {
49                    "type": "string",
50                    "description": "Agent ID"
51                },
52                "thought": {
53                    "type": "string",
54                    "description": "What the agent is currently doing"
55                },
56                "tasks": {
57                    "type": "array",
58                    "items": { "type": "string" },
59                    "description": "Specific task IDs to update (default: all claimed tasks)"
60                }
61            }),
62            vec!["agent", "thought"],
63            prompts,
64        ),
65        make_tool_with_prompts(
66            "task_history",
67            "Get the status transition history for a task, including automatic time tracking data and aggregate statistics.",
68            json!({
69                "task": {
70                    "type": "string",
71                    "description": "Task ID"
72                },
73                "states": {
74                    "type": "array",
75                    "items": { "type": "string", "enum": state_enum },
76                    "description": "Filter to only show transitions involving these statuses"
77                }
78            }),
79            vec!["task"],
80            prompts,
81        ),
82        make_tool_with_prompts(
83            "log_metrics",
84            "Log metrics and cost for a task. Values are aggregated (added to existing).",
85            json!({
86                "agent": {
87                    "type": "string",
88                    "description": "Agent ID"
89                },
90                "task": {
91                    "type": "string",
92                    "description": "Task ID"
93                },
94                "cost_usd": {
95                    "type": "number",
96                    "description": "Cost in USD to add"
97                },
98                "values": {
99                    "type": "array",
100                    "items": { "type": "integer" },
101                    "description": "Array of up to 8 integer metric values [metric_0..metric_7] to aggregate"
102                }
103            }),
104            vec!["agent", "task"],
105            prompts,
106        ),
107        make_tool_with_prompts(
108            "project_history",
109            "Get project-wide status transition history and aggregate statistics. Like task_history but across all tasks with date/time range filters.",
110            json!({
111                "from": {
112                    "type": "string",
113                    "description": "Start of time range (ISO 8601 datetime or milliseconds since epoch)"
114                },
115                "to": {
116                    "type": "string",
117                    "description": "End of time range (ISO 8601 datetime or milliseconds since epoch)"
118                },
119                "states": {
120                    "type": "array",
121                    "items": { "type": "string", "enum": state_enum },
122                    "description": "Filter to only show transitions involving these statuses"
123                },
124                "limit": {
125                    "type": "integer",
126                    "description": "Maximum number of transitions to return (default: 100)"
127                }
128            }),
129            vec![],
130            prompts,
131        ),
132        make_tool_with_prompts(
133            "get_metrics",
134            "Get metrics and cost for one or more tasks. Returns cost_usd and metrics array, aggregated across all tasks if multiple provided.",
135            json!({
136                "task": {
137                    "oneOf": [
138                        { "type": "string", "description": "Single task ID" },
139                        { "type": "array", "items": { "type": "string" }, "description": "Array of task IDs" }
140                    ],
141                    "description": "Task ID or array of task IDs to get metrics for"
142                }
143            }),
144            vec!["task"],
145            prompts,
146        ),
147    ]
148}
149
150pub fn thinking(db: &Database, args: Value) -> Result<Value> {
151    let agent_id = get_string(&args, "agent").ok_or_else(|| ToolError::missing_field("agent"))?;
152    let thought =
153        get_string(&args, "thought").ok_or_else(|| ToolError::missing_field("thought"))?;
154    let task_ids = get_string_array(&args, "tasks");
155
156    // Also refresh heartbeat since updating thought implies activity
157    let _ = db.heartbeat(&agent_id);
158
159    let updated = db.set_thought(&agent_id, Some(thought), task_ids)?;
160
161    Ok(json!({
162        "success": true,
163        "updated_count": updated
164    }))
165}
166
167pub fn task_history(
168    db: &Database,
169    states_config: &StatesConfig,
170    default_format: OutputFormat,
171    args: Value,
172) -> Result<Value> {
173    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
174    let state_filter = get_string_array(&args, "states");
175    let format = get_string(&args, "format")
176        .and_then(|s| OutputFormat::parse(&s))
177        .unwrap_or(default_format);
178
179    let history = db.get_task_state_history(&task_id)?;
180    let current_duration = db.get_current_state_duration(&task_id, states_config)?;
181
182    // Filter history by statuses if specified
183    let filtered_history: Vec<_> = if let Some(ref states) = state_filter {
184        history
185            .into_iter()
186            .filter(|e| states.contains(&e.event))
187            .collect()
188    } else {
189        history
190    };
191
192    // Calculate aggregate stats
193    let mut time_per_status: HashMap<String, i64> = HashMap::new();
194    let mut time_per_agent: HashMap<String, i64> = HashMap::new();
195
196    for event in &filtered_history {
197        if let Some(end_ts) = event.end_timestamp {
198            let duration = end_ts - event.timestamp;
199            *time_per_status.entry(event.event.clone()).or_insert(0) += duration;
200            if let Some(ref agent) = event.worker_id {
201                *time_per_agent.entry(agent.clone()).or_insert(0) += duration;
202            }
203        }
204    }
205
206    // Add current duration to the current state if applicable
207    if let Some(current_dur) = current_duration
208        && let Some(last_event) = filtered_history.last()
209            && last_event.end_timestamp.is_none() {
210                // Include in state filter check
211                if state_filter.is_none()
212                    || state_filter.as_ref().unwrap().contains(&last_event.event)
213                {
214                    *time_per_status.entry(last_event.event.clone()).or_insert(0) += current_dur;
215                    if let Some(ref agent) = last_event.worker_id {
216                        *time_per_agent.entry(agent.clone()).or_insert(0) += current_dur;
217                    }
218                }
219            }
220
221    match format {
222        OutputFormat::Markdown => {
223            let mut md = String::from("# Task History\n\n");
224
225            // History table
226            md.push_str("## Status Transitions\n\n");
227            if filtered_history.is_empty() {
228                md.push_str("No status transitions found.\n");
229            } else {
230                md.push_str("| # | Status | Agent | Timestamp | Duration |\n");
231                md.push_str("|---|-------|-------|-----------|----------|\n");
232                for (i, event) in filtered_history.iter().enumerate() {
233                    let duration = if let Some(end_ts) = event.end_timestamp {
234                        format_duration_ms(end_ts - event.timestamp)
235                    } else if let Some(dur) = current_duration {
236                        format!("{} (ongoing)", format_duration_ms(dur))
237                    } else {
238                        "ongoing".to_string()
239                    };
240                    let agent = event.worker_id.as_deref().unwrap_or("-");
241                    md.push_str(&format!(
242                        "| {} | {} | {} | {} | {} |\n",
243                        i + 1,
244                        event.event,
245                        agent,
246                        format_timestamp(event.timestamp),
247                        duration
248                    ));
249                }
250            }
251
252            // Aggregate stats
253            md.push_str("\n## Time per Status\n\n");
254            if time_per_status.is_empty() {
255                md.push_str("No completed status durations.\n");
256            } else {
257                md.push_str("| Status | Total Time |\n");
258                md.push_str("|--------|------------|\n");
259                let mut sorted_statuses: Vec<_> = time_per_status.iter().collect();
260                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
261                for (status, time) in sorted_statuses {
262                    md.push_str(&format!("| {} | {} |\n", status, format_duration_ms(*time)));
263                }
264            }
265
266            md.push_str("\n## Time per Agent\n\n");
267            if time_per_agent.is_empty() {
268                md.push_str("No agent time tracked.\n");
269            } else {
270                md.push_str("| Agent | Total Time |\n");
271                md.push_str("|-------|------------|\n");
272                let mut sorted_agents: Vec<_> = time_per_agent.iter().collect();
273                sorted_agents.sort_by_key(|(k, _)| k.as_str());
274                for (agent, time) in sorted_agents {
275                    md.push_str(&format!("| {} | {} |\n", agent, format_duration_ms(*time)));
276                }
277            }
278
279            Ok(markdown_to_json(md))
280        }
281        OutputFormat::Json => Ok(json!({
282            "history": filtered_history,
283            "current_duration_ms": current_duration,
284            "time_per_status_ms": time_per_status,
285            "time_per_agent_ms": time_per_agent
286        })),
287    }
288}
289
290pub fn log_metrics(db: &Database, args: Value) -> Result<Value> {
291    let task_id = get_string(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
292
293    let cost_usd = get_f64(&args, "cost_usd");
294
295    // Parse values array
296    let values: Vec<i64> = args
297        .get("values")
298        .and_then(|v| v.as_array())
299        .map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect())
300        .unwrap_or_default();
301
302    let task = db.log_metrics(&task_id, cost_usd, &values)?;
303
304    Ok(json!({
305        "success": true,
306        "cost_usd": task.cost_usd,
307        "metrics": task.metrics
308    }))
309}
310
311/// Parse a timestamp from either ISO 8601 string or milliseconds.
312fn parse_timestamp(s: &str) -> Option<i64> {
313    // Try parsing as milliseconds first
314    if let Ok(ms) = s.parse::<i64>() {
315        return Some(ms);
316    }
317
318    // Try parsing as ISO 8601 datetime
319    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) {
320        return Some(dt.timestamp_millis());
321    }
322
323    // Try parsing common datetime formats
324    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
325        return Some(dt.and_utc().timestamp_millis());
326    }
327
328    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
329        return Some(dt.and_utc().timestamp_millis());
330    }
331
332    // Try parsing date only
333    if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
334        return Some(d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis());
335    }
336
337    None
338}
339
340pub fn project_history(db: &Database, default_format: OutputFormat, args: Value) -> Result<Value> {
341    let from_timestamp = get_string(&args, "from").and_then(|s| parse_timestamp(&s));
342    let to_timestamp = get_string(&args, "to").and_then(|s| parse_timestamp(&s));
343    let state_filter = get_string_array(&args, "states");
344    let limit = get_i64(&args, "limit").or(Some(100));
345    let format = get_string(&args, "format")
346        .and_then(|s| OutputFormat::parse(&s))
347        .unwrap_or(default_format);
348
349    // Get transitions
350    let history =
351        db.get_project_state_history(from_timestamp, to_timestamp, state_filter.as_deref(), limit)?;
352
353    // Get aggregate stats
354    let stats = db.get_project_state_stats(from_timestamp, to_timestamp)?;
355
356    match format {
357        OutputFormat::Markdown => {
358            let mut md = String::from("# Project History\n\n");
359
360            // Time range info
361            md.push_str("## Time Range\n\n");
362            let from_str = from_timestamp
363                .map(format_timestamp)
364                .unwrap_or_else(|| "beginning".to_string());
365            let to_str = to_timestamp
366                .map(format_timestamp)
367                .unwrap_or_else(|| "now".to_string());
368            md.push_str(&format!("**From:** {} **To:** {}\n\n", from_str, to_str));
369
370            // Summary stats
371            md.push_str("## Summary\n\n");
372            md.push_str(&format!(
373                "- **Total Transitions:** {}\n",
374                stats.total_transitions
375            ));
376            md.push_str(&format!("- **Tasks Affected:** {}\n", stats.tasks_affected));
377            md.push_str(&format!(
378                "- **Total Time Tracked:** {}\n\n",
379                format_duration_ms(stats.total_time_ms)
380            ));
381
382            // Recent transitions table
383            md.push_str("## Recent Transitions\n\n");
384            if history.is_empty() {
385                md.push_str("No status transitions found.\n");
386            } else {
387                md.push_str("| # | Task | Status | Agent | Timestamp | Duration |\n");
388                md.push_str("|---|------|-------|-------|-----------|----------|\n");
389                for (i, event) in history.iter().enumerate() {
390                    let duration = if let Some(end_ts) = event.end_timestamp {
391                        format_duration_ms(end_ts - event.timestamp)
392                    } else {
393                        "ongoing".to_string()
394                    };
395                    let agent = event.worker_id.as_deref().unwrap_or("-");
396                    let short_task = if event.task_id.len() > 12 {
397                        format!("{}...", &event.task_id[..12])
398                    } else {
399                        event.task_id.clone()
400                    };
401                    md.push_str(&format!(
402                        "| {} | {} | {} | {} | {} | {} |\n",
403                        i + 1,
404                        short_task,
405                        event.event,
406                        agent,
407                        format_timestamp(event.timestamp),
408                        duration
409                    ));
410                }
411            }
412
413            // Transitions by status
414            md.push_str("\n## Transitions by Status\n\n");
415            if stats.transitions_by_status.is_empty() {
416                md.push_str("No transitions found.\n");
417            } else {
418                md.push_str("| Status | Count | Total Time |\n");
419                md.push_str("|-------|-------|------------|\n");
420                let mut sorted_statuses: Vec<_> = stats.transitions_by_status.iter().collect();
421                sorted_statuses.sort_by_key(|(k, _)| k.as_str());
422                for (status, count) in sorted_statuses {
423                    let time = stats.time_by_status_ms.get(status).copied().unwrap_or(0);
424                    md.push_str(&format!(
425                        "| {} | {} | {} |\n",
426                        status,
427                        count,
428                        format_duration_ms(time)
429                    ));
430                }
431            }
432
433            // Transitions by agent
434            md.push_str("\n## Transitions by Agent\n\n");
435            if stats.transitions_by_agent.is_empty() {
436                md.push_str("No agent activity tracked.\n");
437            } else {
438                md.push_str("| Agent | Count | Total Time |\n");
439                md.push_str("|-------|-------|------------|\n");
440                let mut sorted_agents: Vec<_> = stats.transitions_by_agent.iter().collect();
441                sorted_agents.sort_by(|(_, a), (_, b)| b.cmp(a)); // Sort by count descending
442                for (agent, count) in sorted_agents {
443                    let time = stats.time_by_agent_ms.get(agent).copied().unwrap_or(0);
444                    md.push_str(&format!(
445                        "| {} | {} | {} |\n",
446                        agent,
447                        count,
448                        format_duration_ms(time)
449                    ));
450                }
451            }
452
453            Ok(markdown_to_json(md))
454        }
455        OutputFormat::Json => Ok(json!({
456            "time_range": {
457                "from_ms": from_timestamp,
458                "to_ms": to_timestamp
459            },
460            "summary": {
461                "total_transitions": stats.total_transitions,
462                "tasks_affected": stats.tasks_affected,
463                "total_time_ms": stats.total_time_ms
464            },
465            "transitions": history,
466            "transitions_by_status": stats.transitions_by_status,
467            "time_by_status_ms": stats.time_by_status_ms,
468            "transitions_by_agent": stats.transitions_by_agent,
469            "time_by_agent_ms": stats.time_by_agent_ms
470        })),
471    }
472}
473
474pub fn get_metrics(db: &Database, args: Value) -> Result<Value> {
475    use super::get_string_or_array;
476
477    let task_ids =
478        get_string_or_array(&args, "task").ok_or_else(|| ToolError::missing_field("task"))?;
479
480    if task_ids.is_empty() {
481        return Err(ToolError::missing_field("task").into());
482    }
483
484    // Get metrics for all specified tasks
485    let mut total_cost_usd: f64 = 0.0;
486    let mut total_metrics: [i64; 8] = [0; 8];
487    let mut found_count = 0;
488
489    for task_id in &task_ids {
490        if let Some(task) = db.get_task(task_id)? {
491            total_cost_usd += task.cost_usd;
492            for (total, task_metric) in total_metrics.iter_mut().zip(task.metrics.iter()) {
493                *total += task_metric;
494            }
495            found_count += 1;
496        }
497    }
498
499    if found_count == 0 {
500        return Err(anyhow::anyhow!("No tasks found with the provided IDs"));
501    }
502
503    let response = if task_ids.len() == 1 {
504        // Single task - return flat response
505        json!({
506            "task": task_ids[0],
507            "cost_usd": total_cost_usd,
508            "metrics": total_metrics
509        })
510    } else {
511        // Multiple tasks - return aggregated response
512        json!({
513            "tasks": task_ids,
514            "task_count": found_count,
515            "cost_usd": total_cost_usd,
516            "metrics": total_metrics
517        })
518    };
519
520    Ok(response)
521}