Skip to main content

rivet/source/mysql/
mod.rs

1//! MySQL `Source` implementation.
2//!
3//! Module layout (mirrors `postgres/`):
4//!
5//! - `mod.rs` (this file) — `MysqlSource` struct + connect/TLS path, the
6//!   `InnoDB_log_waits` sampler, the `lean_pool_opts` / `connect_pool` /
7//!   `build_mysql_ssl_opts` helpers, `introspect_mysql_table_for_chunking`
8//!   together with the InnoDB `AVG_ROW_LENGTH` correction, the cursor-bound
9//!   `exec_iter` export loop (`mysql_run_export`), and the `Source` trait impl.
10//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline:
11//!   `mysql_type_to_rivet` + `mysql_native_type_name`,
12//!   `mysql_schema_and_arrow_types`, BIT / TIME / DECIMAL decoders, and the
13//!   array builders. Kept in a sibling because it is the largest
14//!   single-purpose cluster in this driver (~510 LoC) and has zero reverse
15//!   dependency back into the connection / pool / cursor layer.
16//! - [`proxy`] — `MysqlProxyKind` enum, the pure `classify_mysql_proxy`
17//!   classifier, the I/O wrapper `detect_mysql_proxy_kind`, and
18//!   `warn_proxy_kind`. Detection runs once at connect time; the classifier
19//!   is exhaustively unit-tested in isolation (no live MySQL needed).
20
21mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::batch_controller::{
33    AdaptiveBatchController, DEFAULT_BATCH_TARGET_MB, PROBE_BATCH_SIZE,
34};
35use crate::source::query::build_export_query;
36use crate::tuning::SourceTuning;
37use crate::types::ColumnOverrides;
38
39use arrow_convert::{
40    mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
41    rows_to_record_batch_typed,
42};
43// `bit_bytes_to_u64` is only referenced by the `tests` module below — gate the
44// re-import on `cfg(test)` so non-test builds don't see an unused-import warning.
45#[cfg(test)]
46use arrow_convert::bit_bytes_to_u64;
47use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
48
49// Re-exported so external code (`tests/live_pool_safety.rs`) can still write
50// `use rivet::source::mysql::MysqlProxyKind` after the proxy block moved to
51// the `proxy` submodule.
52pub use proxy::MysqlProxyKind;
53
54pub struct MysqlSource {
55    pool: Pool,
56    proxy_kind: MysqlProxyKind,
57}
58
59/// Pool options that prevent eager pre-connection. The default mysql::Pool
60/// opens `min=10` connections immediately, which overflows MySQL's
61/// max_connections when many parallel exports run simultaneously.
62fn lean_pool_opts() -> PoolOpts {
63    PoolOpts::default()
64        .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
65}
66
67/// Sample the global `Innodb_log_waits` counter — increments when InnoDB has to
68/// wait for redo-log buffer space, indicating write pressure.
69fn mysql_sample_innodb_log_waits(pool: &Pool) -> Option<u64> {
70    let mut conn = pool.get_conn().ok()?;
71    conn.query_first::<(String, u64), _>("SHOW GLOBAL STATUS LIKE 'Innodb_log_waits'")
72        .ok()
73        .flatten()
74        .map(|(_, v)| v)
75}
76
77impl MysqlSource {
78    /// Build a source from an existing pool: the single place that detects the
79    /// proxy kind, warns once, and wraps the pool. The `connect*` entry points
80    /// all funnel through here (also handy in tests that share the pool for
81    /// post-export state inspection).
82    pub fn from_pool(pool: Pool) -> Self {
83        let proxy_kind = detect_mysql_proxy_kind(&pool);
84        warn_proxy_kind(proxy_kind);
85        Self { pool, proxy_kind }
86    }
87
88    /// Connect with no transport security (legacy path).
89    pub fn connect(url: &str) -> Result<Self> {
90        let opts =
91            Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
92        Ok(Self::from_pool(Pool::new(opts)?))
93    }
94
95    /// Connect honoring the user's [`TlsConfig`].
96    pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
97        match tls {
98            Some(cfg) if cfg.mode.is_enforced() => {
99                let base = Opts::from_url(url)?;
100                let ssl = build_mysql_ssl_opts(cfg);
101                let opts = Opts::from(
102                    OptsBuilder::from_opts(base)
103                        .ssl_opts(Some(ssl))
104                        .pool_opts(lean_pool_opts()),
105                );
106                Ok(Self::from_pool(Pool::new(opts)?))
107            }
108            _ => Self::connect(url),
109        }
110    }
111
112    /// Expose the proxy classification for diagnostic tools (preflight,
113    /// integration tests). Not part of the public Source trait — same
114    /// internal-may-change contract as the rest of `rivet::source::mysql::*`.
115    ///
116    /// `#[allow(dead_code)]` covers the binary compilation unit; the lib +
117    /// integration tests reference this through the `rivet::source::mysql`
118    /// public surface.
119    #[allow(dead_code)]
120    pub fn proxy_kind(&self) -> MysqlProxyKind {
121        self.proxy_kind
122    }
123}
124
125/// Build a MySQL connection pool honoring the configured TLS policy.
126///
127/// Shared by preflight, doctor, init, and anywhere else we need a pool outside
128/// the `Source` trait. `tls = None` falls back to plaintext (legacy behavior).
129pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
130    match tls {
131        Some(cfg) if cfg.mode.is_enforced() => {
132            let base = Opts::from_url(url)?;
133            let ssl = build_mysql_ssl_opts(cfg);
134            let opts = Opts::from(
135                OptsBuilder::from_opts(base)
136                    .ssl_opts(Some(ssl))
137                    .pool_opts(lean_pool_opts()),
138            );
139            Ok(Pool::new(opts)?)
140        }
141        _ => {
142            let opts = Opts::from(
143                OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
144            );
145            Ok(Pool::new(opts)?)
146        }
147    }
148}
149
150/// Threshold above which `AVG_ROW_LENGTH` is treated as inflated by InnoDB BLOB
151/// overflow pages and divided down. Rows under 8 KB fit inline (no overflow),
152/// so the raw figure is accurate; above it the divisor compensates.
153const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
154
155/// Empirical divisor for InnoDB BLOB-page inflation. A wide-text row that
156/// allocates eight 16 KB overflow pages reports ~128 KB in `AVG_ROW_LENGTH`
157/// while the actual wire content is ~40 KB → factor of ~3.
158const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
159
160/// Apply the InnoDB BLOB-overflow correction to a raw `AVG_ROW_LENGTH` value.
161/// Pure function for unit testability — the live introspection helper calls
162/// this on the figure returned by `information_schema.TABLES`.
163///
164/// - Below the 8 KB threshold: raw value is accurate (no overflow).
165/// - Above: divide by 3, floored at threshold/2 so we never undershoot too far.
166fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
167    if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
168        (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
169    } else {
170        raw_bytes
171    }
172}
173
174/// Probe `information_schema` for stats chunked-mode planning needs.
175///
176/// MySQL analogue of [`crate::source::postgres::introspect_pg_table_for_chunking`]:
177/// returns the same source-neutral [`crate::source::TableIntrospection`] so
178/// `plan/build.rs` can dispatch on `source_type` and reuse the same downstream
179/// logic for chunk-column / chunk_size derivation.
180///
181/// Two queries per call, both against `information_schema` (no extra grants
182/// required for a normal app user):
183/// - `TABLES.AVG_ROW_LENGTH` + `TABLE_ROWS` for the row-size and row-count estimate.
184///   These come from `mysql.innodb_table_stats` and are only as fresh as the
185///   last `ANALYZE TABLE` / autostat run. Empty / unanalysed → zero.
186/// - `STATISTICS` filtered to `INDEX_NAME='PRIMARY'` with `SEQ_IN_INDEX=1` and a
187///   second probe ensuring no `SEQ_IN_INDEX=2` row exists — single-column PK only.
188///
189/// `qualified_table` is `<schema>.<table>` or bare `<table>` (resolved under the
190/// current database for the connection). Same strict ident rules as the YAML
191/// `table:` shortcut so the SQL stays trivially safe.
192pub(crate) fn introspect_mysql_table_for_chunking(
193    url: &str,
194    tls: Option<&TlsConfig>,
195    qualified_table: &str,
196) -> Result<crate::source::TableIntrospection> {
197    let pool = connect_pool(url, tls)?;
198    let mut conn = pool.get_conn()?;
199    let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
200    let default_db = default_db.unwrap_or_default();
201
202    let (schema, table) = match qualified_table.split_once('.') {
203        Some((s, t)) => (s.to_string(), t.to_string()),
204        None => (default_db, qualified_table.to_string()),
205    };
206
207    // (1) Row count + avg row bytes. AVG_ROW_LENGTH already accounts for
208    // overflow pages on InnoDB, so we use it directly rather than dividing
209    // DATA_LENGTH by TABLE_ROWS (which under-counts for tables with TOAST-like
210    // overflow). Fall back to division when AVG_ROW_LENGTH is 0.
211    let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
212        "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
213                CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
214                CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
215         FROM information_schema.TABLES \
216         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
217        (&schema, &table),
218    )?;
219    let (row_estimate, avg_row_bytes) = match row_stats {
220        Some((rows, avg, data_len)) => {
221            let row_count = rows.max(0);
222            let raw_per_row = if avg > 0 {
223                Some(avg)
224            } else if row_count > 0 {
225                Some(data_len / row_count)
226            } else {
227                None
228            };
229            // InnoDB stores TEXT/BLOB > ~768 B off-page in 16 KB BLOB pages,
230            // and `AVG_ROW_LENGTH` counts the allocated page bytes — not the
231            // actual content. On wide-text workloads (CMS bodies, JSON logs,
232            // audit trails) this inflates the per-row estimate 3-5× compared
233            // to what the client driver actually buffers over the wire.
234            //
235            // We empirically divide by 3 above an 8 KB threshold. Below 8 KB
236            // a row fits inline with no overflow, so the raw figure is
237            // accurate. Above it, dividing by 3 brings content_items' 41 KB
238            // estimate down to ~14 KB — still conservative vs the ~10 KB the
239            // PG side reports for the same payload via `pg_total_relation_size`.
240            //
241            // Pilots who want exact control can set `chunk_size:` explicitly
242            // (it always wins over the budget-derived size).
243            let per_row = raw_per_row.map(correct_innodb_avg_row_length);
244            (row_count, per_row.filter(|b| *b > 0))
245        }
246        None => (0, None),
247    };
248
249    // (2) Single-column int PK probe. STATISTICS has one row per (column,
250    // index) so we filter to PRIMARY + SEQ_IN_INDEX=1 and then check that
251    // the PRIMARY index has no SEQ_IN_INDEX=2 row (composite).
252    let pk_first: Option<(String,)> = conn.exec_first(
253        "SELECT COLUMN_NAME \
254         FROM information_schema.STATISTICS \
255         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
256        (&schema, &table),
257    )?;
258    let single_int_pk = if let Some((col,)) = pk_first {
259        let composite: Option<(String,)> = conn.exec_first(
260            "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
261             WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
262             LIMIT 1",
263            (&schema, &table),
264        )?;
265        if composite.is_some() {
266            log::debug!(
267                "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
268            );
269            None
270        } else {
271            // Column type must be integer-family for safe range chunking.
272            let type_row: Option<(String,)> = conn.exec_first(
273                "SELECT DATA_TYPE FROM information_schema.COLUMNS \
274                 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
275                (&schema, &table, &col),
276            )?;
277            match type_row.map(|(t,)| t.to_ascii_lowercase()) {
278                Some(t)
279                    if matches!(
280                        t.as_str(),
281                        "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
282                    ) =>
283                {
284                    Some(col)
285                }
286                Some(t) => {
287                    log::debug!(
288                        "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
289                    );
290                    None
291                }
292                None => None,
293            }
294        }
295    } else {
296        None
297    };
298
299    // (3) Keyset keys (OPT-4): single-column, NOT NULL, UNIQUE index columns —
300    // usable as a seek-pagination key. NON_UNIQUE=0 filters to unique indexes
301    // (PRIMARY included); SEQ_IN_INDEX=1 with no SEQ_IN_INDEX=2 row keeps only
302    // single-column indexes; IS_NULLABLE='NO' guarantees `> last` never has to
303    // reason about NULL ordering. Index-backed by definition, so keyset's
304    // `ORDER BY key LIMIT n` is a range scan, not a filesort.
305    let keyset_rows: Vec<(String, String, String)> = conn.exec(
306        "SELECT s.COLUMN_NAME, s.INDEX_NAME, c.IS_NULLABLE \
307         FROM information_schema.STATISTICS s \
308         JOIN information_schema.COLUMNS c \
309           ON c.TABLE_SCHEMA = s.TABLE_SCHEMA AND c.TABLE_NAME = s.TABLE_NAME \
310              AND c.COLUMN_NAME = s.COLUMN_NAME \
311         WHERE s.TABLE_SCHEMA = ? AND s.TABLE_NAME = ? AND s.NON_UNIQUE = 0 \
312           AND s.SEQ_IN_INDEX = 1 \
313           AND NOT EXISTS ( \
314             SELECT 1 FROM information_schema.STATISTICS s2 \
315             WHERE s2.TABLE_SCHEMA = s.TABLE_SCHEMA AND s2.TABLE_NAME = s.TABLE_NAME \
316               AND s2.INDEX_NAME = s.INDEX_NAME AND s2.SEQ_IN_INDEX = 2)",
317        (&schema, &table),
318    )?;
319    let mut keyset_keys: Vec<String> = Vec::new();
320    // PRIMARY first (most efficient — clustered), then other unique indexes.
321    for primary in [true, false] {
322        for (col, index_name, is_nullable) in &keyset_rows {
323            let is_primary = index_name == "PRIMARY";
324            if is_primary == primary
325                && is_nullable.eq_ignore_ascii_case("NO")
326                && !keyset_keys.contains(col)
327            {
328                keyset_keys.push(col.clone());
329            }
330        }
331    }
332
333    Ok(crate::source::TableIntrospection {
334        single_int_pk,
335        keyset_keys,
336        row_estimate,
337        avg_row_bytes,
338    })
339}
340
341fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
342    let mut ssl = SslOpts::default();
343    if let Some(path) = &cfg.ca_file {
344        ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
345    }
346    match cfg.mode {
347        TlsMode::Require => {
348            ssl = ssl
349                .with_danger_accept_invalid_certs(true)
350                .with_danger_skip_domain_validation(true);
351        }
352        TlsMode::VerifyCa => {
353            ssl = ssl.with_danger_skip_domain_validation(true);
354        }
355        TlsMode::VerifyFull => {
356            // Strict: verify chain + hostname.
357        }
358        TlsMode::Disable => {
359            // Never invoked: gated in connect_with_tls.
360        }
361    }
362    if cfg.accept_invalid_certs {
363        ssl = ssl.with_danger_accept_invalid_certs(true);
364    }
365    if cfg.accept_invalid_hostnames {
366        ssl = ssl.with_danger_skip_domain_validation(true);
367    }
368    ssl
369}
370
371/// Execute the MySQL query and stream results to sink.
372///
373/// Separated from export() so session-state cleanup (time_zone, max_execution_time)
374/// can run unconditionally in the caller regardless of success or failure.
375///
376/// `sample_pool`: when `tuning.adaptive` is true, a clone of the source pool used
377/// to obtain a second connection for `Innodb_log_waits` sampling without interfering
378/// with the streaming result set on `conn`.
379fn mysql_run_export(
380    conn: &mut mysql::PooledConn,
381    sample_pool: Option<Pool>,
382    sql: &str,
383    cursor_param: Option<&str>,
384    tuning: &SourceTuning,
385    column_overrides: &ColumnOverrides,
386    sink: &mut dyn super::BatchSink,
387) -> Result<usize> {
388    // SecOps: cursor value is bound via exec_iter rather than string-interpolated.
389    // Using exec_iter uniformly (even with empty params) keeps match arms
390    // type-compatible — query_iter returns a Text-protocol result, exec_iter Binary.
391    let mut result = match cursor_param {
392        Some(val) => conn.exec_iter(sql, (val,))?,
393        None => conn.exec_iter(sql, ())?,
394    };
395    let columns = result.columns().as_ref().to_vec();
396
397    // Compute TypeMappings once; derive both the Arrow schema and the
398    // per-column DataType vec from the same source so they can never diverge.
399    let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
400    let schema = Arc::new(schema);
401
402    sink.on_schema(schema.clone())?;
403
404    // PG path uses `work_mem × 0.7 / row_bytes` for FETCH N — the analogous
405    // bottleneck on MySQL is *our* `row_buf` accumulator. The mysql crate
406    // streams rows from the wire one-at-a-time, but we pile up `effective_bs`
407    // of them in a `Vec<Row>` before flushing to Arrow → for `batch_size: 50000`
408    // (fast profile) on content_items that's ~650 MB just for the row_buf,
409    // plus another ~650 MB for the Arrow batch it feeds — RSS scales with
410    // `batch_size`, not chunk size.
411    //
412    // Fix: start with a small probe (`PROBE_BATCH_SIZE`), measure the actual
413    // Arrow bytes per row after the first batch, then cap `effective_bs` so
414    // each flush fits in roughly `MYSQL_BATCH_TARGET_MB` of Arrow memory.
415    // Caller's `batch_size_memory_mb` wins when set; the default is 64 MB —
416    // chosen to keep peak RSS well under 200 MB on wide-row tables while
417    // keeping batches large enough to be efficient for the parquet writer.
418    let configured_batch_size = tuning.effective_batch_size(Some(&schema));
419    // Shared batch-size state machine (probe → memory-cap → adaptive → throttle);
420    // MySQL provides only the row source + the target-MB cap formula below.
421    let mut ctl = AdaptiveBatchController::new(tuning, configured_batch_size);
422    ctl.seed_pressure(if tuning.adaptive {
423        sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits)
424    } else {
425        None
426    });
427    let row_set = result
428        .iter()
429        .ok_or_else(|| anyhow::anyhow!("no result set"))?;
430    let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(ctl.target());
431    let mut total_rows: usize = 0;
432    let mut memory_cap_applied = false;
433
434    for row_result in row_set {
435        let row = row_result?;
436        row_buf.push(row);
437
438        if row_buf.len() >= ctl.target() {
439            total_rows += row_buf.len();
440            let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
441            let batch_rows = row_buf.len();
442            row_buf.clear();
443
444            // After the first (probe-sized) batch we know how many bytes per
445            // row Arrow actually uses. Cap subsequent flushes to a memory
446            // target. The controller clamps it to the configured `batch_size`.
447            if !memory_cap_applied && batch_rows > 0 {
448                let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
449                let arrow_per_row = (arrow_bytes / batch_rows).max(64);
450                let target_mb = tuning
451                    .batch_size_memory_mb
452                    .unwrap_or(DEFAULT_BATCH_TARGET_MB);
453                let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
454                if let Some(new) = ctl.apply_memory_cap(safe) {
455                    log::info!(
456                        "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size → {} (configured={})",
457                        arrow_per_row,
458                        target_mb,
459                        new,
460                        configured_batch_size
461                    );
462                    row_buf.reserve(new.saturating_sub(row_buf.capacity()));
463                }
464                memory_cap_applied = true;
465            }
466
467            sink.on_batch(&batch)?;
468
469            if let Some((new, under_pressure)) =
470                ctl.after_batch(|| sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits))
471            {
472                log::info!(
473                    "adaptive batch size → {} ({})",
474                    new,
475                    if under_pressure {
476                        "pressure"
477                    } else {
478                        "recovery"
479                    }
480                );
481            }
482
483            log::info!("fetched {} rows so far...", total_rows);
484            ctl.throttle();
485        }
486    }
487
488    if !row_buf.is_empty() {
489        total_rows += row_buf.len();
490        let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
491        sink.on_batch(&batch)?;
492    }
493
494    drop(result);
495    Ok(total_rows)
496}
497
498impl super::Source for MysqlSource {
499    fn export(
500        &mut self,
501        request: &super::ExportRequest<'_>,
502        sink: &mut dyn super::BatchSink,
503    ) -> Result<()> {
504        let built = build_export_query(request, SourceType::Mysql);
505        log::debug!(
506            "executing query (connection={}): {}",
507            self.proxy_kind.log_label(),
508            built.sql
509        );
510
511        let mut conn = self.pool.get_conn()?;
512
513        // Roadmap §13: normalize TIMESTAMP columns to UTC so Parquet writes
514        // isAdjustedToUTC=true. SET per-connection (not global) to avoid side-effects.
515        conn.query_drop("SET time_zone = '+00:00'")?;
516
517        if request.tuning.statement_timeout_s > 0 {
518            conn.query_drop(format!(
519                "SET SESSION max_execution_time = {}",
520                request.tuning.statement_timeout_s * 1000
521            ))?;
522        }
523
524        let sample_pool = if request.tuning.adaptive {
525            Some(self.pool.clone())
526        } else {
527            None
528        };
529        let result = mysql_run_export(
530            &mut conn,
531            sample_pool,
532            &built.sql,
533            built.cursor_param.as_deref(),
534            request.tuning,
535            request.column_overrides,
536            sink,
537        );
538
539        // Always reset session state before connection returns to pool,
540        // regardless of whether the export succeeded or failed.
541        let _ = conn.query_drop("SET time_zone = @@global.time_zone");
542        if request.tuning.statement_timeout_s > 0 {
543            let _ = conn.query_drop("SET SESSION max_execution_time = 0");
544        }
545
546        // The empty-result fallback to `Schema::empty()` lives here for
547        // parity with the PG implementation, even though `exec_iter` always
548        // returns the column metadata before yielding any rows so
549        // mysql_run_export's `on_schema` already fired.
550        let total_rows = result?;
551        if total_rows == 0 {
552            sink.on_schema(Arc::new(Schema::empty()))?;
553        }
554        log::info!("total: {} rows", total_rows);
555        Ok(())
556    }
557
558    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
559        let mut conn = self.pool.get_conn()?;
560        let row: Option<mysql::Row> = conn.query_first(sql)?;
561        match row {
562            Some(r) => {
563                let val: Option<mysql::Value> = r.get(0);
564                match val {
565                    Some(mysql::Value::Bytes(b)) => {
566                        Ok(Some(String::from_utf8_lossy(&b).into_owned()))
567                    }
568                    Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
569                    Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
570                    Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
571                    Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
572                    _ => Ok(None),
573                }
574            }
575            None => Ok(None),
576        }
577    }
578
579    fn type_mappings(
580        &mut self,
581        query: &str,
582        column_overrides: &ColumnOverrides,
583    ) -> Result<Vec<crate::types::TypeMapping>> {
584        let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
585        let mut conn = self.pool.get_conn()?;
586        let result = conn.exec_iter(&wrapped, ())?;
587        let columns = result.columns().as_ref().to_vec();
588        drop(result);
589        let mappings = columns
590            .iter()
591            .map(|col| {
592                let rivet =
593                    crate::types::resolve_or(column_overrides, col.name_str().as_ref(), || {
594                        mysql_type_to_rivet(col)
595                    });
596                let source = crate::types::SourceColumn::simple(
597                    col.name_str().as_ref(),
598                    mysql_native_type_name(col),
599                    true,
600                );
601                crate::types::TypeMapping::from_source(&source, rivet)
602            })
603            .collect();
604        Ok(mappings)
605    }
606
607    /// Governor pressure proxy: global `Innodb_log_waits` — the same monotonic
608    /// counter the adaptive batch loop samples. Rising between samples means
609    /// InnoDB is stalling on redo-log buffer space under write pressure.
610    fn sample_pressure(&mut self) -> Option<u64> {
611        mysql_sample_innodb_log_waits(&self.pool)
612    }
613}
614
615#[cfg(test)]
616mod tests {
617    use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
618
619    // Proxy classifier tests live in `proxy.rs` alongside the classifier.
620
621    // ── bit_bytes_to_u64 (lives in arrow_convert.rs, exported pub(super)) ──
622
623    #[test]
624    fn bit_bytes_single_byte() {
625        assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
626        assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
627        assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
628    }
629
630    #[test]
631    fn bit_bytes_multi_byte() {
632        assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
633        assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
634    }
635
636    #[test]
637    fn bit_bytes_empty() {
638        assert_eq!(bit_bytes_to_u64(&[]), 0);
639    }
640
641    // ── InnoDB AVG_ROW_LENGTH correction ────────────────────────────────
642
643    #[test]
644    fn innodb_correction_below_threshold_is_identity() {
645        assert_eq!(correct_innodb_avg_row_length(82), 82);
646        assert_eq!(correct_innodb_avg_row_length(314), 314);
647        assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
648        assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
649    }
650
651    #[test]
652    fn innodb_correction_above_threshold_divides_by_three() {
653        assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
654        assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
655    }
656
657    #[test]
658    fn innodb_correction_does_not_undershoot_floor() {
659        let just_above = 8 * 1024 + 1;
660        let divided = correct_innodb_avg_row_length(just_above);
661        assert!(divided >= 4 * 1024, "got {divided}");
662    }
663}