Skip to main content

rivet/source/mysql/
mod.rs

1//! MySQL `Source` implementation.
2//!
3//! Module layout (mirrors `postgres/`):
4//!
5//! - `mod.rs` (this file) — `MysqlSource` struct + connect/TLS path, the
6//!   `InnoDB_log_waits` sampler, the `lean_pool_opts` / `connect_pool` /
7//!   `build_mysql_ssl_opts` helpers, `introspect_mysql_table_for_chunking`
8//!   together with the InnoDB `AVG_ROW_LENGTH` correction, the cursor-bound
9//!   `exec_iter` export loop (`mysql_run_export`), and the `Source` trait impl.
10//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline:
11//!   `mysql_type_to_rivet` + `mysql_native_type_name`,
12//!   `mysql_schema_and_arrow_types`, BIT / TIME / DECIMAL decoders, and the
13//!   array builders. Kept in a sibling because it is the largest
14//!   single-purpose cluster in this driver (~510 LoC) and has zero reverse
15//!   dependency back into the connection / pool / cursor layer.
16//! - [`proxy`] — `MysqlProxyKind` enum, the pure `classify_mysql_proxy`
17//!   classifier, the I/O wrapper `detect_mysql_proxy_kind`, and
18//!   `warn_proxy_kind`. Detection runs once at connect time; the classifier
19//!   is exhaustively unit-tested in isolation (no live MySQL needed).
20
21mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::query::build_export_query;
33use crate::tuning::{ADAPTIVE_SAMPLE_INTERVAL, SourceTuning, next_adaptive_batch_size};
34use crate::types::ColumnOverrides;
35
36use arrow_convert::{
37    mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
38    rows_to_record_batch_typed,
39};
40// `bit_bytes_to_u64` is only referenced by the `tests` module below — gate the
41// re-import on `cfg(test)` so non-test builds don't see an unused-import warning.
42#[cfg(test)]
43use arrow_convert::bit_bytes_to_u64;
44use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
45
46// Re-exported so external code (`tests/live_pool_safety.rs`) can still write
47// `use rivet::source::mysql::MysqlProxyKind` after the proxy block moved to
48// the `proxy` submodule.
49pub use proxy::MysqlProxyKind;
50
51pub struct MysqlSource {
52    pool: Pool,
53    proxy_kind: MysqlProxyKind,
54}
55
56/// Pool options that prevent eager pre-connection. The default mysql::Pool
57/// opens `min=10` connections immediately, which overflows MySQL's
58/// max_connections when many parallel exports run simultaneously.
59fn lean_pool_opts() -> PoolOpts {
60    PoolOpts::default()
61        .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
62}
63
64/// Sample the global `Innodb_log_waits` counter — increments when InnoDB has to
65/// wait for redo-log buffer space, indicating write pressure.
66fn mysql_sample_innodb_log_waits(pool: &Pool) -> Option<u64> {
67    let mut conn = pool.get_conn().ok()?;
68    conn.query_first::<(String, u64), _>("SHOW GLOBAL STATUS LIKE 'Innodb_log_waits'")
69        .ok()
70        .flatten()
71        .map(|(_, v)| v)
72}
73
74impl MysqlSource {
75    /// Build a source from an existing pool. Useful in tests that need to
76    /// share the pool with post-export state inspection.
77    #[allow(dead_code)]
78    pub fn from_pool(pool: Pool) -> Self {
79        let proxy_kind = detect_mysql_proxy_kind(&pool);
80        warn_proxy_kind(proxy_kind);
81        Self { pool, proxy_kind }
82    }
83
84    /// Connect with no transport security (legacy path).
85    pub fn connect(url: &str) -> Result<Self> {
86        let opts =
87            Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
88        let pool = Pool::new(opts)?;
89        let proxy_kind = detect_mysql_proxy_kind(&pool);
90        warn_proxy_kind(proxy_kind);
91        Ok(Self { pool, proxy_kind })
92    }
93
94    /// Connect honoring the user's [`TlsConfig`].
95    pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
96        match tls {
97            Some(cfg) if cfg.mode.is_enforced() => {
98                let base = Opts::from_url(url)?;
99                let ssl = build_mysql_ssl_opts(cfg);
100                let opts = Opts::from(
101                    OptsBuilder::from_opts(base)
102                        .ssl_opts(Some(ssl))
103                        .pool_opts(lean_pool_opts()),
104                );
105                let pool = Pool::new(opts)?;
106                let proxy_kind = detect_mysql_proxy_kind(&pool);
107                warn_proxy_kind(proxy_kind);
108                Ok(Self { pool, proxy_kind })
109            }
110            _ => Self::connect(url),
111        }
112    }
113
114    /// Expose the proxy classification for diagnostic tools (preflight,
115    /// integration tests). Not part of the public Source trait — same
116    /// internal-may-change contract as the rest of `rivet::source::mysql::*`.
117    ///
118    /// `#[allow(dead_code)]` covers the binary compilation unit; the lib +
119    /// integration tests reference this through the `rivet::source::mysql`
120    /// public surface.
121    #[allow(dead_code)]
122    pub fn proxy_kind(&self) -> MysqlProxyKind {
123        self.proxy_kind
124    }
125}
126
127/// Build a MySQL connection pool honoring the configured TLS policy.
128///
129/// Shared by preflight, doctor, init, and anywhere else we need a pool outside
130/// the `Source` trait. `tls = None` falls back to plaintext (legacy behavior).
131pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
132    match tls {
133        Some(cfg) if cfg.mode.is_enforced() => {
134            let base = Opts::from_url(url)?;
135            let ssl = build_mysql_ssl_opts(cfg);
136            let opts = Opts::from(
137                OptsBuilder::from_opts(base)
138                    .ssl_opts(Some(ssl))
139                    .pool_opts(lean_pool_opts()),
140            );
141            Ok(Pool::new(opts)?)
142        }
143        _ => {
144            let opts = Opts::from(
145                OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
146            );
147            Ok(Pool::new(opts)?)
148        }
149    }
150}
151
152/// Threshold above which `AVG_ROW_LENGTH` is treated as inflated by InnoDB BLOB
153/// overflow pages and divided down. Rows under 8 KB fit inline (no overflow),
154/// so the raw figure is accurate; above it the divisor compensates.
155const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
156
157/// Empirical divisor for InnoDB BLOB-page inflation. A wide-text row that
158/// allocates eight 16 KB overflow pages reports ~128 KB in `AVG_ROW_LENGTH`
159/// while the actual wire content is ~40 KB → factor of ~3.
160const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
161
162/// Apply the InnoDB BLOB-overflow correction to a raw `AVG_ROW_LENGTH` value.
163/// Pure function for unit testability — the live introspection helper calls
164/// this on the figure returned by `information_schema.TABLES`.
165///
166/// - Below the 8 KB threshold: raw value is accurate (no overflow).
167/// - Above: divide by 3, floored at threshold/2 so we never undershoot too far.
168fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
169    if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
170        (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
171    } else {
172        raw_bytes
173    }
174}
175
176/// Probe `information_schema` for stats chunked-mode planning needs.
177///
178/// MySQL analogue of [`crate::source::postgres::introspect_pg_table_for_chunking`]:
179/// returns the same source-neutral [`crate::source::TableIntrospection`] so
180/// `plan/build.rs` can dispatch on `source_type` and reuse the same downstream
181/// logic for chunk-column / chunk_size derivation.
182///
183/// Two queries per call, both against `information_schema` (no extra grants
184/// required for a normal app user):
185/// - `TABLES.AVG_ROW_LENGTH` + `TABLE_ROWS` for the row-size and row-count estimate.
186///   These come from `mysql.innodb_table_stats` and are only as fresh as the
187///   last `ANALYZE TABLE` / autostat run. Empty / unanalysed → zero.
188/// - `STATISTICS` filtered to `INDEX_NAME='PRIMARY'` with `SEQ_IN_INDEX=1` and a
189///   second probe ensuring no `SEQ_IN_INDEX=2` row exists — single-column PK only.
190///
191/// `qualified_table` is `<schema>.<table>` or bare `<table>` (resolved under the
192/// current database for the connection). Same strict ident rules as the YAML
193/// `table:` shortcut so the SQL stays trivially safe.
194pub(crate) fn introspect_mysql_table_for_chunking(
195    url: &str,
196    tls: Option<&TlsConfig>,
197    qualified_table: &str,
198) -> Result<crate::source::TableIntrospection> {
199    let pool = connect_pool(url, tls)?;
200    let mut conn = pool.get_conn()?;
201    let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
202    let default_db = default_db.unwrap_or_default();
203
204    let (schema, table) = match qualified_table.split_once('.') {
205        Some((s, t)) => (s.to_string(), t.to_string()),
206        None => (default_db, qualified_table.to_string()),
207    };
208
209    // (1) Row count + avg row bytes. AVG_ROW_LENGTH already accounts for
210    // overflow pages on InnoDB, so we use it directly rather than dividing
211    // DATA_LENGTH by TABLE_ROWS (which under-counts for tables with TOAST-like
212    // overflow). Fall back to division when AVG_ROW_LENGTH is 0.
213    let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
214        "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
215                CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
216                CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
217         FROM information_schema.TABLES \
218         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
219        (&schema, &table),
220    )?;
221    let (row_estimate, avg_row_bytes) = match row_stats {
222        Some((rows, avg, data_len)) => {
223            let row_count = rows.max(0);
224            let raw_per_row = if avg > 0 {
225                Some(avg)
226            } else if row_count > 0 {
227                Some(data_len / row_count)
228            } else {
229                None
230            };
231            // InnoDB stores TEXT/BLOB > ~768 B off-page in 16 KB BLOB pages,
232            // and `AVG_ROW_LENGTH` counts the allocated page bytes — not the
233            // actual content. On wide-text workloads (CMS bodies, JSON logs,
234            // audit trails) this inflates the per-row estimate 3-5× compared
235            // to what the client driver actually buffers over the wire.
236            //
237            // We empirically divide by 3 above an 8 KB threshold. Below 8 KB
238            // a row fits inline with no overflow, so the raw figure is
239            // accurate. Above it, dividing by 3 brings content_items' 41 KB
240            // estimate down to ~14 KB — still conservative vs the ~10 KB the
241            // PG side reports for the same payload via `pg_total_relation_size`.
242            //
243            // Pilots who want exact control can set `chunk_size:` explicitly
244            // (it always wins over the budget-derived size).
245            let per_row = raw_per_row.map(correct_innodb_avg_row_length);
246            (row_count, per_row.filter(|b| *b > 0))
247        }
248        None => (0, None),
249    };
250
251    // (2) Single-column int PK probe. STATISTICS has one row per (column,
252    // index) so we filter to PRIMARY + SEQ_IN_INDEX=1 and then check that
253    // the PRIMARY index has no SEQ_IN_INDEX=2 row (composite).
254    let pk_first: Option<(String,)> = conn.exec_first(
255        "SELECT COLUMN_NAME \
256         FROM information_schema.STATISTICS \
257         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
258        (&schema, &table),
259    )?;
260    let single_int_pk = if let Some((col,)) = pk_first {
261        let composite: Option<(String,)> = conn.exec_first(
262            "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
263             WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
264             LIMIT 1",
265            (&schema, &table),
266        )?;
267        if composite.is_some() {
268            log::debug!(
269                "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
270            );
271            None
272        } else {
273            // Column type must be integer-family for safe range chunking.
274            let type_row: Option<(String,)> = conn.exec_first(
275                "SELECT DATA_TYPE FROM information_schema.COLUMNS \
276                 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
277                (&schema, &table, &col),
278            )?;
279            match type_row.map(|(t,)| t.to_ascii_lowercase()) {
280                Some(t)
281                    if matches!(
282                        t.as_str(),
283                        "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
284                    ) =>
285                {
286                    Some(col)
287                }
288                Some(t) => {
289                    log::debug!(
290                        "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
291                    );
292                    None
293                }
294                None => None,
295            }
296        }
297    } else {
298        None
299    };
300
301    // (3) Keyset keys (OPT-4): single-column, NOT NULL, UNIQUE index columns —
302    // usable as a seek-pagination key. NON_UNIQUE=0 filters to unique indexes
303    // (PRIMARY included); SEQ_IN_INDEX=1 with no SEQ_IN_INDEX=2 row keeps only
304    // single-column indexes; IS_NULLABLE='NO' guarantees `> last` never has to
305    // reason about NULL ordering. Index-backed by definition, so keyset's
306    // `ORDER BY key LIMIT n` is a range scan, not a filesort.
307    let keyset_rows: Vec<(String, String, String)> = conn.exec(
308        "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
309         FROM information_schema.STATISTICS s \
310         JOIN information_schema.COLUMNS c \
311           ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
312              AND c.COLUMN_NAME = s.COLUMN_NAME \
313         WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
314           AND s.SEQ_IN_INDEX = 1 \
315           AND NOT EXISTS ( \
316             SELECT 1 FROM information_schema.STATISTICS s2 \
317             WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
318               AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
319        (&schema, &table),
320    )?;
321    let mut keyset_keys: Vec<String> = Vec::new();
322    // PRIMARY first (most efficient — clustered), then other unique indexes.
323    for primary in [true, false] {
324        for (col, index_name, is_nullable) in &keyset_rows {
325            let is_primary = index_name == "PRIMARY";
326            if is_primary == primary
327                && is_nullable.eq_ignore_ascii_case("NO")
328                && !keyset_keys.contains(col)
329            {
330                keyset_keys.push(col.clone());
331            }
332        }
333    }
334
335    Ok(crate::source::TableIntrospection {
336        single_int_pk,
337        keyset_keys,
338        row_estimate,
339        avg_row_bytes,
340    })
341}
342
343fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
344    let mut ssl = SslOpts::default();
345    if let Some(path) = &cfg.ca_file {
346        ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
347    }
348    match cfg.mode {
349        TlsMode::Require => {
350            ssl = ssl
351                .with_danger_accept_invalid_certs(true)
352                .with_danger_skip_domain_validation(true);
353        }
354        TlsMode::VerifyCa => {
355            ssl = ssl.with_danger_skip_domain_validation(true);
356        }
357        TlsMode::VerifyFull => {
358            // Strict: verify chain + hostname.
359        }
360        TlsMode::Disable => {
361            // Never invoked: gated in connect_with_tls.
362        }
363    }
364    if cfg.accept_invalid_certs {
365        ssl = ssl.with_danger_accept_invalid_certs(true);
366    }
367    if cfg.accept_invalid_hostnames {
368        ssl = ssl.with_danger_skip_domain_validation(true);
369    }
370    ssl
371}
372
373/// Execute the MySQL query and stream results to sink.
374///
375/// Separated from export() so session-state cleanup (time_zone, max_execution_time)
376/// can run unconditionally in the caller regardless of success or failure.
377///
378/// `sample_pool`: when `tuning.adaptive` is true, a clone of the source pool used
379/// to obtain a second connection for `Innodb_log_waits` sampling without interfering
380/// with the streaming result set on `conn`.
381fn mysql_run_export(
382    conn: &mut mysql::PooledConn,
383    sample_pool: Option<Pool>,
384    sql: &str,
385    cursor_param: Option<&str>,
386    tuning: &SourceTuning,
387    column_overrides: &ColumnOverrides,
388    sink: &mut dyn super::BatchSink,
389) -> Result<usize> {
390    // SecOps: cursor value is bound via exec_iter rather than string-interpolated.
391    // Using exec_iter uniformly (even with empty params) keeps match arms
392    // type-compatible — query_iter returns a Text-protocol result, exec_iter Binary.
393    let mut result = match cursor_param {
394        Some(val) => conn.exec_iter(sql, (val,))?,
395        None => conn.exec_iter(sql, ())?,
396    };
397    let columns = result.columns().as_ref().to_vec();
398
399    // Compute TypeMappings once; derive both the Arrow schema and the
400    // per-column DataType vec from the same source so they can never diverge.
401    let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
402    let schema = Arc::new(schema);
403
404    sink.on_schema(schema.clone())?;
405
406    // PG path uses `work_mem × 0.7 / row_bytes` for FETCH N — the analogous
407    // bottleneck on MySQL is *our* `row_buf` accumulator. The mysql crate
408    // streams rows from the wire one-at-a-time, but we pile up `effective_bs`
409    // of them in a `Vec<Row>` before flushing to Arrow → for `batch_size: 50000`
410    // (fast profile) on content_items that's ~650 MB just for the row_buf,
411    // plus another ~650 MB for the Arrow batch it feeds — RSS scales with
412    // `batch_size`, not chunk size.
413    //
414    // Fix: start with a small probe (`PROBE_BATCH_SIZE`), measure the actual
415    // Arrow bytes per row after the first batch, then cap `effective_bs` so
416    // each flush fits in roughly `MYSQL_BATCH_TARGET_MB` of Arrow memory.
417    // Caller's `batch_size_memory_mb` wins when set; the default is 64 MB —
418    // chosen to keep peak RSS well under 200 MB on wide-row tables while
419    // keeping batches large enough to be efficient for the parquet writer.
420    const PROBE_BATCH_SIZE: usize = 500;
421    const MYSQL_BATCH_TARGET_MB_DEFAULT: usize = 64;
422
423    let configured_batch_size = tuning.effective_batch_size(Some(&schema));
424    let mut effective_bs = configured_batch_size.min(PROBE_BATCH_SIZE);
425    let mut base_fetch_size = effective_bs;
426    let mut adaptive_last_waits: Option<u64> = if tuning.adaptive {
427        sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits)
428    } else {
429        None
430    };
431    let mut batch_count: usize = 0;
432    let row_set = result
433        .iter()
434        .ok_or_else(|| anyhow::anyhow!("no result set"))?;
435    let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(effective_bs);
436    let mut total_rows: usize = 0;
437    let mut memory_cap_applied = false;
438
439    for row_result in row_set {
440        let row = row_result?;
441        row_buf.push(row);
442
443        if row_buf.len() >= effective_bs {
444            total_rows += row_buf.len();
445            batch_count += 1;
446            let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
447            let batch_rows = row_buf.len();
448            row_buf.clear();
449
450            // After the first (probe-sized) batch we know how many bytes per
451            // row Arrow actually uses. Cap subsequent flushes to a memory
452            // target. Never exceed the user's configured `batch_size`.
453            if !memory_cap_applied && batch_rows > 0 {
454                let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
455                let arrow_per_row = (arrow_bytes / batch_rows).max(64);
456                let target_mb = tuning
457                    .batch_size_memory_mb
458                    .unwrap_or(MYSQL_BATCH_TARGET_MB_DEFAULT);
459                let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
460                let target = safe.min(configured_batch_size);
461                if target != effective_bs {
462                    log::info!(
463                        "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size {} → {} (configured={})",
464                        arrow_per_row,
465                        target_mb,
466                        effective_bs,
467                        target,
468                        configured_batch_size
469                    );
470                    effective_bs = target;
471                    base_fetch_size = effective_bs;
472                    row_buf.reserve(effective_bs.saturating_sub(row_buf.capacity()));
473                }
474                memory_cap_applied = true;
475            }
476
477            sink.on_batch(&batch)?;
478
479            if tuning.adaptive
480                && batch_count.is_multiple_of(ADAPTIVE_SAMPLE_INTERVAL)
481                && let Some(ref pool) = sample_pool
482                && let Some(cur) = mysql_sample_innodb_log_waits(pool)
483            {
484                let under_pressure = adaptive_last_waits.is_some_and(|prev| cur > prev);
485                adaptive_last_waits = Some(cur);
486                let next = next_adaptive_batch_size(effective_bs, base_fetch_size, under_pressure);
487                if next != effective_bs {
488                    effective_bs = next;
489                    log::info!(
490                        "adaptive batch size → {} ({})",
491                        effective_bs,
492                        if under_pressure {
493                            "pressure"
494                        } else {
495                            "recovery"
496                        }
497                    );
498                }
499            }
500
501            log::info!("fetched {} rows so far...", total_rows);
502
503            if tuning.throttle_ms > 0 {
504                std::thread::sleep(std::time::Duration::from_millis(tuning.throttle_ms));
505            }
506        }
507    }
508
509    if !row_buf.is_empty() {
510        total_rows += row_buf.len();
511        let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
512        sink.on_batch(&batch)?;
513    }
514
515    drop(result);
516    Ok(total_rows)
517}
518
519impl super::Source for MysqlSource {
520    fn export(
521        &mut self,
522        request: &super::ExportRequest<'_>,
523        sink: &mut dyn super::BatchSink,
524    ) -> Result<()> {
525        let built = build_export_query(request, SourceType::Mysql);
526        log::debug!(
527            "executing query (connection={}): {}",
528            self.proxy_kind.log_label(),
529            built.sql
530        );
531
532        let mut conn = self.pool.get_conn()?;
533
534        // Roadmap §13: normalize TIMESTAMP columns to UTC so Parquet writes
535        // isAdjustedToUTC=true. SET per-connection (not global) to avoid side-effects.
536        conn.query_drop("SET time_zone = '+00:00'")?;
537
538        if request.tuning.statement_timeout_s > 0 {
539            conn.query_drop(format!(
540                "SET SESSION max_execution_time = {}",
541                request.tuning.statement_timeout_s * 1000
542            ))?;
543        }
544
545        let sample_pool = if request.tuning.adaptive {
546            Some(self.pool.clone())
547        } else {
548            None
549        };
550        let result = mysql_run_export(
551            &mut conn,
552            sample_pool,
553            &built.sql,
554            built.cursor_param.as_deref(),
555            request.tuning,
556            request.column_overrides,
557            sink,
558        );
559
560        // Always reset session state before connection returns to pool,
561        // regardless of whether the export succeeded or failed.
562        let _ = conn.query_drop("SET time_zone = @@global.time_zone");
563        if request.tuning.statement_timeout_s > 0 {
564            let _ = conn.query_drop("SET SESSION max_execution_time = 0");
565        }
566
567        // The empty-result fallback to `Schema::empty()` lives here for
568        // parity with the PG implementation, even though `exec_iter` always
569        // returns the column metadata before yielding any rows so
570        // mysql_run_export's `on_schema` already fired.
571        let total_rows = result?;
572        if total_rows == 0 {
573            sink.on_schema(Arc::new(Schema::empty()))?;
574        }
575        log::info!("total: {} rows", total_rows);
576        Ok(())
577    }
578
579    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
580        let mut conn = self.pool.get_conn()?;
581        let row: Option<mysql::Row> = conn.query_first(sql)?;
582        match row {
583            Some(r) => {
584                let val: Option<mysql::Value> = r.get(0);
585                match val {
586                    Some(mysql::Value::Bytes(b)) => {
587                        Ok(Some(String::from_utf8_lossy(&b).into_owned()))
588                    }
589                    Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
590                    Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
591                    Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
592                    Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
593                    _ => Ok(None),
594                }
595            }
596            None => Ok(None),
597        }
598    }
599
600    fn type_mappings(
601        &mut self,
602        query: &str,
603        column_overrides: &ColumnOverrides,
604    ) -> Result<Vec<crate::types::TypeMapping>> {
605        let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
606        let mut conn = self.pool.get_conn()?;
607        let result = conn.exec_iter(&wrapped, ())?;
608        let columns = result.columns().as_ref().to_vec();
609        drop(result);
610        let mappings = columns
611            .iter()
612            .map(|col| {
613                let rivet =
614                    crate::types::resolve_or(column_overrides, col.name_str().as_ref(), || {
615                        mysql_type_to_rivet(col)
616                    });
617                let source = crate::types::SourceColumn::simple(
618                    col.name_str().as_ref(),
619                    mysql_native_type_name(col),
620                    true,
621                );
622                crate::types::TypeMapping::from_source(&source, rivet)
623            })
624            .collect();
625        Ok(mappings)
626    }
627
628    /// Governor pressure proxy: global `Innodb_log_waits` — the same monotonic
629    /// counter the adaptive batch loop samples. Rising between samples means
630    /// InnoDB is stalling on redo-log buffer space under write pressure.
631    fn sample_pressure(&mut self) -> Option<u64> {
632        mysql_sample_innodb_log_waits(&self.pool)
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
639
640    // Proxy classifier tests live in `proxy.rs` alongside the classifier.
641
642    // ── bit_bytes_to_u64 (lives in arrow_convert.rs, exported pub(super)) ──
643
644    #[test]
645    fn bit_bytes_single_byte() {
646        assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
647        assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
648        assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
649    }
650
651    #[test]
652    fn bit_bytes_multi_byte() {
653        assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
654        assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
655    }
656
657    #[test]
658    fn bit_bytes_empty() {
659        assert_eq!(bit_bytes_to_u64(&[]), 0);
660    }
661
662    // ── InnoDB AVG_ROW_LENGTH correction ────────────────────────────────
663
664    #[test]
665    fn innodb_correction_below_threshold_is_identity() {
666        assert_eq!(correct_innodb_avg_row_length(82), 82);
667        assert_eq!(correct_innodb_avg_row_length(314), 314);
668        assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
669        assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
670    }
671
672    #[test]
673    fn innodb_correction_above_threshold_divides_by_three() {
674        assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
675        assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
676    }
677
678    #[test]
679    fn innodb_correction_does_not_undershoot_floor() {
680        let just_above = 8 * 1024 + 1;
681        let divided = correct_innodb_avg_row_length(just_above);
682        assert!(divided >= 4 * 1024, "got {divided}");
683    }
684}