1use tokio::net::{TcpListener, TcpStream};
2use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3use serde_json::{json, Value};
4use tracing::{error, warn};
5use std::sync::Arc;
6
7use crate::config::Config;
8use crate::errors::{MCPError, Result as MCPResult};
9use crate::metrics;
10use crate::pool::ConnectionPool;
11use crate::protocol::{JsonRpcRequest, JsonRpcResponse};
12use crate::actions;
13use once_cell::sync::Lazy;
14
15static TOOLS_LIST: Lazy<Value> = Lazy::new(|| {
16 let tools_json = include_str!("../tools.json");
17 let tools: Vec<Value> = serde_json::from_str(tools_json)
18 .expect("Failed to parse tools.json");
19 json!({ "tools": tools })
20});
21
22const BUFFER_CAPACITY: usize = 4096;
23const NEWLINE: &[u8] = b"\n";
24
25#[inline]
26#[cold]
27fn parse_error(msg: String) -> JsonRpcResponse {
28 let mcp_error = MCPError::ParseError(msg);
29 JsonRpcResponse::error(None, mcp_error.error_code(), mcp_error.to_string())
30}
31
32#[inline]
33fn parse_request(line: &str) -> Result<JsonRpcRequest, String> {
34 let trimmed = line.trim();
35 if trimmed.is_empty() {
36 return Err("Empty request".to_string());
37 }
38 serde_json::from_str::<JsonRpcRequest>(trimmed)
39 .map_err(|e| e.to_string())
40}
41
42pub struct MCPServer {
43 config: Config,
44 pool: Arc<ConnectionPool>,
45}
46
47impl MCPServer {
48 pub fn new(config: Config, pool: Arc<ConnectionPool>) -> Self {
49 Self { config, pool }
50 }
51
52 pub async fn run_stdio(&self) -> MCPResult<()> {
54 let stdin = tokio::io::stdin();
55 let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, stdin);
56 let mut stdout = tokio::io::stdout();
57 let mut line = String::with_capacity(512);
58 let mut response_buf = Vec::with_capacity(65536);
59
60 loop {
61 line.clear();
62 match reader.read_line(&mut line).await {
63 Ok(0) => break,
64 Ok(_) => {
65 process_one_line(&line, &self.pool, &self.config, &mut response_buf, &mut stdout).await?;
66 }
67 Err(e) => {
68 error!("IO error: {}", e);
69 break;
70 }
71 }
72 }
73 Ok(())
74 }
75
76 pub async fn run(&self) -> MCPResult<()> {
77 let addr = format!("{}:{}", self.config.server.host, self.config.server.port);
78 let listener = TcpListener::bind(&addr).await?;
79
80 tracing::info!("MCP server listening on {}", addr);
81
82 loop {
83 let (socket, peer_addr) = listener.accept().await?;
84
85 if let Err(e) = socket.set_nodelay(true) {
86 warn!("Failed to set TCP_NODELAY: {}", e);
87 }
88
89 let pool = Arc::clone(&self.pool);
90 let config = self.config.clone();
91
92 tokio::spawn(async move {
93 if let Err(e) = handle_client(socket, pool, config).await {
94 error!("Client {} error: {}", peer_addr, e);
95 }
96 });
97 }
98 }
99}
100
101#[inline(never)]
102async fn handle_client(socket: TcpStream, pool: Arc<ConnectionPool>, config: Config) -> MCPResult<()> {
103 let (reader, mut writer) = socket.into_split();
104 let mut reader = BufReader::with_capacity(BUFFER_CAPACITY, reader);
105 let mut line = String::with_capacity(512);
106 let mut response_buf = Vec::with_capacity(65536);
107
108 loop {
109 line.clear();
110 match reader.read_line(&mut line).await {
111 Ok(0) => break,
112 Ok(_) => {
113 process_one_line(&line, &pool, &config, &mut response_buf, &mut writer).await?;
114 }
115 Err(e) => {
116 error!("IO error: {}", e);
117 break;
118 }
119 }
120 }
121
122 Ok(())
123}
124
125#[inline]
128async fn process_one_line<W: AsyncWriteExt + Unpin>(
129 line: &str,
130 pool: &Arc<ConnectionPool>,
131 config: &Config,
132 response_buf: &mut Vec<u8>,
133 writer: &mut W,
134) -> MCPResult<()> {
135 metrics::inc_requests();
136
137 let (response, is_notification) = match parse_request(line) {
138 Ok(req) => {
139 let is_notif = req.id.is_none();
140 match process_request(&req, pool, config).await {
141 Ok(result) => (JsonRpcResponse::success(req.id, result), is_notif),
142 Err(e) => {
143 metrics::inc_errors();
144 (JsonRpcResponse::error(req.id, e.error_code(), e.to_string()), is_notif)
145 }
146 }
147 }
148 Err(e) => {
149 metrics::inc_errors();
150 (parse_error(e), false)
151 }
152 };
153
154 if is_notification {
156 return Ok(());
157 }
158
159 response_buf.clear();
160 serde_json::to_writer(&mut *response_buf, &response)?;
161 response_buf.extend_from_slice(NEWLINE);
162
163 writer.write_all(response_buf).await?;
164 writer.flush().await?;
165 Ok(())
166}
167
168#[inline]
170pub async fn process_request(
171 req: &JsonRpcRequest,
172 pool: &Arc<ConnectionPool>,
173 config: &Config,
174) -> MCPResult<Value> {
175 match req.method.as_str() {
176 "initialize" => handle_initialize(req),
177 "tools/list" => handle_tools_list(),
178 "tools/call" => handle_tools_call(req, pool, config).await,
179 "ping" => handle_ping(),
180 method if method.starts_with("notifications/") => handle_notification(method),
181 _ => Err(MCPError::MethodNotFound(req.method.clone())),
182 }
183}
184
185#[inline]
187fn handle_ping() -> MCPResult<Value> {
188 Ok(Value::Null)
189}
190
191#[inline]
193fn handle_notification(method: &str) -> MCPResult<Value> {
194 tracing::trace!("Received notification: {method}");
195 Ok(Value::Null)
196}
197
198pub async fn process_request_http(
200 req: &JsonRpcRequest,
201 pool: &Arc<ConnectionPool>,
202 config: &Config,
203) -> JsonRpcResponse {
204 metrics::inc_requests();
205
206 match process_request(req, pool, config).await {
207 Ok(result) => JsonRpcResponse::success(req.id.clone(), result),
208 Err(e) => {
209 metrics::inc_errors();
210 JsonRpcResponse::error(req.id.clone(), e.error_code(), e.to_string())
211 }
212 }
213}
214
215#[inline]
216fn handle_initialize(_req: &JsonRpcRequest) -> MCPResult<Value> {
217 Ok(json!({
218 "protocolVersion": "2024-11-05",
219 "capabilities": {
220 "tools": {
221 "listChanged": false
222 },
223 "resources": {
224 "subscribe": false,
225 "listChanged": false
226 },
227 "prompts": {
228 "listChanged": false
229 }
230 },
231 "serverInfo": {
232 "name": "mcp-postgres",
233 "version": env!("CARGO_PKG_VERSION")
234 }
235 }))
236}
237
238#[inline]
239fn handle_tools_list() -> MCPResult<Value> {
240 Ok((*TOOLS_LIST).clone())
241}
242
243async fn handle_tools_call(
244 req: &JsonRpcRequest,
245 pool: &Arc<ConnectionPool>,
246 config: &Config,
247) -> MCPResult<Value> {
248 let tool_name = req
249 .params
250 .as_ref()
251 .and_then(|p| p.get("name").and_then(|v| v.as_str()))
252 .ok_or_else(|| MCPError::InvalidParams("Missing 'name' parameter".into()))?;
253
254 let tool_args = req.params.as_ref().and_then(|p| p.get("arguments"));
255
256 if config.server.access_mode == crate::config::AccessMode::Restricted
258 && crate::tools::is_write_tool(tool_name)
259 {
260 return Err(MCPError::InvalidParams(format!(
261 "Operation '{tool_name}' is not allowed in restricted (read-only) mode"
262 )));
263 }
264
265 if !crate::tools::tool_exists(tool_name) {
267 return Err(method_not_found(tool_name));
268 }
269
270 let client = pool.acquire().await?;
272
273 let result = match tool_name {
274 "list_tables" => actions::schema::list_tables(&client, &tool_args).await,
276 "describe_table" => actions::schema::describe_table(&client, &tool_args).await,
277 "list_indexes" => actions::schema::list_indexes(&client, &tool_args).await,
278 "list_schemas" => actions::schema::list_schemas(&client, &tool_args).await,
279 "show_constraints" => actions::schema::show_constraints(&client, &tool_args).await,
280 "list_triggers" => actions::schema::list_triggers(&client, &tool_args).await,
281 "create_table" => actions::schema::create_table(&client, &tool_args).await,
282 "drop_table" => actions::schema::drop_table(&client, &tool_args).await,
283 "create_view" => actions::schema::create_view(&client, &tool_args).await,
284 "drop_view" => actions::schema::drop_view(&client, &tool_args).await,
285 "alter_view" => actions::schema::alter_view(&client, &tool_args).await,
286 "create_schema" => actions::schema::create_schema(&client, &tool_args).await,
287 "drop_schema" => actions::schema::drop_schema(&client, &tool_args).await,
288 "create_sequence" => actions::schema::create_sequence(&client, &tool_args).await,
289 "drop_sequence" => actions::schema::drop_sequence(&client, &tool_args).await,
290 "alter_index" => actions::schema::alter_index(&client, &tool_args).await,
291 "list_partitions" => actions::schema::list_partitions(&client, &tool_args).await,
292 "backup_table" => actions::schema::backup_table(&client, &tool_args).await,
293 "create_index" => actions::schema::create_index(&client, &tool_args).await,
294 "drop_index" => actions::schema::drop_index(&client, &tool_args).await,
295 "create_partition" => actions::schema::create_partition(&client, &tool_args).await,
296 "drop_partition" => actions::schema::drop_partition(&client, &tool_args).await,
297 "execute_query" => actions::query::execute_query(&client, &tool_args).await,
299 "execute_insert" => actions::query::execute_insert(&client, &tool_args).await,
300 "execute_update" => actions::query::execute_update(&client, &tool_args).await,
301 "execute_delete" => actions::query::execute_delete(&client, &tool_args).await,
302 "async_execute_insert" => actions::query::async_execute_insert(&client, &tool_args).await,
303 "async_execute_update" => actions::query::async_execute_update(&client, &tool_args).await,
304 "async_execute_delete" => actions::query::async_execute_delete(&client, &tool_args).await,
305 "explain_query" => actions::query::explain_query(&client, &tool_args).await,
306 "async_batch_insert" => actions::batch::async_batch_insert(&client, &tool_args).await,
308 "async_batch_update" => actions::batch::async_batch_update(&client, &tool_args).await,
309 "async_batch_delete" => actions::batch::async_batch_delete(&client, &tool_args).await,
310 "async_batch_insert_copy" => actions::batch::async_batch_insert_copy(&client, &tool_args).await,
311 "get_table_stats" => actions::monitoring::get_table_stats(&client, &tool_args).await,
313 "get_index_stats" => actions::monitoring::get_index_stats(&client, &tool_args).await,
314 "show_database_size" => actions::monitoring::show_database_size(&client, &tool_args).await,
315 "show_table_size" => actions::monitoring::show_table_size(&client, &tool_args).await,
316 "get_cache_hit_ratio" => actions::monitoring::get_cache_hit_ratio(&client, &tool_args).await,
317 "list_connections" => actions::connections::list_connections(&client, &tool_args).await,
319 "show_current_user" => actions::connections::show_current_user(&client, &tool_args).await,
320 "show_running_queries" => actions::connections::show_running_queries(&client, &tool_args).await,
321 "show_connection_summary" => actions::connections::show_connection_summary(&client, &tool_args).await,
322 "vacuum_analyze" => actions::maintenance::vacuum_analyze(&client, &tool_args).await,
324 "analyze_table" => actions::maintenance::analyze_table(&client, &tool_args).await,
325 "reindex_table" => actions::maintenance::reindex_table(&client, &tool_args).await,
326 "get_pg_stat_statements" => actions::maintenance::get_pg_stat_statements(&client, &tool_args).await,
327 "reset_statistics" => actions::maintenance::reset_statistics(&client, &tool_args).await,
328 "truncate_table" => actions::maintenance::truncate_table(&client, &tool_args).await,
329 "list_users" => actions::security::list_users(&client, &tool_args).await,
331 "list_user_privileges" => actions::security::list_user_privileges(&client, &tool_args).await,
332 "list_role_memberships" => actions::security::list_role_memberships(&client, &tool_args).await,
333 "list_database_privileges" => actions::security::list_database_privileges(&client, &tool_args).await,
334 "show_session_info" => actions::security::show_session_info(&client, &tool_args).await,
335 "show_all_settings" => actions::config::show_all_settings(&client, &tool_args).await,
337 "get_setting" => actions::config::get_setting(&client, &tool_args).await,
338 "show_memory_settings" => actions::config::show_memory_settings(&client, &tool_args).await,
339 "show_performance_settings" => actions::config::show_performance_settings(&client, &tool_args).await,
340 "show_log_settings" => actions::config::show_log_settings(&client, &tool_args).await,
341 "show_replication_status" => actions::replication::show_replication_status(&client, &tool_args).await,
343 "list_replication_slots" => actions::replication::list_replication_slots(&client, &tool_args).await,
344 "list_standby_servers" => actions::replication::list_standby_servers(&client, &tool_args).await,
345 "show_wal_info" => actions::replication::show_wal_info(&client, &tool_args).await,
346 "show_base_backup_progress" => actions::replication::show_base_backup_progress(&client, &tool_args).await,
347 "show_active_transactions" => actions::transactions::show_active_transactions(&client, &tool_args).await,
349 "show_locks" => actions::transactions::show_locks(&client, &tool_args).await,
350 "show_waiting_locks" => actions::transactions::show_waiting_locks(&client, &tool_args).await,
351 "show_transaction_isolation" => actions::transactions::show_transaction_isolation(&client, &tool_args).await,
352 "show_deadlocks" => actions::transactions::show_deadlocks(&client, &tool_args).await,
353 "show_autocommit_status" => actions::transactions::show_autocommit_status(&client, &tool_args).await,
354 "show_transaction_timeout" => actions::transactions::show_transaction_timeout(&client, &tool_args).await,
355 "analyze_db_health" => actions::health::analyze_db_health(&client, &tool_args).await,
357 "list_unused_indexes" => actions::health::list_unused_indexes(&client, &tool_args).await,
358 "list_duplicate_indexes" => actions::health::list_duplicate_indexes(&client, &tool_args).await,
359 "show_vacuum_progress" => actions::health::show_vacuum_progress(&client, &tool_args).await,
360 "get_object_details" => actions::schema::get_object_details(&client, &tool_args).await,
362 "create_user" => actions::user_mgmt::create_user(&client, &tool_args).await,
364 "alter_user" => actions::user_mgmt::alter_user(&client, &tool_args).await,
365 "drop_user" => actions::user_mgmt::drop_user(&client, &tool_args).await,
366 "create_role" => actions::user_mgmt::create_role(&client, &tool_args).await,
367 "alter_role" => actions::user_mgmt::alter_role(&client, &tool_args).await,
368 "drop_role" => actions::user_mgmt::drop_role(&client, &tool_args).await,
369 "grant_privileges" => actions::user_mgmt::grant_privileges(&client, &tool_args).await,
370 "revoke_privileges" => actions::user_mgmt::revoke_privileges(&client, &tool_args).await,
371 "add_column" => actions::schema_alter::add_column(&client, &tool_args).await,
373 "drop_column" => actions::schema_alter::drop_column(&client, &tool_args).await,
374 "rename_column" => actions::schema_alter::rename_column(&client, &tool_args).await,
375 "alter_column_type" => actions::schema_alter::alter_column_type(&client, &tool_args).await,
376 "rename_table" => actions::schema_alter::rename_table(&client, &tool_args).await,
377 "rename_index" => actions::schema_alter::rename_index(&client, &tool_args).await,
378 "rename_schema" => actions::schema_alter::rename_schema(&client, &tool_args).await,
379 "add_foreign_key" => actions::schema_alter::add_foreign_key(&client, &tool_args).await,
380 "drop_foreign_key" => actions::schema_alter::drop_foreign_key(&client, &tool_args).await,
381 "add_unique_constraint" => actions::schema_alter::add_unique_constraint(&client, &tool_args).await,
382 "drop_constraint" => actions::schema_alter::drop_constraint(&client, &tool_args).await,
383 "cancel_query" => actions::session_mgmt::cancel_query(&client, &tool_args).await,
385 "terminate_connection" => actions::session_mgmt::terminate_connection(&client, &tool_args).await,
386 "show_blocked_queries" => actions::session_mgmt::show_blocked_queries(&client, &tool_args).await,
387 "list_extensions" => actions::ext_mgmt::list_extensions(&client, &tool_args).await,
389 "create_extension" => actions::ext_mgmt::create_extension(&client, &tool_args).await,
390 "drop_extension" => actions::ext_mgmt::drop_extension(&client, &tool_args).await,
391 "list_databases" => actions::db_mgmt::list_databases(&client, &tool_args).await,
393 "create_database" => actions::db_mgmt::create_database(&client, &tool_args).await,
394 "vacuum" => actions::maint_ext::vacuum(&client, &tool_args).await,
396 "vacuum_full" => actions::maint_ext::vacuum_full(&client, &tool_args).await,
397 "reindex_database" => actions::maint_ext::reindex_database(&client, &tool_args).await,
398 "generate_create_table_ddl" => actions::migration_helpers::generate_create_table_ddl(&client, &tool_args).await,
400 "generate_create_index_ddl" => actions::migration_helpers::generate_create_index_ddl(&client, &tool_args).await,
401 "table_dependencies" => actions::migration_helpers::table_dependencies(&client, &tool_args).await,
402 "list_vector_columns" => actions::pgvector::list_vector_columns(&client, &tool_args).await,
404 "vector_search" => actions::pgvector::vector_search(&client, &tool_args).await,
405 "create_vector_index" => actions::pgvector::create_vector_index(&client, &tool_args).await,
406 "create_hypertable" => actions::timescaledb::create_hypertable(&client, &tool_args).await,
408 "show_hypertable_details" => actions::timescaledb::show_hypertable_details(&client, &tool_args).await,
409 "show_chunks" => actions::timescaledb::show_chunks(&client, &tool_args).await,
410 "add_retention_policy" => actions::timescaledb::add_retention_policy(&client, &tool_args).await,
411 "add_compression_policy" => actions::timescaledb::add_compression_policy(&client, &tool_args).await,
412 "compress_chunk" => actions::timescaledb::compress_chunk(&client, &tool_args).await,
413 "add_continuous_aggregate" => actions::timescaledb::add_continuous_aggregate(&client, &tool_args).await,
414 "list_bm25_indexes" => actions::pg_textsearch::list_bm25_indexes(&client, &tool_args).await,
416 "search_bm25" => actions::pg_textsearch::search_bm25(&client, &tool_args).await,
417 "create_bm25_index" => actions::pg_textsearch::create_bm25_index(&client, &tool_args).await,
418 "drop_bm25_index" => actions::pg_textsearch::drop_bm25_index(&client, &tool_args).await,
419 "bm25_force_merge" => actions::pg_textsearch::bm25_force_merge(&client, &tool_args).await,
420 "bm25_index_stats" => actions::pg_textsearch::bm25_index_stats(&client, &tool_args).await,
421 "import_from_url" => actions::data_io::import_from_url(&client, &tool_args).await,
423 "export_csv" => actions::data_io::export_csv(&client, &tool_args).await,
424 "suggest_indexes" => actions::index_advisor::suggest_indexes(&client, &tool_args).await,
426 "find_tables_without_pk" => actions::schema_health::find_tables_without_pk(&client, &tool_args).await,
428 "find_missing_fk_indexes" => actions::schema_health::find_missing_fk_indexes(&client, &tool_args).await,
429 "analyze_table_bloat" => actions::schema_health::analyze_table_bloat(&client, &tool_args).await,
430 "clone_table_schema" => actions::schema_health::clone_table_schema(&client, &tool_args).await,
431 "security_audit" => actions::security_audit::security_audit(&client, &tool_args).await,
433 "audit_role_usage" => actions::security_audit::audit_role_usage(&client, &tool_args).await,
434 "sample_data" => actions::data_tools::sample_data(&client, &tool_args).await,
436 tool => Err(method_not_found(tool)),
437 };
438
439 if let Err(ref e) = result {
440 error!("Tool '{}' error: {:?}", tool_name, e);
441 }
442 drop(client);
444 result
445}
446
447#[cold]
448fn method_not_found(name: &str) -> MCPError {
449 MCPError::MethodNotFound(name.to_string())
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_parse_valid_request() {
458 let line = r#"{"jsonrpc":"2.0","method":"initialize","id":1}"#;
459 let req = parse_request(line).unwrap();
460 assert_eq!(req.method, "initialize");
461 assert_eq!(req.id, Some(Value::Number(1.into())));
462 }
463
464 #[test]
465 fn test_parse_request_with_trailing_newline() {
466 let line = r#"{"jsonrpc":"2.0","method":"tools/list","id":2}"#;
467 let req = parse_request(line).unwrap();
468 assert_eq!(req.method, "tools/list");
469 }
470
471 #[test]
472 fn test_parse_request_with_whitespace() {
473 let line = " {\"jsonrpc\":\"2.0\",\"method\":\"ping\",\"id\":3} ";
474 let req = parse_request(line).unwrap();
475 assert_eq!(req.method, "ping");
476 }
477
478 #[test]
479 fn test_parse_empty_request() {
480 let err = parse_request("").unwrap_err();
481 assert_eq!(err, "Empty request");
482 }
483
484 #[test]
485 fn test_parse_whitespace_only() {
486 let err = parse_request(" \n ").unwrap_err();
487 assert_eq!(err, "Empty request");
488 }
489
490 #[test]
491 fn test_parse_invalid_json() {
492 let err = parse_request("{invalid}").unwrap_err();
493 assert!(!err.is_empty(), "Invalid JSON should produce an error message");
494 }
495
496 #[test]
497 fn test_parse_missing_method() {
498 let err = parse_request(r#"{"jsonrpc":"2.0","id":1}"#).unwrap_err();
499 assert!(err.contains("method"));
500 }
501
502 #[test]
503 fn test_parse_wrong_version() {
504 let req = parse_request(r#"{"jsonrpc":"1.0","method":"init","id":1}"#).unwrap();
505 assert_eq!(req.jsonrpc, "1.0");
506 }
507
508 #[test]
509 fn test_method_not_found_error() {
510 let err = method_not_found("test_tool");
511 assert_eq!(err.error_code(), -32601);
512 assert!(err.to_string().contains("test_tool"));
513 }
514
515 #[test]
516 fn test_tools_list_static() {
517 let list = &*TOOLS_LIST;
518 let tools = list.get("tools").and_then(|v| v.as_array());
519 assert!(tools.is_some(), "TOOLS_LIST should contain a tools array");
520 assert!(!tools.unwrap().is_empty(), "Tools list should not be empty");
521 }
522
523 #[test]
524 fn test_process_request_method_dispatch() {
525 let _req = JsonRpcRequest {
528 jsonrpc: "2.0".to_string(),
529 method: "nonexistent".to_string(),
530 params: None,
531 id: Some(Value::Number(1.into())),
532 };
533 }
536
537 #[test]
538 fn test_handle_initialize_response() {
539 let req = JsonRpcRequest {
540 jsonrpc: "2.0".to_string(),
541 method: "initialize".to_string(),
542 params: None,
543 id: Some(Value::Number(1.into())),
544 };
545 let result = handle_initialize(&req).unwrap();
546 assert_eq!(result["protocolVersion"], "2024-11-05");
547 assert!(result["capabilities"]["tools"]["listChanged"].is_boolean());
548 assert_eq!(result["serverInfo"]["version"], env!("CARGO_PKG_VERSION"));
549 }
550
551 #[test]
555 fn test_no_bare_set_outside_transaction() {
556 let source_files = &[
557 include_str!("../src/actions/query.rs"),
558 include_str!("../src/actions/batch.rs"),
559 ];
560 for (idx, source) in source_files.iter().enumerate() {
561 for (line_no, line) in source.lines().enumerate() {
562 let trimmed = line.trim();
563 if trimmed.starts_with("//") || trimmed.starts_with("/*") || trimmed.starts_with("*") {
565 continue;
566 }
567 if trimmed.contains("UPDATE ") && trimmed.contains("SET ") {
568 continue;
569 }
570 if trimmed.contains("SET LOCAL") {
571 continue;
572 }
573 if trimmed.contains("client.execute(\"SET ") && !trimmed.contains("SET LOCAL") {
575 let names = ["query.rs", "batch.rs"];
576 panic!(
577 "Phase 1.5 violation: bare `SET` (not SET LOCAL) found in {}:{} — \
578 use BEGIN + SET LOCAL + COMMIT pattern to avoid session leakage.\n\
579 Line: {}",
580 names[idx], line_no + 1, trimmed
581 );
582 }
583 }
584 }
585 }
586}