Skip to main content

mcp_postgres/
server.rs

1use serde_json::{Value, json};
2use std::sync::Arc;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::net::{TcpListener, TcpStream};
5use tracing::{error, warn};
6
7use crate::actions;
8use crate::config::Config;
9use crate::errors::{MCPError, Result as MCPResult};
10use crate::metrics;
11use crate::pool::ConnectionPool;
12use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
13use once_cell::sync::Lazy;
14
15/// Pre-serialized response bytes for `tools/list`.  Built once at startup;
16/// each call deserializes from this cached buffer instead of deep-cloning the
17/// entire 135-tool Value tree (~50 KB).
18static TOOLS_LIST_RESPONSE: Lazy<Vec<u8>> = Lazy::new(|| {
19    let tools_json = include_str!("../tools.json");
20    let tools: Vec<Value> = serde_json::from_str(tools_json).expect("Failed to parse tools.json");
21    let resp = json!({ "tools": tools });
22    serde_json::to_vec(&resp).expect("Failed to serialize tools/list response")
23});
24
25const BUFFER_CAPACITY: usize = 4096;
26const NEWLINE: &[u8] = b"\n";
27
28#[inline]
29#[cold]
30fn parse_error(msg: String) -> JsonRpcResponse {
31    let mcp_error = MCPError::ParseError(msg);
32    JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
33}
34
35#[inline]
36fn parse_request(line: &str) -> Result<JsonRpcRequest, String> {
37    let trimmed = line.trim();
38    if trimmed.is_empty() {
39        return Err("Empty request".to_string());
40    }
41    serde_json::from_str::<JsonRpcRequest>(trimmed).map_err(|e| e.to_string())
42}
43
44pub struct MCPServer {
45    config: Config,
46    pool: Arc<ConnectionPool>,
47}
48
49impl MCPServer {
50    pub const fn new(config: Config, pool: Arc<ConnectionPool>) -> Self {
51        Self { config, pool }
52    }
53
54    /// Run in stdio mode for MCP compatibility (Claude Desktop, etc.)
55    pub async fn run_stdio(&self) -> MCPResult<()> {
56        let stdin = tokio::io::stdin();
57        let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
58        let mut stdout = tokio::io::stdout();
59        let mut line = String::with_capacity(512);
60        // 4 KB initial — handles >95% of responses without resizing.
61        // Tools like `list_tables` or `execute_query` may exceed this,
62        // but Vec grows geometrically so the amortized cost is negligible.
63        let mut response_buf = Vec::with_capacity(4096);
64
65        loop {
66            line.clear();
67            match reader.read_line(&mut line).await {
68                Ok(0) => break,
69                Ok(_) => {
70                    process_one_line(
71                        &line,
72                        &self.pool,
73                        &self.config,
74                        &mut response_buf,
75                        &mut stdout,
76                    )
77                    .await?;
78                }
79                Err(e) => {
80                    error!("IO error: {}", e);
81                    break;
82                }
83            }
84        }
85        Ok(())
86    }
87
88    pub async fn run(&self) -> MCPResult<()> {
89        let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
90        let listener = TcpListener::bind(&addr).await?;
91
92        tracing::info!("MCP server listening on {}", addr);
93
94        loop {
95            let (socket, peer_addr) = listener.accept().await?;
96
97            if let Err(e) = socket.set_nodelay(true) {
98                warn!("Failed to set TCP_NODELAY: {}", e);
99            }
100
101            let pool = Arc::clone(&self.pool);
102            let config = self.config.clone();
103
104            tokio::spawn(async move {
105                if let Err(e) = handle_client(socket, pool, config).await {
106                    error!("Client {} error: {}", peer_addr, e);
107                }
108            });
109        }
110    }
111}
112
113#[inline(never)]
114async fn handle_client(
115    socket: TcpStream,
116    pool: Arc<ConnectionPool>,
117    config: Config,
118) -> MCPResult<()> {
119    let (reader, mut writer) = socket.into_split();
120    let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, reader);
121    let mut line = String::with_capacity(512);
122    // 4 KB initial capacity — grows geometrically for large responses.
123    let mut response_buf = Vec::with_capacity(4096);
124
125    loop {
126        line.clear();
127        match reader.read_line(&mut line).await {
128            Ok(0) => break,
129            Ok(_) => {
130                process_one_line(&line, &pool, &config, &mut response_buf, &mut writer).await?;
131            }
132            Err(e) => {
133                error!("IO error: {}", e);
134                break;
135            }
136        }
137    }
138
139    Ok(())
140}
141
142/// Core per-line processing shared by TCP and stdio transports.
143/// For notifications (JSON-RPC messages without `id`), no response is sent.
144#[inline]
145async fn process_one_line<W: AsyncWriteExt + Unpin>(
146    line: &str,
147    pool: &Arc<ConnectionPool>,
148    config: &Config,
149    response_buf: &mut Vec<u8>,
150    writer: &mut W,
151) -> MCPResult<()> {
152    metrics::inc_requests();
153
154    let (response, is_notification) = match parse_request(line) {
155        Ok(req) => {
156            let is_notif = req.id.is_none();
157            match process_request(&req, pool, config).await {
158                Ok(result) => (JsonRpcResponse::success(req.id, result), is_notif),
159                Err(e) => {
160                    metrics::inc_errors();
161                    (
162                        JsonRpcResponse::error(req.id, e.error_code(), e.to_string()),
163                        is_notif,
164                    )
165                }
166            }
167        }
168        Err(e) => {
169            metrics::inc_errors();
170            (parse_error(e), false)
171        }
172    };
173
174    // JSON-RPC notifications (no `id`) do not expect a response
175    if is_notification {
176        return Ok(());
177    }
178
179    response_buf.clear();
180    serde_json::to_writer(&mut *response_buf, &response)?;
181    response_buf.extend_from_slice(NEWLINE);
182
183    writer.write_all(response_buf).await?;
184    writer.flush().await?;
185    Ok(())
186}
187
188/// Process a JSON-RPC request (used by both TCP and HTTP transports)
189#[inline]
190pub async fn process_request(
191    req: &JsonRpcRequest,
192    pool: &Arc<ConnectionPool>,
193    config: &Config,
194) -> MCPResult<Value> {
195    match req.method.as_str() {
196        "initialize" => handle_initialize(req),
197        "tools/list" => handle_tools_list(),
198        "tools/call" => handle_tools_call(req, pool, config).await,
199        "ping" => handle_ping(),
200        method if method.starts_with("notifications/") => handle_notification(method),
201        _ => Err(MCPError::MethodNotFound(req.method.clone())),
202    }
203}
204
205/// Handle JSON-RPC ping (respond with empty success)
206#[inline]
207const fn handle_ping() -> MCPResult<Value> {
208    Ok(Value::Null)
209}
210
211/// Handle MCP notifications (silently accepted, no response needed per JSON-RPC spec)
212#[inline]
213fn handle_notification(method: &str) -> MCPResult<Value> {
214    tracing::trace!("Received notification: {method}");
215    Ok(Value::Null)
216}
217
218/// Public wrapper for HTTP handlers - returns complete JSON-RPC response
219pub async fn process_request_http(
220    req: &JsonRpcRequest,
221    pool: &Arc<ConnectionPool>,
222    config: &Config,
223) -> JsonRpcResponse {
224    metrics::inc_requests();
225
226    match process_request(req, pool, config).await {
227        Ok(result) => JsonRpcResponse::success(req.id.clone(), result),
228        Err(e) => {
229            metrics::inc_errors();
230            JsonRpcResponse::error(req.id.clone(), e.error_code(), e.to_string())
231        }
232    }
233}
234
235fn handle_initialize(_req: &JsonRpcRequest) -> MCPResult<Value> {
236    /// Cached initialize response — built once on first call.
237    static INIT_RESPONSE: Lazy<Value> = Lazy::new(|| {
238        json!({
239            "protocolVersion": "2024-11-05",
240            "capabilities": {
241                "tools": { "listChanged": false },
242                "resources": { "subscribe": false, "listChanged": false },
243                "prompts": { "listChanged": false }
244            },
245            "serverInfo": {
246                "name": "mcp-postgres",
247                "version": env!("CARGO_PKG_VERSION")
248            }
249        })
250    });
251
252    Ok(INIT_RESPONSE.clone())
253}
254
255#[inline]
256fn handle_tools_list() -> MCPResult<Value> {
257    // Deserialize from cached bytes instead of deep-cloning a 50 KB Value tree.
258    Ok(serde_json::from_slice(&TOOLS_LIST_RESPONSE)?)
259}
260
261async fn handle_tools_call(
262    req: &JsonRpcRequest,
263    pool: &Arc<ConnectionPool>,
264    config: &Config,
265) -> MCPResult<Value> {
266    let tool_name = req
267        .params
268        .as_ref()
269        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
270        .ok_or_else(|| MCPError::InvalidParams("Missing 'name' parameter".into()))?;
271
272    let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
273
274    // Restricted mode check + unknown tool check BEFORE pool acquire
275    if config.server.access_mode == crate::config::AccessMode::Restricted
276        && crate::tools::is_write_tool(tool_name)
277    {
278        return Err(MCPError::InvalidParams(format!(
279            "Operation '{tool_name}' is not allowed in restricted (read-only) mode"
280        )));
281    }
282
283    // Verify tool exists before acquiring a connection
284    if !crate::tools::tool_exists(tool_name) {
285        return Err(method_not_found(tool_name));
286    }
287
288    // Acquire pool connection only for known tools
289    let client = pool.acquire().await?;
290
291    let result = match tool_name {
292        // Schema actions
293        "list_tables" => actions::schema::list_tables(&client, &tool_args).await,
294        "describe_table" => actions::schema::describe_table(&client, &tool_args).await,
295        "list_indexes" => actions::schema::list_indexes(&client, &tool_args).await,
296        "list_schemas" => actions::schema::list_schemas(&client, &tool_args).await,
297        "show_constraints" => actions::schema::show_constraints(&client, &tool_args).await,
298        "list_triggers" => actions::schema::list_triggers(&client, &tool_args).await,
299        "create_table" => actions::schema::create_table(&client, &tool_args).await,
300        "drop_table" => actions::schema::drop_table(&client, &tool_args).await,
301        "create_view" => actions::schema::create_view(&client, &tool_args).await,
302        "drop_view" => actions::schema::drop_view(&client, &tool_args).await,
303        "alter_view" => actions::schema::alter_view(&client, &tool_args).await,
304        "create_schema" => actions::schema::create_schema(&client, &tool_args).await,
305        "drop_schema" => actions::schema::drop_schema(&client, &tool_args).await,
306        "create_sequence" => actions::schema::create_sequence(&client, &tool_args).await,
307        "drop_sequence" => actions::schema::drop_sequence(&client, &tool_args).await,
308        "alter_index" => actions::schema::alter_index(&client, &tool_args).await,
309        "list_partitions" => actions::schema::list_partitions(&client, &tool_args).await,
310        "backup_table" => actions::schema::backup_table(&client, &tool_args).await,
311        "create_index" => actions::schema::create_index(&client, &tool_args).await,
312        "drop_index" => actions::schema::drop_index(&client, &tool_args).await,
313        "create_partition" => actions::schema::create_partition(&client, &tool_args).await,
314        "drop_partition" => actions::schema::drop_partition(&client, &tool_args).await,
315        // Query actions
316        "execute_query" => actions::query::execute_query(&client, &tool_args).await,
317        "execute_insert" => actions::query::execute_insert(&client, &tool_args).await,
318        "execute_update" => actions::query::execute_update(&client, &tool_args).await,
319        "execute_delete" => actions::query::execute_delete(&client, &tool_args).await,
320        "async_execute_insert" => actions::query::async_execute_insert(&client, &tool_args).await,
321        "async_execute_update" => actions::query::async_execute_update(&client, &tool_args).await,
322        "async_execute_delete" => actions::query::async_execute_delete(&client, &tool_args).await,
323        "explain_query" => actions::query::explain_query(&client, &tool_args).await,
324        // Batch operations
325        "async_batch_insert" => actions::batch::async_batch_insert(&client, &tool_args).await,
326        "async_batch_update" => actions::batch::async_batch_update(&client, &tool_args).await,
327        "async_batch_delete" => actions::batch::async_batch_delete(&client, &tool_args).await,
328        "async_batch_insert_copy" => {
329            actions::batch::async_batch_insert_copy(&client, &tool_args).await
330        }
331        // Monitoring actions
332        "get_table_stats" => actions::monitoring::get_table_stats(&client, &tool_args).await,
333        "get_index_stats" => actions::monitoring::get_index_stats(&client, &tool_args).await,
334        "show_database_size" => actions::monitoring::show_database_size(&client, &tool_args).await,
335        "show_table_size" => actions::monitoring::show_table_size(&client, &tool_args).await,
336        "get_cache_hit_ratio" => {
337            actions::monitoring::get_cache_hit_ratio(&client, &tool_args).await
338        }
339        // Connection actions
340        "list_connections" => actions::connections::list_connections(&client, &tool_args).await,
341        "show_current_user" => actions::connections::show_current_user(&client, &tool_args).await,
342        "show_running_queries" => {
343            actions::connections::show_running_queries(&client, &tool_args).await
344        }
345        "show_connection_summary" => {
346            actions::connections::show_connection_summary(&client, &tool_args).await
347        }
348        // Maintenance actions
349        "vacuum_analyze" => actions::maintenance::vacuum_analyze(&client, &tool_args).await,
350        "analyze_table" => actions::maintenance::analyze_table(&client, &tool_args).await,
351        "reindex_table" => actions::maintenance::reindex_table(&client, &tool_args).await,
352        "get_pg_stat_statements" => {
353            actions::maintenance::get_pg_stat_statements(&client, &tool_args).await
354        }
355        "reset_statistics" => actions::maintenance::reset_statistics(&client, &tool_args).await,
356        "truncate_table" => actions::maintenance::truncate_table(&client, &tool_args).await,
357        // Security actions
358        "list_users" => actions::security::list_users(&client, &tool_args).await,
359        "list_user_privileges" => {
360            actions::security::list_user_privileges(&client, &tool_args).await
361        }
362        "list_role_memberships" => {
363            actions::security::list_role_memberships(&client, &tool_args).await
364        }
365        "list_database_privileges" => {
366            actions::security::list_database_privileges(&client, &tool_args).await
367        }
368        "show_session_info" => actions::security::show_session_info(&client, &tool_args).await,
369        // Config actions
370        "show_all_settings" => actions::config::show_all_settings(&client, &tool_args).await,
371        "get_setting" => actions::config::get_setting(&client, &tool_args).await,
372        "show_memory_settings" => actions::config::show_memory_settings(&client, &tool_args).await,
373        "show_performance_settings" => {
374            actions::config::show_performance_settings(&client, &tool_args).await
375        }
376        "show_log_settings" => actions::config::show_log_settings(&client, &tool_args).await,
377        // Replication actions
378        "show_replication_status" => {
379            actions::replication::show_replication_status(&client, &tool_args).await
380        }
381        "list_replication_slots" => {
382            actions::replication::list_replication_slots(&client, &tool_args).await
383        }
384        "list_standby_servers" => {
385            actions::replication::list_standby_servers(&client, &tool_args).await
386        }
387        "show_wal_info" => actions::replication::show_wal_info(&client, &tool_args).await,
388        "show_base_backup_progress" => {
389            actions::replication::show_base_backup_progress(&client, &tool_args).await
390        }
391        // Transaction actions
392        "show_active_transactions" => {
393            actions::transactions::show_active_transactions(&client, &tool_args).await
394        }
395        "show_locks" => actions::transactions::show_locks(&client, &tool_args).await,
396        "show_waiting_locks" => {
397            actions::transactions::show_waiting_locks(&client, &tool_args).await
398        }
399        "show_transaction_isolation" => {
400            actions::transactions::show_transaction_isolation(&client, &tool_args).await
401        }
402        "show_deadlocks" => actions::transactions::show_deadlocks(&client, &tool_args).await,
403        "show_autocommit_status" => {
404            actions::transactions::show_autocommit_status(&client, &tool_args).await
405        }
406        "show_transaction_timeout" => {
407            actions::transactions::show_transaction_timeout(&client, &tool_args).await
408        }
409        // Health actions
410        "analyze_db_health" => actions::health::analyze_db_health(&client, &tool_args).await,
411        "list_unused_indexes" => actions::health::list_unused_indexes(&client, &tool_args).await,
412        "list_duplicate_indexes" => {
413            actions::health::list_duplicate_indexes(&client, &tool_args).await
414        }
415        "show_vacuum_progress" => actions::health::show_vacuum_progress(&client, &tool_args).await,
416        // Enhanced schema
417        "get_object_details" => actions::schema::get_object_details(&client, &tool_args).await,
418        // User management
419        "create_user" => actions::user_mgmt::create_user(&client, &tool_args).await,
420        "alter_user" => actions::user_mgmt::alter_user(&client, &tool_args).await,
421        "drop_user" => actions::user_mgmt::drop_user(&client, &tool_args).await,
422        "create_role" => actions::user_mgmt::create_role(&client, &tool_args).await,
423        "alter_role" => actions::user_mgmt::alter_role(&client, &tool_args).await,
424        "drop_role" => actions::user_mgmt::drop_role(&client, &tool_args).await,
425        "grant_privileges" => actions::user_mgmt::grant_privileges(&client, &tool_args).await,
426        "revoke_privileges" => actions::user_mgmt::revoke_privileges(&client, &tool_args).await,
427        // Schema alter
428        "add_column" => actions::schema_alter::add_column(&client, &tool_args).await,
429        "drop_column" => actions::schema_alter::drop_column(&client, &tool_args).await,
430        "rename_column" => actions::schema_alter::rename_column(&client, &tool_args).await,
431        "alter_column_type" => actions::schema_alter::alter_column_type(&client, &tool_args).await,
432        "rename_table" => actions::schema_alter::rename_table(&client, &tool_args).await,
433        "rename_index" => actions::schema_alter::rename_index(&client, &tool_args).await,
434        "rename_schema" => actions::schema_alter::rename_schema(&client, &tool_args).await,
435        "add_foreign_key" => actions::schema_alter::add_foreign_key(&client, &tool_args).await,
436        "drop_foreign_key" => actions::schema_alter::drop_foreign_key(&client, &tool_args).await,
437        "add_unique_constraint" => {
438            actions::schema_alter::add_unique_constraint(&client, &tool_args).await
439        }
440        "drop_constraint" => actions::schema_alter::drop_constraint(&client, &tool_args).await,
441        // Session management
442        "cancel_query" => actions::session_mgmt::cancel_query(&client, &tool_args).await,
443        "terminate_connection" => {
444            actions::session_mgmt::terminate_connection(&client, &tool_args).await
445        }
446        "show_blocked_queries" => {
447            actions::session_mgmt::show_blocked_queries(&client, &tool_args).await
448        }
449        // Extension management
450        "list_extensions" => actions::ext_mgmt::list_extensions(&client, &tool_args).await,
451        "create_extension" => actions::ext_mgmt::create_extension(&client, &tool_args).await,
452        "drop_extension" => actions::ext_mgmt::drop_extension(&client, &tool_args).await,
453        // Database management
454        "list_databases" => actions::db_mgmt::list_databases(&client, &tool_args).await,
455        "create_database" => actions::db_mgmt::create_database(&client, &tool_args).await,
456        // Extended maintenance
457        "vacuum" => actions::maint_ext::vacuum(&client, &tool_args).await,
458        "vacuum_full" => actions::maint_ext::vacuum_full(&client, &tool_args).await,
459        "reindex_database" => actions::maint_ext::reindex_database(&client, &tool_args).await,
460        // Migration helpers
461        "generate_create_table_ddl" => {
462            actions::migration_helpers::generate_create_table_ddl(&client, &tool_args).await
463        }
464        "generate_create_index_ddl" => {
465            actions::migration_helpers::generate_create_index_ddl(&client, &tool_args).await
466        }
467        "table_dependencies" => {
468            actions::migration_helpers::table_dependencies(&client, &tool_args).await
469        }
470        // pgvector
471        "list_vector_columns" => actions::pgvector::list_vector_columns(&client, &tool_args).await,
472        "vector_search" => actions::pgvector::vector_search(&client, &tool_args).await,
473        "create_vector_index" => actions::pgvector::create_vector_index(&client, &tool_args).await,
474        // TimescaleDB
475        "create_hypertable" => actions::timescaledb::create_hypertable(&client, &tool_args).await,
476        "show_hypertable_details" => {
477            actions::timescaledb::show_hypertable_details(&client, &tool_args).await
478        }
479        "show_chunks" => actions::timescaledb::show_chunks(&client, &tool_args).await,
480        "add_retention_policy" => {
481            actions::timescaledb::add_retention_policy(&client, &tool_args).await
482        }
483        "add_compression_policy" => {
484            actions::timescaledb::add_compression_policy(&client, &tool_args).await
485        }
486        "compress_chunk" => actions::timescaledb::compress_chunk(&client, &tool_args).await,
487        "add_continuous_aggregate" => {
488            actions::timescaledb::add_continuous_aggregate(&client, &tool_args).await
489        }
490        // pg_textsearch (BM25)
491        "list_bm25_indexes" => actions::pg_textsearch::list_bm25_indexes(&client, &tool_args).await,
492        "search_bm25" => actions::pg_textsearch::search_bm25(&client, &tool_args).await,
493        "create_bm25_index" => actions::pg_textsearch::create_bm25_index(&client, &tool_args).await,
494        "drop_bm25_index" => actions::pg_textsearch::drop_bm25_index(&client, &tool_args).await,
495        "bm25_force_merge" => actions::pg_textsearch::bm25_force_merge(&client, &tool_args).await,
496        "bm25_index_stats" => actions::pg_textsearch::bm25_index_stats(&client, &tool_args).await,
497        // v4.0: Data I/O
498        "import_from_url" => actions::data_io::import_from_url(&client, &tool_args).await,
499        "export_csv" => actions::data_io::export_csv(&client, &tool_args).await,
500        // v4.0: Index Advisor
501        "suggest_indexes" => actions::index_advisor::suggest_indexes(&client, &tool_args).await,
502        // v4.0: Schema Health
503        "find_tables_without_pk" => {
504            actions::schema_health::find_tables_without_pk(&client, &tool_args).await
505        }
506        "find_missing_fk_indexes" => {
507            actions::schema_health::find_missing_fk_indexes(&client, &tool_args).await
508        }
509        "analyze_table_bloat" => {
510            actions::schema_health::analyze_table_bloat(&client, &tool_args).await
511        }
512        "clone_table_schema" => {
513            actions::schema_health::clone_table_schema(&client, &tool_args).await
514        }
515        // v4.0: Security Audit
516        "security_audit" => actions::security_audit::security_audit(&client, &tool_args).await,
517        "audit_role_usage" => actions::security_audit::audit_role_usage(&client, &tool_args).await,
518        // v4.0: Data Tools
519        "sample_data" => actions::data_tools::sample_data(&client, &tool_args).await,
520        tool => Err(method_not_found(tool)),
521    };
522
523    if let Err(ref e) = result {
524        error!("Tool '{}' error: {:?}", tool_name, e);
525    }
526    // client is returned to the pool automatically via Drop
527    drop(client);
528    result
529}
530
531#[cold]
532fn method_not_found(name: &str) -> MCPError {
533    MCPError::MethodNotFound(name.to_string())
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn test_parse_valid_request() {
542        let line = r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#;
543        let req = parse_request(line).unwrap();
544        assert_eq!(req.method, "initialize");
545        assert_eq!(req.id, Some(Value::Number(1.into())));
546    }
547
548    #[test]
549    fn test_parse_request_with_trailing_newline() {
550        let line = r#"{"jsonrpc":"2.0","method":"tools/list","id":2}"#;
551        let req = parse_request(line).unwrap();
552        assert_eq!(req.method, "tools/list");
553    }
554
555    #[test]
556    fn test_parse_request_with_whitespace() {
557        let line = "  {\"jsonrpc\":\"2.0\",\"method\":\"ping\",\"id\":3}  ";
558        let req = parse_request(line).unwrap();
559        assert_eq!(req.method, "ping");
560    }
561
562    #[test]
563    fn test_parse_empty_request() {
564        let err = parse_request("").unwrap_err();
565        assert_eq!(err, "Empty request");
566    }
567
568    #[test]
569    fn test_parse_whitespace_only() {
570        let err = parse_request("   \n  ").unwrap_err();
571        assert_eq!(err, "Empty request");
572    }
573
574    #[test]
575    fn test_parse_invalid_json() {
576        let err = parse_request("{invalid}").unwrap_err();
577        assert!(
578            !err.is_empty(),
579            "Invalid JSON should produce an error message"
580        );
581    }
582
583    #[test]
584    fn test_parse_missing_method() {
585        let err = parse_request(r#"{"jsonrpc":"2.0","id":1}"#).unwrap_err();
586        assert!(err.contains("method"));
587    }
588
589    #[test]
590    fn test_parse_wrong_version() {
591        let req = parse_request(r#"{"jsonrpc":"1.0","method":"init","id":1}"#).unwrap();
592        assert_eq!(req.jsonrpc, "1.0");
593    }
594
595    #[test]
596    fn test_method_not_found_error() {
597        let err = method_not_found("test_tool");
598        assert_eq!(err.error_code(), -32601);
599        assert!(err.to_string().contains("test_tool"));
600    }
601
602    #[test]
603    fn test_tools_list_static() {
604        let list: Value = serde_json::from_slice(&TOOLS_LIST_RESPONSE).unwrap();
605        let tools = list.get("tools").and_then(|v| v.as_array());
606        assert!(
607            tools.is_some(),
608            "TOOLS_LIST_RESPONSE should contain a tools array"
609        );
610        assert!(!tools.unwrap().is_empty(), "Tools list should not be empty");
611    }
612
613    #[test]
614    fn test_process_request_method_dispatch() {
615        // Verify that process_request handles the dispatch correctly
616        // by testing the match on method strings — this is a compilation/coverage test
617        let _req = JsonRpcRequest {
618            jsonrpc: "2.0".to_string(),
619            method: "nonexistent".to_string(),
620            params: None,
621            id: Some(Value::Number(1.into())),
622        };
623        // We can't run process_request without a pool, but we can verify the fallback path
624        // acts as expected through separate unit tests on the dispatch logic
625    }
626
627    #[test]
628    fn test_handle_initialize_response() {
629        let req = JsonRpcRequest {
630            jsonrpc: "2.0".to_string(),
631            method: "initialize".to_string(),
632            params: None,
633            id: Some(Value::Number(1.into())),
634        };
635        let result = handle_initialize(&req).unwrap();
636        assert_eq!(result["protocolVersion"], "2024-11-05");
637        assert!(result["capabilities"]["tools"]["listChanged"].is_boolean());
638        assert_eq!(result["serverInfo"]["version"], env!("CARGO_PKG_VERSION"));
639    }
640
641    /// Enforce Phase 1.5: no bare `SET ` outside transaction blocks.
642    /// Every session-level SET must use `SET LOCAL` inside a `BEGIN`/`COMMIT` pair.
643    /// This grep-based test fails compilation if any violation exists in `src/actions/`.
644    #[test]
645    fn test_no_bare_set_outside_transaction() {
646        let source_files = &[
647            include_str!("../src/actions/query.rs"),
648            include_str!("../src/actions/batch.rs"),
649        ];
650        for (idx, source) in source_files.iter().enumerate() {
651            for (line_no, line) in source.lines().enumerate() {
652                let trimmed = line.trim();
653                // Skip comments, UPDATE SET, string literals
654                if trimmed.starts_with("//")
655                    || trimmed.starts_with("/*")
656                    || trimmed.starts_with("*")
657                {
658                    continue;
659                }
660                if trimmed.contains("UPDATE ") && trimmed.contains("SET ") {
661                    continue;
662                }
663                if trimmed.contains("SET LOCAL") {
664                    continue;
665                }
666                // Check for bare client.execute("SET ...") outside txn
667                if trimmed.contains("client.execute(\"SET ") && !trimmed.contains("SET LOCAL") {
668                    let names = ["query.rs", "batch.rs"];
669                    panic!(
670                        "Phase 1.5 violation: bare `SET` (not SET LOCAL) found in {}:{} — \
671                         use BEGIN + SET LOCAL + COMMIT pattern to avoid session leakage.\n\
672                         Line: {}",
673                        names[idx],
674                        line_no + 1,
675                        trimmed
676                    );
677                }
678            }
679        }
680    }
681}