use anyhow::Result;
use serde_json::json;
use std::panic;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, info, trace, warn};
use crate::config::Config;
use crate::indexer;
use crate::lock::IndexLock;
use crate::mcp::graphrag::GraphRagProvider;
use crate::mcp::logging::{
init_mcp_logging, log_critical_anyhow_error, log_critical_error, log_indexing_operation,
log_mcp_request, log_mcp_response, log_watcher_event,
};
use crate::mcp::semantic_code::SemanticCodeProvider;
use crate::mcp::types::{parse_mcp_error, JsonRpcError, JsonRpcRequest, JsonRpcResponse, McpError};
use crate::state;
use crate::store::Store;
use crate::watcher_config::{
IgnorePatterns, DEFAULT_ADDITIONAL_DELAY_MS, MCP_DEFAULT_DEBOUNCE_MS, MIN_DEBOUNCE_MS,
};
const MCP_DEBOUNCE_MS: u64 = MCP_DEFAULT_DEBOUNCE_MS; const MCP_MAX_PENDING_EVENTS: usize = 100;
const MCP_INDEX_TIMEOUT_MS: u64 = 300_000; const MCP_ENABLE_VERBOSE_EVENTS: bool = false; const MCP_MAX_REQUEST_SIZE: usize = 10_485_760; const MCP_IO_TIMEOUT_MS: u64 = 30_000;
pub struct McpServer {
semantic_code: SemanticCodeProvider,
graphrag: Option<GraphRagProvider>,
lsp: Option<Arc<Mutex<crate::mcp::lsp::LspProvider>>>,
debug: bool,
working_directory: std::path::PathBuf,
no_git: bool,
watcher_handle: Option<tokio::task::JoinHandle<()>>,
index_handle: Option<tokio::task::JoinHandle<()>>,
indexing_handle: Option<tokio::task::JoinHandle<()>>,
indexing_in_progress: Arc<AtomicBool>,
store: Arc<Store>,
config: Config,
index_rx: Option<mpsc::Receiver<()>>,
}
impl McpServer {
pub async fn new(
config: Config,
debug: bool,
working_directory: std::path::PathBuf,
no_git: bool,
lsp_command: Option<String>,
) -> Result<Self> {
std::env::set_current_dir(&working_directory).map_err(|e| {
anyhow::anyhow!(
"Failed to change to working directory '{}': {}",
working_directory.display(),
e
)
})?;
let store = Store::new().await?;
store.initialize_collections().await?;
let store = Arc::new(store);
init_mcp_logging(working_directory.clone(), debug)?;
let semantic_code = SemanticCodeProvider::new(config.clone(), working_directory.clone());
let graphrag = GraphRagProvider::new(config.clone(), working_directory.clone());
let lsp = if let Some(command) = lsp_command {
tracing::info!(
"LSP provider will be initialized lazily with command: {}",
command
);
let provider = Arc::new(Mutex::new(crate::mcp::lsp::LspProvider::new(
working_directory.clone(),
command,
)));
let provider_clone = provider.clone();
tokio::spawn(async move {
let mut provider_guard = provider_clone.lock().await;
if let Err(e) = provider_guard.start_initialization().await {
tracing::warn!("LSP initialization failed: {}", e);
}
});
Some(provider)
} else {
None
};
Ok(Self {
semantic_code,
graphrag,
lsp,
debug,
working_directory,
no_git,
watcher_handle: None,
index_handle: None,
indexing_handle: None,
indexing_in_progress: Arc::new(AtomicBool::new(false)),
store,
config,
index_rx: None,
})
}
pub async fn run(&mut self) -> Result<()> {
let original_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
log_critical_anyhow_error("Panic in MCP server", &anyhow::anyhow!("{}", panic_info));
original_hook(panic_info);
}));
self.start_watcher().await?;
self.start_background_indexing().await?;
info!(
debug_mode = self.debug,
debounce_ms = MCP_DEBOUNCE_MS,
timeout_ms = MCP_INDEX_TIMEOUT_MS,
max_events = MCP_MAX_PENDING_EVENTS,
max_request_size_mb = MCP_MAX_REQUEST_SIZE / 1_048_576,
io_timeout_ms = MCP_IO_TIMEOUT_MS,
"MCP Server started"
);
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin);
let mut writer = stdout;
let mut line = String::with_capacity(1024); let mut consecutive_errors = 0u32;
const MAX_CONSECUTIVE_ERRORS: u32 = 10;
loop {
line.clear();
tokio::select! {
result = tokio::time::timeout(
Duration::from_millis(MCP_IO_TIMEOUT_MS),
reader.read_line(&mut line)
) => {
match result {
Ok(Ok(0)) => {
debug!("MCP Server: EOF received, shutting down gracefully");
break;
}
Ok(Ok(bytes_read)) => {
if bytes_read > MCP_MAX_REQUEST_SIZE {
log_critical_anyhow_error(
"Request size limit exceeded",
&anyhow::anyhow!("Request size {} exceeds limit {}", bytes_read, MCP_MAX_REQUEST_SIZE)
);
if let Err(e) = self.send_error_response(
&mut writer,
None,
-32700,
"Request too large",
Some(json!({"max_size": MCP_MAX_REQUEST_SIZE}))
).await {
log_critical_anyhow_error("Failed to send error response", &e);
}
continue;
}
match self.handle_request_safe(&line).await {
Ok(Some(response)) => {
if let Err(e) = self.send_response(&mut writer, &response).await {
log_critical_anyhow_error("Failed to send response", &e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
log_critical_anyhow_error(
"Too many consecutive errors",
&anyhow::anyhow!("Shutting down after {} consecutive errors", consecutive_errors)
);
break;
}
} else {
consecutive_errors = 0; }
}
Ok(None) => {
consecutive_errors = 0;
}
Err(e) => {
log_critical_anyhow_error("Request handling failed", &e);
consecutive_errors += 1;
if let Err(send_err) = self.send_error_response(
&mut writer,
None,
-32603,
"Internal server error",
Some(json!({"error": e.to_string()}))
).await {
log_critical_anyhow_error("Failed to send error response", &send_err);
}
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
log_critical_anyhow_error(
"Too many consecutive errors",
&anyhow::anyhow!("Shutting down after {} consecutive errors", consecutive_errors)
);
break;
}
}
}
}
Ok(Err(e)) => {
if self.is_broken_pipe_error(&e) {
debug!("MCP Server: Broken pipe detected, shutting down gracefully");
break;
} else {
log_critical_error("Error reading from stdin", &e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Err(_) => {
trace!("MCP Server: Timeout reading from stdin (normal - waiting for client requests)");
consecutive_errors = 0;
}
}
}
}
}
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
if let Some(handle) = self.indexing_handle.take() {
handle.abort();
}
debug!("MCP Server stopped");
Ok(())
}
pub async fn run_http(&mut self, bind_addr: &str) -> Result<()> {
let original_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
log_critical_anyhow_error("Panic in MCP server", &anyhow::anyhow!("{}", panic_info));
original_hook(panic_info);
}));
self.start_watcher().await?;
let addr = bind_addr
.parse::<std::net::SocketAddr>()
.map_err(|e| anyhow::anyhow!("Invalid bind address '{}': {}", bind_addr, e))?;
info!(
debug_mode = self.debug,
bind_address = %addr,
debounce_ms = MCP_DEBOUNCE_MS,
timeout_ms = MCP_INDEX_TIMEOUT_MS,
max_events = MCP_MAX_PENDING_EVENTS,
max_request_size_mb = MCP_MAX_REQUEST_SIZE / 1_048_576,
io_timeout_ms = MCP_IO_TIMEOUT_MS,
"MCP Server started in HTTP mode"
);
let mut index_rx = self.index_rx.take().unwrap();
let server_state = Arc::new(Mutex::new(HttpServerState {
semantic_code: self.semantic_code.clone(),
graphrag: self.graphrag.clone(),
lsp: self.lsp.clone(),
}));
let listener = TcpListener::bind(&addr)
.await
.map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", addr, e))?;
info!("MCP HTTP server listening on {}", addr);
let state_for_server = server_state.clone();
let mut server_handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let state = state_for_server.clone();
tokio::spawn(async move {
if let Err(e) = handle_http_connection(stream, state).await {
debug!("HTTP connection error from {}: {}", addr, e);
}
});
}
Err(e) => {
log_critical_anyhow_error(
"HTTP server accept error",
&anyhow::anyhow!("{}", e),
);
break;
}
}
}
});
loop {
tokio::select! {
Some(_) = index_rx.recv() => {
debug!("Processing index request");
sleep(Duration::from_millis(DEFAULT_ADDITIONAL_DELAY_MS)).await;
let indexing_result = tokio::time::timeout(
Duration::from_millis(MCP_INDEX_TIMEOUT_MS),
perform_indexing(&self.store, &self.config, &self.working_directory, self.no_git)
).await;
match indexing_result {
Ok(Ok(())) => {
info!("Reindex completed successfully");
if let Some(ref lsp_provider) = self.lsp {
let mut lsp_guard = lsp_provider.lock().await;
if let Err(e) = Self::update_lsp_after_indexing(&mut lsp_guard, &self.working_directory).await {
debug!("LSP update after indexing failed: {}", e);
}
}
}
Ok(Err(e)) => {
log_critical_anyhow_error("Reindex error", &e);
}
Err(_) => {
log_critical_anyhow_error(
"Reindex timeout",
&anyhow::anyhow!("Reindex timed out after {}ms", MCP_INDEX_TIMEOUT_MS)
);
}
}
self.indexing_in_progress.store(false, Ordering::SeqCst);
}
_ = &mut server_handle => {
warn!("HTTP server task completed unexpectedly");
break;
}
}
}
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
server_handle.abort();
debug!("MCP HTTP Server stopped");
Ok(())
}
async fn start_watcher(&mut self) -> Result<()> {
let (file_tx, file_rx) = mpsc::channel(MCP_MAX_PENDING_EVENTS);
let (index_tx, index_rx) = mpsc::channel(10);
let working_dir = self.working_directory.clone();
let debug = self.debug;
let watcher_handle = tokio::spawn(async move {
if let Err(e) = run_watcher(file_tx, working_dir, debug).await {
log_critical_anyhow_error("Watcher error", &e);
}
});
let indexing_in_progress = self.indexing_in_progress.clone();
let debug_mode = self.debug;
let index_handle = tokio::spawn(async move {
let mut file_rx = file_rx;
let mut last_event_time = None::<Instant>;
let mut pending_events = 0u32;
loop {
let timeout_duration = Duration::from_millis(MCP_DEBOUNCE_MS);
tokio::select! {
event_result = file_rx.recv() => {
match event_result {
Some(_) => {
pending_events += 1;
last_event_time = Some(Instant::now());
log_watcher_event("file_change", None, pending_events as usize);
}
None => {
debug!("File watcher channel closed, stopping debouncer");
break;
}
}
}
_ = sleep(timeout_duration), if last_event_time.is_some() => {
if let Some(last_time) = last_event_time {
if last_time.elapsed() >= timeout_duration && pending_events > 0 {
if indexing_in_progress
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
if debug_mode {
debug!(
pending_events = pending_events,
"Debounce period completed, requesting reindex"
);
}
log_watcher_event("debounce_trigger", None, pending_events as usize);
if (index_tx.send(()).await).is_err() {
if debug_mode {
debug!("Failed to send index request - server may be shutting down");
}
indexing_in_progress.store(false, Ordering::SeqCst);
break;
}
pending_events = 0;
last_event_time = None;
} else if debug_mode {
debug!("Indexing already in progress, will retry after current indexing completes");
}
}
}
}
}
}
});
self.index_rx = Some(index_rx);
self.watcher_handle = Some(watcher_handle);
self.index_handle = Some(index_handle);
Ok(())
}
async fn start_background_indexing(&mut self) -> Result<()> {
let mut index_rx = self.index_rx.take().unwrap();
let store = self.store.clone();
let config = self.config.clone();
let working_directory = self.working_directory.clone();
let no_git = self.no_git;
let indexing_in_progress = self.indexing_in_progress.clone();
let indexing_handle = tokio::spawn(async move {
loop {
match index_rx.recv().await {
Some(_) => {
debug!("Processing index request in background");
sleep(Duration::from_millis(DEFAULT_ADDITIONAL_DELAY_MS)).await;
let indexing_result = tokio::time::timeout(
Duration::from_millis(MCP_INDEX_TIMEOUT_MS),
perform_indexing(&store, &config, &working_directory, no_git),
)
.await;
match indexing_result {
Ok(Ok(())) => {
info!("Background reindex completed successfully");
}
Ok(Err(e)) => {
log_critical_anyhow_error("Background reindex error", &e);
}
Err(_) => {
log_critical_anyhow_error(
"Background reindex timeout",
&anyhow::anyhow!(
"Background reindex timed out after {}ms",
MCP_INDEX_TIMEOUT_MS
),
);
}
}
indexing_in_progress.store(false, Ordering::SeqCst);
}
None => {
debug!("Background indexing channel closed, stopping indexing task");
break;
}
}
}
});
self.indexing_handle = Some(indexing_handle);
Ok(())
}
async fn handle_request_safe(&mut self, line: &str) -> Result<Option<JsonRpcResponse>> {
let line = line.trim();
if line.is_empty() {
return Ok(None);
}
let clean_line = String::from_utf8_lossy(line.as_bytes()).to_string();
let line = clean_line.as_str();
let parsed_request: Result<JsonRpcRequest, _> =
panic::catch_unwind(|| serde_json::from_str(line)).unwrap_or_else(|_| {
Err(serde_json::Error::io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"JSON parsing panicked",
)))
});
let request: JsonRpcRequest = match parsed_request {
Ok(req) => {
log_mcp_request(&req.method, req.params.as_ref(), req.id.as_ref());
req
}
Err(e) => {
log_critical_error("Request parsing", &e);
return Ok(Some(JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: None,
result: None,
error: Some(JsonRpcError {
code: -32700,
message: format!("Parse error: {}", e),
data: None,
}),
}));
}
};
let start_time = std::time::Instant::now();
let request_id = request.id.clone();
let request_id_for_error = request_id.clone(); let request_method = request.method.clone();
let response = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
match request.method.as_str() {
"initialize" => self.handle_initialize(&request).await,
"tools/list" => self.handle_tools_list(&request).await,
"tools/call" => self.handle_tools_call(&request).await,
"ping" => self.handle_ping(&request).await,
_ => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: "Method not found".to_string(),
data: Some(json!({
"method": request.method,
"available_methods": ["initialize", "tools/list", "tools/call", "ping"]
})),
}),
},
}
})
})
})) {
Ok(response) => response,
Err(_) => {
log_critical_anyhow_error(
"Request handler panicked",
&anyhow::anyhow!("Method '{}' caused a panic", request_method),
);
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request_id_for_error,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal server error (panic recovered)".to_string(),
data: Some(json!({"method": request_method})),
}),
}
}
};
let duration_ms = start_time.elapsed().as_millis() as u64;
log_mcp_response(
&request_method,
response.error.is_none(),
request_id.as_ref(),
Some(duration_ms),
);
if request_id.is_none() {
return Ok(None);
}
Ok(Some(response))
}
fn is_broken_pipe_error(&self, error: &std::io::Error) -> bool {
use std::io::ErrorKind;
matches!(
error.kind(),
ErrorKind::BrokenPipe
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::UnexpectedEof
)
}
async fn send_response(
&self,
writer: &mut tokio::io::Stdout,
response: &JsonRpcResponse,
) -> Result<()> {
let response_json = match panic::catch_unwind(|| serde_json::to_string(response)) {
Ok(Ok(json)) => {
if json.len() > MCP_MAX_REQUEST_SIZE {
log_critical_anyhow_error(
"Response too large",
&anyhow::anyhow!(
"Response size {} exceeds limit {}",
json.len(),
MCP_MAX_REQUEST_SIZE
),
);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response too large"}}"#.to_string()
} else {
json
}
}
Ok(Err(e)) => {
log_critical_error("Response serialization failed", &e);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response serialization failed"}}"#.to_string()
}
Err(_) => {
log_critical_anyhow_error(
"Response serialization panicked",
&anyhow::anyhow!("JSON serialization panicked"),
);
r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Response serialization panicked"}}"#.to_string()
}
};
tokio::time::timeout(Duration::from_millis(MCP_IO_TIMEOUT_MS), async {
writer.write_all(response_json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await
})
.await
.map_err(|_| anyhow::anyhow!("Response send timeout"))??;
Ok(())
}
async fn send_error_response(
&self,
writer: &mut tokio::io::Stdout,
id: Option<&serde_json::Value>,
code: i32,
message: &str,
data: Option<serde_json::Value>,
) -> Result<()> {
let error_response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: id.cloned(),
result: None,
error: Some(JsonRpcError {
code,
message: message.to_string(),
data,
}),
};
self.send_response(writer, &error_response).await
}
async fn handle_initialize(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": false
}
},
"serverInfo": {
"name": "octocode-mcp",
"version": "0.1.0",
"description": "Semantic code search server with vector embeddings and optional GraphRAG support"
},
"instructions": "This server provides modular AI tools: semantic code search and GraphRAG (if available). Use 'semantic_search' for code/documentation searches and 'graphrag' (if enabled) for relationship queries."
})),
error: None,
}
}
async fn handle_tools_list(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
let mut tools = vec![
SemanticCodeProvider::get_tool_definition(),
SemanticCodeProvider::get_view_signatures_tool_definition(),
];
if self.graphrag.is_some() {
tools.push(GraphRagProvider::get_tool_definition());
}
if self.lsp.is_some() {
tools.extend(crate::mcp::lsp::LspProvider::get_tool_definitions());
}
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"tools": tools
})),
error: None,
}
}
async fn handle_tools_call(&mut self, request: &JsonRpcRequest) -> JsonRpcResponse {
let params = match &request.params {
Some(params) => params,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing parameters object".to_string(),
data: Some(json!({
"details": "Tool calls require a 'params' object with 'name' and 'arguments' fields"
})),
}),
};
}
};
let tool_name = match params.get("name").and_then(|v| v.as_str()) {
Some(name) => name,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing tool name".to_string(),
data: Some(json!({
"details": "Required field 'name' must be provided with the tool name to call"
})),
}),
};
}
};
let default_args = json!({});
let arguments = params.get("arguments").unwrap_or(&default_args);
if let Ok(args_str) = serde_json::to_string(arguments) {
if args_str.len() > MCP_MAX_REQUEST_SIZE {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Tool arguments too large".to_string(),
data: Some(json!({
"max_size": MCP_MAX_REQUEST_SIZE,
"actual_size": args_str.len()
})),
}),
};
}
}
let result = match tool_name {
"semantic_search" => self.semantic_code.execute_search(arguments).await,
"view_signatures" => self.semantic_code.execute_view_signatures(arguments).await,
"graphrag" => match &self.graphrag {
Some(provider) => provider.execute(arguments).await,
None => Err(McpError::method_not_found("GraphRAG is not enabled in the current configuration. Please enable GraphRAG in octocode.toml to use relationship-aware search.", "graphrag")),
},
"lsp_goto_definition" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_goto_definition(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_goto_definition")),
},
"lsp_hover" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_hover(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_hover")),
},
"lsp_find_references" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_find_references(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_find_references")),
},
"lsp_document_symbols" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_document_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_document_symbols")),
},
"lsp_workspace_symbols" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_workspace_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_workspace_symbols")),
},
"lsp_completion" => match &self.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_completion(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_completion")),
},
_ => {
let available_tools = format!("semantic_search, view_signatures{}{}",
if self.graphrag.is_some() { ", graphrag" } else { "" },
if self.lsp.is_some() { ", lsp_goto_definition, lsp_hover, lsp_find_references, lsp_document_symbols, lsp_workspace_symbols, lsp_completion" } else { "" }
);
Err(McpError::method_not_found(format!("Unknown tool '{}'. Available tools: {}", tool_name, available_tools), tool_name))
}
};
match result {
Ok(content) => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"content": [{
"type": "text",
"text": content
}]
})),
error: None,
},
Err(e) => {
let error_message = e.to_string();
if let Some(mcp_error) = parse_mcp_error(&error_message) {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(mcp_error),
}
} else {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(e.into_jsonrpc()),
}
}
}
}
}
async fn handle_ping(&self, request: &JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({})),
error: None,
}
}
async fn update_lsp_after_indexing(
lsp_provider: &mut crate::mcp::lsp::LspProvider,
working_directory: &std::path::Path,
) -> Result<()> {
use crate::indexer::{detect_language, NoindexWalker, PathUtils};
debug!("Updating LSP server with changed files");
let walker = NoindexWalker::create_walker(working_directory).build();
let mut files_updated = 0;
for result in walker {
let entry = match result {
Ok(entry) => entry,
Err(_) => continue,
};
if !entry.file_type().is_some_and(|ft| ft.is_file()) {
continue;
}
if detect_language(entry.path()).is_some() {
let relative_path = PathUtils::to_relative_string(entry.path(), working_directory);
if let Err(e) = lsp_provider.update_file(&relative_path).await {
debug!("Failed to update file {} in LSP: {}", relative_path, e);
} else {
files_updated += 1;
}
}
}
debug!("LSP update completed: {} files updated", files_updated);
Ok(())
}
}
impl Drop for McpServer {
fn drop(&mut self) {
if let Some(handle) = self.watcher_handle.take() {
handle.abort();
}
if let Some(handle) = self.index_handle.take() {
handle.abort();
}
}
}
async fn perform_indexing(
store: &Store,
config: &Config,
working_directory: &std::path::Path,
no_git: bool,
) -> Result<()> {
let start_time = std::time::Instant::now();
log_indexing_operation("direct_reindex_start", None, None, true);
let mut lock = IndexLock::new(working_directory)?;
lock.acquire()?;
debug!("MCP server: acquired indexing lock");
let state = state::create_shared_state();
state.write().current_directory = working_directory.to_path_buf();
let git_repo_root = if !no_git {
indexer::git::find_git_root(working_directory)
} else {
None
};
let indexing_result = indexer::index_files_with_quiet(
store,
state.clone(),
config,
git_repo_root.as_deref(),
true,
)
.await;
lock.release()?;
debug!("MCP server: released indexing lock");
let duration_ms = start_time.elapsed().as_millis() as u64;
match indexing_result {
Ok(()) => {
log_indexing_operation("direct_reindex_complete", None, Some(duration_ms), true);
Ok(())
}
Err(e) => {
log_indexing_operation("direct_reindex_complete", None, Some(duration_ms), false);
log_critical_error("Direct indexing", e.as_ref());
Err(e)
}
}
}
async fn run_watcher(
tx: mpsc::Sender<()>,
working_dir: std::path::PathBuf,
debug: bool,
) -> Result<()> {
use notify_debouncer_mini::notify::RecursiveMode;
use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
let (debouncer_tx, mut debouncer_rx) = mpsc::channel(MCP_MAX_PENDING_EVENTS);
let ignore_patterns = IgnorePatterns::new(working_dir.clone());
let mut debouncer = new_debouncer(
Duration::from_millis(MIN_DEBOUNCE_MS),
move |res: Result<Vec<DebouncedEvent>, notify_debouncer_mini::notify::Error>| match res {
Ok(events) => {
let relevant_events: Vec<_> = events
.iter()
.filter(|event| !ignore_patterns.should_ignore_path(&event.path))
.collect();
if !relevant_events.is_empty() {
log_watcher_event("file_change_batch", None, relevant_events.len());
if debug && MCP_ENABLE_VERBOSE_EVENTS {
trace!(
event_count = relevant_events.len(),
"File watcher detected relevant events"
);
for event in &relevant_events {
trace!(
event_kind = ?event.kind,
event_path = ?event.path,
"File watcher event detail"
);
}
}
if let Err(e) = debouncer_tx.try_send(()) {
warn!(
error = ?e,
"Failed to send file event - channel may be full"
);
}
}
}
Err(e) => {
log_critical_error("File watcher error", &e);
}
},
)
.map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
if let Err(e) = debouncer
.watcher()
.watch(&working_dir, RecursiveMode::Recursive)
{
log_critical_error("Failed to start watching directory", &e);
return Err(anyhow::anyhow!("Failed to watch directory: {}", e));
}
debug!(
working_dir = %working_dir.display(),
debounce_ms = MCP_DEBOUNCE_MS,
additional_delay_ms = DEFAULT_ADDITIONAL_DELAY_MS,
"File watcher started with ignore patterns loaded"
);
let mut consecutive_errors = 0u32;
const MAX_WATCHER_ERRORS: u32 = 5;
while let Some(()) = debouncer_rx.recv().await {
match tx.send(()).await {
Ok(()) => {
consecutive_errors = 0; }
Err(e) => {
consecutive_errors += 1;
log_critical_error("Event channel send failed", &e);
debug!(
consecutive_errors = consecutive_errors,
"Event channel closed or failed"
);
if consecutive_errors >= MAX_WATCHER_ERRORS {
log_critical_anyhow_error(
"Too many watcher errors",
&anyhow::anyhow!(
"Stopping file watcher after {} consecutive errors",
consecutive_errors
),
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
debug!("File watcher stopped");
Ok(())
}
#[derive(Clone)]
struct HttpServerState {
semantic_code: SemanticCodeProvider,
graphrag: Option<GraphRagProvider>,
lsp: Option<Arc<Mutex<crate::mcp::lsp::LspProvider>>>,
}
async fn handle_http_connection(
mut stream: TcpStream,
state: Arc<Mutex<HttpServerState>>,
) -> Result<()> {
let mut buffer = vec![0; 8192];
let bytes_read = stream.read(&mut buffer).await?;
if bytes_read == 0 {
return Ok(());
}
let request_str = String::from_utf8_lossy(&buffer[..bytes_read]);
let mut lines = request_str.lines();
let request_line = lines.next().unwrap_or("");
if !request_line.starts_with("POST /mcp") && !request_line.starts_with("POST / ") {
let response = "HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n";
stream.write_all(response.as_bytes()).await?;
return Ok(());
}
let mut content_length = 0;
let mut body_start = 0;
for (i, line) in lines.enumerate() {
if line.is_empty() {
let lines_before_body: Vec<&str> = request_str.lines().take(i + 2).collect();
body_start = lines_before_body.join("\n").len() + 1; break;
}
if line.to_lowercase().starts_with("content-length:") {
if let Some(len_str) = line.split(':').nth(1) {
content_length = len_str.trim().parse().unwrap_or(0);
}
}
}
let json_body = if content_length > 0 && body_start < bytes_read {
let body_bytes =
&buffer[body_start..std::cmp::min(body_start + content_length, bytes_read)];
String::from_utf8_lossy(body_bytes).to_string()
} else {
return send_http_error(&mut stream, 400, "Missing or invalid request body").await;
};
let request: JsonRpcRequest = match serde_json::from_str(&json_body) {
Ok(req) => req,
Err(e) => {
debug!("Failed to parse JSON-RPC request: {}", e);
return send_http_error(&mut stream, 400, "Invalid JSON-RPC request").await;
}
};
log_mcp_request(
&request.method,
request.params.as_ref(),
request.id.as_ref(),
);
let start_time = std::time::Instant::now();
let request_id = request.id.clone();
let request_method = request.method.clone();
let server_state = state.lock().await;
let response = match request.method.as_str() {
"initialize" => handle_initialize_http(&request),
"tools/list" => handle_tools_list_http(&request, &server_state),
"tools/call" => handle_tools_call_http(&request, &server_state).await,
"ping" => handle_ping_http(&request),
_ => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: "Method not found".to_string(),
data: None,
}),
},
};
let duration_ms = start_time.elapsed().as_millis() as u64;
log_mcp_response(
&request_method,
response.error.is_none(),
request_id.as_ref(),
Some(duration_ms),
);
if request_id.is_none() {
return send_http_no_content(&mut stream).await;
}
send_http_response(&mut stream, &response).await
}
async fn send_http_error(stream: &mut TcpStream, status: u16, message: &str) -> Result<()> {
let status_text = match status {
400 => "Bad Request",
404 => "Not Found",
500 => "Internal Server Error",
_ => "Error",
};
let response = format!(
"HTTP/1.1 {} {}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nAccess-Control-Allow-Origin: *\r\n\r\n{}",
status, status_text, message.len(), message
);
stream.write_all(response.as_bytes()).await?;
Ok(())
}
async fn send_http_response(stream: &mut TcpStream, response: &JsonRpcResponse) -> Result<()> {
let json_response = serde_json::to_string(response)?;
let http_response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: POST, OPTIONS\r\nAccess-Control-Allow-Headers: Content-Type\r\n\r\n{}",
json_response.len(),
json_response
);
stream.write_all(http_response.as_bytes()).await?;
Ok(())
}
async fn send_http_no_content(stream: &mut TcpStream) -> Result<()> {
let http_response = "HTTP/1.1 204 No Content\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: POST, OPTIONS\r\nAccess-Control-Allow-Headers: Content-Type\r\n\r\n";
stream.write_all(http_response.as_bytes()).await?;
Ok(())
}
fn handle_initialize_http(request: &JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": false
}
},
"serverInfo": {
"name": "octocode-mcp",
"version": "0.1.0",
"description": "Semantic code search server with vector embeddings and optional GraphRAG support"
},
"instructions": "This server provides modular AI tools: semantic code search and GraphRAG (if available). Use 'semantic_search' for code/documentation searches and 'graphrag' (if enabled) for relationship queries."
})),
error: None,
}
}
fn handle_tools_list_http(request: &JsonRpcRequest, state: &HttpServerState) -> JsonRpcResponse {
let mut tools = vec![
SemanticCodeProvider::get_tool_definition(),
SemanticCodeProvider::get_view_signatures_tool_definition(),
];
if state.graphrag.is_some() {
tools.push(GraphRagProvider::get_tool_definition());
}
if state.lsp.is_some() {
tools.extend(crate::mcp::lsp::LspProvider::get_tool_definitions());
}
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"tools": tools
})),
error: None,
}
}
async fn handle_tools_call_http(
request: &JsonRpcRequest,
state: &HttpServerState,
) -> JsonRpcResponse {
let params = match &request.params {
Some(params) => params,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing parameters object".to_string(),
data: Some(json!({
"details": "Tool calls require a 'params' object with 'name' and 'arguments' fields"
})),
}),
};
}
};
let tool_name = match params.get("name").and_then(|v| v.as_str()) {
Some(name) => name,
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params: missing tool name".to_string(),
data: Some(json!({
"details": "Required field 'name' must be provided with the tool name to call"
})),
}),
};
}
};
let default_args = json!({});
let arguments = params.get("arguments").unwrap_or(&default_args);
if let Ok(args_str) = serde_json::to_string(arguments) {
if args_str.len() > MCP_MAX_REQUEST_SIZE {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Tool arguments too large".to_string(),
data: Some(json!({
"max_size": MCP_MAX_REQUEST_SIZE,
"actual_size": args_str.len()
})),
}),
};
}
}
let result = match tool_name {
"semantic_search" => state.semantic_code.execute_search(arguments).await,
"view_signatures" => state.semantic_code.execute_view_signatures(arguments).await,
"graphrag" => match &state.graphrag {
Some(provider) => provider.execute(arguments).await,
None => Err(McpError::method_not_found("GraphRAG is not enabled in the current configuration. Please enable GraphRAG in octocode.toml to use relationship-aware search.", "graphrag")),
},
"lsp_goto_definition" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_goto_definition(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_goto_definition")),
},
"lsp_hover" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_hover(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_hover")),
},
"lsp_find_references" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_find_references(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_find_references")),
},
"lsp_document_symbols" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_document_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_document_symbols")),
},
"lsp_workspace_symbols" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_workspace_symbols(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_workspace_symbols")),
},
"lsp_completion" => match &state.lsp {
Some(provider) => {
let mut lsp_guard = provider.lock().await;
lsp_guard.execute_completion(arguments).await
},
None => Err(McpError::method_not_found("LSP server is not available. Start MCP server with --with-lsp=\"<command>\" to enable LSP features.", "lsp_completion")),
},
_ => {
let available_tools = format!("semantic_search, view_signatures{}{}",
if state.graphrag.is_some() { ", graphrag" } else { "" },
if state.lsp.is_some() { ", lsp_goto_definition, lsp_hover, lsp_find_references, lsp_document_symbols, lsp_workspace_symbols, lsp_completion" } else { "" }
);
Err(McpError::method_not_found(format!("Unknown tool '{}'. Available tools: {}", tool_name, available_tools), tool_name))
}
};
match result {
Ok(content) => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({
"content": [{
"type": "text",
"text": content
}]
})),
error: None,
},
Err(e) => {
let error_message = e.to_string();
if let Some(mcp_error) = parse_mcp_error(&error_message) {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(mcp_error),
}
} else {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: None,
error: Some(e.into_jsonrpc()),
}
}
}
}
}
fn handle_ping_http(request: &JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id.clone(),
result: Some(json!({})),
error: None,
}
}