1use crate::connection::{
2 AsyncConnection, BulkInsert, ConnectOptions, ExecutionSummary, ForeignKey, QueryResult,
3 SchemaInfo, StatementResult,
4};
5use crate::error::SqlError;
6use crate::stream::BoxRowStream;
7use crate::url::DatabaseUrl;
8use crate::value::{ColumnInfo, Row, TypeHint, Value};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use futures_util::stream::StreamExt;
13use mysql_async::prelude::Queryable;
14use secrecy::ExposeSecret;
15
16pub struct MySqlConnection {
17 conn: mysql_async::Conn,
18}
19
20#[async_trait]
21impl AsyncConnection for MySqlConnection {
22 async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
23 self.conn
24 .query_drop(sql)
25 .await
26 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
27 let affected = self.conn.affected_rows();
28 Ok(ExecutionSummary {
29 rows_affected: Some(affected),
30 command_tag: None,
31 })
32 }
33
34 async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
35 let mut result = self
36 .conn
37 .query_iter(sql)
38 .await
39 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
40
41 let columns_ref = result.columns_ref();
42 let columns: Vec<ColumnInfo> = columns_ref
43 .iter()
44 .map(|c| ColumnInfo {
45 name: c.name_str().to_string(),
46 type_hint: TypeHint::Other,
47 nullable: true,
48 })
49 .collect();
50
51 let mysql_rows = result
52 .collect::<mysql_async::Row>()
53 .await
54 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
55
56 result
58 .drop_result()
59 .await
60 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
61
62 let rows: Vec<Row> = mysql_rows
63 .into_iter()
64 .map(|row| {
65 let col_types: Vec<_> = row
66 .columns_ref()
67 .iter()
68 .map(|c| (c.column_type(), c.column_length()))
69 .collect();
70 row.unwrap()
71 .into_iter()
72 .enumerate()
73 .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
74 .collect()
75 })
76 .collect();
77
78 Ok(QueryResult { columns, rows })
79 }
80
81 async fn query_stream(
92 &mut self,
93 sql: &str,
94 ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError> {
95 let result = self
99 .conn
100 .query_iter(sql.to_string())
101 .await
102 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
103
104 let columns: Vec<ColumnInfo> = result
105 .columns_ref()
106 .iter()
107 .map(|c| ColumnInfo {
108 name: c.name_str().to_string(),
109 type_hint: TypeHint::Other,
110 nullable: true,
111 })
112 .collect();
113
114 let stream = futures_util::stream::try_unfold(result, |mut result| async move {
115 match result.next().await {
116 Ok(Some(row)) => {
117 let col_types: Vec<_> = row
118 .columns_ref()
119 .iter()
120 .map(|c| (c.column_type(), c.column_length()))
121 .collect();
122 let values: Row = row
123 .unwrap()
124 .into_iter()
125 .enumerate()
126 .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
127 .collect();
128 Ok(Some((values, result)))
129 }
130 Ok(None) => Ok(None),
131 Err(e) => Err(SqlError::QueryFailed(e.to_string())),
132 }
133 });
134 Ok((columns, Box::pin(stream)))
135 }
136
137 async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
138 let mut result = self
139 .conn
140 .query_iter(sql)
141 .await
142 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
143
144 let mut results = Vec::new();
145
146 loop {
147 let columns_ref = result.columns_ref();
148 if columns_ref.is_empty() {
149 let affected = result.affected_rows();
150 result
151 .collect::<mysql_async::Row>()
152 .await
153 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
154 results.push(StatementResult::Summary(ExecutionSummary {
155 rows_affected: Some(affected),
156 command_tag: None,
157 }));
158 } else {
159 let columns: Vec<ColumnInfo> = columns_ref
160 .iter()
161 .map(|c| ColumnInfo {
162 name: c.name_str().to_string(),
163 type_hint: TypeHint::Other,
164 nullable: true,
165 })
166 .collect();
167
168 let mysql_rows = result
169 .collect::<mysql_async::Row>()
170 .await
171 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
172
173 let rows: Vec<Row> = mysql_rows
174 .into_iter()
175 .map(|row| {
176 let col_types: Vec<_> = row
177 .columns_ref()
178 .iter()
179 .map(|c| (c.column_type(), c.column_length()))
180 .collect();
181 row.unwrap()
182 .into_iter()
183 .enumerate()
184 .map(|(i, v)| mysql_to_value(v, col_types[i].0, col_types[i].1))
185 .collect()
186 })
187 .collect();
188
189 results.push(StatementResult::Query(QueryResult { columns, rows }));
190 }
191
192 if result.is_empty() {
193 break;
194 }
195 }
196
197 Ok(results)
198 }
199
200 async fn ping(&mut self) -> Result<(), SqlError> {
201 self.conn
202 .ping()
203 .await
204 .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
205 Ok(())
206 }
207
208 async fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
209 let sql = match schema {
210 Some(s) => format!("SHOW TABLES FROM `{}`", escape_mysql_identifier(s)),
211 None => "SHOW TABLES".to_string(),
212 };
213 let result = self.query(&sql).await?;
214 let names: Vec<String> = result
215 .rows
216 .into_iter()
217 .filter_map(|row| {
218 row.into_iter().next().and_then(|v| match v {
219 Value::String(s) => Some(s),
220 _ => None,
221 })
222 })
223 .collect();
224 Ok(names)
225 }
226
227 async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
228 let sql = "SELECT SCHEMA_NAME, SCHEMA_NAME = DATABASE() FROM information_schema.SCHEMATA ORDER BY SCHEMA_NAME";
234 let result = self.query(sql).await?;
235 let schemas: Vec<SchemaInfo> = result
236 .rows
237 .into_iter()
238 .filter_map(|row| {
239 let name = match row.first() {
240 Some(Value::String(s)) => s.clone(),
241 _ => return None,
242 };
243 let is_default = crate::connection::is_default_from_value(row.get(1));
244 Some(SchemaInfo { name, is_default })
245 })
246 .collect();
247 Ok(schemas)
248 }
249
250 async fn describe_table(
251 &mut self,
252 schema: Option<&str>,
253 table: &str,
254 ) -> Result<QueryResult, SqlError> {
255 let schema = match schema {
256 Some(s) => s.to_string(),
257 None => {
258 let db_query = self.query("SELECT DATABASE()").await?;
259 db_query
260 .rows
261 .into_iter()
262 .next()
263 .and_then(|row| row.into_iter().next())
264 .and_then(|v| match v {
265 Value::String(s) => Some(s),
266 _ => None,
267 })
268 .unwrap_or_default()
269 }
270 };
271
272 let sql = format!(
273 "SELECT column_name AS `column_name`, \
274 data_type AS `data_type`, \
275 is_nullable AS `is_nullable`, \
276 column_default AS `column_default`, \
277 numeric_precision AS `numeric_precision`, \
278 numeric_scale AS `numeric_scale` \
279 FROM information_schema.columns \
280 WHERE table_schema = '{}' AND table_name = '{}' \
281 ORDER BY ordinal_position",
282 escape_mysql_string(&schema),
283 escape_mysql_string(table)
284 );
285 self.query(&sql).await
286 }
287
288 async fn primary_key(
289 &mut self,
290 schema: Option<&str>,
291 table: &str,
292 ) -> Result<Vec<String>, SqlError> {
293 let schema = match schema {
294 Some(s) => s.to_string(),
295 None => current_database(self).await?,
296 };
297 let sql = format!(
298 "SELECT column_name FROM information_schema.key_column_usage \
299 WHERE table_schema = '{}' AND table_name = '{}' \
300 AND constraint_name = 'PRIMARY' \
301 ORDER BY ordinal_position",
302 escape_mysql_string(&schema),
303 escape_mysql_string(table)
304 );
305 let result = self.query(&sql).await?;
306 Ok(result
307 .rows
308 .into_iter()
309 .filter_map(|row| {
310 row.into_iter().next().and_then(|v| match v {
311 Value::String(s) => Some(s),
312 _ => None,
313 })
314 })
315 .collect())
316 }
317
318 async fn list_foreign_keys(
319 &mut self,
320 schema: Option<&str>,
321 ) -> Result<Vec<ForeignKey>, SqlError> {
322 let schema = match schema {
323 Some(s) => s.to_string(),
324 None => current_database(self).await?,
325 };
326 let sql = format!(
329 "SELECT k.constraint_name, k.table_name, k.column_name, \
330 k.referenced_table_name, k.referenced_column_name, \
331 rc.delete_rule \
332 FROM information_schema.key_column_usage k \
333 JOIN information_schema.referential_constraints rc \
334 ON rc.constraint_schema = k.constraint_schema \
335 AND rc.constraint_name = k.constraint_name \
336 WHERE k.table_schema = '{}' AND k.referenced_table_name IS NOT NULL \
337 ORDER BY k.constraint_name, k.ordinal_position",
338 escape_mysql_string(&schema)
339 );
340 let result = self.query(&sql).await?;
341 let mut map: indexmap::IndexMap<String, ForeignKey> = indexmap::IndexMap::new();
342 for row in result.rows {
343 let mut cols = row.into_iter();
344 let conname = match cols.next() {
345 Some(Value::String(s)) => s,
346 _ => continue,
347 };
348 let child_table = match cols.next() {
349 Some(Value::String(s)) => s,
350 _ => continue,
351 };
352 let child_col = match cols.next() {
353 Some(Value::String(s)) => s,
354 _ => continue,
355 };
356 let parent_table = match cols.next() {
357 Some(Value::String(s)) => s,
358 _ => continue,
359 };
360 let parent_col = match cols.next() {
361 Some(Value::String(s)) => s,
362 _ => continue,
363 };
364 let on_delete = match cols.next() {
365 Some(Value::String(s)) => Some(s),
366 _ => None,
367 };
368 let entry = map.entry(conname).or_insert_with(|| ForeignKey {
369 child_table: child_table.clone(),
370 child_columns: Vec::new(),
371 parent_table: parent_table.clone(),
372 parent_columns: Vec::new(),
373 on_delete,
374 });
375 entry.child_columns.push(child_col);
376 entry.parent_columns.push(parent_col);
377 }
378 Ok(map.into_values().collect())
379 }
380
381 async fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
382 if target.rows.is_empty() {
383 return Ok(0);
384 }
385
386 let hints: Vec<TypeHint> = target.columns.iter().map(|c| c.type_hint).collect();
392 let mut chunks: Vec<Bytes> = Vec::with_capacity(target.rows.len());
393 for row in target.rows {
394 let bytes = my_load_data::encode_row(row, &hints)?;
395 chunks.push(bytes);
396 }
397
398 self.conn.set_infile_handler(async move {
413 Ok(futures_util::stream::iter(chunks).map(Ok).boxed())
414 });
415
416 let qtable = my_load_data::backtick_quote(target.table);
421 let cols = target
422 .columns
423 .iter()
424 .map(|c| my_load_data::backtick_quote(&c.name))
425 .collect::<Vec<_>>()
426 .join(", ");
427 let load_sql = format!(
428 "LOAD DATA LOCAL INFILE 'ferrule_bulk' INTO TABLE {qtable} \
429 CHARACTER SET utf8mb4 \
430 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' \
431 LINES TERMINATED BY '\\n' \
432 ({cols})"
433 );
434
435 let load_result = self.conn.query_drop(load_sql).await;
438
439 if let Err(e) = load_result {
440 let _ = self.conn.reset().await;
469 self.conn.set_infile_handler(async {
470 Err(mysql_async::Error::from(
471 mysql_async::LocalInfileError::other(std::io::Error::new(
472 std::io::ErrorKind::PermissionDenied,
473 "ferrule: LOAD DATA LOCAL INFILE refused — connection \
474 state may be tainted after a failed bulk operation. \
475 Reconnect to re-enable bulk_insert_rows.",
476 )),
477 ))
478 });
479 return Err(my_load_data::classify_load_error(e));
480 }
481
482 Ok(self.conn.affected_rows() as usize)
483 }
484}
485
486pub(crate) async fn connect(
487 url: &DatabaseUrl,
488 opts: &ConnectOptions,
489) -> Result<MySqlConnection, SqlError> {
490 let mut builder = mysql_async::OptsBuilder::default()
491 .ip_or_hostname(url.host().unwrap_or("localhost"))
492 .tcp_port(url.port().unwrap_or(3306));
493
494 if !url.username().is_empty() {
495 builder = builder.user(Some(url.username()));
496 }
497 if let Some(pass) = opts.effective_password(url) {
499 builder = builder.pass(Some(pass.expose_secret()));
500 }
501 let db = url.database();
502 if !db.is_empty() {
503 builder = builder.db_name(Some(db));
504 }
505
506 if opts.insecure {
507 let ssl_opts = mysql_async::SslOpts::default()
508 .with_danger_accept_invalid_certs(true)
509 .with_danger_skip_domain_validation(true);
510 builder = builder.ssl_opts(Some(ssl_opts));
511 }
512
513 if let Some(ssl_mode) = url.params().get("ssl-mode") {
514 match ssl_mode.as_str() {
515 "disabled" | "disable" => {
516 let ssl_opts =
517 mysql_async::SslOpts::default().with_danger_accept_invalid_certs(true);
518 builder = builder.ssl_opts(Some(ssl_opts));
519 }
520 "preferred" => {
521 }
523 "required" => {
524 let ssl_opts =
525 mysql_async::SslOpts::default().with_danger_accept_invalid_certs(false);
526 builder = builder.ssl_opts(Some(ssl_opts));
527 }
528 "verify-ca" | "verify-identity" => {
529 let ssl_opts = mysql_async::SslOpts::default()
530 .with_danger_accept_invalid_certs(false)
531 .with_danger_skip_domain_validation(false);
532 builder = builder.ssl_opts(Some(ssl_opts));
533 }
534 _ => {}
535 }
536 }
537
538 let conn_opts: mysql_async::Opts = builder.into();
539 let conn = mysql_async::Conn::new(conn_opts)
540 .await
541 .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
542
543 Ok(MySqlConnection { conn })
544}
545
546fn mysql_to_value(
547 value: mysql_async::Value,
548 column_type: mysql_async::consts::ColumnType,
549 column_length: u32,
550) -> Value {
551 use mysql_async::consts::ColumnType as CT;
552
553 match value {
554 mysql_async::Value::NULL => Value::Null,
555 mysql_async::Value::Bytes(b) => match column_type {
556 CT::MYSQL_TYPE_JSON => serde_json::from_slice(&b)
557 .map(Value::Json)
558 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
559 CT::MYSQL_TYPE_DECIMAL | CT::MYSQL_TYPE_NEWDECIMAL => {
560 Value::Decimal(String::from_utf8_lossy(&b).into_owned())
561 }
562 CT::MYSQL_TYPE_TINY_BLOB
563 | CT::MYSQL_TYPE_MEDIUM_BLOB
564 | CT::MYSQL_TYPE_LONG_BLOB
565 | CT::MYSQL_TYPE_BLOB => Value::Bytes(b),
566 CT::MYSQL_TYPE_TINY => {
567 let s = String::from_utf8_lossy(&b);
568 if column_length == 1 {
569 Value::Bool(s != "0")
570 } else {
571 s.parse::<i64>()
572 .map(Value::Int64)
573 .unwrap_or_else(|_| Value::String(s.into_owned()))
574 }
575 }
576 CT::MYSQL_TYPE_SHORT
577 | CT::MYSQL_TYPE_LONG
578 | CT::MYSQL_TYPE_INT24
579 | CT::MYSQL_TYPE_LONGLONG
580 | CT::MYSQL_TYPE_YEAR => String::from_utf8_lossy(&b)
581 .parse::<i64>()
582 .map(Value::Int64)
583 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
584 CT::MYSQL_TYPE_FLOAT | CT::MYSQL_TYPE_DOUBLE => String::from_utf8_lossy(&b)
585 .parse::<f64>()
586 .map(Value::Float64)
587 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned())),
588 CT::MYSQL_TYPE_DATE => {
589 NaiveDate::parse_from_str(&String::from_utf8_lossy(&b), "%Y-%m-%d")
590 .map(Value::Date)
591 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned()))
592 }
593 CT::MYSQL_TYPE_TIME => {
594 NaiveTime::parse_from_str(&String::from_utf8_lossy(&b), "%H:%M:%S")
595 .or_else(|_| {
596 NaiveTime::parse_from_str(&String::from_utf8_lossy(&b), "%H:%M:%S%.f")
597 })
598 .map(Value::Time)
599 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&b).into_owned()))
600 }
601 CT::MYSQL_TYPE_DATETIME | CT::MYSQL_TYPE_DATETIME2 => {
602 parse_naive_datetime(&String::from_utf8_lossy(&b))
603 .map(Value::DateTime)
604 .unwrap_or_else(|| Value::String(String::from_utf8_lossy(&b).into_owned()))
605 }
606 CT::MYSQL_TYPE_TIMESTAMP | CT::MYSQL_TYPE_TIMESTAMP2 => {
607 parse_naive_datetime(&String::from_utf8_lossy(&b))
608 .and_then(|dt| dt.and_local_timezone(Utc).single())
609 .map(Value::DateTimeTz)
610 .unwrap_or_else(|| Value::String(String::from_utf8_lossy(&b).into_owned()))
611 }
612 _ => String::from_utf8(b)
613 .map(Value::String)
614 .unwrap_or_else(|e| Value::Bytes(e.into_bytes())),
615 },
616 mysql_async::Value::Int(i) => {
617 if column_type == CT::MYSQL_TYPE_TINY && column_length == 1 {
618 Value::Bool(i != 0)
619 } else {
620 Value::Int64(i)
621 }
622 }
623 mysql_async::Value::UInt(u) => {
624 if column_type == CT::MYSQL_TYPE_TINY && column_length == 1 {
625 Value::Bool(u != 0)
626 } else {
627 Value::Int64(u as i64)
628 }
629 }
630 mysql_async::Value::Float(f) => Value::Float64(f64::from(f)),
631 mysql_async::Value::Double(d) => Value::Float64(d),
632 mysql_async::Value::Date(year, month, day, hour, min, sec, usec) => match column_type {
633 CT::MYSQL_TYPE_DATE => NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
634 .map(Value::Date)
635 .unwrap_or_else(|| Value::String(format!("{:04}-{:02}-{:02}", year, month, day))),
636 CT::MYSQL_TYPE_TIMESTAMP | CT::MYSQL_TYPE_TIMESTAMP2 => {
637 NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
638 .and_then(|d| d.and_hms_micro_opt(hour as u32, min as u32, sec as u32, usec))
639 .and_then(|dt| dt.and_local_timezone(Utc).single())
640 .map(Value::DateTimeTz)
641 .unwrap_or_else(|| {
642 Value::String(format!(
643 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
644 year, month, day, hour, min, sec
645 ))
646 })
647 }
648 _ => NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32)
649 .and_then(|d| d.and_hms_micro_opt(hour as u32, min as u32, sec as u32, usec))
650 .map(Value::DateTime)
651 .unwrap_or_else(|| {
652 Value::String(format!(
653 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
654 year, month, day, hour, min, sec
655 ))
656 }),
657 },
658 mysql_async::Value::Time(neg, days, hours, minutes, seconds, _usec) => {
659 let total_hours = days * 24 + u32::from(hours);
660 Value::String(format!(
661 "{}{:02}:{:02}:{:02}",
662 if neg { "-" } else { "" },
663 total_hours,
664 minutes,
665 seconds
666 ))
667 }
668 }
669}
670
671fn parse_naive_datetime(s: &str) -> Option<NaiveDateTime> {
672 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
673 .ok()
674 .or_else(|| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").ok())
675}
676
677async fn current_database(conn: &mut MySqlConnection) -> Result<String, SqlError> {
678 let result = conn.query("SELECT DATABASE()").await?;
679 Ok(result
680 .rows
681 .into_iter()
682 .next()
683 .and_then(|row| row.into_iter().next())
684 .and_then(|v| match v {
685 Value::String(s) => Some(s),
686 _ => None,
687 })
688 .unwrap_or_default())
689}
690
691fn escape_mysql_identifier(name: &str) -> String {
692 name.replace('`', "``")
693}
694
695fn escape_mysql_string(s: &str) -> String {
696 s.replace("'", "''")
697}
698
699mod my_load_data {
718 use crate::error::SqlError;
719 use crate::value::{TypeHint, Value};
720 use bytes::Bytes;
721
722 pub fn encode_row(row: &[Value], hints: &[TypeHint]) -> Result<Bytes, SqlError> {
724 let mut buf: Vec<u8> = Vec::with_capacity(row.len() * 16 + 1);
725 for (i, value) in row.iter().enumerate() {
726 if i > 0 {
727 buf.push(b'\t');
728 }
729 let hint = hints.get(i).copied().unwrap_or(TypeHint::Other);
730 encode_value(&mut buf, value, hint)?;
731 }
732 buf.push(b'\n');
733 Ok(Bytes::from(buf))
734 }
735
736 fn encode_value(out: &mut Vec<u8>, v: &Value, hint: TypeHint) -> Result<(), SqlError> {
737 match v {
738 Value::Null => out.extend_from_slice(b"\\N"),
739 Value::Bool(b) => out.push(if *b { b'1' } else { b'0' }),
740 Value::Int64(n) => out.extend_from_slice(n.to_string().as_bytes()),
741 Value::Float64(f) => {
742 if f.is_nan() {
743 out.extend_from_slice(b"\\N");
747 } else if f.is_infinite() {
748 out.extend_from_slice(b"\\N");
750 } else {
751 out.extend_from_slice(f.to_string().as_bytes());
752 }
753 }
754 Value::Decimal(s) => push_escaped(out, s.as_bytes()),
755 Value::String(s) => push_escaped(out, s.as_bytes()),
756 Value::Bytes(b) => push_escaped(out, b),
757 Value::Date(d) => out.extend_from_slice(d.to_string().as_bytes()),
758 Value::Time(t) => out.extend_from_slice(t.to_string().as_bytes()),
759 Value::DateTime(dt) => {
760 out.extend_from_slice(dt.to_string().as_bytes());
763 }
764 Value::DateTimeTz(dt) => {
765 let naive = dt.naive_utc();
769 out.extend_from_slice(naive.to_string().as_bytes());
770 }
771 Value::Json(j) => {
772 let rendered = serde_json::to_string(j).map_err(|e| {
773 SqlError::QueryFailed(format!("MySQL bulk: JSON serialize: {e}"))
774 })?;
775 push_escaped(out, rendered.as_bytes());
776 }
777 Value::Uuid(s) => push_escaped(out, s.as_bytes()),
778 Value::Array(a) => {
779 let _ = hint;
781 let rendered = serde_json::to_string(a).map_err(|e| {
782 SqlError::QueryFailed(format!("MySQL bulk: array serialize: {e}"))
783 })?;
784 push_escaped(out, rendered.as_bytes());
785 }
786 }
787 Ok(())
788 }
789
790 fn push_escaped(out: &mut Vec<u8>, bytes: &[u8]) {
794 for &b in bytes {
795 match b {
796 b'\\' => out.extend_from_slice(b"\\\\"),
797 b'\t' => out.extend_from_slice(b"\\t"),
798 b'\n' => out.extend_from_slice(b"\\n"),
799 b'\r' => out.extend_from_slice(b"\\r"),
806 b'\0' => out.extend_from_slice(b"\\0"),
807 other => out.push(other),
808 }
809 }
810 }
811
812 pub fn backtick_quote(s: &str) -> String {
815 format!("`{}`", s.replace('`', "``"))
816 }
817
818 pub fn classify_load_error(e: mysql_async::Error) -> SqlError {
824 match &e {
825 mysql_async::Error::Server(srv) => {
826 if matches!(srv.code, 1148 | 3948 | 3950) {
832 return SqlError::BulkUnavailable(format!(
833 "MySQL server rejected LOAD DATA LOCAL INFILE \
834 (error {}: {}). Enable `local_infile=ON` server-side, \
835 or pass `--bulk-native=off` to use the generic path.",
836 srv.code, srv.message
837 ));
838 }
839 SqlError::QueryFailed(format!("MySQL bulk LOAD DATA: {srv}"))
840 }
841 _ => SqlError::QueryFailed(format!("MySQL bulk LOAD DATA: {e}")),
842 }
843 }
844
845 #[cfg(test)]
846 mod tests {
847 use super::*;
848 use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
849
850 fn enc1(v: Value, hint: TypeHint) -> Vec<u8> {
851 let bytes = encode_row(&[v], &[hint]).expect("encode_row");
852 assert_eq!(bytes.last().copied(), Some(b'\n'));
854 bytes[..bytes.len() - 1].to_vec()
855 }
856
857 fn enc1_str(v: Value, hint: TypeHint) -> String {
858 String::from_utf8(enc1(v, hint)).expect("UTF-8")
859 }
860
861 #[test]
862 fn encode_null_is_backslash_n() {
863 assert_eq!(enc1_str(Value::Null, TypeHint::Null), "\\N");
864 }
865
866 #[test]
867 fn encode_bool_is_one_or_zero() {
868 assert_eq!(enc1_str(Value::Bool(true), TypeHint::Bool), "1");
869 assert_eq!(enc1_str(Value::Bool(false), TypeHint::Bool), "0");
870 }
871
872 #[test]
873 fn encode_int_and_float() {
874 assert_eq!(enc1_str(Value::Int64(42), TypeHint::Int64), "42");
875 assert_eq!(enc1_str(Value::Int64(-7), TypeHint::Int64), "-7");
876 assert_eq!(enc1_str(Value::Float64(1.5), TypeHint::Float64), "1.5");
877 }
878
879 #[test]
880 fn encode_float_nan_and_inf_become_null() {
881 assert_eq!(enc1_str(Value::Float64(f64::NAN), TypeHint::Float64), "\\N");
885 assert_eq!(
886 enc1_str(Value::Float64(f64::INFINITY), TypeHint::Float64),
887 "\\N"
888 );
889 }
890
891 #[test]
892 fn encode_string_escapes_backslash_first() {
893 assert_eq!(
900 enc1_str(Value::String("a\\b".into()), TypeHint::String),
901 "a\\\\b"
902 );
903 assert_eq!(
904 enc1_str(Value::String("a\tb".into()), TypeHint::String),
905 "a\\tb"
906 );
907 assert_eq!(
908 enc1_str(Value::String("a\nb".into()), TypeHint::String),
909 "a\\nb"
910 );
911 assert_eq!(
916 enc1_str(Value::String("a\rb".into()), TypeHint::String),
917 "a\\rb"
918 );
919 assert_eq!(
920 enc1_str(Value::String("a\r\nb".into()), TypeHint::String),
921 "a\\r\\nb"
922 );
923 }
924
925 #[test]
926 fn encode_string_escapes_nul_byte() {
927 let out = enc1(Value::String("a\0b".into()), TypeHint::String);
932 assert_eq!(out, b"a\\0b");
933 }
934
935 #[test]
936 fn encode_bytes_preserves_arbitrary_payload() {
937 let raw = vec![0x01u8, b'\t', 0xFF, b'\\', b'\n', 0x00, b'Z'];
941 let out = enc1(Value::Bytes(raw), TypeHint::Bytes);
942 assert_eq!(
943 out,
944 vec![
945 0x01u8, b'\\', b't', 0xFF, b'\\', b'\\', b'\\', b'n', b'\\', b'0', b'Z'
946 ]
947 );
948 }
949
950 #[test]
951 fn encode_date_time_datetime() {
952 let d = NaiveDate::from_ymd_opt(2026, 5, 14).unwrap();
953 let t = NaiveTime::from_hms_opt(12, 34, 56).unwrap();
954 let dt = NaiveDateTime::new(d, t);
955 assert_eq!(enc1_str(Value::Date(d), TypeHint::Date), "2026-05-14");
956 assert_eq!(enc1_str(Value::Time(t), TypeHint::Time), "12:34:56");
957 assert_eq!(
958 enc1_str(Value::DateTime(dt), TypeHint::DateTime),
959 "2026-05-14 12:34:56"
960 );
961 }
962
963 #[test]
964 fn encode_datetimetz_converts_to_utc_naive() {
965 let dt = Utc.with_ymd_and_hms(2026, 5, 14, 12, 34, 56).unwrap();
966 assert_eq!(
967 enc1_str(Value::DateTimeTz(dt), TypeHint::DateTimeTz),
968 "2026-05-14 12:34:56"
969 );
970 }
971
972 #[test]
973 fn encode_json_is_compact_then_escaped() {
974 let j = serde_json::json!({"role": "admin"});
975 let s = enc1_str(Value::Json(j), TypeHint::Json);
976 assert!(s.contains("\"role\":\"admin\""));
980 assert!(!s.contains(' '));
981 }
982
983 #[test]
984 fn encode_array_is_compact_json() {
985 let a = Value::Array(vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)]);
986 assert_eq!(enc1_str(a, TypeHint::Array), "[1,2,3]");
987 }
988
989 #[test]
990 fn encode_uuid_passes_through() {
991 assert_eq!(
992 enc1_str(
993 Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
994 TypeHint::Uuid
995 ),
996 "550e8400-e29b-41d4-a716-446655440000"
997 );
998 }
999
1000 #[test]
1001 fn encode_row_with_multiple_cells_uses_tab_separator() {
1002 let row = vec![
1003 Value::Int64(1),
1004 Value::String("Alice".into()),
1005 Value::Null,
1006 Value::Bool(true),
1007 ];
1008 let hints = vec![
1009 TypeHint::Int64,
1010 TypeHint::String,
1011 TypeHint::Null,
1012 TypeHint::Bool,
1013 ];
1014 let bytes = encode_row(&row, &hints).unwrap();
1015 assert_eq!(&bytes[..], b"1\tAlice\t\\N\t1\n");
1016 }
1017
1018 #[test]
1019 fn backtick_quote_doubles_embedded_backticks() {
1020 assert_eq!(backtick_quote("plain"), "`plain`");
1021 assert_eq!(backtick_quote("with`tick"), "`with``tick`");
1022 }
1023 }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use super::*;
1029 use crate::url::DatabaseUrl;
1030
1031 const TEST_MYSQL_URL: &str = "mysql://root:ferrule@127.0.0.1:13306/ferrule";
1032
1033 fn try_connect() -> Option<Box<dyn crate::Connection>> {
1034 let url = DatabaseUrl::parse(TEST_MYSQL_URL).ok()?;
1035 let conn = crate::connect(&url, &ConnectOptions::default(), None).ok()?;
1036 Some(conn)
1037 }
1038
1039 #[test]
1040 fn test_mysql_ping() {
1041 let Some(mut conn) = try_connect() else {
1042 eprintln!("MySQL test container not available, skipping test_mysql_ping");
1043 return;
1044 };
1045 conn.ping().expect("ping should succeed");
1046 }
1047
1048 #[test]
1049 fn test_mysql_query() {
1050 let Some(mut conn) = try_connect() else {
1051 eprintln!("MySQL test container not available, skipping test_mysql_query");
1052 return;
1053 };
1054 let result = conn
1055 .query("SELECT * FROM test_users")
1056 .expect("query should succeed");
1057 assert!(!result.columns.is_empty(), "should have columns");
1058 assert!(!result.rows.is_empty(), "should have rows");
1059 }
1060
1061 #[test]
1062 fn test_mysql_execute() {
1063 let Some(mut conn) = try_connect() else {
1064 eprintln!("MySQL test container not available, skipping test_mysql_execute");
1065 return;
1066 };
1067 let summary = conn
1068 .execute("INSERT INTO test_users (name, age) VALUES ('TestUser', 99)")
1069 .expect("execute should succeed");
1070 assert!(
1071 summary.rows_affected.is_some_and(|n| n > 0),
1072 "should have affected rows"
1073 );
1074 }
1075
1076 #[test]
1077 fn test_mysql_list_tables() {
1078 let Some(mut conn) = try_connect() else {
1079 eprintln!("MySQL test container not available, skipping test_mysql_list_tables");
1080 return;
1081 };
1082 let tables = conn.list_tables(None).expect("list_tables should succeed");
1083 assert!(
1084 tables.contains(&"test_users".to_string()),
1085 "should contain test_users"
1086 );
1087 }
1088
1089 #[test]
1090 fn test_mysql_list_schemas() {
1091 let Some(mut conn) = try_connect() else {
1092 eprintln!("MySQL test container not available, skipping test_mysql_list_schemas");
1093 return;
1094 };
1095 let schemas = conn.list_schemas().expect("list_schemas should succeed");
1096 assert!(
1099 schemas.iter().any(|s| s.name == "ferrule"),
1100 "should contain the seeded `ferrule` database, got: {schemas:?}"
1101 );
1102 assert!(
1103 schemas.iter().filter(|s| s.is_default).count() <= 1,
1104 "at most one schema should be flagged is_default, got: {schemas:?}"
1105 );
1106 }
1107
1108 #[test]
1109 fn test_mysql_describe_table() {
1110 let Some(mut conn) = try_connect() else {
1111 eprintln!("MySQL test container not available, skipping test_mysql_describe_table");
1112 return;
1113 };
1114 let result = conn
1115 .describe_table(None, "test_users")
1116 .expect("describe_table should succeed");
1117 assert_eq!(result.columns.len(), 6, "should return 6 metadata columns");
1118 let col_names: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
1119 assert_eq!(
1120 col_names,
1121 vec![
1122 "column_name",
1123 "data_type",
1124 "is_nullable",
1125 "column_default",
1126 "numeric_precision",
1127 "numeric_scale"
1128 ]
1129 );
1130 }
1131
1132 #[test]
1133 fn test_mysql_execute_multi() {
1134 let Some(mut conn) = try_connect() else {
1135 eprintln!("MySQL test container not available, skipping test_mysql_execute_multi");
1136 return;
1137 };
1138 let _ = conn.execute("DELETE FROM test_users WHERE name = 'MultiUser'");
1140 let results = conn
1141 .execute_multi("INSERT INTO test_users (name, age) VALUES ('MultiUser', 42); SELECT COUNT(*) FROM test_users;")
1142 .expect("execute_multi should succeed");
1143 assert_eq!(results.len(), 2, "should have two result sets");
1144 assert!(
1146 matches!(&results[0], StatementResult::Summary(s) if s.rows_affected.is_some_and(|n| n > 0)),
1147 "first result should be a DML summary with affected rows"
1148 );
1149 assert!(
1151 matches!(&results[1], StatementResult::Query(_)),
1152 "second result should be a Query"
1153 );
1154 }
1155
1156 #[test]
1157 fn test_mysql_type_mapping() {
1158 let Some(mut conn) = try_connect() else {
1159 eprintln!("MySQL test container not available, skipping test_mysql_type_mapping");
1160 return;
1161 };
1162 let result = conn
1163 .query("SELECT name, age, score, active, meta FROM test_users WHERE name = 'Alice'")
1164 .expect("query should succeed");
1165 assert_eq!(result.rows.len(), 1);
1166 let row = &result.rows[0];
1167 assert!(matches!(row[0], Value::String(_)), "name should be String");
1168 assert!(matches!(row[1], Value::Int64(_)), "age should be Int64");
1169 assert!(
1170 matches!(row[2], Value::Float64(_) | Value::Decimal(_)),
1171 "score should be Float64 or Decimal"
1172 );
1173 assert!(
1174 matches!(row[3], Value::Int64(_) | Value::Bool(_)),
1175 "active should be Int64 or Bool"
1176 );
1177 assert!(
1178 matches!(row[4], Value::Json(_) | Value::String(_)),
1179 "meta should be Json or String"
1180 );
1181 }
1182
1183 #[test]
1188 fn test_mysql_bulk_insert_rows_round_trip() {
1189 let Some(mut conn) = try_connect() else {
1190 eprintln!(
1191 "MySQL test container not available, skipping test_mysql_bulk_insert_rows_round_trip"
1192 );
1193 return;
1194 };
1195
1196 if conn.execute("SET GLOBAL local_infile = ON").is_err() {
1205 eprintln!(
1206 "MySQL test container does not allow toggling local_infile; \
1207 skipping test_mysql_bulk_insert_rows_round_trip"
1208 );
1209 return;
1210 }
1211
1212 let pid = std::process::id();
1213 let table = format!("ferrule_bulk_test_{pid}");
1214 let _ = conn.execute(&format!("DROP TABLE IF EXISTS {table}"));
1215 conn.execute(&format!(
1216 "CREATE TABLE {table} (\
1217 id BIGINT NOT NULL, \
1218 name VARCHAR(255) NULL, \
1219 active TINYINT(1) NULL, \
1220 blob_data BLOB NULL, \
1221 meta JSON NULL, \
1222 tricky TEXT NULL\
1223 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
1224 ))
1225 .expect("CREATE TABLE");
1226
1227 let columns = vec![
1228 ColumnInfo {
1229 name: "id".into(),
1230 type_hint: TypeHint::Int64,
1231 nullable: false,
1232 },
1233 ColumnInfo {
1234 name: "name".into(),
1235 type_hint: TypeHint::String,
1236 nullable: true,
1237 },
1238 ColumnInfo {
1239 name: "active".into(),
1240 type_hint: TypeHint::Bool,
1241 nullable: true,
1242 },
1243 ColumnInfo {
1244 name: "blob_data".into(),
1245 type_hint: TypeHint::Bytes,
1246 nullable: true,
1247 },
1248 ColumnInfo {
1249 name: "meta".into(),
1250 type_hint: TypeHint::Json,
1251 nullable: true,
1252 },
1253 ColumnInfo {
1254 name: "tricky".into(),
1255 type_hint: TypeHint::String,
1256 nullable: true,
1257 },
1258 ];
1259
1260 let rows: Vec<Row> = vec![
1261 vec![
1262 Value::Int64(1),
1263 Value::String("Alice".into()),
1264 Value::Bool(true),
1265 Value::Bytes(vec![0xDE, 0xAD, 0xBE, 0xEF]),
1266 Value::Json(serde_json::json!({"role": "admin"})),
1267 Value::String("plain".into()),
1268 ],
1269 vec![
1270 Value::Int64(2),
1271 Value::String("Esc\\\t\nape".into()),
1272 Value::Bool(false),
1273 Value::Bytes(vec![0x00, 0x09, 0x0A, 0xFF]),
1274 Value::Json(serde_json::Value::Null),
1275 Value::String("\\.".into()),
1276 ],
1277 vec![
1278 Value::Int64(3),
1279 Value::Null,
1280 Value::Null,
1281 Value::Null,
1282 Value::Null,
1283 Value::Null,
1284 ],
1285 ];
1286
1287 let n = conn
1288 .bulk_insert_rows(BulkInsert {
1289 table: &table,
1290 columns: &columns,
1291 rows: &rows,
1292 copy_format: crate::copy::CopyFormat::Text,
1293 })
1294 .expect("bulk_insert_rows");
1295 assert_eq!(n, 3);
1296
1297 let result = conn
1298 .query(&format!(
1299 "SELECT id, name, active, blob_data, tricky FROM {table} ORDER BY id"
1300 ))
1301 .unwrap();
1302 assert_eq!(result.rows.len(), 3);
1303
1304 if let Value::String(s) = &result.rows[1][1] {
1306 assert_eq!(
1307 s, "Esc\\\t\nape",
1308 "row 2 name should preserve backslash/tab/nl"
1309 );
1310 } else {
1311 panic!("row 2 name should be String, got {:?}", result.rows[1][1]);
1312 }
1313 if let Value::Bytes(b) = &result.rows[1][3] {
1314 assert_eq!(b.as_slice(), &[0x00u8, 0x09, 0x0A, 0xFF]);
1315 } else {
1316 panic!(
1317 "row 2 blob_data should be Bytes, got {:?}",
1318 result.rows[1][3]
1319 );
1320 }
1321 if let Value::String(s) = &result.rows[1][4] {
1322 assert_eq!(s, "\\.", "row 2 tricky should be literal backslash-dot");
1323 } else {
1324 panic!("row 2 tricky should be String, got {:?}", result.rows[1][4]);
1325 }
1326
1327 assert!(matches!(&result.rows[2][1], Value::Null));
1329 assert!(matches!(&result.rows[2][2], Value::Null));
1330 assert!(matches!(&result.rows[2][3], Value::Null));
1331
1332 conn.execute(&format!("DROP TABLE {table}"))
1334 .expect("DROP TABLE");
1335 }
1336
1337 #[test]
1344 fn test_mysql_load_data_without_bulk_in_progress_rejected() {
1345 let Some(mut conn) = try_connect() else {
1346 eprintln!(
1347 "MySQL test container not available, skipping test_mysql_load_data_without_bulk_in_progress_rejected"
1348 );
1349 return;
1350 };
1351
1352 if conn.execute("SET GLOBAL local_infile = ON").is_err() {
1356 eprintln!(
1357 "MySQL test container does not allow toggling local_infile; \
1358 skipping test_mysql_load_data_without_bulk_in_progress_rejected"
1359 );
1360 return;
1361 }
1362
1363 let pid = std::process::id();
1364 let table = format!("ferrule_bulk_security_test_{pid}");
1365 let _ = conn.execute(&format!("DROP TABLE IF EXISTS {table}"));
1366 conn.execute(&format!(
1367 "CREATE TABLE {table} (id INT, line TEXT) ENGINE=InnoDB"
1368 ))
1369 .expect("CREATE TABLE");
1370
1371 let result = conn.execute(&format!(
1373 "LOAD DATA LOCAL INFILE '/etc/passwd' INTO TABLE {table} \
1374 FIELDS TERMINATED BY ':' (id, line)"
1375 ));
1376
1377 let err = result
1381 .expect_err("LOAD DATA LOCAL INFILE without bulk_insert_rows in progress must fail");
1382 let msg = err.to_string();
1383 assert!(
1386 msg.to_lowercase().contains("handler")
1387 || msg.to_lowercase().contains("local_infile")
1388 || msg.to_lowercase().contains("infile"),
1389 "expected handler/infile rejection, got: {msg}"
1390 );
1391
1392 let count = conn
1394 .query(&format!("SELECT COUNT(*) FROM {table}"))
1395 .unwrap();
1396 match &count.rows[0][0] {
1397 Value::Int64(n) => assert_eq!(*n, 0, "no rows should have been inserted"),
1398 other => panic!("unexpected count shape: {other:?}"),
1399 }
1400
1401 let _ = conn.execute(&format!("DROP TABLE {table}"));
1402 }
1403
1404 #[test]
1405 fn test_mysql_primary_key() {
1406 let Some(mut conn) = try_connect() else {
1407 eprintln!("MySQL test container not available, skipping test_mysql_primary_key");
1408 return;
1409 };
1410 let pk = conn.primary_key(None, "test_users").expect("primary_key");
1411 assert_eq!(pk, vec!["id".to_string()]);
1412 }
1413
1414 #[test]
1415 fn test_mysql_list_foreign_keys() {
1416 let Some(mut conn) = try_connect() else {
1417 eprintln!("MySQL test container not available, skipping test_mysql_list_foreign_keys");
1418 return;
1419 };
1420 let pid = std::process::id();
1421 let child = format!("ferrule_fk_test_orders_{pid}");
1422 let _ = conn.execute(&format!("DROP TABLE IF EXISTS {child}"));
1423 conn.execute(&format!(
1424 "CREATE TABLE {child} (\
1425 id INT AUTO_INCREMENT PRIMARY KEY, \
1426 user_id INT, \
1427 FOREIGN KEY (user_id) REFERENCES test_users(id) ON DELETE CASCADE\
1428 )"
1429 ))
1430 .expect("CREATE TABLE");
1431
1432 let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
1433 let matching: Vec<_> = fks.iter().filter(|fk| fk.child_table == child).collect();
1434 assert_eq!(matching.len(), 1, "expected 1 FK from {child}, got {fks:?}");
1435 let fk = matching[0];
1436 assert_eq!(fk.child_columns, vec!["user_id".to_string()]);
1437 assert_eq!(fk.parent_table, "test_users");
1438 assert_eq!(fk.parent_columns, vec!["id".to_string()]);
1439 assert_eq!(fk.on_delete.as_deref(), Some("CASCADE"));
1440
1441 let _ = conn.execute(&format!("DROP TABLE {child}"));
1442 }
1443
1444 #[test]
1447 fn test_mysql_copy_skip_then_upsert() {
1448 use crate::backend::Backend;
1449 use crate::copy::{CopyOptions, CopySource, IfExists, copy_rows};
1450
1451 let (Some(mut src), Some(mut dst)) = (try_connect(), try_connect()) else {
1452 eprintln!(
1453 "MySQL test container not available, skipping test_mysql_copy_skip_then_upsert"
1454 );
1455 return;
1456 };
1457
1458 let pid = std::process::id();
1459 let src_table = format!("ferrule_my_skip_src_{pid}");
1460 let dst_table = format!("ferrule_my_skip_dst_{pid}");
1461 let _ = src.execute(&format!("DROP TABLE IF EXISTS {src_table}"));
1462 let _ = dst.execute(&format!("DROP TABLE IF EXISTS {dst_table}"));
1463 src.execute(&format!(
1464 "CREATE TABLE {src_table} (id INT PRIMARY KEY, name VARCHAR(64), val INT)"
1465 ))
1466 .expect("CREATE src");
1467 dst.execute(&format!(
1468 "CREATE TABLE {dst_table} (id INT PRIMARY KEY, name VARCHAR(64), val INT)"
1469 ))
1470 .expect("CREATE dst");
1471 src.execute(&format!(
1472 "INSERT INTO {src_table} VALUES (1, 'new-1', 10), (2, 'new-2', 20)"
1473 ))
1474 .expect("seed src");
1475 dst.execute(&format!("INSERT INTO {dst_table} VALUES (1, 'old-1', 99)"))
1476 .expect("seed dst");
1477
1478 let opts = CopyOptions {
1480 source: CopySource::Query {
1481 sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1482 into: dst_table.clone(),
1483 },
1484 if_exists: IfExists::Skip,
1485 ..Default::default()
1486 };
1487 copy_rows(&mut src, Backend::MySql, &mut dst, Backend::MySql, &opts)
1488 .expect("copy_rows skip");
1489
1490 let out = dst
1491 .query(&format!(
1492 "SELECT id, name, val FROM {dst_table} ORDER BY id"
1493 ))
1494 .expect("verify skip");
1495 assert_eq!(out.rows.len(), 2);
1496 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "old-1"));
1497 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1498
1499 let opts = CopyOptions {
1501 source: CopySource::Query {
1502 sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1503 into: dst_table.clone(),
1504 },
1505 if_exists: IfExists::Upsert,
1506 ..Default::default()
1507 };
1508 copy_rows(&mut src, Backend::MySql, &mut dst, Backend::MySql, &opts)
1509 .expect("copy_rows upsert");
1510
1511 let out = dst
1512 .query(&format!(
1513 "SELECT id, name, val FROM {dst_table} ORDER BY id"
1514 ))
1515 .expect("verify upsert");
1516 assert_eq!(out.rows.len(), 2);
1517 assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
1518 assert!(matches!(&out.rows[0][2], Value::Int64(10)));
1519 assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1520
1521 let _ = src.execute(&format!("DROP TABLE {src_table}"));
1522 let _ = dst.execute(&format!("DROP TABLE {dst_table}"));
1523 }
1524
1525 #[test]
1530 fn test_mysql_cursor_streams_in_bounded_batches() {
1531 let Some(mut conn) = try_connect() else {
1532 eprintln!(
1533 "MySQL test container not available, skipping test_mysql_cursor_streams_in_bounded_batches"
1534 );
1535 return;
1536 };
1537 let _ = conn.execute("DROP TABLE IF EXISTS ferrule_stream_src");
1539 conn.execute("CREATE TABLE ferrule_stream_src (i INT PRIMARY KEY)")
1540 .expect("create src");
1541 conn.execute("SET SESSION cte_max_recursion_depth = 100000")
1544 .expect("raise cte depth");
1545 conn.execute(
1546 "INSERT INTO ferrule_stream_src (i) \
1547 WITH RECURSIVE seq(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM seq WHERE i < 5000) \
1548 SELECT i FROM seq",
1549 )
1550 .expect("seed src");
1551 const BATCH: usize = 128;
1552 let mut cursor = conn
1553 .query_cursor("SELECT i, i * 2 AS doubled FROM ferrule_stream_src ORDER BY i")
1554 .expect("open mysql cursor");
1555 let mut total = 0u64;
1556 loop {
1557 let batch = cursor.next_batch(BATCH).expect("pull mysql batch");
1558 if batch.is_empty() {
1559 break;
1560 }
1561 assert!(batch.len() <= BATCH);
1562 total += batch.len() as u64;
1563 }
1564 assert_eq!(total, 5000);
1565 drop(cursor);
1566 let _ = conn.execute("DROP TABLE ferrule_stream_src");
1567 }
1568
1569 #[test]
1572 fn test_mysql_write_rows_round_trip() {
1573 let Some(mut conn) = try_connect() else {
1574 eprintln!(
1575 "MySQL test container not available, skipping test_mysql_write_rows_round_trip"
1576 );
1577 return;
1578 };
1579 let _ = conn.execute("DROP TABLE IF EXISTS ferrule_write_test");
1580 conn.execute("CREATE TABLE ferrule_write_test (id INT PRIMARY KEY, name VARCHAR(64))")
1581 .expect("create write table");
1582 let columns = vec![
1583 crate::value::ColumnInfo {
1584 name: "id".into(),
1585 type_hint: TypeHint::Int64,
1586 nullable: false,
1587 },
1588 crate::value::ColumnInfo {
1589 name: "name".into(),
1590 type_hint: TypeHint::String,
1591 nullable: true,
1592 },
1593 ];
1594 let rows: Vec<crate::value::Row> = (1..=2000)
1595 .map(|i| vec![Value::Int64(i), Value::String(format!("n{i}"))])
1596 .collect();
1597 let opts = crate::write::WriteOptions {
1598 batch_size: 250,
1599 ..Default::default()
1600 };
1601 let report = crate::write::write_rows(
1602 &mut *conn,
1603 crate::Backend::MySql,
1604 "ferrule_write_test",
1605 &columns,
1606 rows,
1607 &opts,
1608 )
1609 .expect("write_rows");
1610 assert_eq!(report.rows_written, 2000);
1611 assert!(report.is_complete());
1612 let back = conn
1613 .query("SELECT COUNT(*) FROM ferrule_write_test")
1614 .expect("count");
1615 assert!(matches!(back.rows[0][0], Value::Int64(2000)));
1616 let _ = conn.execute("DROP TABLE ferrule_write_test");
1617 }
1618
1619 #[test]
1622 fn test_mysql_write_rows_upsert() {
1623 let Some(mut conn) = try_connect() else {
1624 eprintln!("MySQL test container not available, skipping test_mysql_write_rows_upsert");
1625 return;
1626 };
1627 let _ = conn.execute("DROP TABLE IF EXISTS ferrule_write_up");
1628 conn.execute("CREATE TABLE ferrule_write_up (id INT PRIMARY KEY, v VARCHAR(32))")
1629 .expect("create");
1630 conn.execute("INSERT INTO ferrule_write_up VALUES (1, 'old')")
1631 .expect("seed");
1632 let columns = vec![
1633 crate::value::ColumnInfo {
1634 name: "id".into(),
1635 type_hint: TypeHint::Int64,
1636 nullable: false,
1637 },
1638 crate::value::ColumnInfo {
1639 name: "v".into(),
1640 type_hint: TypeHint::String,
1641 nullable: true,
1642 },
1643 ];
1644 let rows: Vec<crate::value::Row> = vec![
1645 vec![Value::Int64(1), Value::String("new".into())],
1646 vec![Value::Int64(2), Value::String("two".into())],
1647 ];
1648 let opts = crate::write::WriteOptions {
1649 mode: crate::write::WriteMode::Upsert,
1650 key_columns: vec!["id".into()],
1651 ..Default::default()
1652 };
1653 let report = crate::write::write_rows(
1654 &mut *conn,
1655 crate::Backend::MySql,
1656 "ferrule_write_up",
1657 &columns,
1658 rows,
1659 &opts,
1660 )
1661 .expect("write_rows upsert");
1662 assert!(report.is_complete());
1663 let v1 = conn
1664 .query("SELECT v FROM ferrule_write_up WHERE id = 1")
1665 .expect("read back");
1666 assert!(matches!(&v1.rows[0][0], Value::String(s) if s == "new"));
1667 let _ = conn.execute("DROP TABLE ferrule_write_up");
1668 }
1669}