Skip to main content

rivet/source/mysql/
mod.rs

1//! MySQL `Source` implementation.
2//!
3//! Module layout (mirrors `postgres/`):
4//!
5//! - `mod.rs` (this file) — `MysqlSource` struct + connect/TLS path, the
6//!   `InnoDB_log_waits` sampler, the `lean_pool_opts` / `connect_pool` /
7//!   `build_mysql_ssl_opts` helpers, `introspect_mysql_table_for_chunking`
8//!   together with the InnoDB `AVG_ROW_LENGTH` correction, the cursor-bound
9//!   `exec_iter` export loop (`mysql_run_export`), and the `Source` trait impl.
10//! - [`arrow_convert`] — the entire row → Arrow `RecordBatch` pipeline:
11//!   `mysql_type_to_rivet` + `mysql_native_type_name`,
12//!   `mysql_schema_and_arrow_types`, BIT / TIME / DECIMAL decoders, and the
13//!   array builders. Kept in a sibling because it is the largest
14//!   single-purpose cluster in this driver (~510 LoC) and has zero reverse
15//!   dependency back into the connection / pool / cursor layer.
16//! - [`proxy`] — `MysqlProxyKind` enum, the pure `classify_mysql_proxy`
17//!   classifier, the I/O wrapper `detect_mysql_proxy_kind`, and
18//!   `warn_proxy_kind`. Detection runs once at connect time; the classifier
19//!   is exhaustively unit-tested in isolation (no live MySQL needed).
20
21mod arrow_convert;
22mod proxy;
23
24use std::sync::Arc;
25
26use arrow::datatypes::Schema;
27use mysql::prelude::*;
28use mysql::{Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
29
30use crate::config::{SourceType, TlsConfig, TlsMode};
31use crate::error::Result;
32use crate::source::query::build_incremental_query;
33use crate::tuning::{ADAPTIVE_SAMPLE_INTERVAL, SourceTuning, next_adaptive_batch_size};
34use crate::types::ColumnOverrides;
35
36use arrow_convert::{
37    mysql_native_type_name, mysql_schema_and_arrow_types, mysql_type_to_rivet,
38    rows_to_record_batch_typed,
39};
40// `bit_bytes_to_u64` is only referenced by the `tests` module below — gate the
41// re-import on `cfg(test)` so non-test builds don't see an unused-import warning.
42#[cfg(test)]
43use arrow_convert::bit_bytes_to_u64;
44use proxy::{detect_mysql_proxy_kind, warn_proxy_kind};
45
46// Re-exported so external code (`tests/live_pool_safety.rs`) can still write
47// `use rivet::source::mysql::MysqlProxyKind` after the proxy block moved to
48// the `proxy` submodule.
49pub use proxy::MysqlProxyKind;
50
51pub struct MysqlSource {
52    pool: Pool,
53    proxy_kind: MysqlProxyKind,
54}
55
56/// Pool options that prevent eager pre-connection. The default mysql::Pool
57/// opens `min=10` connections immediately, which overflows MySQL's
58/// max_connections when many parallel exports run simultaneously.
59fn lean_pool_opts() -> PoolOpts {
60    PoolOpts::default()
61        .with_constraints(PoolConstraints::new(1, 100).expect("valid pool constraints"))
62}
63
64/// Sample the global `Innodb_log_waits` counter — increments when InnoDB has to
65/// wait for redo-log buffer space, indicating write pressure.
66fn mysql_sample_innodb_log_waits(pool: &Pool) -> Option<u64> {
67    let mut conn = pool.get_conn().ok()?;
68    conn.query_first::<(String, u64), _>("SHOW GLOBAL STATUS LIKE 'Innodb_log_waits'")
69        .ok()
70        .flatten()
71        .map(|(_, v)| v)
72}
73
74impl MysqlSource {
75    /// Build a source from an existing pool. Useful in tests that need to
76    /// share the pool with post-export state inspection.
77    #[allow(dead_code)]
78    pub fn from_pool(pool: Pool) -> Self {
79        let proxy_kind = detect_mysql_proxy_kind(&pool);
80        warn_proxy_kind(proxy_kind);
81        Self { pool, proxy_kind }
82    }
83
84    /// Connect with no transport security (legacy path).
85    pub fn connect(url: &str) -> Result<Self> {
86        let opts =
87            Opts::from(OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()));
88        let pool = Pool::new(opts)?;
89        let proxy_kind = detect_mysql_proxy_kind(&pool);
90        warn_proxy_kind(proxy_kind);
91        Ok(Self { pool, proxy_kind })
92    }
93
94    /// Connect honoring the user's [`TlsConfig`].
95    pub fn connect_with_tls(url: &str, tls: Option<&TlsConfig>) -> Result<Self> {
96        match tls {
97            Some(cfg) if cfg.mode.is_enforced() => {
98                let base = Opts::from_url(url)?;
99                let ssl = build_mysql_ssl_opts(cfg);
100                let opts = Opts::from(
101                    OptsBuilder::from_opts(base)
102                        .ssl_opts(Some(ssl))
103                        .pool_opts(lean_pool_opts()),
104                );
105                let pool = Pool::new(opts)?;
106                let proxy_kind = detect_mysql_proxy_kind(&pool);
107                warn_proxy_kind(proxy_kind);
108                Ok(Self { pool, proxy_kind })
109            }
110            _ => Self::connect(url),
111        }
112    }
113
114    /// Expose the proxy classification for diagnostic tools (preflight,
115    /// integration tests). Not part of the public Source trait — same
116    /// internal-may-change contract as the rest of `rivet::source::mysql::*`.
117    ///
118    /// `#[allow(dead_code)]` covers the binary compilation unit; the lib +
119    /// integration tests reference this through the `rivet::source::mysql`
120    /// public surface.
121    #[allow(dead_code)]
122    pub fn proxy_kind(&self) -> MysqlProxyKind {
123        self.proxy_kind
124    }
125}
126
127/// Build a MySQL connection pool honoring the configured TLS policy.
128///
129/// Shared by preflight, doctor, init, and anywhere else we need a pool outside
130/// the `Source` trait. `tls = None` falls back to plaintext (legacy behavior).
131pub(crate) fn connect_pool(url: &str, tls: Option<&TlsConfig>) -> Result<Pool> {
132    match tls {
133        Some(cfg) if cfg.mode.is_enforced() => {
134            let base = Opts::from_url(url)?;
135            let ssl = build_mysql_ssl_opts(cfg);
136            let opts = Opts::from(
137                OptsBuilder::from_opts(base)
138                    .ssl_opts(Some(ssl))
139                    .pool_opts(lean_pool_opts()),
140            );
141            Ok(Pool::new(opts)?)
142        }
143        _ => {
144            let opts = Opts::from(
145                OptsBuilder::from_opts(Opts::from_url(url)?).pool_opts(lean_pool_opts()),
146            );
147            Ok(Pool::new(opts)?)
148        }
149    }
150}
151
152/// Threshold above which `AVG_ROW_LENGTH` is treated as inflated by InnoDB BLOB
153/// overflow pages and divided down. Rows under 8 KB fit inline (no overflow),
154/// so the raw figure is accurate; above it the divisor compensates.
155const INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES: i64 = 8 * 1024;
156
157/// Empirical divisor for InnoDB BLOB-page inflation. A wide-text row that
158/// allocates eight 16 KB overflow pages reports ~128 KB in `AVG_ROW_LENGTH`
159/// while the actual wire content is ~40 KB → factor of ~3.
160const INNODB_BLOB_OVERFLOW_DIVISOR: i64 = 3;
161
162/// Apply the InnoDB BLOB-overflow correction to a raw `AVG_ROW_LENGTH` value.
163/// Pure function for unit testability — the live introspection helper calls
164/// this on the figure returned by `information_schema.TABLES`.
165///
166/// - Below the 8 KB threshold: raw value is accurate (no overflow).
167/// - Above: divide by 3, floored at threshold/2 so we never undershoot too far.
168fn correct_innodb_avg_row_length(raw_bytes: i64) -> i64 {
169    if raw_bytes > INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES {
170        (raw_bytes / INNODB_BLOB_OVERFLOW_DIVISOR).max(INNODB_BLOB_OVERFLOW_THRESHOLD_BYTES / 2)
171    } else {
172        raw_bytes
173    }
174}
175
176/// Probe `information_schema` for stats chunked-mode planning needs.
177///
178/// MySQL analogue of [`crate::source::postgres::introspect_pg_table_for_chunking`]:
179/// returns the same source-neutral [`crate::source::TableIntrospection`] so
180/// `plan/build.rs` can dispatch on `source_type` and reuse the same downstream
181/// logic for chunk-column / chunk_size derivation.
182///
183/// Two queries per call, both against `information_schema` (no extra grants
184/// required for a normal app user):
185/// - `TABLES.AVG_ROW_LENGTH` + `TABLE_ROWS` for the row-size and row-count estimate.
186///   These come from `mysql.innodb_table_stats` and are only as fresh as the
187///   last `ANALYZE TABLE` / autostat run. Empty / unanalysed → zero.
188/// - `STATISTICS` filtered to `INDEX_NAME='PRIMARY'` with `SEQ_IN_INDEX=1` and a
189///   second probe ensuring no `SEQ_IN_INDEX=2` row exists — single-column PK only.
190///
191/// `qualified_table` is `<schema>.<table>` or bare `<table>` (resolved under the
192/// current database for the connection). Same strict ident rules as the YAML
193/// `table:` shortcut so the SQL stays trivially safe.
194pub(crate) fn introspect_mysql_table_for_chunking(
195    url: &str,
196    tls: Option<&TlsConfig>,
197    qualified_table: &str,
198) -> Result<crate::source::TableIntrospection> {
199    let pool = connect_pool(url, tls)?;
200    let mut conn = pool.get_conn()?;
201    let default_db: Option<String> = conn.query_first("SELECT DATABASE()")?;
202    let default_db = default_db.unwrap_or_default();
203
204    let (schema, table) = match qualified_table.split_once('.') {
205        Some((s, t)) => (s.to_string(), t.to_string()),
206        None => (default_db, qualified_table.to_string()),
207    };
208
209    // (1) Row count + avg row bytes. AVG_ROW_LENGTH already accounts for
210    // overflow pages on InnoDB, so we use it directly rather than dividing
211    // DATA_LENGTH by TABLE_ROWS (which under-counts for tables with TOAST-like
212    // overflow). Fall back to division when AVG_ROW_LENGTH is 0.
213    let row_stats: Option<(i64, i64, i64)> = conn.exec_first(
214        "SELECT CAST(IFNULL(TABLE_ROWS, 0) AS SIGNED), \
215                CAST(IFNULL(AVG_ROW_LENGTH, 0) AS SIGNED), \
216                CAST(IFNULL(DATA_LENGTH, 0) AS SIGNED) \
217         FROM information_schema.TABLES \
218         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
219        (&schema, &table),
220    )?;
221    let (row_estimate, avg_row_bytes) = match row_stats {
222        Some((rows, avg, data_len)) => {
223            let row_count = rows.max(0);
224            let raw_per_row = if avg > 0 {
225                Some(avg)
226            } else if row_count > 0 {
227                Some(data_len / row_count)
228            } else {
229                None
230            };
231            // InnoDB stores TEXT/BLOB > ~768 B off-page in 16 KB BLOB pages,
232            // and `AVG_ROW_LENGTH` counts the allocated page bytes — not the
233            // actual content. On wide-text workloads (CMS bodies, JSON logs,
234            // audit trails) this inflates the per-row estimate 3-5× compared
235            // to what the client driver actually buffers over the wire.
236            //
237            // We empirically divide by 3 above an 8 KB threshold. Below 8 KB
238            // a row fits inline with no overflow, so the raw figure is
239            // accurate. Above it, dividing by 3 brings content_items' 41 KB
240            // estimate down to ~14 KB — still conservative vs the ~10 KB the
241            // PG side reports for the same payload via `pg_total_relation_size`.
242            //
243            // Pilots who want exact control can set `chunk_size:` explicitly
244            // (it always wins over the budget-derived size).
245            let per_row = raw_per_row.map(correct_innodb_avg_row_length);
246            (row_count, per_row.filter(|b| *b > 0))
247        }
248        None => (0, None),
249    };
250
251    // (2) Single-column int PK probe. STATISTICS has one row per (column,
252    // index) so we filter to PRIMARY + SEQ_IN_INDEX=1 and then check that
253    // the PRIMARY index has no SEQ_IN_INDEX=2 row (composite).
254    let pk_first: Option<(String,)> = conn.exec_first(
255        "SELECT COLUMN_NAME \
256         FROM information_schema.STATISTICS \
257         WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 1",
258        (&schema, &table),
259    )?;
260    let single_int_pk = if let Some((col,)) = pk_first {
261        let composite: Option<(String,)> = conn.exec_first(
262            "SELECT COLUMN_NAME FROM information_schema.STATISTICS \
263             WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND INDEX_NAME = 'PRIMARY' AND SEQ_IN_INDEX = 2 \
264             LIMIT 1",
265            (&schema, &table),
266        )?;
267        if composite.is_some() {
268            log::debug!(
269                "introspect_mysql_table: composite PK on {schema}.{table} — skipping auto-resolve"
270            );
271            None
272        } else {
273            // Column type must be integer-family for safe range chunking.
274            let type_row: Option<(String,)> = conn.exec_first(
275                "SELECT DATA_TYPE FROM information_schema.COLUMNS \
276                 WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
277                (&schema, &table, &col),
278            )?;
279            match type_row.map(|(t,)| t.to_ascii_lowercase()) {
280                Some(t)
281                    if matches!(
282                        t.as_str(),
283                        "tinyint" | "smallint" | "mediumint" | "int" | "bigint"
284                    ) =>
285                {
286                    Some(col)
287                }
288                Some(t) => {
289                    log::debug!(
290                        "introspect_mysql_table: PK '{col}' on {schema}.{table} has non-int type '{t}' — skipping auto-resolve"
291                    );
292                    None
293                }
294                None => None,
295            }
296        }
297    } else {
298        None
299    };
300
301    Ok(crate::source::TableIntrospection {
302        single_int_pk,
303        row_estimate,
304        avg_row_bytes,
305    })
306}
307
308fn build_mysql_ssl_opts(cfg: &TlsConfig) -> SslOpts {
309    let mut ssl = SslOpts::default();
310    if let Some(path) = &cfg.ca_file {
311        ssl = ssl.with_root_cert_path(Some(std::path::PathBuf::from(path)));
312    }
313    match cfg.mode {
314        TlsMode::Require => {
315            ssl = ssl
316                .with_danger_accept_invalid_certs(true)
317                .with_danger_skip_domain_validation(true);
318        }
319        TlsMode::VerifyCa => {
320            ssl = ssl.with_danger_skip_domain_validation(true);
321        }
322        TlsMode::VerifyFull => {
323            // Strict: verify chain + hostname.
324        }
325        TlsMode::Disable => {
326            // Never invoked: gated in connect_with_tls.
327        }
328    }
329    if cfg.accept_invalid_certs {
330        ssl = ssl.with_danger_accept_invalid_certs(true);
331    }
332    if cfg.accept_invalid_hostnames {
333        ssl = ssl.with_danger_skip_domain_validation(true);
334    }
335    ssl
336}
337
338/// Execute the MySQL query and stream results to sink.
339///
340/// Separated from export() so session-state cleanup (time_zone, max_execution_time)
341/// can run unconditionally in the caller regardless of success or failure.
342///
343/// `sample_pool`: when `tuning.adaptive` is true, a clone of the source pool used
344/// to obtain a second connection for `Innodb_log_waits` sampling without interfering
345/// with the streaming result set on `conn`.
346fn mysql_run_export(
347    conn: &mut mysql::PooledConn,
348    sample_pool: Option<Pool>,
349    sql: &str,
350    cursor_param: Option<&str>,
351    tuning: &SourceTuning,
352    column_overrides: &ColumnOverrides,
353    sink: &mut dyn super::BatchSink,
354) -> Result<usize> {
355    // SecOps: cursor value is bound via exec_iter rather than string-interpolated.
356    // Using exec_iter uniformly (even with empty params) keeps match arms
357    // type-compatible — query_iter returns a Text-protocol result, exec_iter Binary.
358    let mut result = match cursor_param {
359        Some(val) => conn.exec_iter(sql, (val,))?,
360        None => conn.exec_iter(sql, ())?,
361    };
362    let columns = result.columns().as_ref().to_vec();
363
364    // Compute TypeMappings once; derive both the Arrow schema and the
365    // per-column DataType vec from the same source so they can never diverge.
366    let (schema, arrow_types) = mysql_schema_and_arrow_types(&columns, column_overrides)?;
367    let schema = Arc::new(schema);
368
369    sink.on_schema(schema.clone())?;
370
371    // PG path uses `work_mem × 0.7 / row_bytes` for FETCH N — the analogous
372    // bottleneck on MySQL is *our* `row_buf` accumulator. The mysql crate
373    // streams rows from the wire one-at-a-time, but we pile up `effective_bs`
374    // of them in a `Vec<Row>` before flushing to Arrow → for `batch_size: 50000`
375    // (fast profile) on content_items that's ~650 MB just for the row_buf,
376    // plus another ~650 MB for the Arrow batch it feeds — RSS scales with
377    // `batch_size`, not chunk size.
378    //
379    // Fix: start with a small probe (`PROBE_BATCH_SIZE`), measure the actual
380    // Arrow bytes per row after the first batch, then cap `effective_bs` so
381    // each flush fits in roughly `MYSQL_BATCH_TARGET_MB` of Arrow memory.
382    // Caller's `batch_size_memory_mb` wins when set; the default is 64 MB —
383    // chosen to keep peak RSS well under 200 MB on wide-row tables while
384    // keeping batches large enough to be efficient for the parquet writer.
385    const PROBE_BATCH_SIZE: usize = 500;
386    const MYSQL_BATCH_TARGET_MB_DEFAULT: usize = 64;
387
388    let configured_batch_size = tuning.effective_batch_size(Some(&schema));
389    let mut effective_bs = configured_batch_size.min(PROBE_BATCH_SIZE);
390    let mut base_fetch_size = effective_bs;
391    let mut adaptive_last_waits: Option<u64> = if tuning.adaptive {
392        sample_pool.as_ref().and_then(mysql_sample_innodb_log_waits)
393    } else {
394        None
395    };
396    let mut batch_count: usize = 0;
397    let row_set = result
398        .iter()
399        .ok_or_else(|| anyhow::anyhow!("no result set"))?;
400    let mut row_buf: Vec<mysql::Row> = Vec::with_capacity(effective_bs);
401    let mut total_rows: usize = 0;
402    let mut memory_cap_applied = false;
403
404    for row_result in row_set {
405        let row = row_result?;
406        row_buf.push(row);
407
408        if row_buf.len() >= effective_bs {
409            total_rows += row_buf.len();
410            batch_count += 1;
411            let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
412            let batch_rows = row_buf.len();
413            row_buf.clear();
414
415            // After the first (probe-sized) batch we know how many bytes per
416            // row Arrow actually uses. Cap subsequent flushes to a memory
417            // target. Never exceed the user's configured `batch_size`.
418            if !memory_cap_applied && batch_rows > 0 {
419                let arrow_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
420                let arrow_per_row = (arrow_bytes / batch_rows).max(64);
421                let target_mb = tuning
422                    .batch_size_memory_mb
423                    .unwrap_or(MYSQL_BATCH_TARGET_MB_DEFAULT);
424                let safe = ((target_mb * 1024 * 1024) / arrow_per_row).max(PROBE_BATCH_SIZE);
425                let target = safe.min(configured_batch_size);
426                if target != effective_bs {
427                    log::info!(
428                        "MySQL row_buf cap: arrow≈{} B/row, target={} MB → batch_size {} → {} (configured={})",
429                        arrow_per_row,
430                        target_mb,
431                        effective_bs,
432                        target,
433                        configured_batch_size
434                    );
435                    effective_bs = target;
436                    base_fetch_size = effective_bs;
437                    row_buf.reserve(effective_bs.saturating_sub(row_buf.capacity()));
438                }
439                memory_cap_applied = true;
440            }
441
442            sink.on_batch(&batch)?;
443
444            if tuning.adaptive
445                && batch_count.is_multiple_of(ADAPTIVE_SAMPLE_INTERVAL)
446                && let Some(ref pool) = sample_pool
447                && let Some(cur) = mysql_sample_innodb_log_waits(pool)
448            {
449                let under_pressure = adaptive_last_waits.is_some_and(|prev| cur > prev);
450                adaptive_last_waits = Some(cur);
451                let next = next_adaptive_batch_size(effective_bs, base_fetch_size, under_pressure);
452                if next != effective_bs {
453                    effective_bs = next;
454                    log::info!(
455                        "adaptive batch size → {} ({})",
456                        effective_bs,
457                        if under_pressure {
458                            "pressure"
459                        } else {
460                            "recovery"
461                        }
462                    );
463                }
464            }
465
466            log::info!("fetched {} rows so far...", total_rows);
467
468            if tuning.throttle_ms > 0 {
469                std::thread::sleep(std::time::Duration::from_millis(tuning.throttle_ms));
470            }
471        }
472    }
473
474    if !row_buf.is_empty() {
475        total_rows += row_buf.len();
476        let batch = rows_to_record_batch_typed(&schema, &arrow_types, &row_buf)?;
477        sink.on_batch(&batch)?;
478    }
479
480    drop(result);
481    Ok(total_rows)
482}
483
484impl super::Source for MysqlSource {
485    fn export(
486        &mut self,
487        request: &super::ExportRequest<'_>,
488        sink: &mut dyn super::BatchSink,
489    ) -> Result<()> {
490        let built = build_incremental_query(
491            request.query,
492            request.incremental,
493            request.cursor,
494            SourceType::Mysql,
495        );
496        log::debug!(
497            "executing query (connection={}): {}",
498            self.proxy_kind.log_label(),
499            built.sql
500        );
501
502        let mut conn = self.pool.get_conn()?;
503
504        // Roadmap §13: normalize TIMESTAMP columns to UTC so Parquet writes
505        // isAdjustedToUTC=true. SET per-connection (not global) to avoid side-effects.
506        conn.query_drop("SET time_zone = '+00:00'")?;
507
508        if request.tuning.statement_timeout_s > 0 {
509            conn.query_drop(format!(
510                "SET SESSION max_execution_time = {}",
511                request.tuning.statement_timeout_s * 1000
512            ))?;
513        }
514
515        let sample_pool = if request.tuning.adaptive {
516            Some(self.pool.clone())
517        } else {
518            None
519        };
520        let result = mysql_run_export(
521            &mut conn,
522            sample_pool,
523            &built.sql,
524            built.cursor_param.as_deref(),
525            request.tuning,
526            request.column_overrides,
527            sink,
528        );
529
530        // Always reset session state before connection returns to pool,
531        // regardless of whether the export succeeded or failed.
532        let _ = conn.query_drop("SET time_zone = @@global.time_zone");
533        if request.tuning.statement_timeout_s > 0 {
534            let _ = conn.query_drop("SET SESSION max_execution_time = 0");
535        }
536
537        // The empty-result fallback to `Schema::empty()` lives here for
538        // parity with the PG implementation, even though `exec_iter` always
539        // returns the column metadata before yielding any rows so
540        // mysql_run_export's `on_schema` already fired.
541        let total_rows = result?;
542        if total_rows == 0 {
543            sink.on_schema(Arc::new(Schema::empty()))?;
544        }
545        log::info!("total: {} rows", total_rows);
546        Ok(())
547    }
548
549    fn query_scalar(&mut self, sql: &str) -> Result<Option<String>> {
550        let mut conn = self.pool.get_conn()?;
551        let row: Option<mysql::Row> = conn.query_first(sql)?;
552        match row {
553            Some(r) => {
554                let val: Option<mysql::Value> = r.get(0);
555                match val {
556                    Some(mysql::Value::Bytes(b)) => {
557                        Ok(Some(String::from_utf8_lossy(&b).into_owned()))
558                    }
559                    Some(mysql::Value::Int(v)) => Ok(Some(v.to_string())),
560                    Some(mysql::Value::UInt(v)) => Ok(Some(v.to_string())),
561                    Some(mysql::Value::Float(v)) => Ok(Some(v.to_string())),
562                    Some(mysql::Value::Double(v)) => Ok(Some(v.to_string())),
563                    _ => Ok(None),
564                }
565            }
566            None => Ok(None),
567        }
568    }
569
570    fn type_mappings(
571        &mut self,
572        query: &str,
573        column_overrides: &ColumnOverrides,
574    ) -> Result<Vec<crate::types::TypeMapping>> {
575        let wrapped = format!("SELECT * FROM ({}) AS _rivet_type_probe LIMIT 0", query);
576        let mut conn = self.pool.get_conn()?;
577        let result = conn.exec_iter(&wrapped, ())?;
578        let columns = result.columns().as_ref().to_vec();
579        drop(result);
580        let mappings = columns
581            .iter()
582            .map(|col| {
583                let rivet = column_overrides
584                    .get(col.name_str().as_ref())
585                    .cloned()
586                    .unwrap_or_else(|| mysql_type_to_rivet(col));
587                let source = crate::types::SourceColumn::simple(
588                    col.name_str().as_ref(),
589                    mysql_native_type_name(col),
590                    true,
591                );
592                crate::types::TypeMapping::from_source(&source, rivet)
593            })
594            .collect();
595        Ok(mappings)
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::{bit_bytes_to_u64, correct_innodb_avg_row_length};
602
603    // Proxy classifier tests live in `proxy.rs` alongside the classifier.
604
605    // ── bit_bytes_to_u64 (lives in arrow_convert.rs, exported pub(super)) ──
606
607    #[test]
608    fn bit_bytes_single_byte() {
609        assert_eq!(bit_bytes_to_u64(&[0x00]), 0);
610        assert_eq!(bit_bytes_to_u64(&[0x01]), 1);
611        assert_eq!(bit_bytes_to_u64(&[0xFF]), 255);
612    }
613
614    #[test]
615    fn bit_bytes_multi_byte() {
616        assert_eq!(bit_bytes_to_u64(&[0x01, 0x02]), 258);
617        assert_eq!(bit_bytes_to_u64(&[0xFF; 8]), u64::MAX);
618    }
619
620    #[test]
621    fn bit_bytes_empty() {
622        assert_eq!(bit_bytes_to_u64(&[]), 0);
623    }
624
625    // ── InnoDB AVG_ROW_LENGTH correction ────────────────────────────────
626
627    #[test]
628    fn innodb_correction_below_threshold_is_identity() {
629        assert_eq!(correct_innodb_avg_row_length(82), 82);
630        assert_eq!(correct_innodb_avg_row_length(314), 314);
631        assert_eq!(correct_innodb_avg_row_length(2_048), 2_048);
632        assert_eq!(correct_innodb_avg_row_length(8 * 1024), 8 * 1024);
633    }
634
635    #[test]
636    fn innodb_correction_above_threshold_divides_by_three() {
637        assert_eq!(correct_innodb_avg_row_length(40_978), 40_978 / 3);
638        assert_eq!(correct_innodb_avg_row_length(120_000), 40_000);
639    }
640
641    #[test]
642    fn innodb_correction_does_not_undershoot_floor() {
643        let just_above = 8 * 1024 + 1;
644        let divided = correct_innodb_avg_row_length(just_above);
645        assert!(divided >= 4 * 1024, "got {divided}");
646    }
647}