task-graph-mcp 0.2.2

MCP server for agent task workflows with phases, prompts, gates, and multi-agent coordination
Documentation
//! MCP tool implementations.

pub mod agents;
pub mod attachments;
pub mod claiming;
pub mod context;
pub mod deps;
pub mod files;
pub mod gates;
pub mod query;
pub mod schema;
pub mod search;
pub mod skills;
pub mod tasks;
pub mod tracking;
pub mod workflows;

pub use context::ToolContext;

use crate::config::{AppConfig, Prompts, ServerPaths, workflows::WorkflowsConfig};
use crate::db::Database;
use crate::error::ToolError;
use crate::format::{OutputFormat, ToolResult};
use anyhow::Result;
use rmcp::model::Tool;
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;

/// Tool handler that processes MCP tool calls.
pub struct ToolHandler {
    pub db: Arc<Database>,
    pub media_dir: PathBuf,
    pub skills_dir: PathBuf,
    pub server_paths: Arc<ServerPaths>,
    pub prompts: Arc<Prompts>,
    /// Consolidated application configuration.
    pub config: AppConfig,
    pub default_format: OutputFormat,
    pub default_page_size: i32,
    pub path_mapper: Arc<crate::paths::PathMapper>,
}

impl ToolHandler {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        db: Arc<Database>,
        media_dir: PathBuf,
        skills_dir: PathBuf,
        server_paths: Arc<ServerPaths>,
        prompts: Arc<Prompts>,
        config: AppConfig,
        default_format: OutputFormat,
        default_page_size: i32,
        path_mapper: Arc<crate::paths::PathMapper>,
    ) -> Self {
        Self {
            db,
            media_dir,
            skills_dir,
            server_paths,
            prompts,
            config,
            default_format,
            default_page_size,
            path_mapper,
        }
    }

    /// Get the workflow config for a worker.
    /// Looks up the worker's workflow name and returns the corresponding config,
    /// or falls back to the configured default workflow, or the base config.
    pub fn get_workflow_for_worker(&self, worker_id: &str) -> Arc<WorkflowsConfig> {
        // Look up worker's workflow name from database
        if let Ok(Some(worker)) = self.db.get_worker(worker_id)
            && let Some(ref workflow_name) = worker.workflow
        {
            // Try to get from named_workflows cache
            if let Some(workflow_config) = self.config.workflows.get_named_workflow(workflow_name) {
                return Arc::clone(workflow_config);
            }
        }
        // Fall back to configured default workflow, or base config
        if let Some(default_workflow) = self.config.workflows.get_default_workflow() {
            Arc::clone(default_workflow)
        } else {
            Arc::clone(&self.config.workflows)
        }
    }

    /// Get all available tools.
    pub fn get_tools(&self) -> Vec<Tool> {
        let mut tools = Vec::new();

        // Worker tools
        tools.extend(agents::get_tools(&self.prompts));

        // Task tools (with dynamic state schema)
        tools.extend(tasks::get_tools(&self.prompts, &self.config.states));

        // Tracking tools
        tools.extend(tracking::get_tools(&self.prompts, &self.config.states));

        // Dependency tools
        tools.extend(deps::get_tools(&self.prompts, &self.config.deps));

        // Claiming tools (with dynamic state schema)
        tools.extend(claiming::get_tools(&self.prompts, &self.config.states));

        // File coordination tools
        tools.extend(files::get_tools(&self.prompts));

        // Attachment tools
        tools.extend(attachments::get_tools(&self.prompts));

        // Skill tools (no prompts needed, always available)
        tools.extend(skills::get_tools());

        // Schema introspection tools
        tools.extend(schema::get_tools());

        // Search tools
        tools.extend(search::get_tools(&self.prompts));

        // Query tools (read-only SQL)
        tools.extend(query::get_tools());

        // Gate checking tools
        tools.extend(gates::get_tools(&self.prompts));

        // Workflow discovery tools (no auth needed, callable before connect)
        tools.extend(workflows::get_tools());

        tools
    }

    /// Call a tool by name.
    #[allow(unused_variables)]
    pub async fn call_tool(
        &self,
        name: &str,
        arguments: Value,
        ctx: &ToolContext,
    ) -> Result<ToolResult> {
        // Helper to wrap JSON results
        let json = |r: Result<Value>| r.map(ToolResult::Json);

        match name {
            // Worker tools
            "connect" => {
                // Resolve workflow from args (worker isn't registered yet during connect)
                let workflow = arguments
                    .get("workflow")
                    .and_then(|v| v.as_str())
                    .and_then(|name| self.config.workflows.get_named_workflow(name))
                    .map(Arc::clone)
                    .or_else(|| self.config.workflows.get_default_workflow().map(Arc::clone))
                    .unwrap_or_else(|| Arc::clone(&self.config.workflows));
                json(agents::connect(
                    agents::ConnectOptions {
                        db: &self.db,
                        server_paths: &self.server_paths,
                        config: &self.config,
                        workflows: &workflow,
                    },
                    arguments,
                ))
            }
            "disconnect" => json(agents::disconnect(&self.db, &self.config.states, arguments)),
            "list_agents" => agents::list_agents(
                &self.db,
                &self.config.states,
                self.default_format,
                arguments,
            ),
            "cleanup_stale" => json(agents::cleanup_stale(
                &self.db,
                &self.config.states,
                arguments,
            )),

            // Task tools
            "create" => json(tasks::create(&self.db, &self.config, arguments)),
            "create_tree" => json(tasks::create_tree(&self.db, &self.config, arguments)),
            "get" => json(tasks::get(&self.db, self.default_format, arguments)),
            "list_tasks" => json(tasks::list_tasks(
                &self.db,
                &self.config.states,
                &self.config.deps,
                self.default_format,
                arguments,
            )),
            "update" => {
                // Look up worker's workflow for prompts
                let worker_id = arguments
                    .get("worker_id")
                    .and_then(|v| v.as_str())
                    .unwrap_or("");
                let workflow = self.get_workflow_for_worker(worker_id);
                json(tasks::update(
                    tasks::UpdateOptions {
                        db: &self.db,
                        config: &self.config,
                        workflows: &workflow,
                    },
                    arguments,
                ))
            }
            "delete" => json(tasks::delete(&self.db, arguments)),
            "rename" => json(tasks::rename(&self.db, arguments)),
            "scan" => json(tasks::scan(&self.db, self.default_format, arguments)),

            // Tracking tools
            "thinking" => json(tracking::thinking(&self.db, arguments)),
            "task_history" => json(tracking::task_history(
                &self.db,
                &self.config.states,
                self.default_format,
                arguments,
            )),
            "log_metrics" => json(tracking::log_metrics(&self.db, arguments)),
            "get_metrics" => json(tracking::get_metrics(&self.db, arguments)),
            "project_history" => json(tracking::project_history(
                &self.db,
                self.default_format,
                arguments,
            )),

            // Dependency tools
            "link" => json(deps::link(&self.db, &self.config.deps, arguments)),
            "unlink" => json(deps::unlink(&self.db, arguments)),
            "relink" => json(deps::relink(&self.db, &self.config.deps, arguments)),

            // Claiming tools
            "claim" => {
                // Look up worker's workflow for prompts
                let worker_id = arguments
                    .get("worker_id")
                    .and_then(|v| v.as_str())
                    .unwrap_or("");
                let workflow = self.get_workflow_for_worker(worker_id);
                json(claiming::claim(
                    &self.db,
                    &self.config,
                    &workflow,
                    arguments,
                ))
            }

            // File coordination tools
            "mark_file" => json(files::mark_file(&self.db, arguments)),
            "unmark_file" => json(files::unmark_file(&self.db, arguments)),
            "list_marks" => json(files::list_marks(&self.db, self.default_format, arguments)),
            "mark_updates" => {
                json(files::mark_updates_async(std::sync::Arc::clone(&self.db), arguments).await)
            }

            // Attachment tools
            "attach" => json(attachments::attach(
                &self.db,
                &self.media_dir,
                &self.config.attachments,
                arguments,
            )),
            "attachments" => json(attachments::attachments(
                &self.db,
                &self.media_dir,
                self.default_format,
                arguments,
            )),
            "detach" => json(attachments::detach(&self.db, &self.media_dir, arguments)),

            // Skill tools
            name if skills::is_skill_tool(name) => {
                json(skills::call_tool(&self.skills_dir, name, &arguments))
            }

            // Schema introspection tools
            "get_schema" => json(schema::get_schema(&self.db, arguments)),

            // Search tools
            "search" => json(search::search(&self.db, self.default_page_size, arguments)),

            // Query tools (read-only SQL)
            "query" => query::query(&self.db, self.default_format, arguments),

            // Gate checking tools
            "check_gates" => {
                // Look up worker's workflow for gate definitions
                // Since check_gates doesn't require worker_id, use base workflow
                json(gates::check_gates(
                    &self.db,
                    &self.config.workflows,
                    arguments,
                ))
            }

            // Workflow discovery tools (no connection required)
            "list_workflows" => json(workflows::list_workflows(&self.config.workflows)),

            _ => Err(ToolError::unknown_tool(name).into()),
        }
    }
}

