Skip to main content

ferrule_sql/backends/
mysql.rs

1use crate::connection::{
2    AsyncConnection, BulkInsert, ConnectOptions, ExecutionSummary, ForeignKey, QueryResult,
3    SchemaInfo, StatementResult,
4};
5use crate::error::SqlError;
6use crate::stream::BoxRowStream;
7use crate::url::DatabaseUrl;
8use crate::value::{ColumnInfo, Row, TypeHint, Value};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use futures_util::stream::StreamExt;
13use mysql_async::prelude::Queryable;
14use secrecy::ExposeSecret;
15
16pub struct MySqlConnection {
17    conn: mysql_async::Conn,
18}
19
20#[async_trait]
21impl AsyncConnection for MySqlConnection {
22    async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
23        self.conn
24            .query_drop(sql)
25            .await
26            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
27        let affected = self.conn.affected_rows();
28        Ok(ExecutionSummary {
29            rows_affected: Some(affected),
30            command_tag: None,
31        })
32    }
33
34    async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
35        let mut result = self
36            .conn
37            .query_iter(sql)
38            .await
39            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
40
41        let columns_ref = result.columns_ref();
42        let columns: Vec<ColumnInfo> = columns_ref
43            .iter()
44            .map(|c| ColumnInfo {
45                name: c.name_str().to_string(),
46                type_hint: TypeHint::Other,
47                nullable: true,
48            })
49            .collect();
50
51        let mysql_rows = result
52            .collect::<mysql_async::Row>()
53            .await
54            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
55
56        // Discard any remaining result sets so the connection stays clean.
57        result
58            .drop_result()
59            .await
60            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
61
62        let rows: Vec<Row> = mysql_rows
63            .into_iter()
64            .map(|row| {
65                let col_types: Vec<_> = row
66                    .columns_ref()
67                    .iter()
68                    .map(|c| (c.column_type(), c.column_length()))
69                    .collect();
70                row.unwrap()
71                    .into_iter()
72                    .enumerate()
73                    .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
74                    .collect()
75            })
76            .collect();
77
78        Ok(QueryResult { columns, rows })
79    }
80
81    /// Stream rows from a MySQL `query_iter` result at bounded memory.
82    ///
83    /// `query_iter` is already lazy — it does not read rows from the
84    /// server until they are pulled. We keep that laziness by driving
85    /// `QueryResult::next` inside a `try_unfold` stream that owns the
86    /// `QueryResult` (which borrows `&mut self.conn`), so no row is read
87    /// ahead of the consumer and memory stays `O(1)` per pulled row.
88    /// Only the first result set is streamed (consistent with the eager
89    /// `query`); the connection is left ready for reuse once the stream
90    /// is fully drained or dropped.
91    async fn query_stream(
92        &mut self,
93        sql: &str,
94    ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError> {
95        // Pass an owned query so the resulting `QueryResult` borrows
96        // only `&mut self.conn` (not the `sql` argument), keeping the
97        // returned stream tied to the connection lifetime.
98        let result = self
99            .conn
100            .query_iter(sql.to_string())
101            .await
102            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
103
104        let columns: Vec<ColumnInfo> = result
105            .columns_ref()
106            .iter()
107            .map(|c| ColumnInfo {
108                name: c.name_str().to_string(),
109                type_hint: TypeHint::Other,
110                nullable: true,
111            })
112            .collect();
113
114        let stream = futures_util::stream::try_unfold(result, |mut result| async move {
115            match result.next().await {
116                Ok(Some(row)) => {
117                    let col_types: Vec<_> = row
118                        .columns_ref()
119                        .iter()
120                        .map(|c| (c.column_type(), c.column_length()))
121                        .collect();
122                    let values: Row = row
123                        .unwrap()
124                        .into_iter()
125                        .enumerate()
126                        .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
127                        .collect();
128                    Ok(Some((values, result)))
129                }
130                Ok(None) => Ok(None),
131                Err(e) => Err(SqlError::QueryFailed(e.to_string())),
132            }
133        });
134        Ok((columns, Box::pin(stream)))
135    }
136
137    async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
138        let mut result = self
139            .conn
140            .query_iter(sql)
141            .await
142            .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
143
144        let mut results = Vec::new();
145
146        loop {
147            let columns_ref = result.columns_ref();
148            if columns_ref.is_empty() {
149                let affected = result.affected_rows();
150                result
151                    .collect::<mysql_async::Row>()
152                    .await
153                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
154                results.push(StatementResult::Summary(ExecutionSummary {
155                    rows_affected: Some(affected),
156                    command_tag: None,
157                }));
158            } else {
159                let columns: Vec<ColumnInfo> = columns_ref
160                    .iter()
161                    .map(|c| ColumnInfo {
162                        name: c.name_str().to_string(),
163                        type_hint: TypeHint::Other,
164                        nullable: true,
165                    })
166                    .collect();
167
168                let mysql_rows = result
169                    .collect::<mysql_async::Row>()
170                    .await
171                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
172
173                let rows: Vec<Row> = mysql_rows
174                    .into_iter()
175                    .map(|row| {
176                        let col_types: Vec<_> = row
177                            .columns_ref()
178                            .iter()
179                            .map(|c| (c.column_type(), c.column_length()))
180                            .collect();
181                        row.unwrap()
182                            .into_iter()
183                            .enumerate()
184                            .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
185                            .collect()
186                    })
187                    .collect();
188
189                results.push(StatementResult::Query(QueryResult { columns, rows }));
190            }
191
192            if result.is_empty() {
193                break;
194            }
195        }
196
197        Ok(results)
198    }
199
200    async fn ping(&mut self) -> Result<(), SqlError> {
201        self.conn
202            .ping()
203            .await
204            .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
205        Ok(())
206    }
207
208    async fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
209        let sql = match schema {
210            Some(s) => format!("SHOW TABLES FROM `{}`", escape_mysql_identifier(s)),
211            None => "SHOW TABLES".to_string(),
212        };
213        let result = self.query(&sql).await?;
214        let names: Vec<String> = result
215            .rows
216            .into_iter()
217            .filter_map(|row| {
218                row.into_iter().next().and_then(|v| match v {
219                    Value::String(s) => Some(s),
220                    _ => None,
221                })
222            })
223            .collect();
224        Ok(names)
225    }
226
227    async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
228        // `information_schema.SCHEMATA` lets us compute is_default in one
229        // query (`SCHEMA_NAME = DATABASE()`) and matches the
230        // describe_table information_schema usage. `DATABASE()` is NULL
231        // when no database is selected, so the comparison decodes to
232        // is_default = false.
233        let sql = "SELECT SCHEMA_NAME, SCHEMA_NAME = DATABASE() FROM information_schema.SCHEMATA ORDER BY SCHEMA_NAME";
234        let result = self.query(sql).await?;
235        let schemas: Vec<SchemaInfo> = result
236            .rows
237            .into_iter()
238            .filter_map(|row| {
239                let name = match row.first() {
240                    Some(Value::String(s)) => s.clone(),
241                    _ => return None,
242                };
243                let is_default = crate::connection::is_default_from_value(row.get(1));
244                Some(SchemaInfo { name, is_default })
245            })
246            .collect();
247        Ok(schemas)
248    }
249
250    async fn describe_table(
251        &mut self,
252        schema: Option<&str>,
253        table: &str,
254    ) -> Result<QueryResult, SqlError> {
255        let schema = match schema {
256            Some(s) => s.to_string(),
257            None => {
258                let db_query = self.query("SELECT DATABASE()").await?;
259                db_query
260                    .rows
261                    .into_iter()
262                    .next()
263                    .and_then(|row| row.into_iter().next())
264                    .and_then(|v| match v {
265                        Value::String(s) => Some(s),
266                        _ => None,
267                    })
268                    .unwrap_or_default()
269            }
270        };
271
272        let sql = format!(
273            "SELECT column_name AS `column_name`, \
274             data_type AS `data_type`, \
275             is_nullable AS `is_nullable`, \
276             column_default AS `column_default`, \
277             numeric_precision AS `numeric_precision`, \
278             numeric_scale AS `numeric_scale` \
279             FROM information_schema.columns \
280             WHERE table_schema = '{}' AND table_name = '{}' \
281             ORDER BY ordinal_position",
282            escape_mysql_string(&schema),
283            escape_mysql_string(table)
284        );
285        self.query(&sql).await
286    }
287
288    async fn primary_key(
289        &mut self,
290        schema: Option<&str>,
291        table: &str,
292    ) -> Result<Vec<String>, SqlError> {
293        let schema = match schema {
294            Some(s) => s.to_string(),
295            None => current_database(self).await?,
296        };
297        let sql = format!(
298            "SELECT column_name FROM information_schema.key_column_usage \
299             WHERE table_schema = '{}' AND table_name = '{}' \
300               AND constraint_name = 'PRIMARY' \
301             ORDER BY ordinal_position",
302            escape_mysql_string(&schema),
303            escape_mysql_string(table)
304        );
305        let result = self.query(&sql).await?;
306        Ok(result
307            .rows
308            .into_iter()
309            .filter_map(|row| {
310                row.into_iter().next().and_then(|v| match v {
311                    Value::String(s) => Some(s),
312                    _ => None,
313                })
314            })
315            .collect())
316    }
317
318    async fn list_foreign_keys(
319        &mut self,
320        schema: Option<&str>,
321    ) -> Result<Vec<ForeignKey>, SqlError> {
322        let schema = match schema {
323            Some(s) => s.to_string(),
324            None => current_database(self).await?,
325        };
326        // KEY_COLUMN_USAGE has one row per (constraint, column position).
327        // referential_constraints gives ON DELETE; join to surface it.
328        let sql = format!(
329            "SELECT k.constraint_name, k.table_name, k.column_name, \
330                    k.referenced_table_name, k.referenced_column_name, \
331                    rc.delete_rule \
332             FROM information_schema.key_column_usage k \
333             JOIN information_schema.referential_constraints rc \
334               ON rc.constraint_schema = k.constraint_schema \
335              AND rc.constraint_name = k.constraint_name \
336             WHERE k.table_schema = '{}' AND k.referenced_table_name IS NOT NULL \
337             ORDER BY k.constraint_name, k.ordinal_position",
338            escape_mysql_string(&schema)
339        );
340        let result = self.query(&sql).await?;
341        let mut map: indexmap::IndexMap<String, ForeignKey> = indexmap::IndexMap::new();
342        for row in result.rows {
343            let mut cols = row.into_iter();
344            let conname = match cols.next() {
345                Some(Value::String(s)) => s,
346                _ => continue,
347            };
348            let child_table = match cols.next() {
349                Some(Value::String(s)) => s,
350                _ => continue,
351            };
352            let child_col = match cols.next() {
353                Some(Value::String(s)) => s,
354                _ => continue,
355            };
356            let parent_table = match cols.next() {
357                Some(Value::String(s)) => s,
358                _ => continue,
359            };
360            let parent_col = match cols.next() {
361                Some(Value::String(s)) => s,
362                _ => continue,
363            };
364            let on_delete = match cols.next() {
365                Some(Value::String(s)) => Some(s),
366                _ => None,
367            };
368            let entry = map.entry(conname).or_insert_with(|| ForeignKey {
369                child_table: child_table.clone(),
370                child_columns: Vec::new(),
371                parent_table: parent_table.clone(),
372                parent_columns: Vec::new(),
373                on_delete,
374            });
375            entry.child_columns.push(child_col);
376            entry.parent_columns.push(parent_col);
377        }
378        Ok(map.into_values().collect())
379    }
380
381    async fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
382        if target.rows.is_empty() {
383            return Ok(0);
384        }
385
386        // Encode all rows upfront. Each row becomes one tab-separated
387        // newline-terminated line, with the MySQL LOAD DATA escape
388        // set applied byte-by-byte. The buffer is a `Vec<u8>` rather
389        // than a `String` so `Value::Bytes` can carry arbitrary
390        // (non-UTF-8) bytes into a BLOB column safely.
391        let hints: Vec<TypeHint> = target.columns.iter().map(|c| c.type_hint).collect();
392        let mut chunks: Vec<Bytes> = Vec::with_capacity(target.rows.len());
393        for row in target.rows {
394            let bytes = my_load_data::encode_row(row, &hints)?;
395            chunks.push(bytes);
396        }
397
398        // mysql_async exposes a *local* per-call infile handler that
399        // is consumed by a single `LOAD DATA LOCAL INFILE` and
400        // auto-cleared afterwards (also on Drop / Conn::reset). This
401        // gives us exactly the lifecycle we need: the handler is
402        // only present while this function is running, so a hostile
403        // `LOAD DATA LOCAL INFILE '/etc/passwd'` typed into
404        // `ferrule query` on this connection fails with
405        // `LocalInfileError::NoHandler` (no global handler installed
406        // at connect time). Security falls out of the API shape.
407        //
408        // The handler future must be `Sync`, so construct the stream
409        // *inside* the async block — capturing only `Vec<Bytes>`
410        // (which is `Sync`) rather than a pre-boxed stream (which is
411        // not). Matches the mysql_async docs example.
412        self.conn.set_infile_handler(async move {
413            Ok(futures_util::stream::iter(chunks).map(Ok).boxed())
414        });
415
416        // Quote the table and column list. The literal filename
417        // ('ferrule_bulk') does not name a real file — the local
418        // handler returns our encoded bytes instead, ignoring the
419        // server-supplied path entirely.
420        let qtable = my_load_data::backtick_quote(target.table);
421        let cols = target
422            .columns
423            .iter()
424            .map(|c| my_load_data::backtick_quote(&c.name))
425            .collect::<Vec<_>>()
426            .join(", ");
427        let load_sql = format!(
428            "LOAD DATA LOCAL INFILE 'ferrule_bulk' INTO TABLE {qtable} \
429             CHARACTER SET utf8mb4 \
430             FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' \
431             LINES TERMINATED BY '\\n' \
432             ({cols})"
433        );
434
435        // query_drop discards the result set but stores
436        // affected_rows on the connection.
437        let load_result = self.conn.query_drop(load_sql).await;
438
439        if let Err(e) = load_result {
440            // C2: ensure no infile handler lingers on the connection
441            // after a failed LOAD DATA. mysql_async only consumes the
442            // local handler when the *server* asks for the file (see
443            // `helpers.rs::handle_local_infile`); on a parse error,
444            // missing-table error, or any failure *before* that
445            // server prompt, the handler stays in
446            // `Conn::inner.infile_handler` ready to be honored by the
447            // *next* query on this connection — including a hostile
448            // user-issued `LOAD DATA LOCAL INFILE '/etc/passwd'`.
449            //
450            // Two-step defense:
451            //
452            // 1. `Conn::reset()` issues `COM_RESET_CONNECTION` and
453            //    explicitly clears `infile_handler = None`
454            //    (mysql_async src/conn/mod.rs:1146). On modern
455            //    servers this fully neutralizes the threat.
456            //
457            // 2. Reset returns `Ok(false)` without clearing on
458            //    pre-MySQL-5.7.3 / pre-MariaDB-10.2.4 servers, and
459            //    `Err(_)` on transport failure. In both cases we
460            //    install a *poison* local handler that refuses any
461            //    subsequent LOAD DATA on this connection. The poison
462            //    handler is one-shot per mysql_async semantics;
463            //    after it fires once `infile_handler` is `None` and
464            //    further LOAD DATA queries fail with
465            //    `LocalInfileError::NoHandler`. Installing it on top
466            //    of a stale handler overwrites it, so the leaked
467            //    bytes become unreachable even when reset is a no-op.
468            let _ = self.conn.reset().await;
469            self.conn.set_infile_handler(async {
470                Err(mysql_async::Error::from(
471                    mysql_async::LocalInfileError::other(std::io::Error::new(
472                        std::io::ErrorKind::PermissionDenied,
473                        "ferrule: LOAD DATA LOCAL INFILE refused — connection \
474                         state may be tainted after a failed bulk operation. \
475                         Reconnect to re-enable bulk_insert_rows.",
476                    )),
477                ))
478            });
479            return Err(my_load_data::classify_load_error(e));
480        }
481
482        Ok(self.conn.affected_rows() as usize)
483    }
484}
485
486pub(crate) async fn connect(
487    url: &DatabaseUrl,
488    opts: &ConnectOptions,
489) -> Result<MySqlConnection, SqlError> {
490    let mut builder = mysql_async::OptsBuilder::default()
491        .ip_or_hostname(url.host().unwrap_or("localhost"))
492        .tcp_port(url.port().unwrap_or(3306));
493
494    if !url.username().is_empty() {
495        builder = builder.user(Some(url.username()));
496    }
497    // A caller-resolved secret takes precedence over the URL password.
498    if let Some(pass) = opts.effective_password(url) {
499        builder = builder.pass(Some(pass.expose_secret()));
500    }
501    let db = url.database();
502    if !db.is_empty() {
503        builder = builder.db_name(Some(db));
504    }
505
506    if opts.insecure {
507        let ssl_opts = mysql_async::SslOpts::default()
508            .with_danger_accept_invalid_certs(true)
509            .with_danger_skip_domain_validation(true);
510        builder = builder.ssl_opts(Some(ssl_opts));
511    }
512
513    if let Some(ssl_mode) = url.params().get("ssl-mode") {
514        match ssl_mode.as_str() {
515            "disabled" | "disable" => {
516                let ssl_opts =
517                    mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true);
518                builder = builder.ssl_opts(Some(ssl_opts));
519            }
520            "preferred" => {
521                // Default behavior – no-op
522            }
523            "required" => {
524                let ssl_opts =
525                    mysql_async::SslOpts::default().with_danger_accept_invalid_certs(false);
526                builder = builder.ssl_opts(Some(ssl_opts));
527            }
528            "verify-ca" | "verify-identity" => {
529                let ssl_opts = mysql_async::SslOpts::default()
530                    .with_danger_accept_invalid_certs(false)
531                    .with_danger_skip_domain_validation(false);
532                builder = builder.ssl_opts(Some(ssl_opts));
533            }
534            _ => {}
535        }
536    }
537
538    let conn_opts: mysql_async::Opts = builder.into();
539    let conn = mysql_async::Conn::new(conn_opts)
540        .await
541        .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
542
543    Ok(MySqlConnection { conn })
544}
545
546fn mysql_to_value(
547    value: mysql_async::Value,
548    column_type: mysql_async::consts::ColumnType,
549    column_length: u32,
550) -> Value {
551    use mysql_async::consts::ColumnType as CT;
552
553    match value {
554        mysql_async::Value::NULL => Value::Null,
555        mysql_async::Value::Bytes(b) => match column_type {
556            CT::MYSQL_TYPE_JSON => serde_json::from_slice(&b)
557                .map(Value::Json)
558                .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
559            CT::MYSQL_TYPE_DECIMAL | CT::MYSQL_TYPE_NEWDECIMAL => {
560                Value::Decimal(String::from_utf8_lossy(&b).into_owned())
561            }
562            CT::MYSQL_TYPE_TINY_BLOB
563            | CT::MYSQL_TYPE_MEDIUM_BLOB
564            | CT::MYSQL_TYPE_LONG_BLOB
565            | CT::MYSQL_TYPE_BLOB => Value::Bytes(b),
566            CT::MYSQL_TYPE_TINY => {
567                let s = String::from_utf8_lossy(&b);
568                if column_length == 1 {
569                    Value::Bool(s != "0")
570                } else {
571                    s.parse::<i64>()
572                        .map(Value::Int64)
573                        .unwrap_or_else(|_| Value::String(s.into_owned()))
574                }
575            }
576            CT::MYSQL_TYPE_SHORT
577            | CT::MYSQL_TYPE_LONG
578            | CT::MYSQL_TYPE_INT24
579            | CT::MYSQL_TYPE_LONGLONG
580            | CT::MYSQL_TYPE_YEAR => String::from_utf8_lossy(&b)
581                .parse::<i64>()
582                .map(Value::Int64)
583                .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
584            CT::MYSQL_TYPE_FLOAT | CT::MYSQL_TYPE_DOUBLE => String::from_utf8_lossy(&b)
585                .parse::<f64>()
586                .map(Value::Float64)
587                .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
588            CT::MYSQL_TYPE_DATE => {
589                NaiveDate::parse_from_str(&String::from_utf8_lossy(&b), "%Y-%m-%d")
590                    .map(Value::Date)
591                    .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned()))
592            }
593            CT::MYSQL_TYPE_TIME => {
594                NaiveTime::parse_from_str(&String::from_utf8_lossy(&b), "%H:%M:%S")
595                    .or_else(|_| {
596                        NaiveTime::parse_from_str(&String::from_utf8_lossy(&b), "%H:%M:%S%.f")
597                    })
598                    .map(Value::Time)
599                    .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned()))
600            }
601            CT::MYSQL_TYPE_DATETIME | CT::MYSQL_TYPE_DATETIME2 => {
602                parse_naive_datetime(&String::from_utf8_lossy(&b))
603                    .map(Value::DateTime)
604                    .unwrap_or_else(|| Value::String(String::from_utf8_lossy(&b).into_owned()))
605            }
606            CT::MYSQL_TYPE_TIMESTAMP | CT::MYSQL_TYPE_TIMESTAMP2 => {
607                parse_naive_datetime(&String::from_utf8_lossy(&b))
608                    .and_then(|dt| dt.and_local_timezone(Utc).single())
609                    .map(Value::DateTimeTz)
610                    .unwrap_or_else(|| Value::String(String::from_utf8_lossy(&b).into_owned()))
611            }
612            _ => String::from_utf8(b)
613                .map(Value::String)
614                .unwrap_or_else(|e| Value::Bytes(e.into_bytes())),
615        },
616        mysql_async::Value::Int(i) => {
617            if column_type == CT::MYSQL_TYPE_TINY && column_length == 1 {
618                Value::Bool(i != 0)
619            } else {
620                Value::Int64(i)
621            }
622        }
623        mysql_async::Value::UInt(u) => {
624            if column_type == CT::MYSQL_TYPE_TINY && column_length == 1 {
625                Value::Bool(u != 0)
626            } else {
627                Value::Int64(u as i64)
628            }
629        }
630        mysql_async::Value::Float(f) => Value::Float64(f64::from(f)),
631        mysql_async::Value::Double(d) => Value::Float64(d),
632        mysql_async::Value::Date(year, month, day, hour, min, sec, usec) => match column_type {
633            CT::MYSQL_TYPE_DATE => NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
634                .map(Value::Date)
635                .unwrap_or_else(|| Value::String(format!("{:04}-{:02}-{:02}", year, month, day))),
636            CT::MYSQL_TYPE_TIMESTAMP | CT::MYSQL_TYPE_TIMESTAMP2 => {
637                NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
638                    .and_then(|d| d.and_hms_micro_opt(hour as u32, min as u32, sec as u32, usec))
639                    .and_then(|dt| dt.and_local_timezone(Utc).single())
640                    .map(Value::DateTimeTz)
641                    .unwrap_or_else(|| {
642                        Value::String(format!(
643                            "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
644                            year, month, day, hour, min, sec
645                        ))
646                    })
647            }
648            _ => NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
649                .and_then(|d| d.and_hms_micro_opt(hour as u32, min as u32, sec as u32, usec))
650                .map(Value::DateTime)
651                .unwrap_or_else(|| {
652                    Value::String(format!(
653                        "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
654                        year, month, day, hour, min, sec
655                    ))
656                }),
657        },
658        mysql_async::Value::Time(neg, days, hours, minutes, seconds, _usec) => {
659            let total_hours = days * 24 + u32::from(hours);
660            Value::String(format!(
661                "{}{:02}:{:02}:{:02}",
662                if neg { "-" } else { "" },
663                total_hours,
664                minutes,
665                seconds
666            ))
667        }
668    }
669}
670
671fn parse_naive_datetime(s: &str) -> Option<NaiveDateTime> {
672    NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
673        .ok()
674        .or_else(|| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").ok())
675}
676
677async fn current_database(conn: &mut MySqlConnection) -> Result<String, SqlError> {
678    let result = conn.query("SELECT DATABASE()").await?;
679    Ok(result
680        .rows
681        .into_iter()
682        .next()
683        .and_then(|row| row.into_iter().next())
684        .and_then(|v| match v {
685            Value::String(s) => Some(s),
686            _ => None,
687        })
688        .unwrap_or_default())
689}
690
691fn escape_mysql_identifier(name: &str) -> String {
692    name.replace('`', "``")
693}
694
695fn escape_mysql_string(s: &str) -> String {
696    s.replace("'", "''")
697}
698
699/// MySQL `LOAD DATA LOCAL INFILE` encoder + helpers.
700///
701/// Each row is encoded as one tab-separated, newline-terminated line.
702/// MySQL's `LOAD DATA` default escape set (with `ESCAPED BY '\\'`):
703///
704/// - `\\` → literal backslash
705/// - `\t` → literal tab
706/// - `\n` → literal newline
707/// - `\r` → literal carriage return
708/// - `\0` → literal NUL byte
709/// - `\N` → SQL NULL
710///
711/// The encoder mirrors these byte-by-byte. Backslash escapes FIRST so
712/// a literal backslash isn't double-decoded.
713///
714/// The buffer is a `Vec<u8>` (not `String`) so `Value::Bytes` payloads
715/// — which may contain arbitrary non-UTF-8 bytes — can flow into a
716/// BLOB column without lossy conversion.
717mod my_load_data {
718    use crate::error::SqlError;
719    use crate::value::{TypeHint, Value};
720    use bytes::Bytes;
721
722    /// Encode one row.
723    pub fn encode_row(row: &[Value], hints: &[TypeHint]) -> Result<Bytes, SqlError> {
724        let mut buf: Vec<u8> = Vec::with_capacity(row.len() * 16 + 1);
725        for (i, value) in row.iter().enumerate() {
726            if i > 0 {
727                buf.push(b'\t');
728            }
729            let hint = hints.get(i).copied().unwrap_or(TypeHint::Other);
730            encode_value(&mut buf, value, hint)?;
731        }
732        buf.push(b'\n');
733        Ok(Bytes::from(buf))
734    }
735
736    fn encode_value(out: &mut Vec<u8>, v: &Value, hint: TypeHint) -> Result<(), SqlError> {
737        match v {
738            Value::Null => out.extend_from_slice(b"\\N"),
739            Value::Bool(b) => out.push(if *b { b'1' } else { b'0' }),
740            Value::Int64(n) => out.extend_from_slice(n.to_string().as_bytes()),
741            Value::Float64(f) => {
742                if f.is_nan() {
743                    // MySQL has no NaN literal for DOUBLE; the
744                    // generic-INSERT path also can't represent NaN.
745                    // Substitute NULL so bulk and generic agree.
746                    out.extend_from_slice(b"\\N");
747                } else if f.is_infinite() {
748                    // No literal — substitute NULL (same rationale).
749                    out.extend_from_slice(b"\\N");
750                } else {
751                    out.extend_from_slice(f.to_string().as_bytes());
752                }
753            }
754            Value::Decimal(s) => push_escaped(out, s.as_bytes()),
755            Value::String(s) => push_escaped(out, s.as_bytes()),
756            Value::Bytes(b) => push_escaped(out, b),
757            Value::Date(d) => out.extend_from_slice(d.to_string().as_bytes()),
758            Value::Time(t) => out.extend_from_slice(t.to_string().as_bytes()),
759            Value::DateTime(dt) => {
760                // MySQL DATETIME accepts 'YYYY-MM-DD HH:MM:SS[.fff]'.
761                // chrono's NaiveDateTime Display matches.
762                out.extend_from_slice(dt.to_string().as_bytes());
763            }
764            Value::DateTimeTz(dt) => {
765                // MySQL DATETIME has no timezone; the DDL translator
766                // maps DateTimeTz → DATETIME. Convert to UTC and
767                // render naive.
768                let naive = dt.naive_utc();
769                out.extend_from_slice(naive.to_string().as_bytes());
770            }
771            Value::Json(j) => {
772                let rendered = serde_json::to_string(j).map_err(|e| {
773                    SqlError::QueryFailed(format!("MySQL bulk: JSON serialize: {e}"))
774                })?;
775                push_escaped(out, rendered.as_bytes());
776            }
777            Value::Uuid(s) => push_escaped(out, s.as_bytes()),
778            Value::Array(a) => {
779                // DDL translator maps Array → JSON on MySQL.
780                let _ = hint;
781                let rendered = serde_json::to_string(a).map_err(|e| {
782                    SqlError::QueryFailed(format!("MySQL bulk: array serialize: {e}"))
783                })?;
784                push_escaped(out, rendered.as_bytes());
785            }
786        }
787        Ok(())
788    }
789
790    /// Apply MySQL LOAD DATA escape rules byte-by-byte. Backslash
791    /// is escaped FIRST, otherwise a literal backslash in field text
792    /// would be misinterpreted on decode.
793    fn push_escaped(out: &mut Vec<u8>, bytes: &[u8]) {
794        for &b in bytes {
795            match b {
796                b'\\' => out.extend_from_slice(b"\\\\"),
797                b'\t' => out.extend_from_slice(b"\\t"),
798                b'\n' => out.extend_from_slice(b"\\n"),
799                // `\r` matters because (a) some MySQL clients
800                // interpret a bare `\r` immediately before `\n` as
801                // part of a CRLF row terminator under certain
802                // `LINES TERMINATED BY` settings, and (b) even when
803                // the server doesn't, a bare `\r` survives as a data
804                // byte and corrupts `VARCHAR` round-trips.
805                b'\r' => out.extend_from_slice(b"\\r"),
806                b'\0' => out.extend_from_slice(b"\\0"),
807                other => out.push(other),
808            }
809        }
810    }
811
812    /// Quote an identifier for MySQL using backticks. Embedded
813    /// backticks are doubled.
814    pub fn backtick_quote(s: &str) -> String {
815        format!("`{}`", s.replace('`', "``"))
816    }
817
818    /// Classify a `mysql_async::Error` raised by `LOAD DATA LOCAL
819    /// INFILE`. Server error codes for "loading local data is
820    /// disabled" / "command not allowed" return
821    /// [`SqlError::BulkUnavailable`] so the Auto dispatcher can
822    /// fall back. Anything else stays `QueryFailed`.
823    pub fn classify_load_error(e: mysql_async::Error) -> SqlError {
824        match &e {
825            mysql_async::Error::Server(srv) => {
826                // 1148 ER_NOT_ALLOWED_COMMAND, 3948
827                // ER_CLIENT_LOCAL_FILES_DISABLED — the two codes the
828                // mysql_async docs page calls out for LOAD DATA
829                // LOCAL being disabled. 3950 ER_LOAD_DATA_INFILE_
830                // _DISABLED appears in some MySQL builds; include it.
831                if matches!(srv.code, 1148 | 3948 | 3950) {
832                    return SqlError::BulkUnavailable(format!(
833                        "MySQL server rejected LOAD DATA LOCAL INFILE \
834                         (error {}: {}). Enable `local_infile=ON` server-side, \
835                         or pass `--bulk-native=off` to use the generic path.",
836                        srv.code, srv.message
837                    ));
838                }
839                SqlError::QueryFailed(format!("MySQL bulk LOAD DATA: {srv}"))
840            }
841            _ => SqlError::QueryFailed(format!("MySQL bulk LOAD DATA: {e}")),
842        }
843    }
844
845    #[cfg(test)]
846    mod tests {
847        use super::*;
848        use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
849
850        fn enc1(v: Value, hint: TypeHint) -> Vec<u8> {
851            let bytes = encode_row(&[v], &[hint]).expect("encode_row");
852            // Strip trailing newline so tests assert on field content.
853            assert_eq!(bytes.last().copied(), Some(b'\n'));
854            bytes[..bytes.len() - 1].to_vec()
855        }
856
857        fn enc1_str(v: Value, hint: TypeHint) -> String {
858            String::from_utf8(enc1(v, hint)).expect("UTF-8")
859        }
860
861        #[test]
862        fn encode_null_is_backslash_n() {
863            assert_eq!(enc1_str(Value::Null, TypeHint::Null), "\\N");
864        }
865
866        #[test]
867        fn encode_bool_is_one_or_zero() {
868            assert_eq!(enc1_str(Value::Bool(true), TypeHint::Bool), "1");
869            assert_eq!(enc1_str(Value::Bool(false), TypeHint::Bool), "0");
870        }
871
872        #[test]
873        fn encode_int_and_float() {
874            assert_eq!(enc1_str(Value::Int64(42), TypeHint::Int64), "42");
875            assert_eq!(enc1_str(Value::Int64(-7), TypeHint::Int64), "-7");
876            assert_eq!(enc1_str(Value::Float64(1.5), TypeHint::Float64), "1.5");
877        }
878
879        #[test]
880        fn encode_float_nan_and_inf_become_null() {
881            // MySQL has no NaN/Inf literal for DOUBLE; the bulk path
882            // substitutes NULL to stay consistent with the generic
883            // INSERT path's reject-or-NULL outcome.
884            assert_eq!(enc1_str(Value::Float64(f64::NAN), TypeHint::Float64), "\\N");
885            assert_eq!(
886                enc1_str(Value::Float64(f64::INFINITY), TypeHint::Float64),
887                "\\N"
888            );
889        }
890
891        #[test]
892        fn encode_string_escapes_backslash_first() {
893            // Backslash MUST be the first replacement applied.
894            // Input `\t` (literal two chars `\` and `t`) → `\\t`,
895            // input `\n` → `\\n`, input `\` → `\\`. If any of those
896            // mappings ran before backslash-doubling, the result
897            // would be wrong (`\\` then `\t` → `\\t` instead of
898            // `\\\\t`, etc.).
899            assert_eq!(
900                enc1_str(Value::String("a\\b".into()), TypeHint::String),
901                "a\\\\b"
902            );
903            assert_eq!(
904                enc1_str(Value::String("a\tb".into()), TypeHint::String),
905                "a\\tb"
906            );
907            assert_eq!(
908                enc1_str(Value::String("a\nb".into()), TypeHint::String),
909                "a\\nb"
910            );
911            // H1 regression guard: \r MUST be escaped as `\r` too,
912            // otherwise CRLF source data corrupts VARCHAR round-trips
913            // and may interact badly with LINES TERMINATED BY on some
914            // server configurations.
915            assert_eq!(
916                enc1_str(Value::String("a\rb".into()), TypeHint::String),
917                "a\\rb"
918            );
919            assert_eq!(
920                enc1_str(Value::String("a\r\nb".into()), TypeHint::String),
921                "a\\r\\nb"
922            );
923        }
924
925        #[test]
926        fn encode_string_escapes_nul_byte() {
927            // `\0` is a hard error inside MySQL text columns unless
928            // escaped; the encoder produces `\0` which the receiver
929            // decodes back to a NUL byte. The byte goes into BLOB
930            // columns intact via this path.
931            let out = enc1(Value::String("a\0b".into()), TypeHint::String);
932            assert_eq!(out, b"a\\0b");
933        }
934
935        #[test]
936        fn encode_bytes_preserves_arbitrary_payload() {
937            // BLOB columns: an arbitrary byte sequence (including
938            // non-UTF-8 bytes like 0xFF) must flow through with
939            // escapes applied to the metacharacters only.
940            let raw = vec![0x01u8, b'\t', 0xFF, b'\\', b'\n', 0x00, b'Z'];
941            let out = enc1(Value::Bytes(raw), TypeHint::Bytes);
942            assert_eq!(
943                out,
944                vec![
945                    0x01u8, b'\\', b't', 0xFF, b'\\', b'\\', b'\\', b'n', b'\\', b'0', b'Z'
946                ]
947            );
948        }
949
950        #[test]
951        fn encode_date_time_datetime() {
952            let d = NaiveDate::from_ymd_opt(2026, 5, 14).unwrap();
953            let t = NaiveTime::from_hms_opt(12, 34, 56).unwrap();
954            let dt = NaiveDateTime::new(d, t);
955            assert_eq!(enc1_str(Value::Date(d), TypeHint::Date), "2026-05-14");
956            assert_eq!(enc1_str(Value::Time(t), TypeHint::Time), "12:34:56");
957            assert_eq!(
958                enc1_str(Value::DateTime(dt), TypeHint::DateTime),
959                "2026-05-14 12:34:56"
960            );
961        }
962
963        #[test]
964        fn encode_datetimetz_converts_to_utc_naive() {
965            let dt = Utc.with_ymd_and_hms(2026, 5, 14, 12, 34, 56).unwrap();
966            assert_eq!(
967                enc1_str(Value::DateTimeTz(dt), TypeHint::DateTimeTz),
968                "2026-05-14 12:34:56"
969            );
970        }
971
972        #[test]
973        fn encode_json_is_compact_then_escaped() {
974            let j = serde_json::json!({"role": "admin"});
975            let s = enc1_str(Value::Json(j), TypeHint::Json);
976            // serde_json compact form keeps no spaces between :/, ;
977            // the encoder then escapes nothing in this payload
978            // because there are no metacharacters.
979            assert!(s.contains("\"role\":\"admin\""));
980            assert!(!s.contains(' '));
981        }
982
983        #[test]
984        fn encode_array_is_compact_json() {
985            let a = Value::Array(vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)]);
986            assert_eq!(enc1_str(a, TypeHint::Array), "[1,2,3]");
987        }
988
989        #[test]
990        fn encode_uuid_passes_through() {
991            assert_eq!(
992                enc1_str(
993                    Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
994                    TypeHint::Uuid
995                ),
996                "550e8400-e29b-41d4-a716-446655440000"
997            );
998        }
999
1000        #[test]
1001        fn encode_row_with_multiple_cells_uses_tab_separator() {
1002            let row = vec![
1003                Value::Int64(1),
1004                Value::String("Alice".into()),
1005                Value::Null,
1006                Value::Bool(true),
1007            ];
1008            let hints = vec![
1009                TypeHint::Int64,
1010                TypeHint::String,
1011                TypeHint::Null,
1012                TypeHint::Bool,
1013            ];
1014            let bytes = encode_row(&row, &hints).unwrap();
1015            assert_eq!(&bytes[..], b"1\tAlice\t\\N\t1\n");
1016        }
1017
1018        #[test]
1019        fn backtick_quote_doubles_embedded_backticks() {
1020            assert_eq!(backtick_quote("plain"), "`plain`");
1021            assert_eq!(backtick_quote("with`tick"), "`with``tick`");
1022        }
1023    }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use super::*;
1029    use crate::url::DatabaseUrl;
1030
1031    const TEST_MYSQL_URL: &str = "mysql://root:ferrule@127.0.0.1:13306/ferrule";
1032
1033    fn try_connect() -> Option<Box<dyn crate::Connection>> {
1034        let url = DatabaseUrl::parse(TEST_MYSQL_URL).ok()?;
1035        let conn = crate::connect(&url, &ConnectOptions::default(), None).ok()?;
1036        Some(conn)
1037    }
1038
1039    #[test]
1040    fn test_mysql_ping() {
1041        let Some(mut conn) = try_connect() else {
1042            eprintln!("MySQL test container not available, skipping test_mysql_ping");
1043            return;
1044        };
1045        conn.ping().expect("ping should succeed");
1046    }
1047
1048    #[test]
1049    fn test_mysql_query() {
1050        let Some(mut conn) = try_connect() else {
1051            eprintln!("MySQL test container not available, skipping test_mysql_query");
1052            return;
1053        };
1054        let result = conn
1055            .query("SELECT * FROM test_users")
1056            .expect("query should succeed");
1057        assert!(!result.columns.is_empty(), "should have columns");
1058        assert!(!result.rows.is_empty(), "should have rows");
1059    }
1060
1061    #[test]
1062    fn test_mysql_execute() {
1063        let Some(mut conn) = try_connect() else {
1064            eprintln!("MySQL test container not available, skipping test_mysql_execute");
1065            return;
1066        };
1067        let summary = conn
1068            .execute("INSERT INTO test_users (name, age) VALUES ('TestUser', 99)")
1069            .expect("execute should succeed");
1070        assert!(
1071            summary.rows_affected.is_some_and(|n| n > 0),
1072            "should have affected rows"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_mysql_list_tables() {
1078        let Some(mut conn) = try_connect() else {
1079            eprintln!("MySQL test container not available, skipping test_mysql_list_tables");
1080            return;
1081        };
1082        let tables = conn.list_tables(None).expect("list_tables should succeed");
1083        assert!(
1084            tables.contains(&"test_users".to_string()),
1085            "should contain test_users"
1086        );
1087    }
1088
1089    #[test]
1090    fn test_mysql_list_schemas() {
1091        let Some(mut conn) = try_connect() else {
1092            eprintln!("MySQL test container not available, skipping test_mysql_list_schemas");
1093            return;
1094        };
1095        let schemas = conn.list_schemas().expect("list_schemas should succeed");
1096        // The container's seeded database is `ferrule` (see CLAUDE.md
1097        // "How to Test"); it is also the connection's default database.
1098        assert!(
1099            schemas.iter().any(|s| s.name == "ferrule"),
1100            "should contain the seeded `ferrule` database, got: {schemas:?}"
1101        );
1102        assert!(
1103            schemas.iter().filter(|s| s.is_default).count() <= 1,
1104            "at most one schema should be flagged is_default, got: {schemas:?}"
1105        );
1106    }
1107
1108    #[test]
1109    fn test_mysql_describe_table() {
1110        let Some(mut conn) = try_connect() else {
1111            eprintln!("MySQL test container not available, skipping test_mysql_describe_table");
1112            return;
1113        };
1114        let result = conn
1115            .describe_table(None, "test_users")
1116            .expect("describe_table should succeed");
1117        assert_eq!(result.columns.len(), 6, "should return 6 metadata columns");
1118        let col_names: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
1119        assert_eq!(
1120            col_names,
1121            vec![
1122                "column_name",
1123                "data_type",
1124                "is_nullable",
1125                "column_default",
1126                "numeric_precision",
1127                "numeric_scale"
1128            ]
1129        );
1130    }
1131
1132    #[test]
1133    fn test_mysql_execute_multi() {
1134        let Some(mut conn) = try_connect() else {
1135            eprintln!("MySQL test container not available, skipping test_mysql_execute_multi");
1136            return;
1137        };
1138        // Clean up any previous test row
1139        let _ = conn.execute("DELETE FROM test_users WHERE name = 'MultiUser'");
1140        let results = conn
1141            .execute_multi("INSERT INTO test_users (name, age) VALUES ('MultiUser', 42); SELECT COUNT(*) FROM test_users;")
1142            .expect("execute_multi should succeed");
1143        assert_eq!(results.len(), 2, "should have two result sets");
1144        // First result: DML summary
1145        assert!(
1146            matches!(&results[0], StatementResult::Summary(s) if s.rows_affected.is_some_and(|n| n > 0)),
1147            "first result should be a DML summary with affected rows"
1148        );
1149        // Second result: SELECT query
1150        assert!(
1151            matches!(&results[1], StatementResult::Query(_)),
1152            "second result should be a Query"
1153        );
1154    }
1155
1156    #[test]
1157    fn test_mysql_type_mapping() {
1158        let Some(mut conn) = try_connect() else {
1159            eprintln!("MySQL test container not available, skipping test_mysql_type_mapping");
1160            return;
1161        };
1162        let result = conn
1163            .query("SELECT name, age, score, active, meta FROM test_users WHERE name = 'Alice'")
1164            .expect("query should succeed");
1165        assert_eq!(result.rows.len(), 1);
1166        let row = &result.rows[0];
1167        assert!(matches!(row[0], Value::String(_)), "name should be String");
1168        assert!(matches!(row[1], Value::Int64(_)), "age should be Int64");
1169        assert!(
1170            matches!(row[2], Value::Float64(_) | Value::Decimal(_)),
1171            "score should be Float64 or Decimal"
1172        );
1173        assert!(
1174            matches!(row[3], Value::Int64(_) | Value::Bool(_)),
1175            "active should be Int64 or Bool"
1176        );
1177        assert!(
1178            matches!(row[4], Value::Json(_) | Value::String(_)),
1179            "meta should be Json or String"
1180        );
1181    }
1182
1183    /// End-to-end: bulk-load a scratch table via LOAD DATA LOCAL
1184    /// INFILE and verify the rows round-trip. Includes a row with
1185    /// backslash/tab/newline payloads, an all-NULLs row, and a BLOB
1186    /// with non-UTF-8 bytes to exercise the binary path.
1187    #[test]
1188    fn test_mysql_bulk_insert_rows_round_trip() {
1189        let Some(mut conn) = try_connect() else {
1190            eprintln!(
1191                "MySQL test container not available, skipping test_mysql_bulk_insert_rows_round_trip"
1192            );
1193            return;
1194        };
1195
1196        // The mysql_async client sets `LOCAL_INFILE` on the
1197        // connection handshake automatically, but the server-side
1198        // toggle `local_infile=ON` is required too. The CLAUDE.md
1199        // test container ships with the MySQL 8 default, which is
1200        // OFF for `LOAD DATA LOCAL`. Flip it on for the duration of
1201        // this test. If the user doesn't have SUPER, the SET fails
1202        // and we degrade by skipping the test (rather than the bulk
1203        // path silently failing).
1204        if conn.execute("SET GLOBAL local_infile = ON").is_err() {
1205            eprintln!(
1206                "MySQL test container does not allow toggling local_infile; \
1207                 skipping test_mysql_bulk_insert_rows_round_trip"
1208            );
1209            return;
1210        }
1211
1212        let pid = std::process::id();
1213        let table = format!("ferrule_bulk_test_{pid}");
1214        let _ = conn.execute(&format!("DROP TABLE IF EXISTS {table}"));
1215        conn.execute(&format!(
1216            "CREATE TABLE {table} (\
1217               id BIGINT NOT NULL, \
1218               name VARCHAR(255) NULL, \
1219               active TINYINT(1) NULL, \
1220               blob_data BLOB NULL, \
1221               meta JSON NULL, \
1222               tricky TEXT NULL\
1223             ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
1224        ))
1225        .expect("CREATE TABLE");
1226
1227        let columns = vec![
1228            ColumnInfo {
1229                name: "id".into(),
1230                type_hint: TypeHint::Int64,
1231                nullable: false,
1232            },
1233            ColumnInfo {
1234                name: "name".into(),
1235                type_hint: TypeHint::String,
1236                nullable: true,
1237            },
1238            ColumnInfo {
1239                name: "active".into(),
1240                type_hint: TypeHint::Bool,
1241                nullable: true,
1242            },
1243            ColumnInfo {
1244                name: "blob_data".into(),
1245                type_hint: TypeHint::Bytes,
1246                nullable: true,
1247            },
1248            ColumnInfo {
1249                name: "meta".into(),
1250                type_hint: TypeHint::Json,
1251                nullable: true,
1252            },
1253            ColumnInfo {
1254                name: "tricky".into(),
1255                type_hint: TypeHint::String,
1256                nullable: true,
1257            },
1258        ];
1259
1260        let rows: Vec<Row> = vec![
1261            vec![
1262                Value::Int64(1),
1263                Value::String("Alice".into()),
1264                Value::Bool(true),
1265                Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1266                Value::Json(serde_json::json!({"role": "admin"})),
1267                Value::String("plain".into()),
1268            ],
1269            vec![
1270                Value::Int64(2),
1271                Value::String("Esc\\\t\nape".into()),
1272                Value::Bool(false),
1273                Value::Bytes(vec![0x00, 0x09, 0x0A, 0xFF]),
1274                Value::Json(serde_json::Value::Null),
1275                Value::String("\\.".into()),
1276            ],
1277            vec![
1278                Value::Int64(3),
1279                Value::Null,
1280                Value::Null,
1281                Value::Null,
1282                Value::Null,
1283                Value::Null,
1284            ],
1285        ];
1286
1287        let n = conn
1288            .bulk_insert_rows(BulkInsert {
1289                table: &table,
1290                columns: &columns,
1291                rows: &rows,
1292                copy_format: crate::copy::CopyFormat::Text,
1293            })
1294            .expect("bulk_insert_rows");
1295        assert_eq!(n, 3);
1296
1297        let result = conn
1298            .query(&format!(
1299                "SELECT id, name, active, blob_data, tricky FROM {table} ORDER BY id"
1300            ))
1301            .unwrap();
1302        assert_eq!(result.rows.len(), 3);
1303
1304        // Row 2 — verify the tricky escaped payload round-tripped.
1305        if let Value::String(s) = &result.rows[1][1] {
1306            assert_eq!(
1307                s, "Esc\\\t\nape",
1308                "row 2 name should preserve backslash/tab/nl"
1309            );
1310        } else {
1311            panic!("row 2 name should be String, got {:?}", result.rows[1][1]);
1312        }
1313        if let Value::Bytes(b) = &result.rows[1][3] {
1314            assert_eq!(b.as_slice(), &[0x00u8, 0x09, 0x0A, 0xFF]);
1315        } else {
1316            panic!(
1317                "row 2 blob_data should be Bytes, got {:?}",
1318                result.rows[1][3]
1319            );
1320        }
1321        if let Value::String(s) = &result.rows[1][4] {
1322            assert_eq!(s, "\\.", "row 2 tricky should be literal backslash-dot");
1323        } else {
1324            panic!("row 2 tricky should be String, got {:?}", result.rows[1][4]);
1325        }
1326
1327        // Row 3 — everything NULL except id.
1328        assert!(matches!(&result.rows[2][1], Value::Null));
1329        assert!(matches!(&result.rows[2][2], Value::Null));
1330        assert!(matches!(&result.rows[2][3], Value::Null));
1331
1332        // Cleanup.
1333        conn.execute(&format!("DROP TABLE {table}"))
1334            .expect("DROP TABLE");
1335    }
1336
1337    /// Security regression: a user-issued `LOAD DATA LOCAL INFILE`
1338    /// outside of `bulk_insert_rows` MUST be rejected — there is no
1339    /// global handler installed, and the per-call local handler is
1340    /// only present during a bulk_insert_rows call. mysql_async
1341    /// raises `LocalInfileError::NoHandler` in that case, which
1342    /// `MySqlConnection::execute` surfaces as a `QueryFailed`.
1343    #[test]
1344    fn test_mysql_load_data_without_bulk_in_progress_rejected() {
1345        let Some(mut conn) = try_connect() else {
1346            eprintln!(
1347                "MySQL test container not available, skipping test_mysql_load_data_without_bulk_in_progress_rejected"
1348            );
1349            return;
1350        };
1351
1352        // Enable server-side local_infile so the *server* is willing
1353        // to ask the client for the file. We need that step to fail
1354        // here, not the server config, to prove the client refused.
1355        if conn.execute("SET GLOBAL local_infile = ON").is_err() {
1356            eprintln!(
1357                "MySQL test container does not allow toggling local_infile; \
1358                 skipping test_mysql_load_data_without_bulk_in_progress_rejected"
1359            );
1360            return;
1361        }
1362
1363        let pid = std::process::id();
1364        let table = format!("ferrule_bulk_security_test_{pid}");
1365        let _ = conn.execute(&format!("DROP TABLE IF EXISTS {table}"));
1366        conn.execute(&format!(
1367            "CREATE TABLE {table} (id INT, line TEXT) ENGINE=InnoDB"
1368        ))
1369        .expect("CREATE TABLE");
1370
1371        // Try to coerce the client into shipping /etc/passwd.
1372        let result = conn.execute(&format!(
1373            "LOAD DATA LOCAL INFILE '/etc/passwd' INTO TABLE {table} \
1374                 FIELDS TERMINATED BY ':' (id, line)"
1375        ));
1376
1377        // The client refuses because no infile handler is
1378        // installed at this point. mysql_async raises a query
1379        // error; ferrule surfaces it as QueryFailed.
1380        let err = result
1381            .expect_err("LOAD DATA LOCAL INFILE without bulk_insert_rows in progress must fail");
1382        let msg = err.to_string();
1383        // Look for any hint that the failure is handler-related —
1384        // exact mysql_async wording may evolve.
1385        assert!(
1386            msg.to_lowercase().contains("handler")
1387                || msg.to_lowercase().contains("local_infile")
1388                || msg.to_lowercase().contains("infile"),
1389            "expected handler/infile rejection, got: {msg}"
1390        );
1391
1392        // Sanity: confirm nothing landed in the table.
1393        let count = conn
1394            .query(&format!("SELECT COUNT(*) FROM {table}"))
1395            .unwrap();
1396        match &count.rows[0][0] {
1397            Value::Int64(n) => assert_eq!(*n, 0, "no rows should have been inserted"),
1398            other => panic!("unexpected count shape: {other:?}"),
1399        }
1400
1401        let _ = conn.execute(&format!("DROP TABLE {table}"));
1402    }
1403
1404    #[test]
1405    fn test_mysql_primary_key() {
1406        let Some(mut conn) = try_connect() else {
1407            eprintln!("MySQL test container not available, skipping test_mysql_primary_key");
1408            return;
1409        };
1410        let pk = conn.primary_key(None, "test_users").expect("primary_key");
1411        assert_eq!(pk, vec!["id".to_string()]);
1412    }
1413
1414    #[test]
1415    fn test_mysql_list_foreign_keys() {
1416        let Some(mut conn) = try_connect() else {
1417            eprintln!("MySQL test container not available, skipping test_mysql_list_foreign_keys");
1418            return;
1419        };
1420        let pid = std::process::id();
1421        let child = format!("ferrule_fk_test_orders_{pid}");
1422        let _ = conn.execute(&format!("DROP TABLE IF EXISTS {child}"));
1423        conn.execute(&format!(
1424            "CREATE TABLE {child} (\
1425               id INT AUTO_INCREMENT PRIMARY KEY, \
1426               user_id INT, \
1427               FOREIGN KEY (user_id) REFERENCES test_users(id) ON DELETE CASCADE\
1428             )"
1429        ))
1430        .expect("CREATE TABLE");
1431
1432        let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
1433        let matching: Vec<_> = fks.iter().filter(|fk| fk.child_table == child).collect();
1434        assert_eq!(matching.len(), 1, "expected 1 FK from {child}, got {fks:?}");
1435        let fk = matching[0];
1436        assert_eq!(fk.child_columns, vec!["user_id".to_string()]);
1437        assert_eq!(fk.parent_table, "test_users");
1438        assert_eq!(fk.parent_columns, vec!["id".to_string()]);
1439        assert_eq!(fk.on_delete.as_deref(), Some("CASCADE"));
1440
1441        let _ = conn.execute(&format!("DROP TABLE {child}"));
1442    }
1443
1444    /// End-to-end `--if-exists skip` then `upsert` round-trip against
1445    /// MySQL. Uses `INSERT IGNORE` and `ON DUPLICATE KEY UPDATE`.
1446    #[test]
1447    fn test_mysql_copy_skip_then_upsert() {
1448        use crate::backend::Backend;
1449        use crate::copy::{CopyOptions, CopySource, IfExists, copy_rows};
1450
1451        let (Some(mut src), Some(mut dst)) = (try_connect(), try_connect()) else {
1452            eprintln!(
1453                "MySQL test container not available, skipping test_mysql_copy_skip_then_upsert"
1454            );
1455            return;
1456        };
1457
1458        let pid = std::process::id();
1459        let src_table = format!("ferrule_my_skip_src_{pid}");
1460        let dst_table = format!("ferrule_my_skip_dst_{pid}");
1461        let _ = src.execute(&format!("DROP TABLE IF EXISTS {src_table}"));
1462        let _ = dst.execute(&format!("DROP TABLE IF EXISTS {dst_table}"));
1463        src.execute(&format!(
1464            "CREATE TABLE {src_table} (id INT PRIMARY KEY, name VARCHAR(64), val INT)"
1465        ))
1466        .expect("CREATE src");
1467        dst.execute(&format!(
1468            "CREATE TABLE {dst_table} (id INT PRIMARY KEY, name VARCHAR(64), val INT)"
1469        ))
1470        .expect("CREATE dst");
1471        src.execute(&format!(
1472            "INSERT INTO {src_table} VALUES (1, 'new-1', 10), (2, 'new-2', 20)"
1473        ))
1474        .expect("seed src");
1475        dst.execute(&format!("INSERT INTO {dst_table} VALUES (1, 'old-1', 99)"))
1476            .expect("seed dst");
1477
1478        // --- Skip ---------------------------------------------------------
1479        let opts = CopyOptions {
1480            source: CopySource::Query {
1481                sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1482                into: dst_table.clone(),
1483            },
1484            if_exists: IfExists::Skip,
1485            ..Default::default()
1486        };
1487        copy_rows(&mut src, Backend::MySql, &mut dst, Backend::MySql, &opts)
1488            .expect("copy_rows skip");
1489
1490        let out = dst
1491            .query(&format!(
1492                "SELECT id, name, val FROM {dst_table} ORDER BY id"
1493            ))
1494            .expect("verify skip");
1495        assert_eq!(out.rows.len(), 2);
1496        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "old-1"));
1497        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1498
1499        // --- Upsert -------------------------------------------------------
1500        let opts = CopyOptions {
1501            source: CopySource::Query {
1502                sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1503                into: dst_table.clone(),
1504            },
1505            if_exists: IfExists::Upsert,
1506            ..Default::default()
1507        };
1508        copy_rows(&mut src, Backend::MySql, &mut dst, Backend::MySql, &opts)
1509            .expect("copy_rows upsert");
1510
1511        let out = dst
1512            .query(&format!(
1513                "SELECT id, name, val FROM {dst_table} ORDER BY id"
1514            ))
1515            .expect("verify upsert");
1516        assert_eq!(out.rows.len(), 2);
1517        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
1518        assert!(matches!(&out.rows[0][2], Value::Int64(10)));
1519        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1520
1521        let _ = src.execute(&format!("DROP TABLE {src_table}"));
1522        let _ = dst.execute(&format!("DROP TABLE {dst_table}"));
1523    }
1524
1525    // --- #65/#66 streaming + write against the gate DB (skip w/o container) ---
1526
1527    /// Stream a synthetic result from MySQL via the lazy `query_iter`
1528    /// cursor and assert batch-at-a-time pulling against a real server.
1529    #[test]
1530    fn test_mysql_cursor_streams_in_bounded_batches() {
1531        let Some(mut conn) = try_connect() else {
1532            eprintln!(
1533                "MySQL test container not available, skipping test_mysql_cursor_streams_in_bounded_batches"
1534            );
1535            return;
1536        };
1537        // MySQL has no generate_series; build a numbers table on the fly.
1538        let _ = conn.execute("DROP TABLE IF EXISTS ferrule_stream_src");
1539        conn.execute("CREATE TABLE ferrule_stream_src (i INT PRIMARY KEY)")
1540            .expect("create src");
1541        // The default cte_max_recursion_depth is 1000; raise it so the
1542        // 5000-row generator CTE below runs to completion.
1543        conn.execute("SET SESSION cte_max_recursion_depth = 100000")
1544            .expect("raise cte depth");
1545        conn.execute(
1546            "INSERT INTO ferrule_stream_src (i) \
1547             WITH RECURSIVE seq(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM seq WHERE i < 5000) \
1548             SELECT i FROM seq",
1549        )
1550        .expect("seed src");
1551        const BATCH: usize = 128;
1552        let mut cursor = conn
1553            .query_cursor("SELECT i, i * 2 AS doubled FROM ferrule_stream_src ORDER BY i")
1554            .expect("open mysql cursor");
1555        let mut total = 0u64;
1556        loop {
1557            let batch = cursor.next_batch(BATCH).expect("pull mysql batch");
1558            if batch.is_empty() {
1559                break;
1560            }
1561            assert!(batch.len() <= BATCH);
1562            total += batch.len() as u64;
1563        }
1564        assert_eq!(total, 5000);
1565        drop(cursor);
1566        let _ = conn.execute("DROP TABLE ferrule_stream_src");
1567    }
1568
1569    /// Batched write into MySQL through the embeddable write path, read
1570    /// back, and clean up.
1571    #[test]
1572    fn test_mysql_write_rows_round_trip() {
1573        let Some(mut conn) = try_connect() else {
1574            eprintln!(
1575                "MySQL test container not available, skipping test_mysql_write_rows_round_trip"
1576            );
1577            return;
1578        };
1579        let _ = conn.execute("DROP TABLE IF EXISTS ferrule_write_test");
1580        conn.execute("CREATE TABLE ferrule_write_test (id INT PRIMARY KEY, name VARCHAR(64))")
1581            .expect("create write table");
1582        let columns = vec![
1583            crate::value::ColumnInfo {
1584                name: "id".into(),
1585                type_hint: TypeHint::Int64,
1586                nullable: false,
1587            },
1588            crate::value::ColumnInfo {
1589                name: "name".into(),
1590                type_hint: TypeHint::String,
1591                nullable: true,
1592            },
1593        ];
1594        let rows: Vec<crate::value::Row> = (1..=2000)
1595            .map(|i| vec![Value::Int64(i), Value::String(format!("n{i}"))])
1596            .collect();
1597        let opts = crate::write::WriteOptions {
1598            batch_size: 250,
1599            ..Default::default()
1600        };
1601        let report = crate::write::write_rows(
1602            &mut *conn,
1603            crate::Backend::MySql,
1604            "ferrule_write_test",
1605            &columns,
1606            rows,
1607            &opts,
1608        )
1609        .expect("write_rows");
1610        assert_eq!(report.rows_written, 2000);
1611        assert!(report.is_complete());
1612        let back = conn
1613            .query("SELECT COUNT(*) FROM ferrule_write_test")
1614            .expect("count");
1615        assert!(matches!(back.rows[0][0], Value::Int64(2000)));
1616        let _ = conn.execute("DROP TABLE ferrule_write_test");
1617    }
1618
1619    /// Upsert into MySQL via the write path's ON DUPLICATE KEY UPDATE
1620    /// builder reuse.
1621    #[test]
1622    fn test_mysql_write_rows_upsert() {
1623        let Some(mut conn) = try_connect() else {
1624            eprintln!("MySQL test container not available, skipping test_mysql_write_rows_upsert");
1625            return;
1626        };
1627        let _ = conn.execute("DROP TABLE IF EXISTS ferrule_write_up");
1628        conn.execute("CREATE TABLE ferrule_write_up (id INT PRIMARY KEY, v VARCHAR(32))")
1629            .expect("create");
1630        conn.execute("INSERT INTO ferrule_write_up VALUES (1, 'old')")
1631            .expect("seed");
1632        let columns = vec![
1633            crate::value::ColumnInfo {
1634                name: "id".into(),
1635                type_hint: TypeHint::Int64,
1636                nullable: false,
1637            },
1638            crate::value::ColumnInfo {
1639                name: "v".into(),
1640                type_hint: TypeHint::String,
1641                nullable: true,
1642            },
1643        ];
1644        let rows: Vec<crate::value::Row> = vec![
1645            vec![Value::Int64(1), Value::String("new".into())],
1646            vec![Value::Int64(2), Value::String("two".into())],
1647        ];
1648        let opts = crate::write::WriteOptions {
1649            mode: crate::write::WriteMode::Upsert,
1650            key_columns: vec!["id".into()],
1651            ..Default::default()
1652        };
1653        let report = crate::write::write_rows(
1654            &mut *conn,
1655            crate::Backend::MySql,
1656            "ferrule_write_up",
1657            &columns,
1658            rows,
1659            &opts,
1660        )
1661        .expect("write_rows upsert");
1662        assert!(report.is_complete());
1663        let v1 = conn
1664            .query("SELECT v FROM ferrule_write_up WHERE id = 1")
1665            .expect("read back");
1666        assert!(matches!(&v1.rows[0][0], Value::String(s) if s == "new"));
1667        let _ = conn.execute("DROP TABLE ferrule_write_up");
1668    }
1669}