Skip to main content

faucet_sink_mysql/
sink.rs

1//! MySQL sink implementation.
2
3use crate::config::{MysqlColumnMapping, MysqlSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use sqlx::mysql::MySqlPoolOptions;
8use sqlx::{MySqlPool, Row};
9
10/// A sink that writes JSON records to a MySQL table.
11pub struct MysqlSink {
12    config: MysqlSinkConfig,
13    pool: MySqlPool,
14}
15
16/// Quote a MySQL identifier using backticks.
17///
18/// Wraps the name in backticks and escapes any embedded backticks by doubling
19/// them, per MySQL convention.
20fn quote_ident_mysql(name: &str) -> String {
21    format!("`{}`", name.replace('`', "``"))
22}
23
24impl MysqlSink {
25    /// Create a new MySQL sink. Establishes a connection pool.
26    pub async fn new(config: MysqlSinkConfig) -> Result<Self, FaucetError> {
27        let pool = MySqlPoolOptions::new()
28            .max_connections(config.max_connections)
29            .connect(&config.connection_url)
30            .await
31            .map_err(|e| FaucetError::Sink(format!("MySQL connection failed: {e}")))?;
32
33        Ok(Self { config, pool })
34    }
35
36    /// Insert a batch of records using JSON column mode.
37    /// Uses a single multi-row INSERT for efficiency.
38    async fn insert_json(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
39        if records.is_empty() {
40            return Ok(0);
41        }
42
43        // Build multi-row INSERT: INSERT INTO t (col) VALUES (?), (?), ...
44        let placeholders: Vec<&str> = records.iter().map(|_| "(?)").collect();
45        let insert_sql = format!(
46            "INSERT INTO {} ({}) VALUES {}",
47            quote_ident_mysql(&self.config.table_name),
48            quote_ident_mysql(column),
49            placeholders.join(", ")
50        );
51
52        let mut q = sqlx::query(&insert_sql);
53        for record in records {
54            let json_str = serde_json::to_string(record)
55                .map_err(|e| FaucetError::Sink(format!("failed to serialize record: {e}")))?;
56            q = q.bind(json_str);
57        }
58
59        q.execute(&self.pool)
60            .await
61            .map_err(|e| FaucetError::Sink(format!("MySQL insert failed: {e}")))?;
62
63        Ok(records.len())
64    }
65
66    /// Insert a batch of records using auto-mapped columns.
67    ///
68    /// Discovers column names from INFORMATION_SCHEMA and maps
69    /// top-level JSON fields to columns. Uses a single multi-row INSERT.
70    async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
71        if records.is_empty() {
72            return Ok(0);
73        }
74
75        // Get column names from the table.
76        let columns: Vec<String> = sqlx::query(
77            "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = DATABASE() ORDER BY ORDINAL_POSITION"
78        )
79        .bind(&self.config.table_name)
80        .fetch_all(&self.pool)
81        .await
82        .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
83        .iter()
84        .map(|row| row.get::<String, _>("COLUMN_NAME"))
85        .collect();
86
87        if columns.is_empty() {
88            return Err(FaucetError::Sink(format!(
89                "table '{}' has no columns or does not exist",
90                self.config.table_name
91            )));
92        }
93
94        // Pre-validate all records and collect matched column values. The
95        // INSERT column set is the UNION of table columns present in ANY record
96        // (in declared table order), not just the first record's keys —
97        // otherwise a field present only in a later record of the batch would be
98        // silently dropped (audit #146 H1). A row missing a unioned column binds
99        // SQL NULL.
100        let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
101        let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
102
103        for record in records {
104            let obj = record
105                .as_object()
106                .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
107
108            let matching: Vec<(&String, &Value)> = columns
109                .iter()
110                .filter_map(|col| obj.get(col).map(|v| (col, v)))
111                .collect();
112
113            if matching.is_empty() {
114                tracing::warn!(
115                    record_keys = ?obj.keys().collect::<Vec<_>>(),
116                    table_columns = ?columns,
117                    "record has no keys matching table columns, skipping"
118                );
119                continue;
120            }
121
122            for (c, _) in &matching {
123                used.insert(c.as_str());
124            }
125            matched_rows.push(matching);
126        }
127
128        if matched_rows.is_empty() {
129            return Ok(0);
130        }
131
132        // Table columns (in declared order) that appear in at least one record.
133        let insert_columns: Vec<String> = columns
134            .iter()
135            .filter(|c| used.contains(c.as_str()))
136            .cloned()
137            .collect();
138
139        let num_cols = insert_columns.len();
140        let num_rows = matched_rows.len();
141        let col_names: Vec<String> = insert_columns
142            .iter()
143            .map(|c| quote_ident_mysql(c))
144            .collect();
145
146        // MySQL caps prepared-statement placeholders at 65535. A multi-row
147        // INSERT binds `rows × num_cols`, so a wide table at a large batch_size
148        // overflows and fails at runtime; split into sub-INSERTs of at most
149        // floor(MAX / num_cols) rows (audit #146 H14 — postgres/sqlite/mssql
150        // already sub-chunk this way).
151        const MAX_MYSQL_PARAMS: usize = 65535;
152        let max_rows_per_insert = (MAX_MYSQL_PARAMS / num_cols).max(1);
153
154        for sub in matched_rows.chunks(max_rows_per_insert) {
155            // Build multi-row VALUES clause: (?, ?), (?, ?), ...
156            let row_placeholder = format!("({})", vec!["?"; num_cols].join(", "));
157            let value_tuples: Vec<&str> =
158                (0..sub.len()).map(|_| row_placeholder.as_str()).collect();
159
160            let query = format!(
161                "INSERT INTO {} ({}) VALUES {}",
162                quote_ident_mysql(&self.config.table_name),
163                col_names.join(", "),
164                value_tuples.join(", ")
165            );
166
167            let mut q = sqlx::query(&query);
168            for matched in sub {
169                for col in &insert_columns {
170                    let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
171                    // Bind native MySQL types. Binding every value as a JSON string
172                    // (the old behaviour) stored `"Bob"` with embedded quotes,
173                    // turned `true` into the text "true", and bound the literal
174                    // text "null" for absent columns instead of SQL NULL (#78/#4).
175                    q = match val {
176                        None | Some(Value::Null) => q.bind(None::<String>),
177                        Some(Value::Bool(b)) => q.bind(*b),
178                        Some(Value::Number(n)) => {
179                            if let Some(i) = n.as_i64() {
180                                q.bind(i)
181                            } else if let Some(f) = n.as_f64() {
182                                q.bind(f)
183                            } else {
184                                // u64 above i64::MAX — preserve exact text.
185                                q.bind(n.to_string())
186                            }
187                        }
188                        Some(Value::String(s)) => q.bind(s.clone()),
189                        // Arrays/objects have no scalar SQL representation — store
190                        // their JSON text (suitable for TEXT / JSON columns).
191                        Some(v) => q.bind(v.to_string()),
192                    };
193                }
194            }
195
196            q.execute(&self.pool)
197                .await
198                .map_err(|e| FaucetError::Sink(format!("MySQL insert failed: {e}")))?;
199        }
200
201        Ok(num_rows)
202    }
203}
204
205#[async_trait]
206impl faucet_core::Sink for MysqlSink {
207    fn config_schema(&self) -> serde_json::Value {
208        serde_json::to_value(faucet_core::schema_for!(MysqlSinkConfig))
209            .expect("schema serialization")
210    }
211
212    /// Preflight connectivity probe (`faucet doctor`).
213    ///
214    /// Acquires a connection from the existing pool and runs `SELECT 1`. This
215    /// is non-mutating and idempotent — it validates that the database is
216    /// reachable and the credentials are accepted without writing anything.
217    async fn check(
218        &self,
219        ctx: &faucet_core::check::CheckContext,
220    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
221        use faucet_core::check::{CheckReport, Probe};
222
223        let started = std::time::Instant::now();
224        let probe =
225            match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
226                .await
227            {
228                Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
229                Ok(Err(e)) => Probe::fail_hint(
230                    "auth",
231                    started.elapsed(),
232                    e.to_string(),
233                    "check connection_url / credentials / that the database is reachable",
234                ),
235                Err(_) => Probe::fail_hint(
236                    "auth",
237                    started.elapsed(),
238                    "timed out",
239                    "check connection_url / credentials / that the database is reachable",
240                ),
241            };
242        Ok(CheckReport::single(probe))
243    }
244
245    /// Write records to MySQL.
246    ///
247    /// When `config.batch_size > 0` and the input slice is larger than
248    /// `batch_size`, the slice is split into chunks of `batch_size` rows and
249    /// each chunk is sent as a separate multi-row `INSERT`. When
250    /// `config.batch_size == 0`, the entire slice is sent in a single
251    /// multi-row `INSERT` — useful when upstream `StreamPage`s are already
252    /// sized for MySQL's `max_allowed_packet` limit.
253    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
254        if records.is_empty() {
255            return Ok(0);
256        }
257
258        let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
259            // Sentinel: pass the entire upstream page through in a single
260            // multi-row INSERT. Subject to MySQL's max_allowed_packet
261            // (default 64MB).
262            vec![records]
263        } else {
264            records.chunks(self.config.batch_size).collect()
265        };
266
267        let mut total = 0;
268        for chunk in chunks {
269            total += match &self.config.column_mapping {
270                MysqlColumnMapping::Json { column } => self.insert_json(chunk, column).await?,
271                MysqlColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
272            };
273        }
274
275        tracing::info!(
276            table = %self.config.table_name,
277            rows = total,
278            "MySQL write complete"
279        );
280        Ok(total)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn quote_ident_mysql_simple() {
290        assert_eq!(quote_ident_mysql("my_table"), "`my_table`");
291    }
292
293    #[test]
294    fn quote_ident_mysql_with_backtick() {
295        assert_eq!(quote_ident_mysql("has`tick"), "`has``tick`");
296    }
297
298    #[test]
299    fn quote_ident_mysql_empty() {
300        assert_eq!(quote_ident_mysql(""), "``");
301    }
302
303    #[test]
304    fn quote_ident_mysql_special_chars() {
305        assert_eq!(quote_ident_mysql("table; DROP"), "`table; DROP`");
306    }
307}