Skip to main content

dbmcp_sql/
connection.rs

1//! Connection abstraction shared across database backends.
2//!
3//! Defines [`Connection`] — the single trait every backend implements.
4//! Backends provide pool resolution and timeout config; default method
5//! implementations handle query execution.
6
7use crate::SqlError;
8use serde_json::Value;
9use sqlx::{Decode, Execute, Executor, FromRow, Row, Type};
10use sqlx_json::{QueryResult as _, RowExt};
11
12use crate::timeout::execute_with_timeout;
13
14/// Unified query surface every backend tool handler uses.
15///
16/// Backends supply three required items — [`DB`](Connection::DB),
17/// [`pool`](Connection::pool), and [`query_timeout`](Connection::query_timeout)
18/// — and receive default implementations for query execution.
19///
20/// Query methods accept any [`sqlx::Execute`] value: a plain `&str` for
21/// bindless statements (which routes through sqlx's unprepared text
22/// protocol, required for statements like `MySQL` `USE`), or an
23/// `sqlx::query(sql).bind(...)` value for parameterized statements.
24///
25/// # Errors
26///
27/// Query methods may return:
28///
29/// - [`SqlError::InvalidIdentifier`] — `database` failed identifier validation.
30/// - [`SqlError::Connection`] — the underlying driver failed.
31/// - [`SqlError::QueryTimeout`] — the query exceeded the configured timeout.
32#[allow(async_fn_in_trait)]
33pub trait Connection: Send + Sync
34where
35    for<'c> &'c mut <Self::DB as sqlx::Database>::Connection: Executor<'c, Database = Self::DB>,
36    usize: sqlx::ColumnIndex<<Self::DB as sqlx::Database>::Row>,
37    <Self::DB as sqlx::Database>::Row: RowExt,
38    <Self::DB as sqlx::Database>::QueryResult: sqlx_json::QueryResult,
39{
40    /// The sqlx database driver type (e.g. `sqlx::MySql`).
41    type DB: sqlx::Database;
42
43    /// Resolves the connection pool for the given target database.
44    ///
45    /// # Errors
46    ///
47    /// - [`SqlError::InvalidIdentifier`] — `target` failed validation.
48    async fn pool(&self, target: Option<&str>) -> Result<sqlx::Pool<Self::DB>, SqlError>;
49
50    /// Returns the configured query timeout in seconds, if any.
51    fn query_timeout(&self) -> Option<u64>;
52
53    /// Runs a statement that returns no meaningful rows.
54    ///
55    /// # Errors
56    ///
57    /// See trait-level documentation.
58    async fn execute<'q, E>(&self, query: E, database: Option<&str>) -> Result<u64, SqlError>
59    where
60        E: 'q + Execute<'q, Self::DB>,
61    {
62        let sql = query.sql().to_owned();
63        let pool = self.pool(database).await?;
64        execute_with_timeout(self.query_timeout(), &sql, async move {
65            Ok(pool.execute(query).await?.rows_affected())
66        })
67        .await
68    }
69
70    /// Runs a statement and collects every result row as JSON.
71    ///
72    /// # Errors
73    ///
74    /// See trait-level documentation.
75    async fn fetch_json<'q, E>(&self, query: E, database: Option<&str>) -> Result<Vec<Value>, SqlError>
76    where
77        E: 'q + Execute<'q, Self::DB>,
78    {
79        let sql = query.sql().to_owned();
80        let pool = self.pool(database).await?;
81        execute_with_timeout(self.query_timeout(), &sql, async move {
82            Ok(pool.fetch_all(query).await?.iter().map(RowExt::to_json).collect())
83        })
84        .await
85    }
86
87    /// Runs a query and extracts column 0 from the first row, if any.
88    ///
89    /// Returns `None` for both "no row returned" and "row where column 0
90    /// is NULL" (decode errors are caught, not propagated).
91    ///
92    /// # Errors
93    ///
94    /// See trait-level documentation.
95    async fn fetch_optional<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Option<T>, SqlError>
96    where
97        E: 'q + Execute<'q, Self::DB>,
98        T: for<'r> Decode<'r, Self::DB> + Type<Self::DB> + Send + Unpin,
99    {
100        let sql = query.sql().to_owned();
101        let pool = self.pool(database).await?;
102        execute_with_timeout(self.query_timeout(), &sql, async move {
103            Ok(pool.fetch_optional(query).await?.and_then(|r| r.try_get(0usize).ok()))
104        })
105        .await
106    }
107
108    /// Runs a query and extracts the first column of every row.
109    ///
110    /// # Errors
111    ///
112    /// See trait-level documentation.
113    async fn fetch_scalar<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Vec<T>, SqlError>
114    where
115        E: 'q + Execute<'q, Self::DB>,
116        T: for<'r> Decode<'r, Self::DB> + Type<Self::DB> + Send + Unpin,
117    {
118        let sql = query.sql().to_owned();
119        let pool = self.pool(database).await?;
120        execute_with_timeout(self.query_timeout(), &sql, async move {
121            let rows = pool.fetch_all(query).await?;
122            rows.iter().map(|r| r.try_get(0usize)).collect()
123        })
124        .await
125    }
126
127    /// Runs a query and decodes every row into `T` via [`sqlx::FromRow`].
128    ///
129    /// # Errors
130    ///
131    /// See trait-level documentation. Row decode failures (column type
132    /// mismatch, malformed JSON inside a [`sqlx::types::Json`] column, etc.)
133    /// surface as [`SqlError::Query`].
134    async fn fetch<'q, E, T>(&self, query: E, database: Option<&str>) -> Result<Vec<T>, SqlError>
135    where
136        E: 'q + Execute<'q, Self::DB>,
137        T: for<'r> FromRow<'r, <Self::DB as sqlx::Database>::Row> + Send + Unpin,
138    {
139        let sql = query.sql().to_owned();
140        let pool = self.pool(database).await?;
141        execute_with_timeout(self.query_timeout(), &sql, async move {
142            let rows = pool.fetch_all(query).await?;
143            rows.iter().map(T::from_row).collect()
144        })
145        .await
146    }
147}