/// Helper to create a tool definition.
pub fn make_tool(name: &str, description: &str, properties: Value, required: Vec<&str>) -> Tool {
    let input_schema = rmcp::model::JsonObject::from_iter([
        ("type".to_string(), serde_json::json!("object")),
        ("properties".to_string(), properties),
        ("required".to_string(), serde_json::json!(required)),
    ]);

    Tool::new(name.to_string(), description.to_string(), input_schema)
}

/// Helper to create a tool definition with prompt overrides.
/// Looks up the tool description in prompts, falls back to default_description.
pub fn make_tool_with_prompts(
    name: &str,
    default_description: &str,
    properties: Value,
    required: Vec<&str>,
    prompts: &Prompts,
) -> Tool {
    let description = prompts
        .get_tool_description(name)
        .unwrap_or(default_description);
    make_tool(name, description, properties, required)
}

/// Helper to get a string from arguments.
pub fn get_string(args: &Value, key: &str) -> Option<String> {
    args.get(key).and_then(|v| v.as_str().map(String::from))
}

/// Helper to get an i32 from arguments.
pub fn get_i32(args: &Value, key: &str) -> Option<i32> {
    args.get(key).and_then(|v| v.as_i64().map(|n| n as i32))
}

/// Helper to get an i64 from arguments.
pub fn get_i64(args: &Value, key: &str) -> Option<i64> {
    args.get(key).and_then(|v| v.as_i64())
}

