Skip to main content

pmcp_toolkit_mysql/
lib.rs

1//! MySQL connector for pmcp-server-toolkit.
2//!
3//! Pure-Rust + Lambda-deployable: sqlx 0.8.6 with `tls-rustls-aws-lc-rs` (no
4//! OpenSSL) per `feedback_avoid_docker_pure_rust_lambda` memory.
5//!
6//! [`MysqlConnector`] implements the toolkit's 3-method
7//! [`SqlConnector`](pmcp_server_toolkit::sql::SqlConnector) trait:
8//! [`dialect`](pmcp_server_toolkit::sql::SqlConnector::dialect),
9//! [`execute`](pmcp_server_toolkit::sql::SqlConnector::execute) (canonical
10//! `:name` placeholders translated to `?` via
11//! [`translate_placeholders`](pmcp_server_toolkit::sql::translate_placeholders)),
12//! and [`schema_text`](pmcp_server_toolkit::sql::SqlConnector::schema_text)
13//! (driven by `information_schema.columns` filtered by the MySQL database name).
14//!
15//! REVIEWS M3: [`MysqlConnector::connect`] uses
16//! [`MySqlPool::connect_lazy`](sqlx::mysql::MySqlPool::connect_lazy) to defer
17//! TCP I/O to first use. `connect_lazy` parses the URL synchronously, so a
18//! malformed URL returns [`ConnectorError::Connection`] immediately, while a
19//! real connection failure surfaces on the first
20//! [`execute`](SqlConnector::execute) / [`schema_text`](SqlConnector::schema_text)
21//! call. The `pub async fn` signature is retained for API symmetry with
22//! `PostgresConnector::connect` and to leave room for a future `connect_eager`
23//! variant that DOES open the connection.
24//!
25//! REVIEWS H5: the `dev_mock` feature exposes
26//! `pmcp_toolkit_mysql::dev_mock::MysqlMock` for examples + downstream
27//! integration tests. It is NOT enabled by default.
28//!
29//! # Security
30//!
31//! [`ConnectorError::Connection`] error text NEVER contains the raw URL or its
32//! password — the password segment is redacted via [`sanitize_url`] before the
33//! error is constructed (T-84-06-02).
34//!
35//! # Example
36//!
37//! ```no_run
38//! # use pmcp_toolkit_mysql::MysqlConnector;
39//! # #[tokio::main]
40//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
41//! let conn = MysqlConnector::connect("mysql://localhost/mydb").await?;
42//! # Ok(())
43//! # }
44//! ```
45
46#![allow(clippy::doc_markdown)]
47
48use async_trait::async_trait;
49use serde_json::{json, Map, Value};
50use sqlx::mysql::{MySqlArguments, MySqlPool, MySqlRow};
51use sqlx::query::Query;
52use sqlx::{Column, MySql, Row, TypeInfo};
53
54use pmcp_server_toolkit::sql::{
55    translate_placeholders, ConnectorError, Dialect, SqlConnector, TranslatedSql,
56};
57
58pub mod dev_mock;
59
60/// MySQL connector backed by a `sqlx` [`MySqlPool`].
61///
62/// Construct with [`MysqlConnector::connect`]. The pool is built lazily via
63/// [`MySqlPool::connect_lazy`](sqlx::mysql::MySqlPool::connect_lazy) (REVIEWS
64/// M3) — no TCP connection opens until the first
65/// [`execute`](SqlConnector::execute) / [`schema_text`](SqlConnector::schema_text)
66/// call. `database` is the schema name parsed from the URL, used to filter
67/// `information_schema.columns` in [`schema_text`](SqlConnector::schema_text).
68pub struct MysqlConnector {
69    pool: MySqlPool,
70    database: String,
71}
72
73/// Redact the password segment of a MySQL connection URL.
74///
75/// Returns the URL with any `:password@` segment rewritten to `:***@`. Used on
76/// the [`MysqlConnector::connect`] error path so a malformed-URL error never
77/// echoes the caller's secret (T-84-06-02). Plain best-effort string surgery —
78/// it never parses, so it cannot itself fail on malformed input.
79fn sanitize_url(url: &str) -> String {
80    let Some(scheme_end) = url.find("://") else {
81        return url.to_string();
82    };
83    let authority_start = scheme_end + 3;
84    let rest = &url[authority_start..];
85    let authority_end = rest.find('/').map_or(rest.len(), |i| i);
86    let authority = &rest[..authority_end];
87
88    // Only the userinfo segment (before '@') can carry a password.
89    let Some(at) = authority.find('@') else {
90        return url.to_string();
91    };
92    let userinfo = &authority[..at];
93    let Some(colon) = userinfo.find(':') else {
94        return url.to_string();
95    };
96
97    let user = &userinfo[..colon];
98    let after_at = &authority[at..];
99    format!(
100        "{}{}:***{}{}",
101        &url[..authority_start],
102        user,
103        after_at,
104        &rest[authority_end..]
105    )
106}
107
108/// Extract the MySQL database (schema) name from a connection URL.
109///
110/// Captures the path component after the host authority — e.g.
111/// `mysql://host/mydb` → `Some("mydb")`. Any query string after `?` is
112/// stripped. Returns `None` when no database segment is present. Used to filter
113/// `information_schema.columns` in [`schema_text`](SqlConnector::schema_text)
114/// (MySQL has no `'public'` schema like Postgres).
115fn parse_database_from_url(url: &str) -> Option<String> {
116    let scheme_end = url.find("://")?;
117    let rest = &url[scheme_end + 3..];
118    let slash = rest.find('/')?;
119    let after = &rest[slash + 1..];
120    let db = after.split(['?', '/']).next().unwrap_or("");
121    if db.is_empty() {
122        None
123    } else {
124        Some(db.to_string())
125    }
126}
127
128impl MysqlConnector {
129    /// Connect to a MySQL backend by URL, returning a pooled connector.
130    ///
131    /// REVIEWS M3: builds the pool via
132    /// [`MySqlPool::connect_lazy`](sqlx::mysql::MySqlPool::connect_lazy), which
133    /// parses the URL synchronously and defers TCP I/O to first use. The
134    /// constructor therefore returns immediately and is offline-safe — real
135    /// connection failures surface on the first
136    /// [`execute`](SqlConnector::execute) / [`schema_text`](SqlConnector::schema_text)
137    /// call.
138    ///
139    /// # Errors
140    ///
141    /// Returns [`ConnectorError::Connection`] if the URL cannot be parsed. The
142    /// error message redacts the password via [`sanitize_url`] — it never echoes
143    /// the raw secret (T-84-06-02).
144    pub async fn connect(url: &str) -> Result<Self, ConnectorError> {
145        let database = parse_database_from_url(url).unwrap_or_default();
146        // REVIEWS M3: connect_lazy parses the URL synchronously, defers TCP I/O
147        // to first use. Matches Plan 06's "returns Ok(Self) on URL parse success".
148        let pool = MySqlPool::connect_lazy(url).map_err(|e| {
149            ConnectorError::Connection(format!("mysql url ({}): {e}", sanitize_url(url)))
150        })?;
151        Ok(Self { pool, database })
152    }
153}
154
155/// Bind one [`Value`] onto a `sqlx` query, dispatching on the JSON variant.
156///
157/// `serde_json::Value` does not impl `Encode<MySql>` directly, so each variant
158/// is bound through a concrete Rust type. `Value::Null` binds an explicit typed
159/// `None`; object/array shapes are serialized to a JSON text fallback.
160fn bind_one<'q>(
161    q: Query<'q, MySql, MySqlArguments>,
162    v: &Value,
163) -> Query<'q, MySql, MySqlArguments> {
164    match v {
165        Value::Null => q.bind(None::<&str>),
166        Value::Bool(b) => q.bind(*b),
167        Value::Number(n) if n.is_i64() => q.bind(n.as_i64().unwrap_or(0)),
168        Value::Number(n) => q.bind(n.as_f64().unwrap_or(0.0)),
169        Value::String(s) => q.bind(s.clone()),
170        arr_or_obj => q.bind(serde_json::to_string(arr_or_obj).unwrap_or_default()),
171    }
172}
173
174/// Convert one column of a row into a [`Value`], dispatching on the MySQL type
175/// name (RESEARCH §1.2). Unknown column types fall through to a text read.
176fn column_to_value(row: &MySqlRow, idx: usize, type_name: &str) -> Value {
177    match type_name {
178        "BIGINT" | "INT" | "MEDIUMINT" | "SMALLINT" | "TINYINT" => row
179            .try_get::<Option<i64>, _>(idx)
180            .ok()
181            .flatten()
182            .map_or(Value::Null, |i| json!(i)),
183        "DOUBLE" | "FLOAT" => row
184            .try_get::<Option<f64>, _>(idx)
185            .ok()
186            .flatten()
187            .map_or(Value::Null, |f| json!(f)),
188        "BOOLEAN" | "BOOL" => row
189            .try_get::<Option<bool>, _>(idx)
190            .ok()
191            .flatten()
192            .map_or(Value::Null, |b| json!(b)),
193        // "VARCHAR" | "TEXT" | "CHAR" | "DECIMAL" and everything else: text read.
194        _ => row
195            .try_get::<Option<String>, _>(idx)
196            .ok()
197            .flatten()
198            .map_or(Value::Null, |s| json!(s)),
199    }
200}
201
202/// Convert a driver row into a JSON object keyed by column name (D-01 shape).
203fn row_to_value(row: &MySqlRow) -> Value {
204    let mut obj = Map::new();
205    for (idx, col) in row.columns().iter().enumerate() {
206        obj.insert(
207            col.name().to_string(),
208            column_to_value(row, idx, col.type_info().name()),
209        );
210    }
211    Value::Object(obj)
212}
213
214/// Read a string column from an `information_schema` row by name, defaulting to
215/// empty on any decode error so DDL rendering never panics mid-pass.
216fn schema_col(row: &MySqlRow, name: &str) -> String {
217    row.try_get::<String, _>(name).unwrap_or_default()
218}
219
220/// Group `information_schema.columns` rows into `CREATE TABLE` blocks with
221/// MySQL backtick identifier quoting and an `ENGINE=InnoDB` footer.
222///
223/// Each row carries `table_name`, `column_name`, `data_type`, and `is_nullable`
224/// (`"YES"`/`"NO"`). Rows arrive ordered by `table_name`, `ordinal_position`, so
225/// a single pass emits one block per table.
226fn format_information_schema_as_ddl(rows: &[MySqlRow]) -> String {
227    let mut out = String::new();
228    let mut current_table: Option<String> = None;
229
230    for row in rows {
231        let table = schema_col(row, "table_name");
232        let column = schema_col(row, "column_name");
233        let data_type = schema_col(row, "data_type");
234        let is_nullable = schema_col(row, "is_nullable");
235
236        if current_table.as_deref() != Some(table.as_str()) {
237            if current_table.is_some() {
238                out.push_str(") ENGINE=InnoDB;\n");
239            }
240            out.push_str(&format!("CREATE TABLE `{table}` (\n"));
241            current_table = Some(table);
242        }
243        let not_null = if is_nullable == "NO" { " NOT NULL" } else { "" };
244        out.push_str(&format!("  `{column}` {data_type}{not_null}\n"));
245    }
246    if current_table.is_some() {
247        out.push_str(") ENGINE=InnoDB;\n");
248    }
249    out
250}
251
252#[async_trait]
253impl SqlConnector for MysqlConnector {
254    fn dialect(&self) -> Dialect {
255        Dialect::MySql
256    }
257
258    async fn execute(
259        &self,
260        sql: &str,
261        params: &[(String, Value)],
262    ) -> Result<Vec<Value>, ConnectorError> {
263        let TranslatedSql {
264            sql: translated,
265            ordered_params,
266        } = translate_placeholders(sql, Dialect::MySql);
267        let mut q = sqlx::query(&translated);
268        for name in &ordered_params {
269            let v = params
270                .iter()
271                .find(|(k, _)| k == name)
272                .map_or(Value::Null, |(_, v)| v.clone());
273            q = bind_one(q, &v);
274        }
275        let rows = q
276            .fetch_all(&self.pool)
277            .await
278            .map_err(|e| ConnectorError::Query(e.to_string()))?;
279        Ok(rows.iter().map(row_to_value).collect())
280    }
281
282    async fn schema_text(&self) -> Result<String, ConnectorError> {
283        let rows = sqlx::query(
284            "SELECT table_name, column_name, data_type, is_nullable \
285             FROM information_schema.columns WHERE table_schema = ? \
286             ORDER BY table_name, ordinal_position",
287        )
288        .bind(&self.database)
289        .fetch_all(&self.pool)
290        .await
291        .map_err(|e| ConnectorError::Schema(e.to_string()))?;
292        Ok(format_information_schema_as_ddl(&rows))
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    #[test]
301    fn test_sanitize_url_redacts_password() {
302        assert_eq!(
303            sanitize_url("mysql://user:secret@host/db"),
304            "mysql://user:***@host/db"
305        );
306    }
307
308    #[test]
309    fn test_sanitize_url_without_password_unchanged() {
310        assert_eq!(
311            sanitize_url("mysql://host/db"),
312            "mysql://host/db",
313            "no userinfo → unchanged"
314        );
315        assert_eq!(
316            sanitize_url("mysql://user@host/db"),
317            "mysql://user@host/db",
318            "user without password → unchanged"
319        );
320    }
321
322    #[test]
323    fn test_parse_database_from_url() {
324        assert_eq!(
325            parse_database_from_url("mysql://localhost/mydb"),
326            Some("mydb".to_string())
327        );
328        assert_eq!(
329            parse_database_from_url("mysql://user:pw@host:3306/shop?ssl=true"),
330            Some("shop".to_string()),
331            "query string and port are stripped"
332        );
333        assert_eq!(
334            parse_database_from_url("mysql://localhost/"),
335            None,
336            "empty database segment → None"
337        );
338        assert_eq!(
339            parse_database_from_url("not a url"),
340            None,
341            "no scheme → None"
342        );
343    }
344
345    #[test]
346    fn test_bind_one_dispatch() {
347        // Each variant must build a bindable query without panicking. We can't
348        // inspect the bound value off a `Query` (sqlx hides its arg buffer), so
349        // the assertion is that dispatch is total across every Value shape.
350        for v in [
351            Value::Null,
352            json!(true),
353            json!(42_i64),
354            json!(2.5_f64),
355            json!("hello"),
356            json!([1, 2, 3]),
357            json!({"k": "v"}),
358        ] {
359            let _ = bind_one(sqlx::query("SELECT ?"), &v);
360        }
361    }
362
363    // REVIEWS M3: connect_lazy returns Ok without opening a TCP connection.
364    #[tokio::test]
365    async fn test_connect_lazy_returns_ok_without_network() {
366        // No MySQL server runs locally; connect_lazy must still return Ok(_)
367        // because it only parses the URL and defers I/O to first use.
368        let result = MysqlConnector::connect("mysql://localhost/db").await;
369        assert!(
370            result.is_ok(),
371            "connect_lazy must return Ok without a reachable server"
372        );
373    }
374
375    // REVIEWS M3: a structurally invalid URL fails synchronously, redacted.
376    #[tokio::test]
377    async fn test_connect_invalid_url_returns_err() {
378        match MysqlConnector::connect("not a url").await {
379            Err(ConnectorError::Connection(msg)) => {
380                assert!(
381                    !msg.contains("password"),
382                    "error text must not contain the literal 'password'; got: {msg:?}"
383                );
384            },
385            Err(other) => panic!("expected ConnectorError::Connection, got {other:?}"),
386            Ok(_) => panic!("malformed URL must error"),
387        }
388    }
389
390    #[tokio::test]
391    async fn test_connect_invalid_url_does_not_echo_password() {
392        match MysqlConnector::connect("mysql://u:hunter2@@@bad url/db").await {
393            Err(ConnectorError::Connection(msg)) => {
394                assert!(
395                    !msg.contains("hunter2"),
396                    "error text must not echo the password; got: {msg:?}"
397                );
398            },
399            Err(other) => panic!("expected ConnectorError::Connection, got {other:?}"),
400            // connect_lazy may accept this — if so there is no error path to check.
401            Ok(_) => {},
402        }
403    }
404}