use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, info, warn};
use tree_sitter::{Node, Parser, Query, QueryCursor, Tree};
use crate::common::{
BaseServer, McpContent, McpServerBase, McpTool, McpToolRequest, McpToolResponse,
ServerCapabilities, ServerConfig,
};
use crate::{McpToolsError, Result};
pub struct CodeAnalysisServer {
base: BaseServer,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComplexityAnalysis {
pub cyclomatic_complexity: u32,
pub cognitive_complexity: u32,
pub lines_of_code: u32,
pub functions: u32,
pub classes: u32,
pub nested_depth: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetrics {
pub maintainability_index: f64,
pub documentation_ratio: f64,
pub test_coverage: f64,
pub code_duplication: f64,
pub technical_debt: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityAnalysis {
pub vulnerabilities: Vec<SecurityVulnerability>,
pub risk_score: u32,
pub security_hotspots: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityVulnerability {
pub severity: String,
pub category: String,
pub description: String,
pub line: u32,
pub recommendation: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DependencyAnalysis {
pub imports: Vec<String>,
pub external_dependencies: Vec<String>,
pub internal_dependencies: Vec<String>,
pub circular_dependencies: Vec<String>,
pub unused_imports: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CodeAnalysisResult {
pub language: String,
pub file_path: String,
pub complexity: ComplexityAnalysis,
pub quality: QualityMetrics,
pub security: SecurityAnalysis,
pub dependencies: DependencyAnalysis,
pub suggestions: Vec<String>,
}
impl CodeAnalysisServer {
pub async fn new(config: ServerConfig) -> Result<Self> {
let base = BaseServer::new(config).await?;
Ok(Self { base })
}
fn detect_language(&self, file_path: &str, content: &str) -> String {
if let Some(extension) = std::path::Path::new(file_path).extension() {
match extension.to_str() {
Some("rs") => return "rust".to_string(),
Some("py") => return "python".to_string(),
Some("js") => return "javascript".to_string(),
Some("ts") => return "typescript".to_string(),
Some("go") => return "go".to_string(),
Some("java") => return "java".to_string(),
Some("c") => return "c".to_string(),
Some("cpp") | Some("cc") | Some("cxx") => return "cpp".to_string(),
_ => {}
}
}
if content.contains("fn main()") || content.contains("use std::") {
"rust".to_string()
} else if content.contains("def ") || content.contains("import ") {
"python".to_string()
} else if content.contains("function ") || content.contains("const ") {
"javascript".to_string()
} else if content.contains("package main") || content.contains("func ") {
"go".to_string()
} else if content.contains("public class") || content.contains("import java") {
"java".to_string()
} else {
"unknown".to_string()
}
}
async fn analyze_complexity(
&self,
file_path: &str,
content: &str,
) -> Result<ComplexityAnalysis> {
let language = self.detect_language(file_path, content);
let mut parser = Parser::new();
let tree_sitter_language = match language.as_str() {
"rust" => tree_sitter_rust::language(),
"python" => tree_sitter_python::language(),
"javascript" => tree_sitter_javascript::language(),
"typescript" => tree_sitter_typescript::language_typescript(),
"go" => tree_sitter_go::language(),
"java" => tree_sitter_java::language(),
"c" => tree_sitter_c::language(),
"cpp" => tree_sitter_cpp::language(),
_ => {
return Ok(ComplexityAnalysis {
cyclomatic_complexity: 1,
cognitive_complexity: 1,
lines_of_code: content.lines().count() as u32,
functions: 0,
classes: 0,
nested_depth: 0,
});
}
};
parser
.set_language(tree_sitter_language)
.map_err(|e| McpToolsError::Server(format!("Failed to set language: {}", e)))?;
let tree = parser
.parse(content, None)
.ok_or_else(|| McpToolsError::Server("Failed to parse code".to_string()))?;
let root_node = tree.root_node();
let mut complexity = ComplexityAnalysis {
cyclomatic_complexity: 1, cognitive_complexity: 0,
lines_of_code: content.lines().count() as u32,
functions: 0,
classes: 0,
nested_depth: 0,
};
self.traverse_node(&root_node, &mut complexity, 0);
Ok(complexity)
}
fn traverse_node(&self, node: &Node, complexity: &mut ComplexityAnalysis, depth: u32) {
complexity.nested_depth = complexity.nested_depth.max(depth);
match node.kind() {
"function_item"
| "function_declaration"
| "function_definition"
| "method_declaration" => {
complexity.functions += 1;
complexity.cyclomatic_complexity += 1;
}
"struct_item" | "impl_item" | "class_declaration" | "class_specifier" => {
complexity.classes += 1;
}
"if_expression" | "if_statement" | "match_expression" | "while_statement"
| "for_statement" | "loop_expression" | "try_statement" => {
complexity.cyclomatic_complexity += 1;
complexity.cognitive_complexity += 1;
}
"binary_expression" => {
if let Some(operator) = node.child_by_field_name("operator") {
if matches!(operator.kind(), "&&" | "||" | "and" | "or") {
complexity.cyclomatic_complexity += 1;
}
}
}
_ => {}
}
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
self.traverse_node(&child, complexity, depth + 1);
}
}
}
async fn analyze_quality(&self, content: &str) -> QualityMetrics {
let lines = content.lines().collect::<Vec<_>>();
let total_lines = lines.len() as f64;
let comment_lines = lines
.iter()
.filter(|line| {
let trimmed = line.trim();
trimmed.starts_with("//")
|| trimmed.starts_with("#")
|| trimmed.starts_with("/*")
|| trimmed.starts_with("*")
|| trimmed.starts_with("\"\"\"")
|| trimmed.starts_with("'''")
})
.count() as f64;
let documentation_ratio = if total_lines > 0.0 {
(comment_lines / total_lines) * 100.0
} else {
0.0
};
let maintainability_index = 100.0 - (documentation_ratio * 0.1).max(0.0).min(100.0);
QualityMetrics {
maintainability_index,
documentation_ratio,
test_coverage: 0.0, code_duplication: 0.0, technical_debt: 0.0, }
}
async fn analyze_security(&self, content: &str, language: &str) -> SecurityAnalysis {
let mut vulnerabilities = Vec::new();
let lines: Vec<&str> = content.lines().collect();
for (line_num, line) in lines.iter().enumerate() {
let line_lower = line.to_lowercase();
if line_lower.contains("select")
&& line_lower.contains("where")
&& line_lower.contains("+")
{
vulnerabilities.push(SecurityVulnerability {
severity: "High".to_string(),
category: "SQL Injection".to_string(),
description: "Potential SQL injection vulnerability".to_string(),
line: (line_num + 1) as u32,
recommendation: "Use parameterized queries".to_string(),
});
}
if line_lower.contains("password")
&& (line_lower.contains("=") || line_lower.contains(":"))
{
vulnerabilities.push(SecurityVulnerability {
severity: "Medium".to_string(),
category: "Hardcoded Credentials".to_string(),
description: "Potential hardcoded password".to_string(),
line: (line_num + 1) as u32,
recommendation: "Use environment variables or secure storage".to_string(),
});
}
match language {
"rust" => {
if line.contains("unsafe") {
vulnerabilities.push(SecurityVulnerability {
severity: "Medium".to_string(),
category: "Unsafe Code".to_string(),
description: "Unsafe Rust code block".to_string(),
line: (line_num + 1) as u32,
recommendation: "Review unsafe code for memory safety".to_string(),
});
}
}
"c" | "cpp" => {
if line.contains("strcpy") || line.contains("sprintf") {
vulnerabilities.push(SecurityVulnerability {
severity: "High".to_string(),
category: "Buffer Overflow".to_string(),
description: "Unsafe string function".to_string(),
line: (line_num + 1) as u32,
recommendation: "Use safe string functions like strncpy or snprintf"
.to_string(),
});
}
}
_ => {}
}
}
let risk_score = vulnerabilities
.iter()
.map(|v| match v.severity.as_str() {
"High" => 30,
"Medium" => 15,
"Low" => 5,
_ => 0,
})
.sum::<u32>()
.min(100);
SecurityAnalysis {
vulnerabilities,
risk_score,
security_hotspots: vec![], }
}
async fn analyze_dependencies(&self, content: &str, language: &str) -> DependencyAnalysis {
let lines: Vec<&str> = content.lines().collect();
let mut imports = Vec::new();
let mut external_dependencies = Vec::new();
let mut internal_dependencies = Vec::new();
for line in lines {
let trimmed = line.trim();
match language {
"rust" => {
if trimmed.starts_with("use ") {
let import = trimmed
.strip_prefix("use ")
.unwrap_or("")
.split(';')
.next()
.unwrap_or("")
.trim();
imports.push(import.to_string());
if import.starts_with("std::") || import.starts_with("core::") {
} else if import.starts_with("crate::")
|| import.starts_with("super::")
|| import.starts_with("self::")
{
internal_dependencies.push(import.to_string());
} else {
external_dependencies.push(import.to_string());
}
}
}
"python" => {
if trimmed.starts_with("import ") || trimmed.starts_with("from ") {
imports.push(trimmed.to_string());
}
}
"javascript" | "typescript" => {
if trimmed.starts_with("import ") || trimmed.contains("require(") {
imports.push(trimmed.to_string());
}
}
_ => {}
}
}
DependencyAnalysis {
imports,
external_dependencies,
internal_dependencies,
circular_dependencies: Vec::new(), unused_imports: Vec::new(), }
}
}
#[async_trait]
impl McpServerBase for CodeAnalysisServer {
async fn get_capabilities(&self) -> Result<ServerCapabilities> {
let mut capabilities = self.base.get_capabilities().await?;
let analysis_tools = vec![
McpTool {
name: "analyze_code".to_string(),
description: "Perform comprehensive code analysis including complexity, quality, security, and dependencies".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the code file to analyze"
},
"content": {
"type": "string",
"description": "Code content to analyze (alternative to file_path)"
},
"language": {
"type": "string",
"description": "Programming language (optional, auto-detected if not provided)"
},
"analysis_type": {
"type": "string",
"description": "Type of analysis: 'complexity', 'quality', 'security', 'dependencies', or 'comprehensive'",
"enum": ["complexity", "quality", "security", "dependencies", "comprehensive"]
}
},
"required": ["file_path"],
"oneOf": [
{"required": ["file_path"]},
{"required": ["content"]}
]
}),
category: "code-analysis".to_string(),
requires_permission: false,
permissions: vec![],
},
McpTool {
name: "detect_language".to_string(),
description: "Detect programming language from file path or content".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file"
},
"content": {
"type": "string",
"description": "Code content to analyze"
}
},
"oneOf": [
{"required": ["file_path"]},
{"required": ["content"]}
]
}),
category: "code-analysis".to_string(),
requires_permission: false,
permissions: vec![],
},
McpTool {
name: "complexity_analysis".to_string(),
description: "Analyze code complexity metrics including cyclomatic complexity and nesting depth".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the code file"
},
"content": {
"type": "string",
"description": "Code content to analyze"
},
"language": {
"type": "string",
"description": "Programming language (optional)"
}
},
"oneOf": [
{"required": ["file_path"]},
{"required": ["content"]}
]
}),
category: "code-analysis".to_string(),
requires_permission: false,
permissions: vec![],
},
McpTool {
name: "security_analysis".to_string(),
description: "Analyze code for security vulnerabilities and risks".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the code file"
},
"content": {
"type": "string",
"description": "Code content to analyze"
},
"language": {
"type": "string",
"description": "Programming language (optional)"
}
},
"oneOf": [
{"required": ["file_path"]},
{"required": ["content"]}
]
}),
category: "code-analysis".to_string(),
requires_permission: false,
permissions: vec![],
},
];
capabilities.tools = analysis_tools;
Ok(capabilities)
}
async fn handle_tool_request(&self, request: McpToolRequest) -> Result<McpToolResponse> {
info!("Handling Code Analysis tool request: {}", request.tool);
let file_path = request
.arguments
.get("file_path")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let content = if let Some(content_value) = request.arguments.get("content") {
content_value.as_str().unwrap_or("").to_string()
} else if file_path != "unknown" {
return Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(
"File reading not implemented. Please provide 'content' parameter.".to_string(),
)],
is_error: true,
error: Some("File reading not implemented".to_string()),
metadata: HashMap::new(),
});
} else {
return Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(
"Either 'file_path' or 'content' parameter is required".to_string(),
)],
is_error: true,
error: Some("Missing required parameter".to_string()),
metadata: HashMap::new(),
});
};
let language = request
.arguments
.get("language")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| self.detect_language(file_path, &content));
match request.tool.as_str() {
"analyze_code" => {
debug!("Performing comprehensive code analysis for: {}", file_path);
let analysis_type = request
.arguments
.get("analysis_type")
.and_then(|v| v.as_str())
.unwrap_or("comprehensive");
match analysis_type {
"comprehensive" => {
let complexity = self.analyze_complexity(file_path, &content).await?;
let quality = self.analyze_quality(&content).await;
let security = self.analyze_security(&content, &language).await;
let dependencies = self.analyze_dependencies(&content, &language).await;
let mut suggestions = Vec::new();
if complexity.cyclomatic_complexity > 10 {
suggestions
.push("Consider breaking down complex functions".to_string());
}
if complexity.nested_depth > 4 {
suggestions
.push("Reduce nesting depth for better readability".to_string());
}
if quality.documentation_ratio < 10.0 {
suggestions.push("Add more documentation and comments".to_string());
}
let result = CodeAnalysisResult {
language: language.clone(),
file_path: file_path.to_string(),
complexity,
quality,
security,
dependencies,
suggestions,
};
let content_text = format!(
"Code Analysis Complete\n\
Language: {}\n\
Cyclomatic Complexity: {}\n\
Lines of Code: {}\n\
Functions: {}\n\
Security Issues: {}\n\
Risk Score: {}/100",
result.language,
result.complexity.cyclomatic_complexity,
result.complexity.lines_of_code,
result.complexity.functions,
result.security.vulnerabilities.len(),
result.security.risk_score
);
let mut metadata = HashMap::new();
metadata
.insert("analysis_result".to_string(), serde_json::to_value(result)?);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
"complexity" => {
let complexity = self.analyze_complexity(file_path, &content).await?;
let content_text = format!(
"Complexity Analysis\n\
Cyclomatic Complexity: {}\n\
Cognitive Complexity: {}\n\
Lines of Code: {}\n\
Functions: {}\n\
Classes: {}\n\
Max Nesting Depth: {}",
complexity.cyclomatic_complexity,
complexity.cognitive_complexity,
complexity.lines_of_code,
complexity.functions,
complexity.classes,
complexity.nested_depth
);
let mut metadata = HashMap::new();
metadata
.insert("complexity".to_string(), serde_json::to_value(complexity)?);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
"security" => {
let security = self.analyze_security(&content, &language).await;
let content_text = format!(
"Security Analysis\n\
Vulnerabilities Found: {}\n\
Risk Score: {}/100\n\
Security Hotspots: {}",
security.vulnerabilities.len(),
security.risk_score,
security.security_hotspots.len()
);
let mut metadata = HashMap::new();
metadata.insert("security".to_string(), serde_json::to_value(security)?);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
_ => {
warn!("Unknown analysis type: {}", analysis_type);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(format!(
"Unknown analysis type: {}",
analysis_type
))],
is_error: true,
error: Some("Invalid analysis type".to_string()),
metadata: HashMap::new(),
})
}
}
}
"detect_language" => {
debug!("Detecting language for: {}", file_path);
let detected_language = self.detect_language(file_path, &content);
let content_text = format!("Detected Language: {}", detected_language);
let mut metadata = HashMap::new();
metadata.insert(
"language".to_string(),
serde_json::Value::String(detected_language),
);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
"complexity_analysis" => {
debug!("Analyzing complexity for: {}", file_path);
let complexity = self.analyze_complexity(file_path, &content).await?;
let content_text = format!(
"Complexity Analysis\n\
Cyclomatic Complexity: {}\n\
Lines of Code: {}\n\
Functions: {}",
complexity.cyclomatic_complexity,
complexity.lines_of_code,
complexity.functions
);
let mut metadata = HashMap::new();
metadata.insert("complexity".to_string(), serde_json::to_value(complexity)?);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
"security_analysis" => {
debug!("Analyzing security for: {}", file_path);
let security = self.analyze_security(&content, &language).await;
let content_text = format!(
"Security Analysis\n\
Vulnerabilities: {}\n\
Risk Score: {}/100",
security.vulnerabilities.len(),
security.risk_score
);
let mut metadata = HashMap::new();
metadata.insert("security".to_string(), serde_json::to_value(security)?);
Ok(McpToolResponse {
id: request.id,
content: vec![McpContent::text(content_text)],
is_error: false,
error: None,
metadata,
})
}
_ => {
warn!("Unknown Code Analysis tool: {}", request.tool);
Err(McpToolsError::Server(format!(
"Unknown Code Analysis tool: {}",
request.tool
)))
}
}
}
async fn get_stats(&self) -> Result<crate::common::ServerStats> {
self.base.get_stats().await
}
async fn initialize(&mut self) -> Result<()> {
info!("Initializing Code Analysis MCP Server");
Ok(())
}
async fn shutdown(&mut self) -> Result<()> {
info!("Shutting down Code Analysis MCP Server");
Ok(())
}
}