faucet_sink_mysql/
sink.rs1use crate::config::{MysqlColumnMapping, MysqlSinkConfig};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use serde_json::Value;
7use sqlx::mysql::MySqlPoolOptions;
8use sqlx::{MySqlPool, Row};
9
10pub struct MysqlSink {
12 config: MysqlSinkConfig,
13 pool: MySqlPool,
14}
15
16fn quote_ident_mysql(name: &str) -> String {
21 format!("`{}`", name.replace('`', "``"))
22}
23
24impl MysqlSink {
25 pub async fn new(config: MysqlSinkConfig) -> Result<Self, FaucetError> {
27 let pool = MySqlPoolOptions::new()
28 .max_connections(config.max_connections)
29 .connect(&config.connection_url)
30 .await
31 .map_err(|e| FaucetError::Sink(format!("MySQL connection failed: {e}")))?;
32
33 Ok(Self { config, pool })
34 }
35
36 async fn insert_json(&self, records: &[Value], column: &str) -> Result<usize, FaucetError> {
39 if records.is_empty() {
40 return Ok(0);
41 }
42
43 let placeholders: Vec<&str> = records.iter().map(|_| "(?)").collect();
45 let insert_sql = format!(
46 "INSERT INTO {} ({}) VALUES {}",
47 quote_ident_mysql(&self.config.table_name),
48 quote_ident_mysql(column),
49 placeholders.join(", ")
50 );
51
52 let mut q = sqlx::query(&insert_sql);
53 for record in records {
54 let json_str = serde_json::to_string(record)
55 .map_err(|e| FaucetError::Sink(format!("failed to serialize record: {e}")))?;
56 q = q.bind(json_str);
57 }
58
59 q.execute(&self.pool)
60 .await
61 .map_err(|e| FaucetError::Sink(format!("MySQL insert failed: {e}")))?;
62
63 Ok(records.len())
64 }
65
66 async fn insert_auto_map(&self, records: &[Value]) -> Result<usize, FaucetError> {
71 if records.is_empty() {
72 return Ok(0);
73 }
74
75 let columns: Vec<String> = sqlx::query(
77 "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA = DATABASE() ORDER BY ORDINAL_POSITION"
78 )
79 .bind(&self.config.table_name)
80 .fetch_all(&self.pool)
81 .await
82 .map_err(|e| FaucetError::Sink(format!("failed to query table columns: {e}")))?
83 .iter()
84 .map(|row| row.get::<String, _>("COLUMN_NAME"))
85 .collect();
86
87 if columns.is_empty() {
88 return Err(FaucetError::Sink(format!(
89 "table '{}' has no columns or does not exist",
90 self.config.table_name
91 )));
92 }
93
94 let mut matched_rows: Vec<Vec<(&String, &Value)>> = Vec::with_capacity(records.len());
101 let mut used: std::collections::HashSet<&str> = std::collections::HashSet::new();
102
103 for record in records {
104 let obj = record
105 .as_object()
106 .ok_or_else(|| FaucetError::Sink("AutoMap requires JSON object records".into()))?;
107
108 let matching: Vec<(&String, &Value)> = columns
109 .iter()
110 .filter_map(|col| obj.get(col).map(|v| (col, v)))
111 .collect();
112
113 if matching.is_empty() {
114 tracing::warn!(
115 record_keys = ?obj.keys().collect::<Vec<_>>(),
116 table_columns = ?columns,
117 "record has no keys matching table columns, skipping"
118 );
119 continue;
120 }
121
122 for (c, _) in &matching {
123 used.insert(c.as_str());
124 }
125 matched_rows.push(matching);
126 }
127
128 if matched_rows.is_empty() {
129 return Ok(0);
130 }
131
132 let insert_columns: Vec<String> = columns
134 .iter()
135 .filter(|c| used.contains(c.as_str()))
136 .cloned()
137 .collect();
138
139 let num_cols = insert_columns.len();
140 let num_rows = matched_rows.len();
141 let col_names: Vec<String> = insert_columns
142 .iter()
143 .map(|c| quote_ident_mysql(c))
144 .collect();
145
146 const MAX_MYSQL_PARAMS: usize = 65535;
152 let max_rows_per_insert = (MAX_MYSQL_PARAMS / num_cols).max(1);
153
154 for sub in matched_rows.chunks(max_rows_per_insert) {
155 let row_placeholder = format!("({})", vec!["?"; num_cols].join(", "));
157 let value_tuples: Vec<&str> =
158 (0..sub.len()).map(|_| row_placeholder.as_str()).collect();
159
160 let query = format!(
161 "INSERT INTO {} ({}) VALUES {}",
162 quote_ident_mysql(&self.config.table_name),
163 col_names.join(", "),
164 value_tuples.join(", ")
165 );
166
167 let mut q = sqlx::query(&query);
168 for matched in sub {
169 for col in &insert_columns {
170 let val = matched.iter().find(|(c, _)| *c == col).map(|(_, v)| *v);
171 q = match val {
176 None | Some(Value::Null) => q.bind(None::<String>),
177 Some(Value::Bool(b)) => q.bind(*b),
178 Some(Value::Number(n)) => {
179 if let Some(i) = n.as_i64() {
180 q.bind(i)
181 } else if let Some(f) = n.as_f64() {
182 q.bind(f)
183 } else {
184 q.bind(n.to_string())
186 }
187 }
188 Some(Value::String(s)) => q.bind(s.clone()),
189 Some(v) => q.bind(v.to_string()),
192 };
193 }
194 }
195
196 q.execute(&self.pool)
197 .await
198 .map_err(|e| FaucetError::Sink(format!("MySQL insert failed: {e}")))?;
199 }
200
201 Ok(num_rows)
202 }
203}
204
205#[async_trait]
206impl faucet_core::Sink for MysqlSink {
207 fn config_schema(&self) -> serde_json::Value {
208 serde_json::to_value(faucet_core::schema_for!(MysqlSinkConfig))
209 .expect("schema serialization")
210 }
211
212 async fn check(
218 &self,
219 ctx: &faucet_core::check::CheckContext,
220 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
221 use faucet_core::check::{CheckReport, Probe};
222
223 let started = std::time::Instant::now();
224 let probe =
225 match tokio::time::timeout(ctx.timeout, sqlx::query("SELECT 1").execute(&self.pool))
226 .await
227 {
228 Ok(Ok(_)) => Probe::pass("auth", started.elapsed()),
229 Ok(Err(e)) => Probe::fail_hint(
230 "auth",
231 started.elapsed(),
232 e.to_string(),
233 "check connection_url / credentials / that the database is reachable",
234 ),
235 Err(_) => Probe::fail_hint(
236 "auth",
237 started.elapsed(),
238 "timed out",
239 "check connection_url / credentials / that the database is reachable",
240 ),
241 };
242 Ok(CheckReport::single(probe))
243 }
244
245 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
254 if records.is_empty() {
255 return Ok(0);
256 }
257
258 let chunks: Vec<&[Value]> = if self.config.batch_size == 0 {
259 vec![records]
263 } else {
264 records.chunks(self.config.batch_size).collect()
265 };
266
267 let mut total = 0;
268 for chunk in chunks {
269 total += match &self.config.column_mapping {
270 MysqlColumnMapping::Json { column } => self.insert_json(chunk, column).await?,
271 MysqlColumnMapping::AutoMap => self.insert_auto_map(chunk).await?,
272 };
273 }
274
275 tracing::info!(
276 table = %self.config.table_name,
277 rows = total,
278 "MySQL write complete"
279 );
280 Ok(total)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn quote_ident_mysql_simple() {
290 assert_eq!(quote_ident_mysql("my_table"), "`my_table`");
291 }
292
293 #[test]
294 fn quote_ident_mysql_with_backtick() {
295 assert_eq!(quote_ident_mysql("has`tick"), "`has``tick`");
296 }
297
298 #[test]
299 fn quote_ident_mysql_empty() {
300 assert_eq!(quote_ident_mysql(""), "``");
301 }
302
303 #[test]
304 fn quote_ident_mysql_special_chars() {
305 assert_eq!(quote_ident_mysql("table; DROP"), "`table; DROP`");
306 }
307}