faucet_source_mysql/
stream.rs1use crate::config::MysqlSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use futures::TryStreamExt;
7use serde_json::Value;
8use sqlx::mysql::MySqlPoolOptions;
9use sqlx::{Column, MySqlPool, Row};
10use std::pin::Pin;
11
12pub struct MysqlSource {
14 config: MysqlSourceConfig,
15 pool: MySqlPool,
16}
17
18impl MysqlSource {
19 pub async fn new(config: MysqlSourceConfig) -> Result<Self, FaucetError> {
21 faucet_core::validate_batch_size(config.batch_size)?;
22
23 let pool = MySqlPoolOptions::new()
24 .max_connections(config.max_connections)
25 .connect(&config.connection_url)
26 .await
27 .map_err(|e| FaucetError::Config(format!("MySQL connection failed: {e}")))?;
28
29 Ok(Self { config, pool })
30 }
31}
32
33fn mysql_value_to_json(row: &sqlx::mysql::MySqlRow, col_name: &str) -> Value {
38 if let Ok(v) = row.try_get::<Value, _>(col_name) {
40 return v;
41 }
42
43 if let Ok(v) = row.try_get::<String, _>(col_name) {
45 return Value::String(v);
46 }
47 if let Ok(v) = row.try_get::<i64, _>(col_name) {
48 return Value::Number(v.into());
49 }
50 if let Ok(v) = row.try_get::<i32, _>(col_name) {
51 return Value::Number(v.into());
52 }
53 if let Ok(v) = row.try_get::<i16, _>(col_name) {
54 return Value::Number(v.into());
55 }
56 if let Ok(v) = row.try_get::<f64, _>(col_name) {
57 return serde_json::Number::from_f64(v)
58 .map(Value::Number)
59 .unwrap_or(Value::Null);
60 }
61 if let Ok(v) = row.try_get::<f32, _>(col_name) {
62 return serde_json::Number::from_f64(v as f64)
63 .map(Value::Number)
64 .unwrap_or(Value::Null);
65 }
66 if let Ok(v) = row.try_get::<bool, _>(col_name) {
67 return Value::Bool(v);
68 }
69
70 if let Ok(v) =
72 row.try_get::<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>, _>(col_name)
73 {
74 return Value::String(v.to_rfc3339());
75 }
76 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDateTime, _>(col_name) {
77 return Value::String(v.to_string());
78 }
79 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDate, _>(col_name) {
80 return Value::String(v.to_string());
81 }
82 if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveTime, _>(col_name) {
83 return Value::String(v.to_string());
84 }
85 if let Ok(v) = row.try_get::<sqlx::types::BigDecimal, _>(col_name) {
87 return Value::String(v.to_string());
88 }
89 if let Ok(v) = row.try_get::<Vec<u8>, _>(col_name) {
91 use base64::Engine as _;
92 return Value::String(base64::engine::general_purpose::STANDARD.encode(v));
93 }
94
95 Value::Null
96}
97
98fn resolve_query(
101 config: &MysqlSourceConfig,
102 context: &std::collections::HashMap<String, Value>,
103) -> (String, Vec<Value>) {
104 if context.is_empty() {
105 (config.query.clone(), Vec::new())
106 } else {
107 faucet_core::util::substitute_context_bind_params(&config.query, context, 1, |_| {
108 "?".to_string()
109 })
110 }
111}
112
113fn bind_params<'q>(
115 mut query: sqlx::query::Query<'q, sqlx::MySql, sqlx::mysql::MySqlArguments>,
116 bind_values: &'q [Value],
117) -> sqlx::query::Query<'q, sqlx::MySql, sqlx::mysql::MySqlArguments> {
118 for value in bind_values {
119 query = match value {
120 Value::String(s) => query.bind(s.clone()),
121 Value::Number(n) if n.is_i64() => query.bind(n.as_i64().unwrap()),
122 Value::Number(n) => query.bind(n.as_f64().unwrap_or(0.0)),
123 Value::Bool(b) => query.bind(*b),
124 Value::Null => query.bind(None::<String>),
125 _ => query.bind(value.to_string()),
126 };
127 }
128 query
129}
130
131fn row_to_json(row: &sqlx::mysql::MySqlRow) -> Value {
134 let mut map = serde_json::Map::new();
135 for col in row.columns() {
136 let name = col.name().to_string();
137 let value = mysql_value_to_json(row, &name);
138 map.insert(name, value);
139 }
140 Value::Object(map)
141}
142
143#[async_trait]
144impl faucet_core::Source for MysqlSource {
145 async fn fetch_with_context(
146 &self,
147 context: &std::collections::HashMap<String, serde_json::Value>,
148 ) -> Result<Vec<Value>, FaucetError> {
149 let (query_str, bind_values) = resolve_query(&self.config, context);
150 let query = bind_params(sqlx::query(&query_str), &bind_values);
151
152 let rows = query
153 .fetch_all(&self.pool)
154 .await
155 .map_err(|e| FaucetError::Config(format!("MySQL query failed: {e}")))?;
156
157 let records: Vec<Value> = rows.iter().map(row_to_json).collect();
158 tracing::info!(rows = records.len(), query = %self.config.query, "MySQL source fetch complete");
159 Ok(records)
160 }
161
162 fn stream_pages<'a>(
175 &'a self,
176 context: &'a std::collections::HashMap<String, Value>,
177 _batch_size: usize,
178 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
179 let batch_size = self.config.batch_size;
180
181 Box::pin(async_stream::try_stream! {
182 let (query_str, bind_values) = resolve_query(&self.config, context);
183 let query = bind_params(sqlx::query(&query_str), &bind_values);
184
185 let mut rows = query.fetch(&self.pool);
186 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
187 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
188 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
189 let mut total = 0usize;
190
191 while let Some(row) = rows
192 .try_next()
193 .await
194 .map_err(|e| FaucetError::Config(format!("MySQL query failed: {e}")))?
195 {
196 buffer.push(row_to_json(&row));
197 if buffer.len() >= chunk {
198 let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
199 total += page.len();
200 yield StreamPage { records: page, bookmark: None };
201 }
202 }
203 if !buffer.is_empty() {
204 total += buffer.len();
205 yield StreamPage { records: buffer, bookmark: None };
206 }
207
208 tracing::info!(
209 rows = total,
210 batch_size,
211 query = %self.config.query,
212 "MySQL source stream complete",
213 );
214 })
215 }
216
217 fn config_schema(&self) -> serde_json::Value {
218 serde_json::to_value(faucet_core::schema_for!(MysqlSourceConfig))
219 .expect("schema serialization")
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[tokio::test]
228 async fn new_rejects_out_of_range_batch_size() {
229 let mut config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1");
230 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
231 match MysqlSource::new(config).await {
232 Err(faucet_core::FaucetError::Config(m)) => {
233 assert!(m.contains("batch_size"), "got: {m}")
234 }
235 _ => panic!("expected a batch_size Config error"),
236 }
237 }
238}