Skip to main content

mcp_postgres/
server.rs

1use tokio::net::{TcpListener, TcpStream};
2use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3use serde_json::{json, Value};
4use tracing::{error, warn};
5use std::sync::Arc;
6
7use crate::config::Config;
8use crate::errors::{MCPError, Result as MCPResult};
9use crate::metrics;
10use crate::pool::ConnectionPool;
11use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
12use crate::actions;
13use once_cell::sync::Lazy;
14
15/// Pre-serialized response bytes for `tools/list`.  Built once at startup;
16/// each call deserializes from this cached buffer instead of deep-cloning the
17/// entire 135-tool Value tree (~50 KB).
18static TOOLS_LIST_RESPONSE: Lazy<Vec<u8>> = Lazy::new(|| {
19    let tools_json = include_str!("../tools.json");
20    let tools: Vec<Value> = serde_json::from_str(tools_json)
21        .expect("Failed to parse tools.json");
22    let resp = json!({ "tools": tools });
23    serde_json::to_vec(&resp).expect("Failed to serialize tools/list response")
24});
25
26const BUFFER_CAPACITY: usize = 4096;
27const NEWLINE: &[u8] = b"\n";
28
29#[inline]
30#[cold]
31fn parse_error(msg: String) -> JsonRpcResponse {
32    let mcp_error = MCPError::ParseError(msg);
33    JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
34}
35
36#[inline]
37fn parse_request(line: &str) -> Result<JsonRpcRequest, String> {
38    let trimmed = line.trim();
39    if trimmed.is_empty() {
40        return Err("Empty request".to_string());
41    }
42    serde_json::from_str::<JsonRpcRequest>(trimmed)
43        .map_err(|e| e.to_string())
44}
45
46pub struct MCPServer {
47    config: Config,
48    pool: Arc<ConnectionPool>,
49}
50
51impl MCPServer {
52    pub const fn new(config: Config, pool: Arc<ConnectionPool>) -> Self {
53        Self { config, pool }
54    }
55
56    /// Run in stdio mode for MCP compatibility (Claude Desktop, etc.)
57    pub async fn run_stdio(&self) -> MCPResult<()> {
58        let stdin = tokio::io::stdin();
59        let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
60        let mut stdout = tokio::io::stdout();
61        let mut line = String::with_capacity(512);
62    // 4 KB initial — handles >95% of responses without resizing.
63    // Tools like `list_tables` or `execute_query` may exceed this,
64    // but Vec grows geometrically so the amortized cost is negligible.
65    let mut response_buf = Vec::with_capacity(4096);
66
67    loop {
68        line.clear();
69        match reader.read_line(&mut line).await {
70            Ok(0) => break,
71            Ok(_) => {
72                process_one_line(&line, &self.pool, &self.config, &mut response_buf, &mut stdout).await?;
73            }
74            Err(e) => {
75                error!("IO error: {}", e);
76                break;
77            }
78        }
79    }
80    Ok(())
81    }
82
83    pub async fn run(&self) -> MCPResult<()> {
84        let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
85        let listener = TcpListener::bind(&addr).await?;
86
87        tracing::info!("MCP server listening on {}", addr);
88
89        loop {
90            let (socket, peer_addr) = listener.accept().await?;
91
92            if let Err(e) = socket.set_nodelay(true) {
93                warn!("Failed to set TCP_NODELAY: {}", e);
94            }
95
96            let pool = Arc::clone(&self.pool);
97            let config = self.config.clone();
98
99            tokio::spawn(async move {
100                if let Err(e) = handle_client(socket, pool, config).await {
101                    error!("Client {} error: {}", peer_addr, e);
102                }
103            });
104        }
105    }
106}
107
108#[inline(never)]
109async fn handle_client(socket: TcpStream, pool: Arc<ConnectionPool>, config: Config) -> MCPResult<()> {
110    let (reader, mut writer) = socket.into_split();
111    let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, reader);
112    let mut line = String::with_capacity(512);
113    // 4 KB initial capacity — grows geometrically for large responses.
114    let mut response_buf = Vec::with_capacity(4096);
115
116    loop {
117        line.clear();
118        match reader.read_line(&mut line).await {
119            Ok(0) => break,
120            Ok(_) => {
121                process_one_line(&line, &pool, &config, &mut response_buf, &mut writer).await?;
122            }
123            Err(e) => {
124                error!("IO error: {}", e);
125                break;
126            }
127        }
128    }
129
130    Ok(())
131}
132
133/// Core per-line processing shared by TCP and stdio transports.
134/// For notifications (JSON-RPC messages without `id`), no response is sent.
135#[inline]
136async fn process_one_line<W: AsyncWriteExt + Unpin>(
137    line: &str,
138    pool: &Arc<ConnectionPool>,
139    config: &Config,
140    response_buf: &mut Vec<u8>,
141    writer: &mut W,
142) -> MCPResult<()> {
143    metrics::inc_requests();
144
145    let (response, is_notification) = match parse_request(line) {
146        Ok(req) => {
147            let is_notif = req.id.is_none();
148            match process_request(&req, pool, config).await {
149                Ok(result) => (JsonRpcResponse::success(req.id, result), is_notif),
150                Err(e) => {
151                    metrics::inc_errors();
152                    (JsonRpcResponse::error(req.id, e.error_code(), e.to_string()), is_notif)
153                }
154            }
155        }
156        Err(e) => {
157            metrics::inc_errors();
158            (parse_error(e), false)
159        }
160    };
161
162    // JSON-RPC notifications (no `id`) do not expect a response
163    if is_notification {
164        return Ok(());
165    }
166
167    response_buf.clear();
168    serde_json::to_writer(&mut *response_buf, &response)?;
169    response_buf.extend_from_slice(NEWLINE);
170
171    writer.write_all(response_buf).await?;
172    writer.flush().await?;
173    Ok(())
174}
175
176/// Process a JSON-RPC request (used by both TCP and HTTP transports)
177#[inline]
178pub async fn process_request(
179    req: &JsonRpcRequest,
180    pool: &Arc<ConnectionPool>,
181    config: &Config,
182) -> MCPResult<Value> {
183    match req.method.as_str() {
184        "initialize" => handle_initialize(req),
185        "tools/list" => handle_tools_list(),
186        "tools/call" => handle_tools_call(req, pool, config).await,
187        "ping" => handle_ping(),
188        method if method.starts_with("notifications/") => handle_notification(method),
189        _ => Err(MCPError::MethodNotFound(req.method.clone())),
190    }
191}
192
193/// Handle JSON-RPC ping (respond with empty success)
194#[inline]
195const fn handle_ping() -> MCPResult<Value> {
196    Ok(Value::Null)
197}
198
199/// Handle MCP notifications (silently accepted, no response needed per JSON-RPC spec)
200#[inline]
201fn handle_notification(method: &str) -> MCPResult<Value> {
202    tracing::trace!("Received notification: {method}");
203    Ok(Value::Null)
204}
205
206/// Public wrapper for HTTP handlers - returns complete JSON-RPC response
207pub async fn process_request_http(
208    req: &JsonRpcRequest,
209    pool: &Arc<ConnectionPool>,
210    config: &Config,
211) -> JsonRpcResponse {
212    metrics::inc_requests();
213
214    match process_request(req, pool, config).await {
215        Ok(result) => JsonRpcResponse::success(req.id.clone(), result),
216        Err(e) => {
217            metrics::inc_errors();
218            JsonRpcResponse::error(req.id.clone(), e.error_code(), e.to_string())
219        }
220    }
221}
222
223fn handle_initialize(_req: &JsonRpcRequest) -> MCPResult<Value> {
224    /// Cached initialize response — built once on first call.
225    static INIT_RESPONSE: Lazy<Value> = Lazy::new(|| {
226        json!({
227            "protocolVersion": "2024-11-05",
228            "capabilities": {
229                "tools": { "listChanged": false },
230                "resources": { "subscribe": false, "listChanged": false },
231                "prompts": { "listChanged": false }
232            },
233            "serverInfo": {
234                "name": "mcp-postgres",
235                "version": env!("CARGO_PKG_VERSION")
236            }
237        })
238    });
239
240    Ok(INIT_RESPONSE.clone())
241}
242
243#[inline]
244fn handle_tools_list() -> MCPResult<Value> {
245    // Deserialize from cached bytes instead of deep-cloning a 50 KB Value tree.
246    Ok(serde_json::from_slice(&TOOLS_LIST_RESPONSE)?)
247}
248
249async fn handle_tools_call(
250    req: &JsonRpcRequest,
251    pool: &Arc<ConnectionPool>,
252    config: &Config,
253) -> MCPResult<Value> {
254    let tool_name = req
255        .params
256        .as_ref()
257        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
258        .ok_or_else(|| MCPError::InvalidParams("Missing 'name' parameter".into()))?;
259
260    let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
261
262    // Restricted mode check + unknown tool check BEFORE pool acquire
263    if config.server.access_mode == crate::config::AccessMode::Restricted
264        && crate::tools::is_write_tool(tool_name)
265    {
266        return Err(MCPError::InvalidParams(format!(
267            "Operation '{tool_name}' is not allowed in restricted (read-only) mode"
268        )));
269    }
270
271    // Verify tool exists before acquiring a connection
272    if !crate::tools::tool_exists(tool_name) {
273        return Err(method_not_found(tool_name));
274    }
275
276    // Acquire pool connection only for known tools
277    let client = pool.acquire().await?;
278
279    let result = match tool_name {
280        // Schema actions
281        "list_tables" => actions::schema::list_tables(&client, &tool_args).await,
282        "describe_table" => actions::schema::describe_table(&client, &tool_args).await,
283        "list_indexes" => actions::schema::list_indexes(&client, &tool_args).await,
284        "list_schemas" => actions::schema::list_schemas(&client, &tool_args).await,
285        "show_constraints" => actions::schema::show_constraints(&client, &tool_args).await,
286        "list_triggers" => actions::schema::list_triggers(&client, &tool_args).await,
287        "create_table" => actions::schema::create_table(&client, &tool_args).await,
288        "drop_table" => actions::schema::drop_table(&client, &tool_args).await,
289        "create_view" => actions::schema::create_view(&client, &tool_args).await,
290        "drop_view" => actions::schema::drop_view(&client, &tool_args).await,
291        "alter_view" => actions::schema::alter_view(&client, &tool_args).await,
292        "create_schema" => actions::schema::create_schema(&client, &tool_args).await,
293        "drop_schema" => actions::schema::drop_schema(&client, &tool_args).await,
294        "create_sequence" => actions::schema::create_sequence(&client, &tool_args).await,
295        "drop_sequence" => actions::schema::drop_sequence(&client, &tool_args).await,
296        "alter_index" => actions::schema::alter_index(&client, &tool_args).await,
297        "list_partitions" => actions::schema::list_partitions(&client, &tool_args).await,
298        "backup_table" => actions::schema::backup_table(&client, &tool_args).await,
299        "create_index" => actions::schema::create_index(&client, &tool_args).await,
300        "drop_index" => actions::schema::drop_index(&client, &tool_args).await,
301        "create_partition" => actions::schema::create_partition(&client, &tool_args).await,
302        "drop_partition" => actions::schema::drop_partition(&client, &tool_args).await,
303        // Query actions
304        "execute_query" => actions::query::execute_query(&client, &tool_args).await,
305        "execute_insert" => actions::query::execute_insert(&client, &tool_args).await,
306        "execute_update" => actions::query::execute_update(&client, &tool_args).await,
307        "execute_delete" => actions::query::execute_delete(&client, &tool_args).await,
308        "async_execute_insert" => actions::query::async_execute_insert(&client, &tool_args).await,
309        "async_execute_update" => actions::query::async_execute_update(&client, &tool_args).await,
310        "async_execute_delete" => actions::query::async_execute_delete(&client, &tool_args).await,
311        "explain_query" => actions::query::explain_query(&client, &tool_args).await,
312        // Batch operations
313        "async_batch_insert" => actions::batch::async_batch_insert(&client, &tool_args).await,
314        "async_batch_update" => actions::batch::async_batch_update(&client, &tool_args).await,
315        "async_batch_delete" => actions::batch::async_batch_delete(&client, &tool_args).await,
316        "async_batch_insert_copy" => actions::batch::async_batch_insert_copy(&client, &tool_args).await,
317        // Monitoring actions
318        "get_table_stats" => actions::monitoring::get_table_stats(&client, &tool_args).await,
319        "get_index_stats" => actions::monitoring::get_index_stats(&client, &tool_args).await,
320        "show_database_size" => actions::monitoring::show_database_size(&client, &tool_args).await,
321        "show_table_size" => actions::monitoring::show_table_size(&client, &tool_args).await,
322        "get_cache_hit_ratio" => actions::monitoring::get_cache_hit_ratio(&client, &tool_args).await,
323        // Connection actions
324        "list_connections" => actions::connections::list_connections(&client, &tool_args).await,
325        "show_current_user" => actions::connections::show_current_user(&client, &tool_args).await,
326        "show_running_queries" => actions::connections::show_running_queries(&client, &tool_args).await,
327        "show_connection_summary" => actions::connections::show_connection_summary(&client, &tool_args).await,
328        // Maintenance actions
329        "vacuum_analyze" => actions::maintenance::vacuum_analyze(&client, &tool_args).await,
330        "analyze_table" => actions::maintenance::analyze_table(&client, &tool_args).await,
331        "reindex_table" => actions::maintenance::reindex_table(&client, &tool_args).await,
332        "get_pg_stat_statements" => actions::maintenance::get_pg_stat_statements(&client, &tool_args).await,
333        "reset_statistics" => actions::maintenance::reset_statistics(&client, &tool_args).await,
334        "truncate_table" => actions::maintenance::truncate_table(&client, &tool_args).await,
335        // Security actions
336        "list_users" => actions::security::list_users(&client, &tool_args).await,
337        "list_user_privileges" => actions::security::list_user_privileges(&client, &tool_args).await,
338        "list_role_memberships" => actions::security::list_role_memberships(&client, &tool_args).await,
339        "list_database_privileges" => actions::security::list_database_privileges(&client, &tool_args).await,
340        "show_session_info" => actions::security::show_session_info(&client, &tool_args).await,
341        // Config actions
342        "show_all_settings" => actions::config::show_all_settings(&client, &tool_args).await,
343        "get_setting" => actions::config::get_setting(&client, &tool_args).await,
344        "show_memory_settings" => actions::config::show_memory_settings(&client, &tool_args).await,
345        "show_performance_settings" => actions::config::show_performance_settings(&client, &tool_args).await,
346        "show_log_settings" => actions::config::show_log_settings(&client, &tool_args).await,
347        // Replication actions
348        "show_replication_status" => actions::replication::show_replication_status(&client, &tool_args).await,
349        "list_replication_slots" => actions::replication::list_replication_slots(&client, &tool_args).await,
350        "list_standby_servers" => actions::replication::list_standby_servers(&client, &tool_args).await,
351        "show_wal_info" => actions::replication::show_wal_info(&client, &tool_args).await,
352        "show_base_backup_progress" => actions::replication::show_base_backup_progress(&client, &tool_args).await,
353        // Transaction actions
354        "show_active_transactions" => actions::transactions::show_active_transactions(&client, &tool_args).await,
355        "show_locks" => actions::transactions::show_locks(&client, &tool_args).await,
356        "show_waiting_locks" => actions::transactions::show_waiting_locks(&client, &tool_args).await,
357        "show_transaction_isolation" => actions::transactions::show_transaction_isolation(&client, &tool_args).await,
358        "show_deadlocks" => actions::transactions::show_deadlocks(&client, &tool_args).await,
359        "show_autocommit_status" => actions::transactions::show_autocommit_status(&client, &tool_args).await,
360        "show_transaction_timeout" => actions::transactions::show_transaction_timeout(&client, &tool_args).await,
361        // Health actions
362        "analyze_db_health" => actions::health::analyze_db_health(&client, &tool_args).await,
363        "list_unused_indexes" => actions::health::list_unused_indexes(&client, &tool_args).await,
364        "list_duplicate_indexes" => actions::health::list_duplicate_indexes(&client, &tool_args).await,
365        "show_vacuum_progress" => actions::health::show_vacuum_progress(&client, &tool_args).await,
366        // Enhanced schema
367        "get_object_details" => actions::schema::get_object_details(&client, &tool_args).await,
368        // User management
369        "create_user" => actions::user_mgmt::create_user(&client, &tool_args).await,
370        "alter_user" => actions::user_mgmt::alter_user(&client, &tool_args).await,
371        "drop_user" => actions::user_mgmt::drop_user(&client, &tool_args).await,
372        "create_role" => actions::user_mgmt::create_role(&client, &tool_args).await,
373        "alter_role" => actions::user_mgmt::alter_role(&client, &tool_args).await,
374        "drop_role" => actions::user_mgmt::drop_role(&client, &tool_args).await,
375        "grant_privileges" => actions::user_mgmt::grant_privileges(&client, &tool_args).await,
376        "revoke_privileges" => actions::user_mgmt::revoke_privileges(&client, &tool_args).await,
377        // Schema alter
378        "add_column" => actions::schema_alter::add_column(&client, &tool_args).await,
379        "drop_column" => actions::schema_alter::drop_column(&client, &tool_args).await,
380        "rename_column" => actions::schema_alter::rename_column(&client, &tool_args).await,
381        "alter_column_type" => actions::schema_alter::alter_column_type(&client, &tool_args).await,
382        "rename_table" => actions::schema_alter::rename_table(&client, &tool_args).await,
383        "rename_index" => actions::schema_alter::rename_index(&client, &tool_args).await,
384        "rename_schema" => actions::schema_alter::rename_schema(&client, &tool_args).await,
385        "add_foreign_key" => actions::schema_alter::add_foreign_key(&client, &tool_args).await,
386        "drop_foreign_key" => actions::schema_alter::drop_foreign_key(&client, &tool_args).await,
387        "add_unique_constraint" => actions::schema_alter::add_unique_constraint(&client, &tool_args).await,
388        "drop_constraint" => actions::schema_alter::drop_constraint(&client, &tool_args).await,
389        // Session management
390        "cancel_query" => actions::session_mgmt::cancel_query(&client, &tool_args).await,
391        "terminate_connection" => actions::session_mgmt::terminate_connection(&client, &tool_args).await,
392        "show_blocked_queries" => actions::session_mgmt::show_blocked_queries(&client, &tool_args).await,
393        // Extension management
394        "list_extensions" => actions::ext_mgmt::list_extensions(&client, &tool_args).await,
395        "create_extension" => actions::ext_mgmt::create_extension(&client, &tool_args).await,
396        "drop_extension" => actions::ext_mgmt::drop_extension(&client, &tool_args).await,
397        // Database management
398        "list_databases" => actions::db_mgmt::list_databases(&client, &tool_args).await,
399        "create_database" => actions::db_mgmt::create_database(&client, &tool_args).await,
400        // Extended maintenance
401        "vacuum" => actions::maint_ext::vacuum(&client, &tool_args).await,
402        "vacuum_full" => actions::maint_ext::vacuum_full(&client, &tool_args).await,
403        "reindex_database" => actions::maint_ext::reindex_database(&client, &tool_args).await,
404        // Migration helpers
405        "generate_create_table_ddl" => actions::migration_helpers::generate_create_table_ddl(&client, &tool_args).await,
406        "generate_create_index_ddl" => actions::migration_helpers::generate_create_index_ddl(&client, &tool_args).await,
407        "table_dependencies" => actions::migration_helpers::table_dependencies(&client, &tool_args).await,
408        // pgvector
409        "list_vector_columns" => actions::pgvector::list_vector_columns(&client, &tool_args).await,
410        "vector_search" => actions::pgvector::vector_search(&client, &tool_args).await,
411        "create_vector_index" => actions::pgvector::create_vector_index(&client, &tool_args).await,
412        // TimescaleDB
413        "create_hypertable" => actions::timescaledb::create_hypertable(&client, &tool_args).await,
414        "show_hypertable_details" => actions::timescaledb::show_hypertable_details(&client, &tool_args).await,
415        "show_chunks" => actions::timescaledb::show_chunks(&client, &tool_args).await,
416        "add_retention_policy" => actions::timescaledb::add_retention_policy(&client, &tool_args).await,
417        "add_compression_policy" => actions::timescaledb::add_compression_policy(&client, &tool_args).await,
418        "compress_chunk" => actions::timescaledb::compress_chunk(&client, &tool_args).await,
419        "add_continuous_aggregate" => actions::timescaledb::add_continuous_aggregate(&client, &tool_args).await,
420        // pg_textsearch (BM25)
421        "list_bm25_indexes" => actions::pg_textsearch::list_bm25_indexes(&client, &tool_args).await,
422        "search_bm25" => actions::pg_textsearch::search_bm25(&client, &tool_args).await,
423        "create_bm25_index" => actions::pg_textsearch::create_bm25_index(&client, &tool_args).await,
424        "drop_bm25_index" => actions::pg_textsearch::drop_bm25_index(&client, &tool_args).await,
425        "bm25_force_merge" => actions::pg_textsearch::bm25_force_merge(&client, &tool_args).await,
426        "bm25_index_stats" => actions::pg_textsearch::bm25_index_stats(&client, &tool_args).await,
427        // v4.0: Data I/O
428        "import_from_url" => actions::data_io::import_from_url(&client, &tool_args).await,
429        "export_csv" => actions::data_io::export_csv(&client, &tool_args).await,
430        // v4.0: Index Advisor
431        "suggest_indexes" => actions::index_advisor::suggest_indexes(&client, &tool_args).await,
432        // v4.0: Schema Health
433        "find_tables_without_pk" => actions::schema_health::find_tables_without_pk(&client, &tool_args).await,
434        "find_missing_fk_indexes" => actions::schema_health::find_missing_fk_indexes(&client, &tool_args).await,
435        "analyze_table_bloat" => actions::schema_health::analyze_table_bloat(&client, &tool_args).await,
436        "clone_table_schema" => actions::schema_health::clone_table_schema(&client, &tool_args).await,
437        // v4.0: Security Audit
438        "security_audit" => actions::security_audit::security_audit(&client, &tool_args).await,
439        "audit_role_usage" => actions::security_audit::audit_role_usage(&client, &tool_args).await,
440        // v4.0: Data Tools
441        "sample_data" => actions::data_tools::sample_data(&client, &tool_args).await,
442        tool => Err(method_not_found(tool)),
443    };
444
445    if let Err(ref e) = result {
446        error!("Tool '{}' error: {:?}", tool_name, e);
447    }
448    // client is returned to the pool automatically via Drop
449    drop(client);
450    result
451}
452
453#[cold]
454fn method_not_found(name: &str) -> MCPError {
455    MCPError::MethodNotFound(name.to_string())
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_parse_valid_request() {
464        let line = r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#;
465        let req = parse_request(line).unwrap();
466        assert_eq!(req.method, "initialize");
467        assert_eq!(req.id, Some(Value::Number(1.into())));
468    }
469
470    #[test]
471    fn test_parse_request_with_trailing_newline() {
472        let line = r#"{"jsonrpc":"2.0","method":"tools/list","id":2}"#;
473        let req = parse_request(line).unwrap();
474        assert_eq!(req.method, "tools/list");
475    }
476
477    #[test]
478    fn test_parse_request_with_whitespace() {
479        let line = "  {\"jsonrpc\":\"2.0\",\"method\":\"ping\",\"id\":3}  ";
480        let req = parse_request(line).unwrap();
481        assert_eq!(req.method, "ping");
482    }
483
484    #[test]
485    fn test_parse_empty_request() {
486        let err = parse_request("").unwrap_err();
487        assert_eq!(err, "Empty request");
488    }
489
490    #[test]
491    fn test_parse_whitespace_only() {
492        let err = parse_request("   \n  ").unwrap_err();
493        assert_eq!(err, "Empty request");
494    }
495
496    #[test]
497    fn test_parse_invalid_json() {
498        let err = parse_request("{invalid}").unwrap_err();
499        assert!(!err.is_empty(), "Invalid JSON should produce an error message");
500    }
501
502    #[test]
503    fn test_parse_missing_method() {
504        let err = parse_request(r#"{"jsonrpc":"2.0","id":1}"#).unwrap_err();
505        assert!(err.contains("method"));
506    }
507
508    #[test]
509    fn test_parse_wrong_version() {
510        let req = parse_request(r#"{"jsonrpc":"1.0","method":"init","id":1}"#).unwrap();
511        assert_eq!(req.jsonrpc, "1.0");
512    }
513
514    #[test]
515    fn test_method_not_found_error() {
516        let err = method_not_found("test_tool");
517        assert_eq!(err.error_code(), -32601);
518        assert!(err.to_string().contains("test_tool"));
519    }
520
521    #[test]
522    fn test_tools_list_static() {
523        let list: Value = serde_json::from_slice(&TOOLS_LIST_RESPONSE).unwrap();
524        let tools = list.get("tools").and_then(|v| v.as_array());
525        assert!(tools.is_some(), "TOOLS_LIST_RESPONSE should contain a tools array");
526        assert!(!tools.unwrap().is_empty(), "Tools list should not be empty");
527    }
528
529    #[test]
530    fn test_process_request_method_dispatch() {
531        // Verify that process_request handles the dispatch correctly
532        // by testing the match on method strings — this is a compilation/coverage test
533        let _req = JsonRpcRequest {
534            jsonrpc: "2.0".to_string(),
535            method: "nonexistent".to_string(),
536            params: None,
537            id: Some(Value::Number(1.into())),
538        };
539        // We can't run process_request without a pool, but we can verify the fallback path
540        // acts as expected through separate unit tests on the dispatch logic
541    }
542
543    #[test]
544    fn test_handle_initialize_response() {
545        let req = JsonRpcRequest {
546            jsonrpc: "2.0".to_string(),
547            method: "initialize".to_string(),
548            params: None,
549            id: Some(Value::Number(1.into())),
550        };
551        let result = handle_initialize(&req).unwrap();
552        assert_eq!(result["protocolVersion"], "2024-11-05");
553        assert!(result["capabilities"]["tools"]["listChanged"].is_boolean());
554        assert_eq!(result["serverInfo"]["version"], env!("CARGO_PKG_VERSION"));
555    }
556
557    /// Enforce Phase 1.5: no bare `SET ` outside transaction blocks.
558    /// Every session-level SET must use `SET LOCAL` inside a `BEGIN`/`COMMIT` pair.
559    /// This grep-based test fails compilation if any violation exists in `src/actions/`.
560    #[test]
561    fn test_no_bare_set_outside_transaction() {
562        let source_files = &[
563            include_str!("../src/actions/query.rs"),
564            include_str!("../src/actions/batch.rs"),
565        ];
566        for (idx, source) in source_files.iter().enumerate() {
567            for (line_no, line) in source.lines().enumerate() {
568                let trimmed = line.trim();
569                // Skip comments, UPDATE SET, string literals
570                if trimmed.starts_with("//") || trimmed.starts_with("/*") || trimmed.starts_with("*") {
571                    continue;
572                }
573                if trimmed.contains("UPDATE ") && trimmed.contains("SET ") {
574                    continue;
575                }
576                if trimmed.contains("SET LOCAL") {
577                    continue;
578                }
579                // Check for bare client.execute("SET ...") outside txn
580                if trimmed.contains("client.execute(\"SET ") && !trimmed.contains("SET LOCAL") {
581                    let names = ["query.rs", "batch.rs"];
582                    panic!(
583                        "Phase 1.5 violation: bare `SET` (not SET LOCAL) found in {}:{} — \
584                         use BEGIN + SET LOCAL + COMMIT pattern to avoid session leakage.\n\
585                         Line: {}",
586                        names[idx], line_no + 1, trimmed
587                    );
588                }
589            }
590        }
591    }
592}