/// Helper to get an f64 from arguments.
pub fn get_f64(args: &Value, key: &str) -> Option<f64> {
    args.get(key).and_then(|v| v.as_f64())
}

/// Helper to get a bool from arguments.
pub fn get_bool(args: &Value, key: &str) -> Option<bool> {
    args.get(key).and_then(|v| v.as_bool())
}

/// Helper to get a string array from arguments.
pub fn get_string_array(args: &Value, key: &str) -> Option<Vec<String>> {
    args.get(key).and_then(|v| {
        v.as_array().map(|arr| {
            arr.iter()
                .filter_map(|v| v.as_str().map(String::from))
                .collect()
        })
    })
}

/// Helper to get either a single string or array of strings from arguments.
/// Normalizes to a Vec<String>.
pub fn get_string_or_array(args: &Value, key: &str) -> Option<Vec<String>> {
    args.get(key).and_then(|v| {
        if let Some(s) = v.as_str() {
            // Single string - wrap in vec
            Some(vec![s.to_string()])
        } else {
            v.as_array().map(|arr| {
                arr.iter()
                    .filter_map(|item| item.as_str().map(String::from))
                    .collect()
            })
        }
    })
}

/// Parsed result that may be a list of IDs or a wildcard "*".
pub enum IdList {
    Ids(Vec<String>),
    Wildcard,
}

/// Like get_string_or_array, but recognizes "*" as a wildcard sentinel.
pub fn get_string_or_array_or_wildcard(args: &Value, key: &str) -> Option<IdList> {
    let vals = get_string_or_array(args, key)?;
    if vals.len() == 1 && vals[0] == "*" {
        Some(IdList::Wildcard)
    } else {
        Some(IdList::Ids(vals))
    }
}