database_mcp_sql/
connection.rs1use database_mcp_server::AppError;
9use serde_json::Value;
10use sqlx::Executor;
11use sqlx_to_json::QueryResult as _;
12
13use crate::identifier;
14use crate::timeout::execute_with_timeout;
15
16#[allow(async_fn_in_trait)]
31pub trait Connection: Send + Sync {
32 type DB: sqlx::Database;
34
35 const IDENTIFIER_QUOTE: char;
37
38 async fn pool(&self, target: Option<&str>) -> Result<sqlx::Pool<Self::DB>, AppError>;
44
45 fn query_timeout(&self) -> Option<u64>;
47
48 async fn execute(&self, query: &str, database: Option<&str>) -> Result<u64, AppError>
54 where
55 for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
56 <Self::DB as sqlx::Database>::QueryResult: sqlx_to_json::QueryResult,
57 {
58 let pool = self.pool(database).await?;
59 let sql = query.to_owned();
60 execute_with_timeout(self.query_timeout(), query, async move {
61 let mut conn = pool.acquire().await?;
62 let result = (&mut *conn).execute(sql.as_str()).await?;
63 Ok::<_, sqlx::Error>(result.rows_affected())
64 })
65 .await
66 }
67
68 async fn fetch(&self, query: &str, database: Option<&str>) -> Result<Vec<Value>, AppError>
74 where
75 for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
76 <Self::DB as sqlx::Database>::Row: sqlx_to_json::RowExt,
77 {
78 let pool = self.pool(database).await?;
79 let sql = query.to_owned();
80 execute_with_timeout(self.query_timeout(), query, async move {
81 let mut conn = pool.acquire().await?;
82 let rows = (&mut *conn).fetch_all(sql.as_str()).await?;
83 Ok::<_, sqlx::Error>(rows.iter().map(sqlx_to_json::RowExt::to_json).collect())
84 })
85 .await
86 }
87
88 async fn fetch_optional(&self, query: &str, database: Option<&str>) -> Result<Option<Value>, AppError>
94 where
95 for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
96 <Self::DB as sqlx::Database>::Row: sqlx_to_json::RowExt,
97 {
98 let pool = self.pool(database).await?;
99 let sql = query.to_owned();
100 execute_with_timeout(self.query_timeout(), query, async move {
101 let mut conn = pool.acquire().await?;
102 let row = (&mut *conn).fetch_optional(sql.as_str()).await?;
103 Ok::<_, sqlx::Error>(row.as_ref().map(sqlx_to_json::RowExt::to_json))
104 })
105 .await
106 }
107
108 fn quote_identifier(&self, name: &str) -> String {
110 identifier::quote_identifier(name, Self::IDENTIFIER_QUOTE)
111 }
112
113 fn quote_string(&self, value: &str) -> String {
115 identifier::quote_string(value)
116 }
117}