Skip to main content

rivet/source/mysql/
mod.rs

1//! MySQL `Source` implementation.
2//!
3//! Module layout (mirrors `postgres/`):
4//!
5//! - `mod.rs` (this file) — `MysqlSource` struct + connect/TLS path, the
6//!   extraction-pressure sampler, the `lean_pool_opts` / `connect_pool` /
7//!   `build_mysql_ssl_opts` helpers, `introspect_mysql_table_for_chunking`
8//!   together with the InnoDB `AVG_ROW_LENGTH` correction, the cursor-bound
9//!   `exec_iter` export loop (`mysql_run_export`), and the `Source` trait impl.
10//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline:
11//!   `mysql_type_to_rivet` + `mysql_native_type_name`,
12//!   `mysql_schema_and_arrow_types`, BIT / TIME / DECIMAL decoders, and the
13//!   array builders. Kept in a sibling because it is the largest
14//!   single-purpose cluster in this driver (~510 LoC) and has zero reverse
15//!   dependency back into the connection / pool / cursor layer.
16//! - [`proxy`] — `MysqlProxyKind` enum, the pure `classify_mysql_proxy`
17//!   classifier, the I/O wrapper `detect_mysql_proxy_kind`, and
18//!   `warn_proxy_kind`. Detection runs once at connect time; the classifier
19//!   is exhaustively unit-tested in isolation (no live MySQL needed).
20
21mod arrow_convert;
22pub(crate) mod cdc;
23mod proxy;
24
25use std::sync::Arc;
26
27use arrow::datatypes::Schema;
28use mysql::prelude::*;
29use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
30
31use crate::config::{SourceType, TlsConfig, TlsMode};
32use crate::error::Result;
33use crate::source::batch_controller::{
34    AdaptiveBatchController, DEFAULT_BATCH_TARGET_MB, PROBE_BATCH_SIZE,
35};
36use crate::source::query::build_export_query;
37use crate::tuning::SourceTuning;
38use crate::types::ColumnOverrides;
39
40use arrow_convert::{
41    mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
42    rows_to_record_batch_typed,
43};
44// `bit_bytes_to_u64` is only referenced by the `tests` module below — gate the
45// re-import on `cfg(test)` so non-test builds don't see an unused-import warning.
46#[cfg(test)]
47use arrow_convert::bit_bytes_to_u64;
48use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
49
50// Re-exported so external code (`tests/live_pool_safety.rs`) can still write
51// `use rivet::source::mysql::MysqlProxyKind` after the proxy block moved to
52// the `proxy` submodule.
53pub use proxy::MysqlProxyKind;
54
55pub struct MysqlSource {
56    pool: Pool,
57    proxy_kind: MysqlProxyKind,
58}
59
60/// Pool options that prevent eager pre-connection. The default mysql::Pool
61/// opens `min=10` connections immediately, which overflows MySQL's
62/// max_connections when many parallel exports run simultaneously.
63fn lean_pool_opts() -> PoolOpts {
64    PoolOpts::default()
65        .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
66}
67
68/// Sample an **extraction-pressure** proxy (Epic 18 C1) — the MySQL analogue of
69/// PG's `temp_bytes`. Sums two monotonic global counters:
70///
71/// - `Created_tmp_disk_tables` — a query spilled an internal temp table to disk
72///   (a `GROUP BY` / `DISTINCT` / `ORDER BY` that exceeded `tmp_table_size`).
73/// - `Innodb_buffer_pool_wait_free` — InnoDB had to wait for a free buffer-pool
74///   page, i.e. the read is evicting pages under memory pressure.
75///
76/// Either moving means "my extraction is stressing the source"; their sum is
77/// monotonic, so the governor's `cur > prev` comparison works unchanged. The
78/// sum is robust to MySQL 8.0's `TempTable` engine, where a spill may not bump
79/// `Created_tmp_disk_tables` — `Innodb_buffer_pool_wait_free` carries the signal
80/// then (and `Created_tmp_disk_tables` adds it on 5.7 / MariaDB). This replaces
81/// the old `Innodb_log_waits`, which is redo-**write** pressure and barely moves
82/// during a read-only export.
83fn mysql_sample_extraction_pressure(pool: &Pool) -> Option<u64> {
84    let mut conn = pool.get_conn().ok()?;
85    let rows: Vec<(String, u64)> = conn
86        .query(
87            "SHOW GLOBAL STATUS WHERE Variable_name IN \
88             ('Created_tmp_disk_tables', 'Innodb_buffer_pool_wait_free')",
89        )
90        .ok()?;
91    if rows.is_empty() {
92        return None;
93    }
94    Some(rows.iter().map(|(_, v)| *v).sum())
95}
96
97/// Snapshot the broader source-harm counters from `SHOW GLOBAL STATUS` — a
98/// superset of the governor's [`mysql_sample_extraction_pressure`]. Returns
99/// `(metric, cumulative_value)` pairs the pipeline deltas around the export and
100/// stores in `export_harm`. `SHOW GLOBAL STATUS` needs **no special privilege**.
101/// These are global counters, so concurrent load inflates the delta (accurate on
102/// a quiet pilot box). `Innodb_rows_read` is the read-amplification signal the
103/// 0.12 harm A/B keyed on. `None` on connect/query failure — never blocks the
104/// export.
105pub(crate) fn sample_harm_counters(
106    url: &str,
107    tls: Option<&TlsConfig>,
108) -> Option<Vec<(String, i64)>> {
109    let pool = connect_pool(url, tls).ok()?;
110    let mut conn = pool.get_conn().ok()?;
111    let rows: Vec<(String, i64)> = conn
112        .query(
113            "SHOW GLOBAL STATUS WHERE Variable_name IN \
114             ('Innodb_rows_read', 'Innodb_buffer_pool_reads', 'Created_tmp_disk_tables', \
115              'Handler_read_rnd_next', 'Innodb_row_lock_waits', 'Innodb_row_lock_time')",
116        )
117        .ok()?;
118    if rows.is_empty() {
119        return None;
120    }
121    Some(
122        rows.into_iter()
123            .map(|(k, v)| (format!("mysql_{}", k.to_lowercase()), v))
124            .collect(),
125    )
126}
127
128impl MysqlSource {
129    /// Build a source from an existing pool: the single place that detects the
130    /// proxy kind, warns once, and wraps the pool. The `connect*` entry points
131    /// all funnel through here (also handy in tests that share the pool for
132    /// post-export state inspection).
133    pub fn from_pool(pool: Pool) -> Self {
134        let proxy_kind = detect_mysql_proxy_kind(&pool);
135        warn_proxy_kind(proxy_kind);
136        Self { pool, proxy_kind }
137    }
138
139    /// Connect with no transport security (legacy path).
140    pub fn connect(url: &str) -> Result<Self> {
141        let opts =
142            Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
143        Ok(Self::from_pool(Pool::new(opts)?))
144    }
145
146    /// Connect honoring the user's [`TlsConfig`].
147    pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
148        // Refuse remote plaintext (no `tls:` block) before any dial (CWE-319).
149        crate::source::require_tls_or_loopback(url, tls)?;
150        match tls {
151            Some(cfg) if cfg.mode.is_enforced() => {
152                let base = Opts::from_url(url)?;
153                let ssl = build_mysql_ssl_opts(cfg);
154                let opts = Opts::from(
155                    OptsBuilder::from_opts(base)
156                        .ssl_opts(Some(ssl))
157                        .pool_opts(lean_pool_opts()),
158                );
159                Ok(Self::from_pool(Pool::new(opts)?))
160            }
161            _ => Self::connect(url),
162        }
163    }
164
165    /// Expose the proxy classification for diagnostic tools (preflight,
166    /// integration tests). Not part of the public Source trait — same
167    /// internal-may-change contract as the rest of `rivet::source::mysql::*`.
168    ///
169    /// `#[allow(dead_code)]` covers the binary compilation unit; the lib +
170    /// integration tests reference this through the `rivet::source::mysql`
171    /// public surface.
172    #[allow(dead_code)]
173    pub fn proxy_kind(&self) -> MysqlProxyKind {
174        self.proxy_kind
175    }
176}
177
178/// Build a MySQL connection pool honoring the configured TLS policy.
179///
180/// Shared by preflight, doctor, init, and anywhere else we need a pool outside
181/// the `Source` trait. `tls = None` falls back to plaintext (legacy behavior).
182pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
183    // Refuse remote plaintext (no `tls:` block) before any dial (CWE-319).
184    crate::source::require_tls_or_loopback(url, tls)?;
185    match tls {
186        Some(cfg) if cfg.mode.is_enforced() => {
187            let base = Opts::from_url(url)?;
188            let ssl = build_mysql_ssl_opts(cfg);
189            let opts = Opts::from(
190                OptsBuilder::from_opts(base)
191                    .ssl_opts(Some(ssl))
192                    .pool_opts(lean_pool_opts()),
193            );
194            Ok(Pool::new(opts)?)
195        }
196        _ => {
197            let opts = Opts::from(
198                OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
199            );
200            Ok(Pool::new(opts)?)
201        }
202    }
203}
204
205/// Threshold above which `AVG_ROW_LENGTH` is treated as inflated by InnoDB BLOB
206/// overflow pages and divided down. Rows under 8 KB fit inline (no overflow),
207/// so the raw figure is accurate; above it the divisor compensates.
208const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
209
210/// Empirical divisor for InnoDB BLOB-page inflation. A wide-text row that
211/// allocates eight 16 KB overflow pages reports ~128 KB in `AVG_ROW_LENGTH`
212/// while the actual wire content is ~40 KB → factor of ~3.
213const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
214
215/// Apply the InnoDB BLOB-overflow correction to a raw `AVG_ROW_LENGTH` value.
216/// Pure function for unit testability — the live introspection helper calls
217/// this on the figure returned by `information_schema.TABLES`.
218///
219/// - Below the 8 KB threshold: raw value is accurate (no overflow).
220/// - Above: divide by 3, floored at threshold/2 so we never undershoot too far.
221fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
222    if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
223        (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
224    } else {
225        raw_bytes
226    }
227}
228
229/// Probe `information_schema` for stats chunked-mode planning needs.
230///
231/// MySQL analogue of [`crate::source::postgres::introspect_pg_table_for_chunking`]:
232/// returns the same source-neutral [`crate::source::TableIntrospection`] so
233/// `plan/build.rs` can dispatch on `source_type` and reuse the same downstream
234/// logic for chunk-column / chunk_size derivation.
235///
236/// Two queries per call, both against `information_schema` (no extra grants
237/// required for a normal app user):
238/// - `TABLES.AVG_ROW_LENGTH` + `TABLE_ROWS` for the row-size and row-count estimate.
239///   These come from `mysql.innodb_table_stats` and are only as fresh as the
240///   last `ANALYZE TABLE` / autostat run. Empty / unanalysed → zero.
241/// - `STATISTICS` filtered to `INDEX_NAME='PRIMARY'` with `SEQ_IN_INDEX=1` and a
242///   second probe ensuring no `SEQ_IN_INDEX=2` row exists — single-column PK only.
243///
244/// `qualified_table` is `<schema>.<table>` or bare `<table>` (resolved under the
245/// current database for the connection). Same strict ident rules as the YAML
246/// `table:` shortcut so the SQL stays trivially safe.
247pub(crate) fn introspect_mysql_table_for_chunking(
248    url: &str,
249    tls: Option<&TlsConfig>,
250    qualified_table: &str,
251) -> Result<crate::source::TableIntrospection> {
252    let pool = connect_pool(url, tls)?;
253    let mut conn = pool.get_conn()?;
254    let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
255    let default_db = default_db.unwrap_or_default();
256
257    let (schema, table) = match qualified_table.split_once('.') {
258        Some((s, t)) => (s.to_string(), t.to_string()),
259        None => (default_db, qualified_table.to_string()),
260    };
261
262    // (1) Row count + avg row bytes. AVG_ROW_LENGTH already accounts for
263    // overflow pages on InnoDB, so we use it directly rather than dividing
264    // DATA_LENGTH by TABLE_ROWS (which under-counts for tables with TOAST-like
265    // overflow). Fall back to division when AVG_ROW_LENGTH is 0.
266    let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
267        "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
268                CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
269                CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
270         FROM information_schema.TABLES \
271         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
272        (&schema, &table),
273    )?;
274    let (row_estimate, avg_row_bytes) = match row_stats {
275        Some((rows, avg, data_len)) => {
276            let row_count = rows.max(0);
277            let raw_per_row = if avg > 0 {
278                Some(avg)
279            } else if row_count > 0 {
280                Some(data_len / row_count)
281            } else {
282                None
283            };
284            // InnoDB stores TEXT/BLOB > ~768 B off-page in 16 KB BLOB pages,
285            // and `AVG_ROW_LENGTH` counts the allocated page bytes — not the
286            // actual content. On wide-text workloads (CMS bodies, JSON logs,
287            // audit trails) this inflates the per-row estimate 3-5× compared
288            // to what the client driver actually buffers over the wire.
289            //
290            // We empirically divide by 3 above an 8 KB threshold. Below 8 KB
291            // a row fits inline with no overflow, so the raw figure is
292            // accurate. Above it, dividing by 3 brings content_items' 41 KB
293            // estimate down to ~14 KB — still conservative vs the ~10 KB the
294            // PG side reports for the same payload via `pg_total_relation_size`.
295            //
296            // Pilots who want exact control can set `chunk_size:` explicitly
297            // (it always wins over the budget-derived size).
298            let per_row = raw_per_row.map(correct_innodb_avg_row_length);
299            (row_count, per_row.filter(|b| *b > 0))
300        }
301        None => (0, None),
302    };
303
304    // (2) Single-column int PK probe. STATISTICS has one row per (column,
305    // index) so we filter to PRIMARY + SEQ_IN_INDEX=1 and then check that
306    // the PRIMARY index has no SEQ_IN_INDEX=2 row (composite).
307    let pk_first: Option<(String,)> = conn.exec_first(
308        "SELECT COLUMN_NAME \
309         FROM information_schema.STATISTICS \
310         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
311        (&schema, &table),
312    )?;
313    let single_int_pk = if let Some((col,)) = pk_first {
314        let composite: Option<(String,)> = conn.exec_first(
315            "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
316             WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
317             LIMIT 1",
318            (&schema, &table),
319        )?;
320        if composite.is_some() {
321            log::debug!(
322                "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
323            );
324            None
325        } else {
326            // Column type must be integer-family for safe range chunking.
327            let type_row: Option<(String,)> = conn.exec_first(
328                "SELECT DATA_TYPE FROM information_schema.COLUMNS \
329                 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
330                (&schema, &table, &col),
331            )?;
332            match type_row.map(|(t,)| t.to_ascii_lowercase()) {
333                Some(t)
334                    if matches!(
335                        t.as_str(),
336                        "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
337                    ) =>
338                {
339                    Some(col)
340                }
341                Some(t) => {
342                    log::debug!(
343                        "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
344                    );
345                    None
346                }
347                None => None,
348            }
349        }
350    } else {
351        None
352    };
353
354    // (3) Keyset keys (OPT-4): single-column, NOT NULL, UNIQUE index columns —
355    // usable as a seek-pagination key. NON_UNIQUE=0 filters to unique indexes
356    // (PRIMARY included); SEQ_IN_INDEX=1 with no SEQ_IN_INDEX=2 row keeps only
357    // single-column indexes; IS_NULLABLE='NO' guarantees `> last` never has to
358    // reason about NULL ordering. Index-backed by definition, so keyset's
359    // `ORDER BY key LIMIT n` is a range scan, not a filesort.
360    let keyset_rows: Vec<(String, String, String)> = conn.exec(
361        "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
362         FROM information_schema.STATISTICS s \
363         JOIN information_schema.COLUMNS c \
364           ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
365              AND c.COLUMN_NAME = s.COLUMN_NAME \
366         WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
367           AND s.SEQ_IN_INDEX = 1 \
368           AND NOT EXISTS ( \
369             SELECT 1 FROM information_schema.STATISTICS s2 \
370             WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
371               AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
372        (&schema, &table),
373    )?;
374    let mut keyset_keys: Vec<String> = Vec::new();
375    // PRIMARY first (most efficient — clustered), then other unique indexes.
376    for primary in [true, false] {
377        for (col, index_name, is_nullable) in &keyset_rows {
378            let is_primary = index_name == "PRIMARY";
379            if is_primary == primary
380                && is_nullable.eq_ignore_ascii_case("NO")
381                && !keyset_keys.contains(col)
382            {
383                keyset_keys.push(col.clone());
384            }
385        }
386    }
387
388    Ok(crate::source::TableIntrospection {
389        single_int_pk,
390        keyset_keys,
391        row_estimate,
392        avg_row_bytes,
393    })
394}
395
396fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
397    let mut ssl = SslOpts::default();
398    if let Some(path) = &cfg.ca_file {
399        ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
400    }
401    match cfg.mode {
402        TlsMode::Require => {
403            ssl = ssl
404                .with_danger_accept_invalid_certs(true)
405                .with_danger_skip_domain_validation(true);
406        }
407        TlsMode::VerifyCa => {
408            ssl = ssl.with_danger_skip_domain_validation(true);
409        }
410        TlsMode::VerifyFull => {
411            // Strict: verify chain + hostname.
412        }
413        TlsMode::Disable => {
414            // Never invoked: gated in connect_with_tls.
415        }
416    }
417    if cfg.accept_invalid_certs {
418        ssl = ssl.with_danger_accept_invalid_certs(true);
419    }
420    if cfg.accept_invalid_hostnames {
421        ssl = ssl.with_danger_skip_domain_validation(true);
422    }
423    ssl
424}
425
426/// RAII reset of the per-connection session state the export mutates
427/// (`time_zone`, optionally `max_execution_time`) — the MySQL analogue of
428/// `postgres::PgTxnGuard` (Epic 18 B1).
429///
430/// MySQL hands connections back to the `mysql` crate's pool on drop, and may sit
431/// behind ProxySQL / MaxScale that reuse a physical backend across logical
432/// connections. The previous end-of-`export()` reset covered success and the
433/// `Err` return (no `?`), but **not** a panic mid-export, nor an early `?` on
434/// the `SET max_execution_time` itself (MariaDB spells it `max_statement_time`,
435/// so that SET errors — and `time_zone`, already set, would leak). Arming the
436/// reset on `Drop` closes both: whatever exit path the export takes, the
437/// connection is clean before it returns to the pool.
438struct MysqlSessionGuard<'a> {
439    conn: &'a mut mysql::PooledConn,
440    reset_max_exec: bool,
441}
442
443impl<'a> MysqlSessionGuard<'a> {
444    /// Apply the session SETs and arm the reset. `time_zone` is always set (UTC
445    /// normalisation so Parquet writes `isAdjustedToUTC=true`); the guard is
446    /// constructed *immediately* after it, so if the later `max_execution_time`
447    /// SET fails (or anything panics), `Drop` still resets `time_zone`.
448    fn apply(conn: &'a mut mysql::PooledConn, max_exec_ms: Option<u64>) -> Result<Self> {
449        conn.query_drop("SET time_zone = '+00:00'")?;
450        let mut guard = Self {
451            conn,
452            reset_max_exec: false,
453        };
454        if let Some(ms) = max_exec_ms {
455            guard
456                .conn
457                .query_drop(format!("SET SESSION max_execution_time = {ms}"))?;
458            guard.reset_max_exec = true;
459        }
460        Ok(guard)
461    }
462
463    fn conn(&mut self) -> &mut mysql::PooledConn {
464        self.conn
465    }
466}
467
468impl Drop for MysqlSessionGuard<'_> {
469    fn drop(&mut self) {
470        // Best-effort; the connection is about to return to the pool either way.
471        let _ = self.conn.query_drop("SET time_zone = @@global.time_zone");
472        if self.reset_max_exec {
473            let _ = self.conn.query_drop("SET SESSION max_execution_time = 0");
474        }
475    }
476}
477
478/// Execute the MySQL query and stream results to sink.
479///
480/// Session-state cleanup (`time_zone`, `max_execution_time`) is handled by the
481/// caller's [`MysqlSessionGuard`], which resets it on `Drop` regardless of how
482/// this function exits (success, `Err`, or panic).
483///
484/// `sample_pool`: when `tuning.adaptive` is true, a clone of the source pool used
485/// to obtain a second connection for extraction-pressure sampling without interfering
486/// with the streaming result set on `conn`.
487fn mysql_run_export(
488    conn: &mut mysql::PooledConn,
489    sample_pool: Option<Pool>,
490    sql: &str,
491    cursor_param: Option<&str>,
492    tuning: &SourceTuning,
493    column_overrides: &ColumnOverrides,
494    sink: &mut dyn super::BatchSink,
495) -> Result<usize> {
496    // SecOps: cursor value is bound via exec_iter rather than string-interpolated.
497    // Using exec_iter uniformly (even with empty params) keeps match arms
498    // type-compatible — query_iter returns a Text-protocol result, exec_iter Binary.
499    let mut result = match cursor_param {
500        Some(val) => conn.exec_iter(sql, (val,))?,
501        None => conn.exec_iter(sql, ())?,
502    };
503    let columns = result.columns().as_ref().to_vec();
504
505    // Compute TypeMappings once; derive both the Arrow schema and the
506    // per-column DataType vec from the same source so they can never diverge.
507    let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
508    let schema = Arc::new(schema);
509
510    sink.on_schema(schema.clone())?;
511
512    // PG path uses `work_mem × 0.7 / row_bytes` for FETCH N — the analogous
513    // bottleneck on MySQL is *our* `row_buf` accumulator. The mysql crate
514    // streams rows from the wire one-at-a-time, but we pile up `effective_bs`
515    // of them in a `Vec<Row>` before flushing to Arrow → for `batch_size: 50000`
516    // (fast profile) on content_items that's ~650 MB just for the row_buf,
517    // plus another ~650 MB for the Arrow batch it feeds — RSS scales with
518    // `batch_size`, not chunk size.
519    //
520    // Fix: start with a small probe (`PROBE_BATCH_SIZE`), measure the actual
521    // Arrow bytes per row after the first batch, then cap `effective_bs` so
522    // each flush fits in roughly `MYSQL_BATCH_TARGET_MB` of Arrow memory.
523    // Caller's `batch_size_memory_mb` wins when set; the default is 64 MB —
524    // chosen to keep peak RSS well under 200 MB on wide-row tables while
525    // keeping batches large enough to be efficient for the parquet writer.
526    let configured_batch_size = tuning.effective_batch_size(Some(&schema));
527    // Shared batch-size state machine (probe → memory-cap → adaptive → throttle);
528    // MySQL provides only the row source + the target-MB cap formula below.
529    let mut ctl = AdaptiveBatchController::new(tuning, configured_batch_size);
530    ctl.seed_pressure(if tuning.adaptive {
531        sample_pool
532            .as_ref()
533            .and_then(mysql_sample_extraction_pressure)
534    } else {
535        None
536    });
537    let row_set = result
538        .iter()
539        .ok_or_else(|| anyhow::anyhow!("no result set"))?;
540    let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(ctl.target());
541    let mut total_rows: usize = 0;
542    let mut memory_cap_applied = false;
543    // Per-value ceiling (MB→bytes; `0`/None disables), enforced pre-allocation
544    // inside the batch builder so an oversized cell bails before Arrow reserves
545    // the buffer. Same source of truth as the sink's backstop guard.
546    let max_value_bytes = tuning.max_value_bytes();
547
548    for row_result in row_set {
549        let row = row_result?;
550        row_buf.push(row);
551
552        if row_buf.len() >= ctl.target() {
553            total_rows += row_buf.len();
554            let batch =
555                rows_to_record_batch_typed(&schema, &arrow_types, &row_buf, max_value_bytes)?;
556            let batch_rows = row_buf.len();
557            row_buf.clear();
558
559            // After the first (probe-sized) batch we know how many bytes per
560            // row Arrow actually uses. Cap subsequent flushes to a memory
561            // target. The controller clamps it to the configured `batch_size`.
562            if !memory_cap_applied && batch_rows > 0 {
563                let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
564                let arrow_per_row = (arrow_bytes / batch_rows).max(64);
565                let target_mb = tuning
566                    .batch_size_memory_mb
567                    .unwrap_or(DEFAULT_BATCH_TARGET_MB);
568                let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
569                if let Some(new) = ctl.apply_memory_cap(safe) {
570                    log::info!(
571                        "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size → {} (configured={})",
572                        arrow_per_row,
573                        target_mb,
574                        new,
575                        configured_batch_size
576                    );
577                    row_buf.reserve(new.saturating_sub(row_buf.capacity()));
578                }
579                memory_cap_applied = true;
580            }
581
582            sink.on_batch(&batch)?;
583
584            if let Some((new, under_pressure)) = ctl.after_batch(|| {
585                sample_pool
586                    .as_ref()
587                    .and_then(mysql_sample_extraction_pressure)
588            }) {
589                log::info!(
590                    "adaptive batch size → {} ({})",
591                    new,
592                    if under_pressure {
593                        "pressure"
594                    } else {
595                        "recovery"
596                    }
597                );
598            }
599
600            log::info!("fetched {} rows so far...", total_rows);
601            ctl.throttle(batch.num_rows());
602        }
603    }
604
605    if !row_buf.is_empty() {
606        total_rows += row_buf.len();
607        let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf, max_value_bytes)?;
608        sink.on_batch(&batch)?;
609    }
610
611    drop(result);
612    Ok(total_rows)
613}
614
615impl super::Source for MysqlSource {
616    fn export(
617        &mut self,
618        request: &super::ExportRequest<'_>,
619        sink: &mut dyn super::BatchSink,
620    ) -> Result<()> {
621        let built = build_export_query(request, SourceType::Mysql);
622        log::debug!(
623            "executing query (connection={}): {}",
624            self.proxy_kind.log_label(),
625            built.sql
626        );
627
628        let mut conn = self.pool.get_conn()?;
629
630        // Per-connection session state, reset on `Drop` (Epic 18 B1) so a pooled
631        // connection — returned to the mysql-crate pool or reused behind
632        // ProxySQL/MaxScale — never carries our settings into the next checkout,
633        // even on a panic or an early return. `time_zone` normalises TIMESTAMP to
634        // UTC (Parquet `isAdjustedToUTC=true`); `max_execution_time` bounds the
635        // statement when a timeout is configured.
636        let max_exec_ms = (request.tuning.statement_timeout_s > 0)
637            .then(|| request.tuning.statement_timeout_s * 1000);
638        let mut guard = MysqlSessionGuard::apply(&mut conn, max_exec_ms)?;
639
640        let sample_pool = if request.tuning.adaptive {
641            Some(self.pool.clone())
642        } else {
643            None
644        };
645        let result = mysql_run_export(
646            guard.conn(),
647            sample_pool,
648            &built.sql,
649            built.cursor_param.as_deref(),
650            request.tuning,
651            request.column_overrides,
652            sink,
653        );
654        // Reset now (success or `Err`); the `Drop` impl is the backstop for a
655        // panic or early return inside `mysql_run_export`.
656        drop(guard);
657
658        // The empty-result fallback to `Schema::empty()` lives here for
659        // parity with the PG implementation, even though `exec_iter` always
660        // returns the column metadata before yielding any rows so
661        // mysql_run_export's `on_schema` already fired.
662        let total_rows = result?;
663        if total_rows == 0 {
664            sink.on_schema(Arc::new(Schema::empty()))?;
665        }
666        log::info!("total: {} rows", total_rows);
667        Ok(())
668    }
669
670    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
671        let mut conn = self.pool.get_conn()?;
672        let row: Option<mysql::Row> = conn.query_first(sql)?;
673        match row {
674            Some(r) => {
675                let val: Option<mysql::Value> = r.get(0);
676                match val {
677                    Some(mysql::Value::Bytes(b)) => {
678                        Ok(Some(String::from_utf8_lossy(&b).into_owned()))
679                    }
680                    Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
681                    Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
682                    Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
683                    Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
684                    _ => Ok(None),
685                }
686            }
687            None => Ok(None),
688        }
689    }
690
691    fn type_mappings(
692        &mut self,
693        query: &str,
694        column_overrides: &ColumnOverrides,
695    ) -> Result<Vec<crate::types::TypeMapping>> {
696        let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
697        let mut conn = self.pool.get_conn()?;
698        let result = conn.exec_iter(&wrapped, ())?;
699        let columns = result.columns().as_ref().to_vec();
700        drop(result);
701        let mappings = columns
702            .iter()
703            .map(|col| {
704                let rivet =
705                    crate::types::resolve_or(column_overrides, col.name_str().as_ref(), || {
706                        mysql_type_to_rivet(col)
707                    });
708                let source = crate::types::SourceColumn::simple(
709                    col.name_str().as_ref(),
710                    mysql_native_type_name(col),
711                    true,
712                );
713                crate::types::TypeMapping::from_source(&source, rivet)
714            })
715            .collect();
716        Ok(mappings)
717    }
718
719    /// Governor pressure proxy (Epic 18 C1): the same monotonic
720    /// extraction-pressure sum the adaptive batch loop samples
721    /// (`Created_tmp_disk_tables` + `Innodb_buffer_pool_wait_free`). Rising
722    /// between samples means the extraction is spilling a temp table to disk or
723    /// stalling on buffer-pool memory — the MySQL analogue of PG `temp_bytes`.
724    fn sample_pressure(&mut self) -> Option<u64> {
725        mysql_sample_extraction_pressure(&self.pool)
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
732
733    // Proxy classifier tests live in `proxy.rs` alongside the classifier.
734
735    // ── bit_bytes_to_u64 (lives in arrow_convert.rs, exported pub(super)) ──
736
737    #[test]
738    fn bit_bytes_single_byte() {
739        assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
740        assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
741        assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
742    }
743
744    #[test]
745    fn bit_bytes_multi_byte() {
746        assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
747        assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
748    }
749
750    #[test]
751    fn bit_bytes_empty() {
752        assert_eq!(bit_bytes_to_u64(&[]), 0);
753    }
754
755    #[test]
756    fn bit_bytes_ascii_digit_bytes_are_bits_not_text() {
757        // Regression (mysql-bit): BIT bytes that happen to be ASCII digits are
758        // still big-endian bits — never decimal text.
759        assert_eq!(bit_bytes_to_u64(&[0x39]), 57); // "9" as text, BIT(8) 57
760        assert_eq!(bit_bytes_to_u64(&[0x31, 0x32]), 0x3132); // b"12" → 12594
761        assert_eq!(bit_bytes_to_u64(&[0x31, 0xFF]), 12799); // digit head, non-digit tail
762    }
763
764    // ── InnoDB AVG_ROW_LENGTH correction ────────────────────────────────
765
766    #[test]
767    fn innodb_correction_below_threshold_is_identity() {
768        assert_eq!(correct_innodb_avg_row_length(82), 82);
769        assert_eq!(correct_innodb_avg_row_length(314), 314);
770        assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
771        assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
772    }
773
774    #[test]
775    fn innodb_correction_above_threshold_divides_by_three() {
776        assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
777        assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
778    }
779
780    #[test]
781    fn innodb_correction_does_not_undershoot_floor() {
782        let just_above = 8 * 1024 + 1;
783        let divided = correct_innodb_avg_row_length(just_above);
784        assert!(divided >= 4 * 1024, "got {divided}");
785    }
786}