rivet/source/mysql/mod.rs
1//! MySQL `Source` implementation.
2//!
3//! Module layout (mirrors `postgres/`):
4//!
5//! - `mod.rs` (this file) — `MysqlSource` struct + connect/TLS path, the
6//! extraction-pressure sampler, the `lean_pool_opts` / `connect_pool` /
7//! `build_mysql_ssl_opts` helpers, `introspect_mysql_table_for_chunking`
8//! together with the InnoDB `AVG_ROW_LENGTH` correction, the cursor-bound
9//! `exec_iter` export loop (`mysql_run_export`), and the `Source` trait impl.
10//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline:
11//! `mysql_type_to_rivet` + `mysql_native_type_name`,
12//! `mysql_schema_and_arrow_types`, BIT / TIME / DECIMAL decoders, and the
13//! array builders. Kept in a sibling because it is the largest
14//! single-purpose cluster in this driver (~510 LoC) and has zero reverse
15//! dependency back into the connection / pool / cursor layer.
16//! - [`proxy`] — `MysqlProxyKind` enum, the pure `classify_mysql_proxy`
17//! classifier, the I/O wrapper `detect_mysql_proxy_kind`, and
18//! `warn_proxy_kind`. Detection runs once at connect time; the classifier
19//! is exhaustively unit-tested in isolation (no live MySQL needed).
20
21mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::batch_controller::{
33 AdaptiveBatchController, DEFAULT_BATCH_TARGET_MB, PROBE_BATCH_SIZE,
34};
35use crate::source::query::build_export_query;
36use crate::tuning::SourceTuning;
37use crate::types::ColumnOverrides;
38
39use arrow_convert::{
40 mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
41 rows_to_record_batch_typed,
42};
43// `bit_bytes_to_u64` is only referenced by the `tests` module below — gate the
44// re-import on `cfg(test)` so non-test builds don't see an unused-import warning.
45#[cfg(test)]
46use arrow_convert::bit_bytes_to_u64;
47use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
48
49// Re-exported so external code (`tests/live_pool_safety.rs`) can still write
50// `use rivet::source::mysql::MysqlProxyKind` after the proxy block moved to
51// the `proxy` submodule.
52pub use proxy::MysqlProxyKind;
53
54pub struct MysqlSource {
55 pool: Pool,
56 proxy_kind: MysqlProxyKind,
57}
58
59/// Pool options that prevent eager pre-connection. The default mysql::Pool
60/// opens `min=10` connections immediately, which overflows MySQL's
61/// max_connections when many parallel exports run simultaneously.
62fn lean_pool_opts() -> PoolOpts {
63 PoolOpts::default()
64 .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
65}
66
67/// Sample an **extraction-pressure** proxy (Epic 18 C1) — the MySQL analogue of
68/// PG's `temp_bytes`. Sums two monotonic global counters:
69///
70/// - `Created_tmp_disk_tables` — a query spilled an internal temp table to disk
71/// (a `GROUP BY` / `DISTINCT` / `ORDER BY` that exceeded `tmp_table_size`).
72/// - `Innodb_buffer_pool_wait_free` — InnoDB had to wait for a free buffer-pool
73/// page, i.e. the read is evicting pages under memory pressure.
74///
75/// Either moving means "my extraction is stressing the source"; their sum is
76/// monotonic, so the governor's `cur > prev` comparison works unchanged. The
77/// sum is robust to MySQL 8.0's `TempTable` engine, where a spill may not bump
78/// `Created_tmp_disk_tables` — `Innodb_buffer_pool_wait_free` carries the signal
79/// then (and `Created_tmp_disk_tables` adds it on 5.7 / MariaDB). This replaces
80/// the old `Innodb_log_waits`, which is redo-**write** pressure and barely moves
81/// during a read-only export.
82fn mysql_sample_extraction_pressure(pool: &Pool) -> Option<u64> {
83 let mut conn = pool.get_conn().ok()?;
84 let rows: Vec<(String, u64)> = conn
85 .query(
86 "SHOW GLOBAL STATUS WHERE Variable_name IN \
87 ('Created_tmp_disk_tables', 'Innodb_buffer_pool_wait_free')",
88 )
89 .ok()?;
90 if rows.is_empty() {
91 return None;
92 }
93 Some(rows.iter().map(|(_, v)| *v).sum())
94}
95
96/// Snapshot the broader source-harm counters from `SHOW GLOBAL STATUS` — a
97/// superset of the governor's [`mysql_sample_extraction_pressure`]. Returns
98/// `(metric, cumulative_value)` pairs the pipeline deltas around the export and
99/// stores in `export_harm`. `SHOW GLOBAL STATUS` needs **no special privilege**.
100/// These are global counters, so concurrent load inflates the delta (accurate on
101/// a quiet pilot box). `Innodb_rows_read` is the read-amplification signal the
102/// 0.12 harm A/B keyed on. `None` on connect/query failure — never blocks the
103/// export.
104pub(crate) fn sample_harm_counters(
105 url: &str,
106 tls: Option<&TlsConfig>,
107) -> Option<Vec<(String, i64)>> {
108 let pool = connect_pool(url, tls).ok()?;
109 let mut conn = pool.get_conn().ok()?;
110 let rows: Vec<(String, i64)> = conn
111 .query(
112 "SHOW GLOBAL STATUS WHERE Variable_name IN \
113 ('Innodb_rows_read', 'Innodb_buffer_pool_reads', 'Created_tmp_disk_tables', \
114 'Handler_read_rnd_next', 'Innodb_row_lock_waits', 'Innodb_row_lock_time')",
115 )
116 .ok()?;
117 if rows.is_empty() {
118 return None;
119 }
120 Some(
121 rows.into_iter()
122 .map(|(k, v)| (format!("mysql_{}", k.to_lowercase()), v))
123 .collect(),
124 )
125}
126
127impl MysqlSource {
128 /// Build a source from an existing pool: the single place that detects the
129 /// proxy kind, warns once, and wraps the pool. The `connect*` entry points
130 /// all funnel through here (also handy in tests that share the pool for
131 /// post-export state inspection).
132 pub fn from_pool(pool: Pool) -> Self {
133 let proxy_kind = detect_mysql_proxy_kind(&pool);
134 warn_proxy_kind(proxy_kind);
135 Self { pool, proxy_kind }
136 }
137
138 /// Connect with no transport security (legacy path).
139 pub fn connect(url: &str) -> Result<Self> {
140 let opts =
141 Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
142 Ok(Self::from_pool(Pool::new(opts)?))
143 }
144
145 /// Connect honoring the user's [`TlsConfig`].
146 pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
147 // Refuse remote plaintext (no `tls:` block) before any dial (CWE-319).
148 crate::source::require_tls_or_loopback(url, tls)?;
149 match tls {
150 Some(cfg) if cfg.mode.is_enforced() => {
151 let base = Opts::from_url(url)?;
152 let ssl = build_mysql_ssl_opts(cfg);
153 let opts = Opts::from(
154 OptsBuilder::from_opts(base)
155 .ssl_opts(Some(ssl))
156 .pool_opts(lean_pool_opts()),
157 );
158 Ok(Self::from_pool(Pool::new(opts)?))
159 }
160 _ => Self::connect(url),
161 }
162 }
163
164 /// Expose the proxy classification for diagnostic tools (preflight,
165 /// integration tests). Not part of the public Source trait — same
166 /// internal-may-change contract as the rest of `rivet::source::mysql::*`.
167 ///
168 /// `#[allow(dead_code)]` covers the binary compilation unit; the lib +
169 /// integration tests reference this through the `rivet::source::mysql`
170 /// public surface.
171 #[allow(dead_code)]
172 pub fn proxy_kind(&self) -> MysqlProxyKind {
173 self.proxy_kind
174 }
175}
176
177/// Build a MySQL connection pool honoring the configured TLS policy.
178///
179/// Shared by preflight, doctor, init, and anywhere else we need a pool outside
180/// the `Source` trait. `tls = None` falls back to plaintext (legacy behavior).
181pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
182 // Refuse remote plaintext (no `tls:` block) before any dial (CWE-319).
183 crate::source::require_tls_or_loopback(url, tls)?;
184 match tls {
185 Some(cfg) if cfg.mode.is_enforced() => {
186 let base = Opts::from_url(url)?;
187 let ssl = build_mysql_ssl_opts(cfg);
188 let opts = Opts::from(
189 OptsBuilder::from_opts(base)
190 .ssl_opts(Some(ssl))
191 .pool_opts(lean_pool_opts()),
192 );
193 Ok(Pool::new(opts)?)
194 }
195 _ => {
196 let opts = Opts::from(
197 OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
198 );
199 Ok(Pool::new(opts)?)
200 }
201 }
202}
203
204/// Threshold above which `AVG_ROW_LENGTH` is treated as inflated by InnoDB BLOB
205/// overflow pages and divided down. Rows under 8 KB fit inline (no overflow),
206/// so the raw figure is accurate; above it the divisor compensates.
207const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
208
209/// Empirical divisor for InnoDB BLOB-page inflation. A wide-text row that
210/// allocates eight 16 KB overflow pages reports ~128 KB in `AVG_ROW_LENGTH`
211/// while the actual wire content is ~40 KB → factor of ~3.
212const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
213
214/// Apply the InnoDB BLOB-overflow correction to a raw `AVG_ROW_LENGTH` value.
215/// Pure function for unit testability — the live introspection helper calls
216/// this on the figure returned by `information_schema.TABLES`.
217///
218/// - Below the 8 KB threshold: raw value is accurate (no overflow).
219/// - Above: divide by 3, floored at threshold/2 so we never undershoot too far.
220fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
221 if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
222 (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
223 } else {
224 raw_bytes
225 }
226}
227
228/// Probe `information_schema` for stats chunked-mode planning needs.
229///
230/// MySQL analogue of [`crate::source::postgres::introspect_pg_table_for_chunking`]:
231/// returns the same source-neutral [`crate::source::TableIntrospection`] so
232/// `plan/build.rs` can dispatch on `source_type` and reuse the same downstream
233/// logic for chunk-column / chunk_size derivation.
234///
235/// Two queries per call, both against `information_schema` (no extra grants
236/// required for a normal app user):
237/// - `TABLES.AVG_ROW_LENGTH` + `TABLE_ROWS` for the row-size and row-count estimate.
238/// These come from `mysql.innodb_table_stats` and are only as fresh as the
239/// last `ANALYZE TABLE` / autostat run. Empty / unanalysed → zero.
240/// - `STATISTICS` filtered to `INDEX_NAME='PRIMARY'` with `SEQ_IN_INDEX=1` and a
241/// second probe ensuring no `SEQ_IN_INDEX=2` row exists — single-column PK only.
242///
243/// `qualified_table` is `<schema>.<table>` or bare `<table>` (resolved under the
244/// current database for the connection). Same strict ident rules as the YAML
245/// `table:` shortcut so the SQL stays trivially safe.
246pub(crate) fn introspect_mysql_table_for_chunking(
247 url: &str,
248 tls: Option<&TlsConfig>,
249 qualified_table: &str,
250) -> Result<crate::source::TableIntrospection> {
251 let pool = connect_pool(url, tls)?;
252 let mut conn = pool.get_conn()?;
253 let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
254 let default_db = default_db.unwrap_or_default();
255
256 let (schema, table) = match qualified_table.split_once('.') {
257 Some((s, t)) => (s.to_string(), t.to_string()),
258 None => (default_db, qualified_table.to_string()),
259 };
260
261 // (1) Row count + avg row bytes. AVG_ROW_LENGTH already accounts for
262 // overflow pages on InnoDB, so we use it directly rather than dividing
263 // DATA_LENGTH by TABLE_ROWS (which under-counts for tables with TOAST-like
264 // overflow). Fall back to division when AVG_ROW_LENGTH is 0.
265 let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
266 "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
267 CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
268 CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
269 FROM information_schema.TABLES \
270 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
271 (&schema, &table),
272 )?;
273 let (row_estimate, avg_row_bytes) = match row_stats {
274 Some((rows, avg, data_len)) => {
275 let row_count = rows.max(0);
276 let raw_per_row = if avg > 0 {
277 Some(avg)
278 } else if row_count > 0 {
279 Some(data_len / row_count)
280 } else {
281 None
282 };
283 // InnoDB stores TEXT/BLOB > ~768 B off-page in 16 KB BLOB pages,
284 // and `AVG_ROW_LENGTH` counts the allocated page bytes — not the
285 // actual content. On wide-text workloads (CMS bodies, JSON logs,
286 // audit trails) this inflates the per-row estimate 3-5× compared
287 // to what the client driver actually buffers over the wire.
288 //
289 // We empirically divide by 3 above an 8 KB threshold. Below 8 KB
290 // a row fits inline with no overflow, so the raw figure is
291 // accurate. Above it, dividing by 3 brings content_items' 41 KB
292 // estimate down to ~14 KB — still conservative vs the ~10 KB the
293 // PG side reports for the same payload via `pg_total_relation_size`.
294 //
295 // Pilots who want exact control can set `chunk_size:` explicitly
296 // (it always wins over the budget-derived size).
297 let per_row = raw_per_row.map(correct_innodb_avg_row_length);
298 (row_count, per_row.filter(|b| *b > 0))
299 }
300 None => (0, None),
301 };
302
303 // (2) Single-column int PK probe. STATISTICS has one row per (column,
304 // index) so we filter to PRIMARY + SEQ_IN_INDEX=1 and then check that
305 // the PRIMARY index has no SEQ_IN_INDEX=2 row (composite).
306 let pk_first: Option<(String,)> = conn.exec_first(
307 "SELECT COLUMN_NAME \
308 FROM information_schema.STATISTICS \
309 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
310 (&schema, &table),
311 )?;
312 let single_int_pk = if let Some((col,)) = pk_first {
313 let composite: Option<(String,)> = conn.exec_first(
314 "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
315 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
316 LIMIT 1",
317 (&schema, &table),
318 )?;
319 if composite.is_some() {
320 log::debug!(
321 "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
322 );
323 None
324 } else {
325 // Column type must be integer-family for safe range chunking.
326 let type_row: Option<(String,)> = conn.exec_first(
327 "SELECT DATA_TYPE FROM information_schema.COLUMNS \
328 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
329 (&schema, &table, &col),
330 )?;
331 match type_row.map(|(t,)| t.to_ascii_lowercase()) {
332 Some(t)
333 if matches!(
334 t.as_str(),
335 "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
336 ) =>
337 {
338 Some(col)
339 }
340 Some(t) => {
341 log::debug!(
342 "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
343 );
344 None
345 }
346 None => None,
347 }
348 }
349 } else {
350 None
351 };
352
353 // (3) Keyset keys (OPT-4): single-column, NOT NULL, UNIQUE index columns —
354 // usable as a seek-pagination key. NON_UNIQUE=0 filters to unique indexes
355 // (PRIMARY included); SEQ_IN_INDEX=1 with no SEQ_IN_INDEX=2 row keeps only
356 // single-column indexes; IS_NULLABLE='NO' guarantees `> last` never has to
357 // reason about NULL ordering. Index-backed by definition, so keyset's
358 // `ORDER BY key LIMIT n` is a range scan, not a filesort.
359 let keyset_rows: Vec<(String, String, String)> = conn.exec(
360 "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
361 FROM information_schema.STATISTICS s \
362 JOIN information_schema.COLUMNS c \
363 ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
364 AND c.COLUMN_NAME = s.COLUMN_NAME \
365 WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
366 AND s.SEQ_IN_INDEX = 1 \
367 AND NOT EXISTS ( \
368 SELECT 1 FROM information_schema.STATISTICS s2 \
369 WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
370 AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
371 (&schema, &table),
372 )?;
373 let mut keyset_keys: Vec<String> = Vec::new();
374 // PRIMARY first (most efficient — clustered), then other unique indexes.
375 for primary in [true, false] {
376 for (col, index_name, is_nullable) in &keyset_rows {
377 let is_primary = index_name == "PRIMARY";
378 if is_primary == primary
379 && is_nullable.eq_ignore_ascii_case("NO")
380 && !keyset_keys.contains(col)
381 {
382 keyset_keys.push(col.clone());
383 }
384 }
385 }
386
387 Ok(crate::source::TableIntrospection {
388 single_int_pk,
389 keyset_keys,
390 row_estimate,
391 avg_row_bytes,
392 })
393}
394
395fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
396 let mut ssl = SslOpts::default();
397 if let Some(path) = &cfg.ca_file {
398 ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
399 }
400 match cfg.mode {
401 TlsMode::Require => {
402 ssl = ssl
403 .with_danger_accept_invalid_certs(true)
404 .with_danger_skip_domain_validation(true);
405 }
406 TlsMode::VerifyCa => {
407 ssl = ssl.with_danger_skip_domain_validation(true);
408 }
409 TlsMode::VerifyFull => {
410 // Strict: verify chain + hostname.
411 }
412 TlsMode::Disable => {
413 // Never invoked: gated in connect_with_tls.
414 }
415 }
416 if cfg.accept_invalid_certs {
417 ssl = ssl.with_danger_accept_invalid_certs(true);
418 }
419 if cfg.accept_invalid_hostnames {
420 ssl = ssl.with_danger_skip_domain_validation(true);
421 }
422 ssl
423}
424
425/// RAII reset of the per-connection session state the export mutates
426/// (`time_zone`, optionally `max_execution_time`) — the MySQL analogue of
427/// `postgres::PgTxnGuard` (Epic 18 B1).
428///
429/// MySQL hands connections back to the `mysql` crate's pool on drop, and may sit
430/// behind ProxySQL / MaxScale that reuse a physical backend across logical
431/// connections. The previous end-of-`export()` reset covered success and the
432/// `Err` return (no `?`), but **not** a panic mid-export, nor an early `?` on
433/// the `SET max_execution_time` itself (MariaDB spells it `max_statement_time`,
434/// so that SET errors — and `time_zone`, already set, would leak). Arming the
435/// reset on `Drop` closes both: whatever exit path the export takes, the
436/// connection is clean before it returns to the pool.
437struct MysqlSessionGuard<'a> {
438 conn: &'a mut mysql::PooledConn,
439 reset_max_exec: bool,
440}
441
442impl<'a> MysqlSessionGuard<'a> {
443 /// Apply the session SETs and arm the reset. `time_zone` is always set (UTC
444 /// normalisation so Parquet writes `isAdjustedToUTC=true`); the guard is
445 /// constructed *immediately* after it, so if the later `max_execution_time`
446 /// SET fails (or anything panics), `Drop` still resets `time_zone`.
447 fn apply(conn: &'a mut mysql::PooledConn, max_exec_ms: Option<u64>) -> Result<Self> {
448 conn.query_drop("SET time_zone = '+00:00'")?;
449 let mut guard = Self {
450 conn,
451 reset_max_exec: false,
452 };
453 if let Some(ms) = max_exec_ms {
454 guard
455 .conn
456 .query_drop(format!("SET SESSION max_execution_time = {ms}"))?;
457 guard.reset_max_exec = true;
458 }
459 Ok(guard)
460 }
461
462 fn conn(&mut self) -> &mut mysql::PooledConn {
463 self.conn
464 }
465}
466
467impl Drop for MysqlSessionGuard<'_> {
468 fn drop(&mut self) {
469 // Best-effort; the connection is about to return to the pool either way.
470 let _ = self.conn.query_drop("SET time_zone = @@global.time_zone");
471 if self.reset_max_exec {
472 let _ = self.conn.query_drop("SET SESSION max_execution_time = 0");
473 }
474 }
475}
476
477/// Execute the MySQL query and stream results to sink.
478///
479/// Session-state cleanup (`time_zone`, `max_execution_time`) is handled by the
480/// caller's [`MysqlSessionGuard`], which resets it on `Drop` regardless of how
481/// this function exits (success, `Err`, or panic).
482///
483/// `sample_pool`: when `tuning.adaptive` is true, a clone of the source pool used
484/// to obtain a second connection for extraction-pressure sampling without interfering
485/// with the streaming result set on `conn`.
486fn mysql_run_export(
487 conn: &mut mysql::PooledConn,
488 sample_pool: Option<Pool>,
489 sql: &str,
490 cursor_param: Option<&str>,
491 tuning: &SourceTuning,
492 column_overrides: &ColumnOverrides,
493 sink: &mut dyn super::BatchSink,
494) -> Result<usize> {
495 // SecOps: cursor value is bound via exec_iter rather than string-interpolated.
496 // Using exec_iter uniformly (even with empty params) keeps match arms
497 // type-compatible — query_iter returns a Text-protocol result, exec_iter Binary.
498 let mut result = match cursor_param {
499 Some(val) => conn.exec_iter(sql, (val,))?,
500 None => conn.exec_iter(sql, ())?,
501 };
502 let columns = result.columns().as_ref().to_vec();
503
504 // Compute TypeMappings once; derive both the Arrow schema and the
505 // per-column DataType vec from the same source so they can never diverge.
506 let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
507 let schema = Arc::new(schema);
508
509 sink.on_schema(schema.clone())?;
510
511 // PG path uses `work_mem × 0.7 / row_bytes` for FETCH N — the analogous
512 // bottleneck on MySQL is *our* `row_buf` accumulator. The mysql crate
513 // streams rows from the wire one-at-a-time, but we pile up `effective_bs`
514 // of them in a `Vec<Row>` before flushing to Arrow → for `batch_size: 50000`
515 // (fast profile) on content_items that's ~650 MB just for the row_buf,
516 // plus another ~650 MB for the Arrow batch it feeds — RSS scales with
517 // `batch_size`, not chunk size.
518 //
519 // Fix: start with a small probe (`PROBE_BATCH_SIZE`), measure the actual
520 // Arrow bytes per row after the first batch, then cap `effective_bs` so
521 // each flush fits in roughly `MYSQL_BATCH_TARGET_MB` of Arrow memory.
522 // Caller's `batch_size_memory_mb` wins when set; the default is 64 MB —
523 // chosen to keep peak RSS well under 200 MB on wide-row tables while
524 // keeping batches large enough to be efficient for the parquet writer.
525 let configured_batch_size = tuning.effective_batch_size(Some(&schema));
526 // Shared batch-size state machine (probe → memory-cap → adaptive → throttle);
527 // MySQL provides only the row source + the target-MB cap formula below.
528 let mut ctl = AdaptiveBatchController::new(tuning, configured_batch_size);
529 ctl.seed_pressure(if tuning.adaptive {
530 sample_pool
531 .as_ref()
532 .and_then(mysql_sample_extraction_pressure)
533 } else {
534 None
535 });
536 let row_set = result
537 .iter()
538 .ok_or_else(|| anyhow::anyhow!("no result set"))?;
539 let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(ctl.target());
540 let mut total_rows: usize = 0;
541 let mut memory_cap_applied = false;
542 // Per-value ceiling (MB→bytes; `0`/None disables), enforced pre-allocation
543 // inside the batch builder so an oversized cell bails before Arrow reserves
544 // the buffer. Same source of truth as the sink's backstop guard.
545 let max_value_bytes = tuning.max_value_bytes();
546
547 for row_result in row_set {
548 let row = row_result?;
549 row_buf.push(row);
550
551 if row_buf.len() >= ctl.target() {
552 total_rows += row_buf.len();
553 let batch =
554 rows_to_record_batch_typed(&schema, &arrow_types, &row_buf, max_value_bytes)?;
555 let batch_rows = row_buf.len();
556 row_buf.clear();
557
558 // After the first (probe-sized) batch we know how many bytes per
559 // row Arrow actually uses. Cap subsequent flushes to a memory
560 // target. The controller clamps it to the configured `batch_size`.
561 if !memory_cap_applied && batch_rows > 0 {
562 let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
563 let arrow_per_row = (arrow_bytes / batch_rows).max(64);
564 let target_mb = tuning
565 .batch_size_memory_mb
566 .unwrap_or(DEFAULT_BATCH_TARGET_MB);
567 let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
568 if let Some(new) = ctl.apply_memory_cap(safe) {
569 log::info!(
570 "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size → {} (configured={})",
571 arrow_per_row,
572 target_mb,
573 new,
574 configured_batch_size
575 );
576 row_buf.reserve(new.saturating_sub(row_buf.capacity()));
577 }
578 memory_cap_applied = true;
579 }
580
581 sink.on_batch(&batch)?;
582
583 if let Some((new, under_pressure)) = ctl.after_batch(|| {
584 sample_pool
585 .as_ref()
586 .and_then(mysql_sample_extraction_pressure)
587 }) {
588 log::info!(
589 "adaptive batch size → {} ({})",
590 new,
591 if under_pressure {
592 "pressure"
593 } else {
594 "recovery"
595 }
596 );
597 }
598
599 log::info!("fetched {} rows so far...", total_rows);
600 ctl.throttle();
601 }
602 }
603
604 if !row_buf.is_empty() {
605 total_rows += row_buf.len();
606 let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf, max_value_bytes)?;
607 sink.on_batch(&batch)?;
608 }
609
610 drop(result);
611 Ok(total_rows)
612}
613
614impl super::Source for MysqlSource {
615 fn export(
616 &mut self,
617 request: &super::ExportRequest<'_>,
618 sink: &mut dyn super::BatchSink,
619 ) -> Result<()> {
620 let built = build_export_query(request, SourceType::Mysql);
621 log::debug!(
622 "executing query (connection={}): {}",
623 self.proxy_kind.log_label(),
624 built.sql
625 );
626
627 let mut conn = self.pool.get_conn()?;
628
629 // Per-connection session state, reset on `Drop` (Epic 18 B1) so a pooled
630 // connection — returned to the mysql-crate pool or reused behind
631 // ProxySQL/MaxScale — never carries our settings into the next checkout,
632 // even on a panic or an early return. `time_zone` normalises TIMESTAMP to
633 // UTC (Parquet `isAdjustedToUTC=true`); `max_execution_time` bounds the
634 // statement when a timeout is configured.
635 let max_exec_ms = (request.tuning.statement_timeout_s > 0)
636 .then(|| request.tuning.statement_timeout_s * 1000);
637 let mut guard = MysqlSessionGuard::apply(&mut conn, max_exec_ms)?;
638
639 let sample_pool = if request.tuning.adaptive {
640 Some(self.pool.clone())
641 } else {
642 None
643 };
644 let result = mysql_run_export(
645 guard.conn(),
646 sample_pool,
647 &built.sql,
648 built.cursor_param.as_deref(),
649 request.tuning,
650 request.column_overrides,
651 sink,
652 );
653 // Reset now (success or `Err`); the `Drop` impl is the backstop for a
654 // panic or early return inside `mysql_run_export`.
655 drop(guard);
656
657 // The empty-result fallback to `Schema::empty()` lives here for
658 // parity with the PG implementation, even though `exec_iter` always
659 // returns the column metadata before yielding any rows so
660 // mysql_run_export's `on_schema` already fired.
661 let total_rows = result?;
662 if total_rows == 0 {
663 sink.on_schema(Arc::new(Schema::empty()))?;
664 }
665 log::info!("total: {} rows", total_rows);
666 Ok(())
667 }
668
669 fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
670 let mut conn = self.pool.get_conn()?;
671 let row: Option<mysql::Row> = conn.query_first(sql)?;
672 match row {
673 Some(r) => {
674 let val: Option<mysql::Value> = r.get(0);
675 match val {
676 Some(mysql::Value::Bytes(b)) => {
677 Ok(Some(String::from_utf8_lossy(&b).into_owned()))
678 }
679 Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
680 Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
681 Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
682 Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
683 _ => Ok(None),
684 }
685 }
686 None => Ok(None),
687 }
688 }
689
690 fn type_mappings(
691 &mut self,
692 query: &str,
693 column_overrides: &ColumnOverrides,
694 ) -> Result<Vec<crate::types::TypeMapping>> {
695 let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
696 let mut conn = self.pool.get_conn()?;
697 let result = conn.exec_iter(&wrapped, ())?;
698 let columns = result.columns().as_ref().to_vec();
699 drop(result);
700 let mappings = columns
701 .iter()
702 .map(|col| {
703 let rivet =
704 crate::types::resolve_or(column_overrides, col.name_str().as_ref(), || {
705 mysql_type_to_rivet(col)
706 });
707 let source = crate::types::SourceColumn::simple(
708 col.name_str().as_ref(),
709 mysql_native_type_name(col),
710 true,
711 );
712 crate::types::TypeMapping::from_source(&source, rivet)
713 })
714 .collect();
715 Ok(mappings)
716 }
717
718 /// Governor pressure proxy (Epic 18 C1): the same monotonic
719 /// extraction-pressure sum the adaptive batch loop samples
720 /// (`Created_tmp_disk_tables` + `Innodb_buffer_pool_wait_free`). Rising
721 /// between samples means the extraction is spilling a temp table to disk or
722 /// stalling on buffer-pool memory — the MySQL analogue of PG `temp_bytes`.
723 fn sample_pressure(&mut self) -> Option<u64> {
724 mysql_sample_extraction_pressure(&self.pool)
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
731
732 // Proxy classifier tests live in `proxy.rs` alongside the classifier.
733
734 // ── bit_bytes_to_u64 (lives in arrow_convert.rs, exported pub(super)) ──
735
736 #[test]
737 fn bit_bytes_single_byte() {
738 assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
739 assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
740 assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
741 }
742
743 #[test]
744 fn bit_bytes_multi_byte() {
745 assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
746 assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
747 }
748
749 #[test]
750 fn bit_bytes_empty() {
751 assert_eq!(bit_bytes_to_u64(&[]), 0);
752 }
753
754 #[test]
755 fn bit_bytes_ascii_digit_bytes_are_bits_not_text() {
756 // Regression (mysql-bit): BIT bytes that happen to be ASCII digits are
757 // still big-endian bits — never decimal text.
758 assert_eq!(bit_bytes_to_u64(&[0x39]), 57); // "9" as text, BIT(8) 57
759 assert_eq!(bit_bytes_to_u64(&[0x31, 0x32]), 0x3132); // b"12" → 12594
760 assert_eq!(bit_bytes_to_u64(&[0x31, 0xFF]), 12799); // digit head, non-digit tail
761 }
762
763 // ── InnoDB AVG_ROW_LENGTH correction ────────────────────────────────
764
765 #[test]
766 fn innodb_correction_below_threshold_is_identity() {
767 assert_eq!(correct_innodb_avg_row_length(82), 82);
768 assert_eq!(correct_innodb_avg_row_length(314), 314);
769 assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
770 assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
771 }
772
773 #[test]
774 fn innodb_correction_above_threshold_divides_by_three() {
775 assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
776 assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
777 }
778
779 #[test]
780 fn innodb_correction_does_not_undershoot_floor() {
781 let just_above = 8 * 1024 + 1;
782 let divided = correct_innodb_avg_row_length(just_above);
783 assert!(divided >= 4 * 1024, "got {divided}");
784 }
785}