1mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::query::build_export_query;
33use crate::tuning::{ADAPTIVE_SAMPLE_INTERVAL, SourceTuning, next_adaptive_batch_size};
34use crate::types::ColumnOverrides;
35
36use arrow_convert::{
37 mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
38 rows_to_record_batch_typed,
39};
40#[cfg(test)]
43use arrow_convert::bit_bytes_to_u64;
44use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
45
46pub use proxy::MysqlProxyKind;
50
51pub struct MysqlSource {
52 pool: Pool,
53 proxy_kind: MysqlProxyKind,
54}
55
56fn lean_pool_opts() -> PoolOpts {
60 PoolOpts::default()
61 .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
62}
63
64fn mysql_sample_innodb_log_waits(pool: &Pool) -> Option<u64> {
67 let mut conn = pool.get_conn().ok()?;
68 conn.query_first::<(String, u64), _>("SHOW GLOBAL STATUS LIKE 'Innodb_log_waits'")
69 .ok()
70 .flatten()
71 .map(|(_, v)| v)
72}
73
74impl MysqlSource {
75 #[allow(dead_code)]
78 pub fn from_pool(pool: Pool) -> Self {
79 let proxy_kind = detect_mysql_proxy_kind(&pool);
80 warn_proxy_kind(proxy_kind);
81 Self { pool, proxy_kind }
82 }
83
84 pub fn connect(url: &str) -> Result<Self> {
86 let opts =
87 Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
88 let pool = Pool::new(opts)?;
89 let proxy_kind = detect_mysql_proxy_kind(&pool);
90 warn_proxy_kind(proxy_kind);
91 Ok(Self { pool, proxy_kind })
92 }
93
94 pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
96 match tls {
97 Some(cfg) if cfg.mode.is_enforced() => {
98 let base = Opts::from_url(url)?;
99 let ssl = build_mysql_ssl_opts(cfg);
100 let opts = Opts::from(
101 OptsBuilder::from_opts(base)
102 .ssl_opts(Some(ssl))
103 .pool_opts(lean_pool_opts()),
104 );
105 let pool = Pool::new(opts)?;
106 let proxy_kind = detect_mysql_proxy_kind(&pool);
107 warn_proxy_kind(proxy_kind);
108 Ok(Self { pool, proxy_kind })
109 }
110 _ => Self::connect(url),
111 }
112 }
113
114 #[allow(dead_code)]
122 pub fn proxy_kind(&self) -> MysqlProxyKind {
123 self.proxy_kind
124 }
125}
126
127pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
132 match tls {
133 Some(cfg) if cfg.mode.is_enforced() => {
134 let base = Opts::from_url(url)?;
135 let ssl = build_mysql_ssl_opts(cfg);
136 let opts = Opts::from(
137 OptsBuilder::from_opts(base)
138 .ssl_opts(Some(ssl))
139 .pool_opts(lean_pool_opts()),
140 );
141 Ok(Pool::new(opts)?)
142 }
143 _ => {
144 let opts = Opts::from(
145 OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
146 );
147 Ok(Pool::new(opts)?)
148 }
149 }
150}
151
152const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
156
157const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
161
162fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
169 if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
170 (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
171 } else {
172 raw_bytes
173 }
174}
175
176pub(crate) fn introspect_mysql_table_for_chunking(
195 url: &str,
196 tls: Option<&TlsConfig>,
197 qualified_table: &str,
198) -> Result<crate::source::TableIntrospection> {
199 let pool = connect_pool(url, tls)?;
200 let mut conn = pool.get_conn()?;
201 let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
202 let default_db = default_db.unwrap_or_default();
203
204 let (schema, table) = match qualified_table.split_once('.') {
205 Some((s, t)) => (s.to_string(), t.to_string()),
206 None => (default_db, qualified_table.to_string()),
207 };
208
209 let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
214 "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
215 CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
216 CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
217 FROM information_schema.TABLES \
218 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
219 (&schema, &table),
220 )?;
221 let (row_estimate, avg_row_bytes) = match row_stats {
222 Some((rows, avg, data_len)) => {
223 let row_count = rows.max(0);
224 let raw_per_row = if avg > 0 {
225 Some(avg)
226 } else if row_count > 0 {
227 Some(data_len / row_count)
228 } else {
229 None
230 };
231 let per_row = raw_per_row.map(correct_innodb_avg_row_length);
246 (row_count, per_row.filter(|b| *b > 0))
247 }
248 None => (0, None),
249 };
250
251 let pk_first: Option<(String,)> = conn.exec_first(
255 "SELECT COLUMN_NAME \
256 FROM information_schema.STATISTICS \
257 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
258 (&schema, &table),
259 )?;
260 let single_int_pk = if let Some((col,)) = pk_first {
261 let composite: Option<(String,)> = conn.exec_first(
262 "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
263 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
264 LIMIT 1",
265 (&schema, &table),
266 )?;
267 if composite.is_some() {
268 log::debug!(
269 "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
270 );
271 None
272 } else {
273 let type_row: Option<(String,)> = conn.exec_first(
275 "SELECT DATA_TYPE FROM information_schema.COLUMNS \
276 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
277 (&schema, &table, &col),
278 )?;
279 match type_row.map(|(t,)| t.to_ascii_lowercase()) {
280 Some(t)
281 if matches!(
282 t.as_str(),
283 "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
284 ) =>
285 {
286 Some(col)
287 }
288 Some(t) => {
289 log::debug!(
290 "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
291 );
292 None
293 }
294 None => None,
295 }
296 }
297 } else {
298 None
299 };
300
301 let keyset_rows: Vec<(String, String, String)> = conn.exec(
308 "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
309 FROM information_schema.STATISTICS s \
310 JOIN information_schema.COLUMNS c \
311 ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
312 AND c.COLUMN_NAME = s.COLUMN_NAME \
313 WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
314 AND s.SEQ_IN_INDEX = 1 \
315 AND NOT EXISTS ( \
316 SELECT 1 FROM information_schema.STATISTICS s2 \
317 WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
318 AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
319 (&schema, &table),
320 )?;
321 let mut keyset_keys: Vec<String> = Vec::new();
322 for primary in [true, false] {
324 for (col, index_name, is_nullable) in &keyset_rows {
325 let is_primary = index_name == "PRIMARY";
326 if is_primary == primary
327 && is_nullable.eq_ignore_ascii_case("NO")
328 && !keyset_keys.contains(col)
329 {
330 keyset_keys.push(col.clone());
331 }
332 }
333 }
334
335 Ok(crate::source::TableIntrospection {
336 single_int_pk,
337 keyset_keys,
338 row_estimate,
339 avg_row_bytes,
340 })
341}
342
343fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
344 let mut ssl = SslOpts::default();
345 if let Some(path) = &cfg.ca_file {
346 ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
347 }
348 match cfg.mode {
349 TlsMode::Require => {
350 ssl = ssl
351 .with_danger_accept_invalid_certs(true)
352 .with_danger_skip_domain_validation(true);
353 }
354 TlsMode::VerifyCa => {
355 ssl = ssl.with_danger_skip_domain_validation(true);
356 }
357 TlsMode::VerifyFull => {
358 }
360 TlsMode::Disable => {
361 }
363 }
364 if cfg.accept_invalid_certs {
365 ssl = ssl.with_danger_accept_invalid_certs(true);
366 }
367 if cfg.accept_invalid_hostnames {
368 ssl = ssl.with_danger_skip_domain_validation(true);
369 }
370 ssl
371}
372
373fn mysql_run_export(
382 conn: &mut mysql::PooledConn,
383 sample_pool: Option<Pool>,
384 sql: &str,
385 cursor_param: Option<&str>,
386 tuning: &SourceTuning,
387 column_overrides: &ColumnOverrides,
388 sink: &mut dyn super::BatchSink,
389) -> Result<usize> {
390 let mut result = match cursor_param {
394 Some(val) => conn.exec_iter(sql, (val,))?,
395 None => conn.exec_iter(sql, ())?,
396 };
397 let columns = result.columns().as_ref().to_vec();
398
399 let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
402 let schema = Arc::new(schema);
403
404 sink.on_schema(schema.clone())?;
405
406 const PROBE_BATCH_SIZE: usize = 500;
421 const MYSQL_BATCH_TARGET_MB_DEFAULT: usize = 64;
422
423 let configured_batch_size = tuning.effective_batch_size(Some(&schema));
424 let mut effective_bs = configured_batch_size.min(PROBE_BATCH_SIZE);
425 let mut base_fetch_size = effective_bs;
426 let mut adaptive_last_waits: Option<u64> = if tuning.adaptive {
427 sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits)
428 } else {
429 None
430 };
431 let mut batch_count: usize = 0;
432 let row_set = result
433 .iter()
434 .ok_or_else(|| anyhow::anyhow!("no result set"))?;
435 let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(effective_bs);
436 let mut total_rows: usize = 0;
437 let mut memory_cap_applied = false;
438
439 for row_result in row_set {
440 let row = row_result?;
441 row_buf.push(row);
442
443 if row_buf.len() >= effective_bs {
444 total_rows += row_buf.len();
445 batch_count += 1;
446 let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
447 let batch_rows = row_buf.len();
448 row_buf.clear();
449
450 if !memory_cap_applied && batch_rows > 0 {
454 let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
455 let arrow_per_row = (arrow_bytes / batch_rows).max(64);
456 let target_mb = tuning
457 .batch_size_memory_mb
458 .unwrap_or(MYSQL_BATCH_TARGET_MB_DEFAULT);
459 let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
460 let target = safe.min(configured_batch_size);
461 if target != effective_bs {
462 log::info!(
463 "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size {} → {} (configured={})",
464 arrow_per_row,
465 target_mb,
466 effective_bs,
467 target,
468 configured_batch_size
469 );
470 effective_bs = target;
471 base_fetch_size = effective_bs;
472 row_buf.reserve(effective_bs.saturating_sub(row_buf.capacity()));
473 }
474 memory_cap_applied = true;
475 }
476
477 sink.on_batch(&batch)?;
478
479 if tuning.adaptive
480 && batch_count.is_multiple_of(ADAPTIVE_SAMPLE_INTERVAL)
481 && let Some(ref pool) = sample_pool
482 && let Some(cur) = mysql_sample_innodb_log_waits(pool)
483 {
484 let under_pressure = adaptive_last_waits.is_some_and(|prev| cur > prev);
485 adaptive_last_waits = Some(cur);
486 let next = next_adaptive_batch_size(effective_bs, base_fetch_size, under_pressure);
487 if next != effective_bs {
488 effective_bs = next;
489 log::info!(
490 "adaptive batch size → {} ({})",
491 effective_bs,
492 if under_pressure {
493 "pressure"
494 } else {
495 "recovery"
496 }
497 );
498 }
499 }
500
501 log::info!("fetched {} rows so far...", total_rows);
502
503 if tuning.throttle_ms > 0 {
504 std::thread::sleep(std::time::Duration::from_millis(tuning.throttle_ms));
505 }
506 }
507 }
508
509 if !row_buf.is_empty() {
510 total_rows += row_buf.len();
511 let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
512 sink.on_batch(&batch)?;
513 }
514
515 drop(result);
516 Ok(total_rows)
517}
518
519impl super::Source for MysqlSource {
520 fn export(
521 &mut self,
522 request: &super::ExportRequest<'_>,
523 sink: &mut dyn super::BatchSink,
524 ) -> Result<()> {
525 let built = build_export_query(request, SourceType::Mysql);
526 log::debug!(
527 "executing query (connection={}): {}",
528 self.proxy_kind.log_label(),
529 built.sql
530 );
531
532 let mut conn = self.pool.get_conn()?;
533
534 conn.query_drop("SET time_zone = '+00:00'")?;
537
538 if request.tuning.statement_timeout_s > 0 {
539 conn.query_drop(format!(
540 "SET SESSION max_execution_time = {}",
541 request.tuning.statement_timeout_s * 1000
542 ))?;
543 }
544
545 let sample_pool = if request.tuning.adaptive {
546 Some(self.pool.clone())
547 } else {
548 None
549 };
550 let result = mysql_run_export(
551 &mut conn,
552 sample_pool,
553 &built.sql,
554 built.cursor_param.as_deref(),
555 request.tuning,
556 request.column_overrides,
557 sink,
558 );
559
560 let _ = conn.query_drop("SET time_zone = @@global.time_zone");
563 if request.tuning.statement_timeout_s > 0 {
564 let _ = conn.query_drop("SET SESSION max_execution_time = 0");
565 }
566
567 let total_rows = result?;
572 if total_rows == 0 {
573 sink.on_schema(Arc::new(Schema::empty()))?;
574 }
575 log::info!("total: {} rows", total_rows);
576 Ok(())
577 }
578
579 fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
580 let mut conn = self.pool.get_conn()?;
581 let row: Option<mysql::Row> = conn.query_first(sql)?;
582 match row {
583 Some(r) => {
584 let val: Option<mysql::Value> = r.get(0);
585 match val {
586 Some(mysql::Value::Bytes(b)) => {
587 Ok(Some(String::from_utf8_lossy(&b).into_owned()))
588 }
589 Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
590 Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
591 Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
592 Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
593 _ => Ok(None),
594 }
595 }
596 None => Ok(None),
597 }
598 }
599
600 fn type_mappings(
601 &mut self,
602 query: &str,
603 column_overrides: &ColumnOverrides,
604 ) -> Result<Vec<crate::types::TypeMapping>> {
605 let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
606 let mut conn = self.pool.get_conn()?;
607 let result = conn.exec_iter(&wrapped, ())?;
608 let columns = result.columns().as_ref().to_vec();
609 drop(result);
610 let mappings = columns
611 .iter()
612 .map(|col| {
613 let rivet = column_overrides
614 .get(col.name_str().as_ref())
615 .cloned()
616 .unwrap_or_else(|| mysql_type_to_rivet(col));
617 let source = crate::types::SourceColumn::simple(
618 col.name_str().as_ref(),
619 mysql_native_type_name(col),
620 true,
621 );
622 crate::types::TypeMapping::from_source(&source, rivet)
623 })
624 .collect();
625 Ok(mappings)
626 }
627
628 fn sample_pressure(&mut self) -> Option<u64> {
632 mysql_sample_innodb_log_waits(&self.pool)
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
639
640 #[test]
645 fn bit_bytes_single_byte() {
646 assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
647 assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
648 assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
649 }
650
651 #[test]
652 fn bit_bytes_multi_byte() {
653 assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
654 assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
655 }
656
657 #[test]
658 fn bit_bytes_empty() {
659 assert_eq!(bit_bytes_to_u64(&[]), 0);
660 }
661
662 #[test]
665 fn innodb_correction_below_threshold_is_identity() {
666 assert_eq!(correct_innodb_avg_row_length(82), 82);
667 assert_eq!(correct_innodb_avg_row_length(314), 314);
668 assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
669 assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
670 }
671
672 #[test]
673 fn innodb_correction_above_threshold_divides_by_three() {
674 assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
675 assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
676 }
677
678 #[test]
679 fn innodb_correction_does_not_undershoot_floor() {
680 let just_above = 8 * 1024 + 1;
681 let divided = correct_innodb_avg_row_length(just_above);
682 assert!(divided >= 4 * 1024, "got {divided}");
683 }
684}