Skip to main content

database_mcp_sql/
connection.rs

1//! Connection abstraction shared across database backends.
2//!
3//! Defines [`Connection`] — the single trait every backend implements.
4//! Backends provide pool resolution, identifier quoting config, and
5//! timeout config; default method implementations handle query execution
6//! and SQL quoting.
7
8use database_mcp_server::AppError;
9use serde_json::Value;
10use sqlx::Executor;
11use sqlx_to_json::QueryResult as _;
12
13use crate::identifier;
14use crate::timeout::execute_with_timeout;
15
16/// Unified query and quoting surface every backend tool handler uses.
17///
18/// Backends supply four required items — [`DB`](Connection::DB),
19/// [`IDENTIFIER_QUOTE`](Connection::IDENTIFIER_QUOTE),
20/// [`pool`](Connection::pool), and [`query_timeout`](Connection::query_timeout)
21/// — and receive default implementations for query execution and SQL quoting.
22///
23/// # Errors
24///
25/// Query methods may return:
26///
27/// - [`AppError::InvalidIdentifier`] — `database` failed identifier validation.
28/// - [`AppError::Connection`] — the underlying driver failed.
29/// - [`AppError::QueryTimeout`] — the query exceeded the configured timeout.
30#[allow(async_fn_in_trait)]
31pub trait Connection: Send + Sync {
32    /// The sqlx database driver type (e.g. `sqlx::MySql`).
33    type DB: sqlx::Database;
34
35    /// Character used to quote identifiers (`` ` `` for `MySQL`, `"` for `PostgreSQL`/`SQLite`).
36    const IDENTIFIER_QUOTE: char;
37
38    /// Resolves the connection pool for the given target database.
39    ///
40    /// # Errors
41    ///
42    /// - [`AppError::InvalidIdentifier`] — `target` failed validation.
43    async fn pool(&self, target: Option<&str>) -> Result<sqlx::Pool<Self::DB>, AppError>;
44
45    /// Returns the configured query timeout in seconds, if any.
46    fn query_timeout(&self) -> Option<u64>;
47
48    /// Runs a statement that returns no meaningful rows.
49    ///
50    /// # Errors
51    ///
52    /// See trait-level documentation.
53    async fn execute(&self, query: &str, database: Option<&str>) -> Result<u64, AppError>
54    where
55        for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
56        <Self::DB as sqlx::Database>::QueryResult: sqlx_to_json::QueryResult,
57    {
58        let pool = self.pool(database).await?;
59        let sql = query.to_owned();
60        execute_with_timeout(self.query_timeout(), query, async move {
61            let mut conn = pool.acquire().await?;
62            let result = (&mut *conn).execute(sql.as_str()).await?;
63            Ok::<_, sqlx::Error>(result.rows_affected())
64        })
65        .await
66    }
67
68    /// Runs a statement and collects every result row as JSON.
69    ///
70    /// # Errors
71    ///
72    /// See trait-level documentation.
73    async fn fetch(&self, query: &str, database: Option<&str>) -> Result<Vec<Value>, AppError>
74    where
75        for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
76        <Self::DB as sqlx::Database>::Row: sqlx_to_json::RowExt,
77    {
78        let pool = self.pool(database).await?;
79        let sql = query.to_owned();
80        execute_with_timeout(self.query_timeout(), query, async move {
81            let mut conn = pool.acquire().await?;
82            let rows = (&mut *conn).fetch_all(sql.as_str()).await?;
83            Ok::<_, sqlx::Error>(rows.iter().map(sqlx_to_json::RowExt::to_json).collect())
84        })
85        .await
86    }
87
88    /// Runs a statement and returns at most one result row as JSON.
89    ///
90    /// # Errors
91    ///
92    /// See trait-level documentation.
93    async fn fetch_optional(&self, query: &str, database: Option<&str>) -> Result<Option<Value>, AppError>
94    where
95        for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
96        <Self::DB as sqlx::Database>::Row: sqlx_to_json::RowExt,
97    {
98        let pool = self.pool(database).await?;
99        let sql = query.to_owned();
100        execute_with_timeout(self.query_timeout(), query, async move {
101            let mut conn = pool.acquire().await?;
102            let row = (&mut *conn).fetch_optional(sql.as_str()).await?;
103            Ok::<_, sqlx::Error>(row.as_ref().map(sqlx_to_json::RowExt::to_json))
104        })
105        .await
106    }
107
108    /// Wraps `name` in the backend's identifier quote character.
109    fn quote_identifier(&self, name: &str) -> String {
110        identifier::quote_identifier(name, Self::IDENTIFIER_QUOTE)
111    }
112
113    /// Wraps `value` in single quotes for use as a SQL string literal.
114    fn quote_string(&self, value: &str) -> String {
115        identifier::quote_string(value)
116    }
117}