1mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::batch_controller::{
33 AdaptiveBatchController, DEFAULT_BATCH_TARGET_MB, PROBE_BATCH_SIZE,
34};
35use crate::source::query::build_export_query;
36use crate::tuning::SourceTuning;
37use crate::types::ColumnOverrides;
38
39use arrow_convert::{
40 mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
41 rows_to_record_batch_typed,
42};
43#[cfg(test)]
46use arrow_convert::bit_bytes_to_u64;
47use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
48
49pub use proxy::MysqlProxyKind;
53
54pub struct MysqlSource {
55 pool: Pool,
56 proxy_kind: MysqlProxyKind,
57}
58
59fn lean_pool_opts() -> PoolOpts {
63 PoolOpts::default()
64 .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
65}
66
67fn mysql_sample_innodb_log_waits(pool: &Pool) -> Option<u64> {
70 let mut conn = pool.get_conn().ok()?;
71 conn.query_first::<(String, u64), _>("SHOW GLOBAL STATUS LIKE 'Innodb_log_waits'")
72 .ok()
73 .flatten()
74 .map(|(_, v)| v)
75}
76
77impl MysqlSource {
78 pub fn from_pool(pool: Pool) -> Self {
83 let proxy_kind = detect_mysql_proxy_kind(&pool);
84 warn_proxy_kind(proxy_kind);
85 Self { pool, proxy_kind }
86 }
87
88 pub fn connect(url: &str) -> Result<Self> {
90 let opts =
91 Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
92 Ok(Self::from_pool(Pool::new(opts)?))
93 }
94
95 pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
97 match tls {
98 Some(cfg) if cfg.mode.is_enforced() => {
99 let base = Opts::from_url(url)?;
100 let ssl = build_mysql_ssl_opts(cfg);
101 let opts = Opts::from(
102 OptsBuilder::from_opts(base)
103 .ssl_opts(Some(ssl))
104 .pool_opts(lean_pool_opts()),
105 );
106 Ok(Self::from_pool(Pool::new(opts)?))
107 }
108 _ => Self::connect(url),
109 }
110 }
111
112 #[allow(dead_code)]
120 pub fn proxy_kind(&self) -> MysqlProxyKind {
121 self.proxy_kind
122 }
123}
124
125pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
130 match tls {
131 Some(cfg) if cfg.mode.is_enforced() => {
132 let base = Opts::from_url(url)?;
133 let ssl = build_mysql_ssl_opts(cfg);
134 let opts = Opts::from(
135 OptsBuilder::from_opts(base)
136 .ssl_opts(Some(ssl))
137 .pool_opts(lean_pool_opts()),
138 );
139 Ok(Pool::new(opts)?)
140 }
141 _ => {
142 let opts = Opts::from(
143 OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
144 );
145 Ok(Pool::new(opts)?)
146 }
147 }
148}
149
150const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
154
155const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
159
160fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
167 if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
168 (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
169 } else {
170 raw_bytes
171 }
172}
173
174pub(crate) fn introspect_mysql_table_for_chunking(
193 url: &str,
194 tls: Option<&TlsConfig>,
195 qualified_table: &str,
196) -> Result<crate::source::TableIntrospection> {
197 let pool = connect_pool(url, tls)?;
198 let mut conn = pool.get_conn()?;
199 let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
200 let default_db = default_db.unwrap_or_default();
201
202 let (schema, table) = match qualified_table.split_once('.') {
203 Some((s, t)) => (s.to_string(), t.to_string()),
204 None => (default_db, qualified_table.to_string()),
205 };
206
207 let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
212 "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
213 CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
214 CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
215 FROM information_schema.TABLES \
216 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
217 (&schema, &table),
218 )?;
219 let (row_estimate, avg_row_bytes) = match row_stats {
220 Some((rows, avg, data_len)) => {
221 let row_count = rows.max(0);
222 let raw_per_row = if avg > 0 {
223 Some(avg)
224 } else if row_count > 0 {
225 Some(data_len / row_count)
226 } else {
227 None
228 };
229 let per_row = raw_per_row.map(correct_innodb_avg_row_length);
244 (row_count, per_row.filter(|b| *b > 0))
245 }
246 None => (0, None),
247 };
248
249 let pk_first: Option<(String,)> = conn.exec_first(
253 "SELECT COLUMN_NAME \
254 FROM information_schema.STATISTICS \
255 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
256 (&schema, &table),
257 )?;
258 let single_int_pk = if let Some((col,)) = pk_first {
259 let composite: Option<(String,)> = conn.exec_first(
260 "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
261 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
262 LIMIT 1",
263 (&schema, &table),
264 )?;
265 if composite.is_some() {
266 log::debug!(
267 "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
268 );
269 None
270 } else {
271 let type_row: Option<(String,)> = conn.exec_first(
273 "SELECT DATA_TYPE FROM information_schema.COLUMNS \
274 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
275 (&schema, &table, &col),
276 )?;
277 match type_row.map(|(t,)| t.to_ascii_lowercase()) {
278 Some(t)
279 if matches!(
280 t.as_str(),
281 "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
282 ) =>
283 {
284 Some(col)
285 }
286 Some(t) => {
287 log::debug!(
288 "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
289 );
290 None
291 }
292 None => None,
293 }
294 }
295 } else {
296 None
297 };
298
299 let keyset_rows: Vec<(String, String, String)> = conn.exec(
306 "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
307 FROM information_schema.STATISTICS s \
308 JOIN information_schema.COLUMNS c \
309 ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
310 AND c.COLUMN_NAME = s.COLUMN_NAME \
311 WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
312 AND s.SEQ_IN_INDEX = 1 \
313 AND NOT EXISTS ( \
314 SELECT 1 FROM information_schema.STATISTICS s2 \
315 WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
316 AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
317 (&schema, &table),
318 )?;
319 let mut keyset_keys: Vec<String> = Vec::new();
320 for primary in [true, false] {
322 for (col, index_name, is_nullable) in &keyset_rows {
323 let is_primary = index_name == "PRIMARY";
324 if is_primary == primary
325 && is_nullable.eq_ignore_ascii_case("NO")
326 && !keyset_keys.contains(col)
327 {
328 keyset_keys.push(col.clone());
329 }
330 }
331 }
332
333 Ok(crate::source::TableIntrospection {
334 single_int_pk,
335 keyset_keys,
336 row_estimate,
337 avg_row_bytes,
338 })
339}
340
341fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
342 let mut ssl = SslOpts::default();
343 if let Some(path) = &cfg.ca_file {
344 ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
345 }
346 match cfg.mode {
347 TlsMode::Require => {
348 ssl = ssl
349 .with_danger_accept_invalid_certs(true)
350 .with_danger_skip_domain_validation(true);
351 }
352 TlsMode::VerifyCa => {
353 ssl = ssl.with_danger_skip_domain_validation(true);
354 }
355 TlsMode::VerifyFull => {
356 }
358 TlsMode::Disable => {
359 }
361 }
362 if cfg.accept_invalid_certs {
363 ssl = ssl.with_danger_accept_invalid_certs(true);
364 }
365 if cfg.accept_invalid_hostnames {
366 ssl = ssl.with_danger_skip_domain_validation(true);
367 }
368 ssl
369}
370
371fn mysql_run_export(
380 conn: &mut mysql::PooledConn,
381 sample_pool: Option<Pool>,
382 sql: &str,
383 cursor_param: Option<&str>,
384 tuning: &SourceTuning,
385 column_overrides: &ColumnOverrides,
386 sink: &mut dyn super::BatchSink,
387) -> Result<usize> {
388 let mut result = match cursor_param {
392 Some(val) => conn.exec_iter(sql, (val,))?,
393 None => conn.exec_iter(sql, ())?,
394 };
395 let columns = result.columns().as_ref().to_vec();
396
397 let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
400 let schema = Arc::new(schema);
401
402 sink.on_schema(schema.clone())?;
403
404 let configured_batch_size = tuning.effective_batch_size(Some(&schema));
419 let mut ctl = AdaptiveBatchController::new(tuning, configured_batch_size);
422 ctl.seed_pressure(if tuning.adaptive {
423 sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits)
424 } else {
425 None
426 });
427 let row_set = result
428 .iter()
429 .ok_or_else(|| anyhow::anyhow!("no result set"))?;
430 let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(ctl.target());
431 let mut total_rows: usize = 0;
432 let mut memory_cap_applied = false;
433
434 for row_result in row_set {
435 let row = row_result?;
436 row_buf.push(row);
437
438 if row_buf.len() >= ctl.target() {
439 total_rows += row_buf.len();
440 let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
441 let batch_rows = row_buf.len();
442 row_buf.clear();
443
444 if !memory_cap_applied && batch_rows > 0 {
448 let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
449 let arrow_per_row = (arrow_bytes / batch_rows).max(64);
450 let target_mb = tuning
451 .batch_size_memory_mb
452 .unwrap_or(DEFAULT_BATCH_TARGET_MB);
453 let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
454 if let Some(new) = ctl.apply_memory_cap(safe) {
455 log::info!(
456 "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size → {} (configured={})",
457 arrow_per_row,
458 target_mb,
459 new,
460 configured_batch_size
461 );
462 row_buf.reserve(new.saturating_sub(row_buf.capacity()));
463 }
464 memory_cap_applied = true;
465 }
466
467 sink.on_batch(&batch)?;
468
469 if let Some((new, under_pressure)) =
470 ctl.after_batch(|| sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits))
471 {
472 log::info!(
473 "adaptive batch size → {} ({})",
474 new,
475 if under_pressure {
476 "pressure"
477 } else {
478 "recovery"
479 }
480 );
481 }
482
483 log::info!("fetched {} rows so far...", total_rows);
484 ctl.throttle();
485 }
486 }
487
488 if !row_buf.is_empty() {
489 total_rows += row_buf.len();
490 let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
491 sink.on_batch(&batch)?;
492 }
493
494 drop(result);
495 Ok(total_rows)
496}
497
498impl super::Source for MysqlSource {
499 fn export(
500 &mut self,
501 request: &super::ExportRequest<'_>,
502 sink: &mut dyn super::BatchSink,
503 ) -> Result<()> {
504 let built = build_export_query(request, SourceType::Mysql);
505 log::debug!(
506 "executing query (connection={}): {}",
507 self.proxy_kind.log_label(),
508 built.sql
509 );
510
511 let mut conn = self.pool.get_conn()?;
512
513 conn.query_drop("SET time_zone = '+00:00'")?;
516
517 if request.tuning.statement_timeout_s > 0 {
518 conn.query_drop(format!(
519 "SET SESSION max_execution_time = {}",
520 request.tuning.statement_timeout_s * 1000
521 ))?;
522 }
523
524 let sample_pool = if request.tuning.adaptive {
525 Some(self.pool.clone())
526 } else {
527 None
528 };
529 let result = mysql_run_export(
530 &mut conn,
531 sample_pool,
532 &built.sql,
533 built.cursor_param.as_deref(),
534 request.tuning,
535 request.column_overrides,
536 sink,
537 );
538
539 let _ = conn.query_drop("SET time_zone = @@global.time_zone");
542 if request.tuning.statement_timeout_s > 0 {
543 let _ = conn.query_drop("SET SESSION max_execution_time = 0");
544 }
545
546 let total_rows = result?;
551 if total_rows == 0 {
552 sink.on_schema(Arc::new(Schema::empty()))?;
553 }
554 log::info!("total: {} rows", total_rows);
555 Ok(())
556 }
557
558 fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
559 let mut conn = self.pool.get_conn()?;
560 let row: Option<mysql::Row> = conn.query_first(sql)?;
561 match row {
562 Some(r) => {
563 let val: Option<mysql::Value> = r.get(0);
564 match val {
565 Some(mysql::Value::Bytes(b)) => {
566 Ok(Some(String::from_utf8_lossy(&b).into_owned()))
567 }
568 Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
569 Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
570 Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
571 Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
572 _ => Ok(None),
573 }
574 }
575 None => Ok(None),
576 }
577 }
578
579 fn type_mappings(
580 &mut self,
581 query: &str,
582 column_overrides: &ColumnOverrides,
583 ) -> Result<Vec<crate::types::TypeMapping>> {
584 let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
585 let mut conn = self.pool.get_conn()?;
586 let result = conn.exec_iter(&wrapped, ())?;
587 let columns = result.columns().as_ref().to_vec();
588 drop(result);
589 let mappings = columns
590 .iter()
591 .map(|col| {
592 let rivet =
593 crate::types::resolve_or(column_overrides, col.name_str().as_ref(), || {
594 mysql_type_to_rivet(col)
595 });
596 let source = crate::types::SourceColumn::simple(
597 col.name_str().as_ref(),
598 mysql_native_type_name(col),
599 true,
600 );
601 crate::types::TypeMapping::from_source(&source, rivet)
602 })
603 .collect();
604 Ok(mappings)
605 }
606
607 fn sample_pressure(&mut self) -> Option<u64> {
611 mysql_sample_innodb_log_waits(&self.pool)
612 }
613}
614
615#[cfg(test)]
616mod tests {
617 use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
618
619 #[test]
624 fn bit_bytes_single_byte() {
625 assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
626 assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
627 assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
628 }
629
630 #[test]
631 fn bit_bytes_multi_byte() {
632 assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
633 assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
634 }
635
636 #[test]
637 fn bit_bytes_empty() {
638 assert_eq!(bit_bytes_to_u64(&[]), 0);
639 }
640
641 #[test]
644 fn innodb_correction_below_threshold_is_identity() {
645 assert_eq!(correct_innodb_avg_row_length(82), 82);
646 assert_eq!(correct_innodb_avg_row_length(314), 314);
647 assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
648 assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
649 }
650
651 #[test]
652 fn innodb_correction_above_threshold_divides_by_three() {
653 assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
654 assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
655 }
656
657 #[test]
658 fn innodb_correction_does_not_undershoot_floor() {
659 let just_above = 8 * 1024 + 1;
660 let divided = correct_innodb_avg_row_length(just_above);
661 assert!(divided >= 4 * 1024, "got {divided}");
662 }
663}