use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info};
use crate::error::McpError;
use crate::protocol::{CallToolResult, McpTool, ToolContent};
use crate::server::ToolHandler;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrewMcpInput {
pub task: String,
#[serde(default)]
pub context: HashMap<String, String>,
#[serde(default)]
pub mode: Option<String>,
#[serde(default)]
pub max_iterations: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskResult {
pub name: String,
pub output: String,
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrewMcpOutput {
pub result: String,
pub task_results: Vec<TaskResult>,
pub duration_ms: u64,
pub agents_used: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct CrewMcpConfig {
pub name_prefix: String,
pub include_task_results: bool,
}
impl Default for CrewMcpConfig {
fn default() -> Self {
Self {
name_prefix: "crew_".to_string(),
include_task_results: true,
}
}
}
pub type CrewHandlerFn = Arc<
dyn Fn(
CrewMcpInput,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<CrewMcpOutput, String>> + Send>,
> + Send
+ Sync,
>;
pub struct CrewMcpHandler {
name: String,
description: String,
capabilities: Vec<String>,
handler: CrewHandlerFn,
config: CrewMcpConfig,
}
impl CrewMcpHandler {
pub fn builder(name: impl Into<String>) -> CrewMcpHandlerBuilder {
CrewMcpHandlerBuilder::new(name)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn capabilities(&self) -> &[String] {
&self.capabilities
}
}
#[async_trait]
impl ToolHandler for CrewMcpHandler {
fn definition(&self) -> McpTool {
let schema = json!({
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The main task description for the crew"
},
"context": {
"type": "object",
"description": "Additional context as key-value pairs",
"additionalProperties": { "type": "string" }
},
"mode": {
"type": "string",
"description": "Execution mode: sequential, parallel, or hierarchical",
"enum": ["sequential", "parallel", "hierarchical"]
},
"max_iterations": {
"type": "integer",
"description": "Maximum number of iterations for crew execution"
}
},
"required": ["task"]
});
let description = if self.capabilities.is_empty() {
self.description.clone()
} else {
format!(
"{}\n\nCapabilities: {}",
self.description,
self.capabilities.join(", ")
)
};
McpTool {
name: self.name.clone(),
description: Some(description),
input_schema: schema,
}
}
async fn execute(&self, arguments: serde_json::Value) -> Result<CallToolResult, McpError> {
debug!(tool = %self.name, "Executing crew MCP handler");
let input: CrewMcpInput = serde_json::from_value(arguments)
.map_err(|e| McpError::InvalidParams(format!("Invalid input: {}", e)))?;
info!(
tool = %self.name,
task = %input.task,
mode = ?input.mode,
"Crew executing task"
);
let result = (self.handler)(input).await;
match result {
Ok(output) => {
let response_text = build_success_response(&output, &self.config);
let structured = json!({
"duration_ms": output.duration_ms,
"agents_used": output.agents_used,
"task_count": output.task_results.len(),
});
Ok(CallToolResult {
content: vec![
ToolContent::text(response_text),
ToolContent::text(format!(
"\n---\nStructured output: {}",
serde_json::to_string_pretty(&structured).unwrap_or_default()
)),
],
is_error: false,
})
}
Err(e) => Ok(CallToolResult {
content: vec![ToolContent::text(format!("Crew error: {}", e))],
is_error: true,
}),
}
}
}
fn build_success_response(output: &CrewMcpOutput, config: &CrewMcpConfig) -> String {
let mut parts = vec![output.result.clone()];
if config.include_task_results && !output.task_results.is_empty() {
let tasks_str = output
.task_results
.iter()
.map(|t| format!(" - {} [{}]: {}", t.name, if t.success { "OK" } else { "FAIL" }, t.output))
.collect::<Vec<_>>()
.join("\n");
parts.push(format!("\n\nTask results:\n{}", tasks_str));
}
if !output.agents_used.is_empty() {
parts.push(format!("\n\nAgents used: {}", output.agents_used.join(", ")));
}
parts.join("")
}
pub struct CrewMcpHandlerBuilder {
name: String,
description: String,
capabilities: Vec<String>,
config: CrewMcpConfig,
}
impl CrewMcpHandlerBuilder {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
description: String::new(),
capabilities: Vec::new(),
config: CrewMcpConfig::default(),
}
}
pub fn description(mut self, description: impl Into<String>) -> Self {
self.description = description.into();
self
}
pub fn capability(mut self, capability: impl Into<String>) -> Self {
self.capabilities.push(capability.into());
self
}
pub fn capabilities(mut self, capabilities: Vec<String>) -> Self {
self.capabilities.extend(capabilities);
self
}
pub fn name_prefix(mut self, prefix: impl Into<String>) -> Self {
self.config.name_prefix = prefix.into();
self
}
pub fn include_task_results(mut self, include: bool) -> Self {
self.config.include_task_results = include;
self
}
pub fn config(mut self, config: CrewMcpConfig) -> Self {
self.config = config;
self
}
pub fn handler<F, Fut>(self, handler: F) -> CrewMcpHandler
where
F: Fn(CrewMcpInput) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<CrewMcpOutput, String>> + Send + 'static,
{
let tool_name = format!("{}{}", self.config.name_prefix, self.name);
CrewMcpHandler {
name: tool_name,
description: self.description,
capabilities: self.capabilities,
handler: Arc::new(move |input| Box::pin(handler(input))),
config: self.config,
}
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_crew_mcp_input_full_deserialization() {
let json_val = json!({
"task": "Research quantum computing advances",
"context": {"domain": "physics", "depth": "detailed"},
"mode": "parallel",
"max_iterations": 5
});
let input: CrewMcpInput = serde_json::from_value(json_val).unwrap();
assert_eq!(input.task, "Research quantum computing advances");
assert_eq!(input.context.get("domain").unwrap(), "physics");
assert_eq!(input.context.get("depth").unwrap(), "detailed");
assert_eq!(input.mode, Some("parallel".to_string()));
assert_eq!(input.max_iterations, Some(5));
}
#[test]
fn test_crew_handler_definition_and_schema() {
let handler = CrewMcpHandler::builder("research")
.description("Research crew workflow")
.capability("web_search")
.capability("summarization")
.handler(|_input: CrewMcpInput| async move {
Ok(CrewMcpOutput {
result: "done".to_string(),
task_results: Vec::new(),
duration_ms: 0,
agents_used: Vec::new(),
})
});
let def = handler.definition();
assert_eq!(def.name, "crew_research");
let desc = def.description.unwrap();
assert!(desc.contains("Research crew workflow"));
assert!(desc.contains("web_search"));
assert!(desc.contains("summarization"));
let schema = &def.input_schema;
assert_eq!(schema["type"], "object");
assert!(schema["properties"]["task"].is_object());
assert!(schema["properties"]["context"].is_object());
assert!(schema["properties"]["mode"].is_object());
assert!(schema["properties"]["max_iterations"].is_object());
assert_eq!(schema["required"][0], "task");
}
#[test]
fn test_crew_handler_custom_prefix() {
let handler = CrewMcpHandler::builder("analysis")
.description("Analysis crew")
.name_prefix("workflow_")
.handler(|_input: CrewMcpInput| async move {
Ok(CrewMcpOutput {
result: "done".to_string(),
task_results: Vec::new(),
duration_ms: 0,
agents_used: Vec::new(),
})
});
let def = handler.definition();
assert_eq!(def.name, "workflow_analysis");
}
#[tokio::test]
async fn test_crew_handler_execution_with_mock() {
let handler = CrewMcpHandler::builder("research")
.description("Research crew")
.handler(|input: CrewMcpInput| async move {
let topic = input.context.get("topic").cloned().unwrap_or_default();
Ok(CrewMcpOutput {
result: format!("Researched: {} - {}", input.task, topic),
task_results: vec![
TaskResult {
name: "gather".to_string(),
output: "Gathered data".to_string(),
success: true,
},
TaskResult {
name: "analyze".to_string(),
output: "Analysis complete".to_string(),
success: true,
},
],
duration_ms: 1500,
agents_used: vec!["researcher".to_string(), "analyst".to_string()],
})
});
let result = handler
.execute(json!({
"task": "Find trends",
"context": {"topic": "AI"},
"mode": "sequential"
}))
.await
.unwrap();
assert!(!result.is_error);
let text = result.content[0].as_text().unwrap();
assert!(text.contains("Researched: Find trends - AI"));
assert!(text.contains("gather [OK]"));
assert!(text.contains("analyze [OK]"));
assert!(text.contains("researcher"));
assert!(text.contains("analyst"));
let structured_text = result.content[1].as_text().unwrap();
assert!(structured_text.contains("duration_ms"));
assert!(structured_text.contains("1500"));
}
#[tokio::test]
async fn test_crew_handler_error_returns_is_error() {
let handler = CrewMcpHandler::builder("failing_crew")
.description("A crew that fails")
.handler(|_: CrewMcpInput| async move {
Err("Agent timeout: researcher did not respond".to_string())
});
let result = handler
.execute(json!({"task": "do something"}))
.await
.unwrap();
assert!(result.is_error);
let text = result.content[0].as_text().unwrap();
assert!(text.contains("Crew error"));
assert!(text.contains("Agent timeout"));
}
#[tokio::test]
async fn test_crew_handler_invalid_input_returns_error() {
let handler = CrewMcpHandler::builder("strict_crew")
.description("Crew with strict input")
.handler(|_: CrewMcpInput| async move {
Ok(CrewMcpOutput {
result: "ok".to_string(),
task_results: Vec::new(),
duration_ms: 0,
agents_used: Vec::new(),
})
});
let result = handler.execute(json!({"context": {"a": "b"}})).await;
assert!(result.is_err());
}
#[test]
fn test_crew_mcp_input_minimal_deserialization() {
let json_val = json!({"task": "simple task"});
let input: CrewMcpInput = serde_json::from_value(json_val).unwrap();
assert_eq!(input.task, "simple task");
assert!(input.context.is_empty());
assert!(input.mode.is_none());
assert!(input.max_iterations.is_none());
}
}