use anyhow::Result;
use serde_json::json;
use std::panic;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, info, trace, warn};
use crate::config::Config;
use crate::indexer;
use crate::lock::IndexLock;
use crate::mcp::graphrag::GraphRagProvider;
use crate::mcp::http::{handle_http_connection, HttpServerState};
use crate::mcp::logging::{
init_mcp_logging, log_critical_anyhow_error, log_critical_error, log_indexing_operation,
log_mcp_request, log_mcp_response, log_watcher_event,
};
use crate::mcp::semantic_code::SemanticCodeProvider;
use crate::mcp::types::{parse_mcp_error, JsonRpcError, JsonRpcRequest, JsonRpcResponse, McpError};
use crate::mcp::watcher::run_watcher;
use crate::state;
use crate::store::Store;
use crate::watcher_config::{DEFAULT_ADDITIONAL_DELAY_MS, MCP_DEFAULT_DEBOUNCE_MS};
pub const MCP_DEBOUNCE_MS: u64 = MCP_DEFAULT_DEBOUNCE_MS; pub const MCP_MAX_PENDING_EVENTS: usize = 100;
const MCP_INDEX_TIMEOUT_MS: u64 = 300_000; pub const MCP_ENABLE_VERBOSE_EVENTS: bool = false; const MCP_MAX_REQUEST_SIZE: usize = 10_485_760; const MCP_IO_TIMEOUT_MS: u64 = 30_000;
pub struct McpServer {
semantic_code: SemanticCodeProvider,
graphrag: Option<GraphRagProvider>,
lsp: Option<Arc<Mutex<crate::mcp::lsp::LspProvider>>>,
debug: bool,
working_directory: std::path::PathBuf,
no_git: bool,
indexer_enabled: bool,
watcher_handle: Option<tokio::task::JoinHandle<()>>,
index_handle: Option<tokio::task::JoinHandle<()>>,
indexing_handle: Option<tokio::task::JoinHandle<()>>,
indexing_in_progress: Arc<AtomicBool>,
store: Arc<Store>,
config: Config,
index_rx: Option<mpsc::Receiver<()>>,
}
impl McpServer {
pub async fn new(
config: Config,
debug: bool,
working_directory: std::path::PathBuf,
no_git: bool,
lsp_command: Option<String>,
) -> Result<Self> {
std::env::set_current_dir(&working_directory).map_err(|e| {
anyhow::anyhow!(
"Failed to change to working directory '{}': {}",
working_directory.display(),
e
)
})?;
let store = Store::new().await?;
store.initialize_collections().await?;
let store = Arc::new(store);
init_mcp_logging(working_directory.clone(), debug)?;
let semantic_code = SemanticCodeProvider::new(config.clone(), working_directory.clone());
let graphrag = GraphRagProvider::new(config.clone(), working_directory.clone());
let lsp = if let Some(command) = lsp_command {
tracing::info!(
"LSP provider will be initialized lazily with command: {}",
command
);
let provider = Arc::new(Mutex::new(crate::mcp::lsp::LspProvider::new(
working_directory.clone(),
command,
)));
let provider_clone = provider.clone();
tokio::spawn(async move {
let mut provider_guard = provider_clone.lock().await;
if let Err(e) = provider_guard.start_initialization().await {
tracing::warn!("LSP initialization failed: {}", e);
}
});
Some(provider)
} else {
None
};
Ok(Self {
semantic_code,
graphrag,
lsp,
debug,
working_directory,
no_git,
indexer_enabled: false,
watcher_handle: None,
index_handle: None,
indexing_handle: None,
indexing_in_progress: Arc::new(AtomicBool::new(false)),
store,
config,
index_rx: None,
})
}
pub async fn run(&mut self) -> Result<()> {
let original_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
log_critical_anyhow_error("Panic in MCP server", &anyhow::anyhow!("{}", panic_info));
original_hook(panic_info);
}));
let should_start_indexer = if !self.no_git && self.config.index.require_git {
indexer::git::is_git_repo_root(&self.working_directory)
} else {
true
};
if should_start_indexer {
self.start_watcher().await?;
self.start_background_indexing().await?;
} else {
warn!(
"Indexer not started: Not in a git repository and --no-git flag not set. Use --no-git to enable indexing outside git repos."
);
}
self.indexer_enabled = should_start_indexer;
info!(
debug_mode = self.debug,
indexer_enabled = should_start_indexer,
debounce_ms = MCP_DEBOUNCE_MS,
timeout_ms = MCP_INDEX_TIMEOUT_MS,
max_events = MCP_MAX_PENDING_EVENTS,
max_request_size_mb = MCP_MAX_REQUEST_SIZE / 1_048_576,
io_timeout_ms = MCP_IO_TIMEOUT_MS,
"MCP Server started"
);
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin);
let mut writer = stdout;
let mut line = String::with_capacity(1024); let mut consecutive_errors = 0u32;
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
loop {
line.clear();
tokio::select! {
result = tokio::time::timeout(
Duration::from_millis(MCP_IO_TIMEOUT_MS),
reader.read_line(&mut line)
) => {
match result {
Ok(Ok(0)) => {
debug!("MCP Server: EOF received, shutting down gracefully");
break;
}
Ok(Ok(bytes_read)) => {
if bytes_read > MCP_MAX_REQUEST_SIZE {
log_critical_anyhow_error(
"Request size limit exceeded",
&anyhow::anyhow!("Request size {} exceeds limit {}", bytes_read, MCP_MAX_REQUEST_SIZE)
);
if let Err(e) = self.send_error_response(
&mut writer,
None,
-32700,
"Request too large",
Some(json!({"max_size": MCP_MAX_REQUEST_SIZE}))
).await {
log_critical_anyhow_error("Failed to send error response", &e);
}
continue;
}
let request_method_for_notif = {
serde_json::from_str::<serde_json::Value>(line.trim())
.ok()
.and_then(|v| v.get("method").and_then(|m| m.as_str()).map(|s| s.to_string()))
};
match self.handle_request_safe(&line).await {
Ok(Some(response)) => {
if request_method_for_notif.as_deref() == Some("initialize") && !self.indexer_enabled {
let notif_params = json!({
"level": "warning",
"message": "Octocode indexer is disabled: not in a git repository root. Run with --no-git to enable indexing outside git repos."
});
if let Err(e) = self.send_notification(&mut writer, "notifications/message", notif_params).await {
log_critical_anyhow_error("Failed to send indexer-disabled notification", &e);
}
}
if let Err(e) = self.send_response(&mut writer, &response).await {
log_critical_anyhow_error("Failed to send response", &e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
log_critical_anyhow_error(
"Too many consecutive errors",
&anyhow::anyhow!("Shutting down after {} consecutive errors", consecutive_errors)
);
break;
}
} else {
consecutive_errors = 0; }
}
Ok(None) => {
consecutive_errors = 0;
}
Err(e) => {
log_critical_anyhow_error("Request handling failed", &e);
consecutive_errors += 1;
if let Err(send_err) = self.send_error_response(
&mut writer,
None,
-32603,
"Internal server error",
Some(json!({"error": e.to_string()}))
).await {
log_critical_anyhow_error("Failed to send error response", &send_err);
}
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
log_critical_anyhow_error(
"Too many consecutive errors",
&anyhow::anyhow!("Shutting down after {} consecutive errors", consecutive_errors)
);
break;
}
}
}
}
Ok(Err(e)) => {
if self.is_broken_pipe_error(&e) {
debug!("MCP Server: Broken pipe detected, shutting down gracefully");
break;
} else {
log_critical_error("Error reading from stdin", &e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Err(_) => {
trace!("MCP Server: Timeout reading from stdin (normal - waiting for client requests)");
consecutive_errors = 0;
}
}
}
}
}
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
if let Some(handle) = self.indexing_handle.take() {
handle.abort();
}
debug!("MCP Server stopped");
Ok(())
}
pub async fn run_http(&mut self, bind_addr: &str) -> Result<()> {
let original_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
log_critical_anyhow_error("Panic in MCP server", &anyhow::anyhow!("{}", panic_info));
original_hook(panic_info);
}));
let should_start_indexer = if !self.no_git && self.config.index.require_git {
indexer::git::is_git_repo_root(&self.working_directory)
} else {
true
};
if should_start_indexer {
self.start_watcher().await?;
} else {
warn!(
"Indexer not started: Not in a git repository and --no-git flag not set. Use --no-git to enable indexing outside git repos."
);
}
self.indexer_enabled = should_start_indexer;
let addr = bind_addr
.parse::<std::net::SocketAddr>()
.map_err(|e| anyhow::anyhow!("Invalid bind address '{}': {}", bind_addr, e))?;
info!(
debug_mode = self.debug,
bind_address = %addr,
indexer_enabled = should_start_indexer,
debounce_ms = MCP_DEBOUNCE_MS,
timeout_ms = MCP_INDEX_TIMEOUT_MS,
max_events = MCP_MAX_PENDING_EVENTS,
max_request_size_mb = MCP_MAX_REQUEST_SIZE / 1_048_576,
io_timeout_ms = MCP_IO_TIMEOUT_MS,
"MCP Server started in HTTP mode"
);
let mut index_rx = if should_start_indexer {
self.index_rx.take().unwrap()
} else {
let (_, rx) = mpsc::channel(1);
rx
};
let server_state = Arc::new(Mutex::new(HttpServerState {
semantic_code: self.semantic_code.clone(),
graphrag: self.graphrag.clone(),
lsp: self.lsp.clone(),
}));
let listener = TcpListener::bind(&addr)
.await
.map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", addr, e))?;
info!("MCP HTTP server listening on {}", addr);
let state_for_server = server_state.clone();
let mut server_handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let state = state_for_server.clone();
tokio::spawn(async move {
if let Err(e) = handle_http_connection(stream, state).await {
debug!("HTTP connection error from {}: {}", addr, e);
}
});
}
Err(e) => {
log_critical_anyhow_error(
"HTTP server accept error",
&anyhow::anyhow!("{}", e),
);
break;
}
}
}
});
loop {
tokio::select! {
Some(_) = index_rx.recv() => {
debug!("Processing index request");
sleep(Duration::from_millis(DEFAULT_ADDITIONAL_DELAY_MS)).await;
let indexing_result = tokio::time::timeout(
Duration::from_millis(MCP_INDEX_TIMEOUT_MS),
perform_indexing(&self.store, &self.config, &self.working_directory, self.no_git)
).await;
match indexing_result {
Ok(Ok(())) => {
info!("Reindex completed successfully");
if let Some(ref lsp_provider) = self.lsp {
let mut lsp_guard = lsp_provider.lock().await;
if let Err(e) = Self::update_lsp_after_indexing(&mut lsp_guard, &self.working_directory).await {
debug!("LSP update after indexing failed: {}", e);
}
}
}
Ok(Err(e)) => {
log_critical_anyhow_error("Reindex error", &e);
}
Err(_) => {
log_critical_anyhow_error(
"Reindex timeout",
&anyhow::anyhow!("Reindex timed out after {}ms", MCP_INDEX_TIMEOUT_MS)
);
}
}
self.indexing_in_progress.store(false, Ordering::SeqCst);
}
_ = &mut server_handle => {
warn!("HTTP server task completed unexpectedly");
break;
}
}
}
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
server_handle.abort();
debug!("MCP HTTP Server stopped");
Ok(())
}
async fn start_watcher(&mut self) -> Result<()> {
let (file_tx, file_rx) = mpsc::channel(MCP_MAX_PENDING_EVENTS);
let (index_tx, index_rx) = mpsc::channel(10);
let working_dir = self.working_directory.clone();
let debug = self.debug;
let watcher_handle = tokio::spawn(async move {
if let Err(e) = run_watcher(file_tx, working_dir, debug, MCP_MAX_PENDING_EVENTS).await {
log_critical_anyhow_error("Watcher error", &e);
}
});
let indexing_in_progress = self.indexing_in_progress.clone();
let debug_mode = self.debug;
let index_handle = tokio::spawn(async move {
let mut file_rx = file_rx;
let mut last_event_time = None::<Instant>;
let mut pending_events = 0u32;
loop {
let timeout_duration = Duration::from_millis(MCP_DEBOUNCE_MS);
tokio::select! {
event_result = file_rx.recv() => {
match event_result {
Some(_) => {
pending_events += 1;
last_event_time = Some(Instant::now());
log_watcher_event("file_change", None, pending_events as usize);
}
None => {
debug!("File watcher channel closed, stopping debouncer");
break;
}
}
}
_ = sleep(timeout_duration), if last_event_time.is_some() => {
if let Some(last_time) = last_event_time {
if last_time.elapsed() >= timeout_duration && pending_events > 0 {
if indexing_in_progress
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if debug_mode {
debug!(
pending_events = pending_events,
"Debounce period completed, requesting reindex"
);
}
log_watcher_event("debounce_trigger", None, pending_events as usize);
if (index_tx.send(()).await).is_err() {
if debug_mode {
debug!("Failed to send index request - server may be shutting down");
}
indexing_in_progress.store(false, Ordering::SeqCst);
break;
}
pending_events = 0;
last_event_time = None;
} else if debug_mode {
debug!("Indexing already in progress, will retry after current indexing completes");
}
}
}
}
}
}
});
self.index_rx = Some(index_rx);
self.watcher_handle = Some(watcher_handle);
self.index_handle = Some(index_handle);
Ok(())
}
async fn start_background_indexing(&mut self) -> Result<()> {
let mut index_rx = self.index_rx.take().unwrap();
let store = self.store.clone();
let config = self.config.clone();
let working_directory = self.working_directory.clone();
let no_git = self.no_git;
let indexing_in_progress = self.indexing_in_progress.clone();
let indexing_handle = tokio::spawn(async move {
loop {
match index_rx.recv().await {
Some(_) => {
debug!("Processing index request in background");
sleep(Duration::from_millis(DEFAULT_ADDITIONAL_DELAY_MS)).await;
let indexing_result = tokio::time::timeout(
Duration::from_millis(MCP_INDEX_TIMEOUT_MS),
perform_indexing(&store, &config, &working_directory, no_git),
)
.await;
match indexing_result {
Ok(Ok(())) => {
info!("Background reindex completed successfully");
}
Ok(Err(e)) => {
log_critical_anyhow_error("Background reindex error", &e);
}
Err(_) => {
log_critical_anyhow_error(
"Background reindex timeout",
&anyhow::anyhow!(
"Background reindex timed out after {}ms",
MCP_INDEX_TIMEOUT_MS
),
);
}
}
indexing_in_progress.store(false, Ordering::SeqCst);
}
None => {
debug!("Background indexing channel closed, stopping indexing task");
break;
}
}
}
});
self.indexing_handle = Some(indexing_handle);
Ok(())
}
async fn send_notification(
&self,
writer: &mut tokio::io::Stdout,
method: &str,
params: serde_json::Value,
) -> Result<()> {
let notification = json!({
"jsonrpc": "2.0",
"method": method,
"params": params,
});
let json = serde_json::to_string(¬ification)?;
tokio::time::timeout(Duration::from_millis(MCP_IO_TIMEOUT_MS), async {
writer.write_all(json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await
})
.await
.map_err(|_| anyhow::anyhow!("Notification send timeout"))??;
Ok(())
}
async fn handle_request_safe(&mut self, line: &str) -> Result<Option<JsonRpcResponse>> {
let line = line.trim();
if line.is_empty() {
return Ok(None);
}
let clean_line = String::from_utf8_lossy(line.as_bytes()).to_string();
let line = clean_line.as_str();
let parsed_request: Result<JsonRpcRequest, _> =
panic::catch_unwind(|| serde_json::from_str(line)).unwrap_or_else(|_| {
Err(serde_json::Error::io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"JSON parsing panicked",
)))
});
let request: JsonRpcRequest = match parsed_request {
Ok(req) => {
log_mcp_request(&req.method, req.params.as_ref(), req.id.as_ref());
req
}
Err(e) => {
log_critical_error("Request parsing", &e);
return Ok(Some(JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: None,
result: None,
error: Some(JsonRpcError {
code: -32700,
message: format!("Parse error: {}", e),
data: None,
}),
}));
}
};
let start_time = std::time::Instant::now();
let request_id = request.id.clone();
let request_id_for_error = request_id.clone(); let request_method = request.method.clone();
let response = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
match request.method.as_str() {
"initialize" => self.handle_initialize(&request).await,
"tools/list" => self.handle_tools_list(&request).await,
"tools/call" => self.handle_tools_call(&request).await,
"ping" => self.handle_ping(&request).await,
_ => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: "Method not found".to_string(),
data: Some(json!({
"method": request.method,
"available_methods": ["initialize", "tools/list", "tools/call", "ping"]
})),
}),
},
}
})
})
})) {
Ok(response) => response,
Err(_) => {
log_critical_anyhow_error(
"Request handler panicked",
&anyhow::anyhow!("Method '{}' caused a panic", request_method),
);
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request_id_for_error,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal server error (panic recovered)".to_string(),
data: Some(json!({"method": request_method})),
}),
}
}
};
let duration_ms = start_time.elapsed().as_millis() as u64;
log_mcp_response(
&request_method,
response.error.is_none(),
request_id.as_ref(),
Some(duration_ms),
);
if request_id.is_none() {
return Ok(None);
}
Ok(Some(response))
}
fn is_broken_pipe_error(&self, error: &std::io::Error) -> bool {
use std::io::ErrorKind;
matches!(
error.kind(),
ErrorKind::BrokenPipe
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::UnexpectedEof
)
}
async fn send_response(
&self,
writer: &mut tokio::io::Stdout,
response: &JsonRpcResponse,
) -> Result<()> {
let response_json = match panic::catch_unwind(|| serde_json::to_string(response)) {
Ok(Ok(json)) => {
if json.len() > MCP_MAX_REQUEST_SIZE {
log_critical_anyhow_error(
"Response too large",
&anyhow::anyhow!(
"Response size {} exceeds limit {}",
json.len(),
MCP_MAX_REQUEST_SIZE
),
);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response too large"}}"#.to_string()
} else {
json
}
}
Ok(Err(e)) => {
log_critical_error("Response serialization failed", &e);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response serialization failed"}}"#.to_string()
}
Err(_) => {
log_critical_anyhow_error(
"Response serialization panicked",
&anyhow::anyhow!("JSON serialization panicked"),
);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response serialization panicked"}}"#.to_string()
}
};
tokio::time::timeout(Duration::from_millis(MCP_IO_TIMEOUT_MS), async {
writer.write_all(response_json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await
})
.await
.map_err(|_| anyhow::anyhow!("Response send timeout"))??;
Ok(())
}
async fn send_error_response(
&self,
writer: &mut tokio::io::Stdout,
id: Option<&serde_json::Value>,
code: i32,
message: &str,
data: Option<serde_json::Value>,
) -> Result<()> {
let error_response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: id.cloned(),
result: None,
error: Some(JsonRpcError {
code,
message: message.to_string(),
data,
}),
};
self.send_response(writer, &error_response).await
}
async fn handle_initialize(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
let mut instructions = "This server provides modular AI tools: semantic code search and GraphRAG (if available). Use semantic_search for code/documentation searches and graphrag (if enabled) for relationship queries.".to_string();
if !self.indexer_enabled {
instructions.push_str("\n\nWARNING: Indexer is disabled because you're not in a git repository and the --no-git flag was not set. Use --no-git to enable indexing outside git repos.");
}
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": false
}
},
"serverInfo": {
"name": "octocode-mcp",
"version": "0.1.0",
"description": "Semantic code search server with vector embeddings and optional GraphRAG support"
},
"instructions": instructions
})),
error: None,
}
}
async fn handle_tools_list(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
let mut tools = vec![
SemanticCodeProvider::get_tool_definition(),
SemanticCodeProvider::get_view_signatures_tool_definition(),
];
if self.graphrag.is_some() {
tools.push(GraphRagProvider::get_tool_definition());
}
if self.lsp.is_some() {
tools.extend(crate::mcp::lsp::LspProvider::get_tool_definitions());
}
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"tools": tools
})),
error: None,
}
}
async fn handle_tools_call(&mut self, request: &JsonRpcRequest) -> JsonRpcResponse {
let params = match &request.params {
Some(params) => params,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing parameters object".to_string(),
data: Some(json!({
"details": "Tool calls require a 'params' object with 'name' and 'arguments' fields"
})),
}),
};
}
};
let tool_name = match params.get("name").and_then(|v| v.as_str()) {
Some(name) => name,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing tool name".to_string(),
data: Some(json!({
"details": "Required field 'name' must be provided with the tool name to call"
})),
}),
};
}
};
let default_args = json!({});
let arguments = params.get("arguments").unwrap_or(&default_args);
if let Ok(args_str) = serde_json::to_string(arguments) {
if args_str.len() > MCP_MAX_REQUEST_SIZE {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Tool arguments too large".to_string(),
data: Some(json!({
"max_size": MCP_MAX_REQUEST_SIZE,
"actual_size": args_str.len()
})),
}),
};
}
}
let result = match tool_name {
"semantic_search" => self.semantic_code.execute_search(arguments).await,
"view_signatures" => self.semantic_code.execute_view_signatures(arguments).await,
"graphrag" => match &self.graphrag {
Some(provider) => provider.execute(arguments).await,
None => Err(McpError::method_not_found("GraphRAG is not enabled in the current configuration. Please enable GraphRAG in octocode.toml to use relationship-aware search.", "graphrag")),
},
"lsp_goto_definition" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_goto_definition(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_goto_definition")),
},
"lsp_hover" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_hover(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_hover")),
},
"lsp_find_references" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_find_references(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_find_references")),
},
"lsp_document_symbols" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_document_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_document_symbols")),
},
"lsp_workspace_symbols" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_workspace_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_workspace_symbols")),
},
"lsp_completion" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_completion(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_completion")),
},
_ => {
let available_tools = format!("semantic_search, view_signatures{}{}",
if self.graphrag.is_some() { ", graphrag" } else { "" },
if self.lsp.is_some() { ", lsp_goto_definition, lsp_hover, lsp_find_references, lsp_document_symbols, lsp_workspace_symbols, lsp_completion" } else { "" }
);
Err(McpError::method_not_found(format!("Unknown tool '{}'. Available tools: {}", tool_name, available_tools), tool_name))
}
};
match result {
Ok(content) => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"content": [{
"type": "text",
"text": content
}]
})),
error: None,
},
Err(e) => {
let error_message = e.to_string();
if let Some(mcp_error) = parse_mcp_error(&error_message) {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(mcp_error),
}
} else {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(e.into_jsonrpc()),
}
}
}
}
}
async fn handle_ping(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({})),
error: None,
}
}
async fn update_lsp_after_indexing(
lsp_provider: &mut crate::mcp::lsp::LspProvider,
working_directory: &std::path::Path,
) -> Result<()> {
use crate::indexer::{detect_language, NoindexWalker, PathUtils};
debug!("Updating LSP server with changed files");
let walker = NoindexWalker::create_walker(working_directory).build();
let mut files_updated = 0;
for result in walker {
let entry = match result {
Ok(entry) => entry,
Err(_) => continue,
};
if !entry.file_type().is_some_and(|ft| ft.is_file()) {
continue;
}
if detect_language(entry.path()).is_some() {
let relative_path = PathUtils::to_relative_string(entry.path(), working_directory);
if let Err(e) = lsp_provider.update_file(&relative_path).await {
debug!("Failed to update file {} in LSP: {}", relative_path, e);
} else {
files_updated += 1;
}
}
}
debug!("LSP update completed: {} files updated", files_updated);
Ok(())
}
}
impl Drop for McpServer {
fn drop(&mut self) {
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
if let Some(handle) = self.indexing_handle.take() {
handle.abort();
}
}
}
async fn perform_indexing(
store: &Store,
config: &Config,
working_directory: &std::path::Path,
no_git: bool,
) -> Result<()> {
let start_time = std::time::Instant::now();
log_indexing_operation("direct_reindex_start", None, None, true);
let mut lock = IndexLock::new(working_directory)?;
lock.acquire()?;
debug!("MCP server: acquired indexing lock");
let state = state::create_shared_state();
state.write().current_directory = working_directory.to_path_buf();
let git_repo_root = if !no_git {
indexer::git::find_git_root(working_directory)
} else {
None
};
let indexing_result = indexer::index_files_with_quiet(
store,
state.clone(),
config,
git_repo_root.as_deref(),
true,
)
.await;
lock.release()?;
debug!("MCP server: released indexing lock");
let duration_ms = start_time.elapsed().as_millis() as u64;
match indexing_result {
Ok(()) => {
log_indexing_operation("direct_reindex_complete", None, Some(duration_ms), true);
Ok(())
}
Err(e) => {
log_indexing_operation("direct_reindex_complete", None, Some(duration_ms), false);
log_critical_error("Direct indexing", e.as_ref());
Err(e)
}
}
}