Skip to main content

faucet_source_mysql/
stream.rs

1//! MySQL source implementation.
2
3use crate::config::MysqlSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use futures::TryStreamExt;
7use serde_json::Value;
8use sqlx::mysql::MySqlPoolOptions;
9use sqlx::{Column, MySqlPool, Row};
10use std::pin::Pin;
11
12/// A source that executes a SQL query against MySQL and returns rows as JSON.
13pub struct MysqlSource {
14    config: MysqlSourceConfig,
15    pool: MySqlPool,
16}
17
18impl MysqlSource {
19    /// Create a new MySQL source. Establishes a connection pool.
20    pub async fn new(config: MysqlSourceConfig) -> Result<Self, FaucetError> {
21        faucet_core::validate_batch_size(config.batch_size)?;
22
23        let pool = MySqlPoolOptions::new()
24            .max_connections(config.max_connections)
25            .connect(&config.connection_url)
26            .await
27            .map_err(|e| FaucetError::Config(format!("MySQL connection failed: {e}")))?;
28
29        Ok(Self { config, pool })
30    }
31}
32
33/// Convert a MySQL row column value to a `serde_json::Value`.
34///
35/// Attempts common types in order of likelihood. Falls back to `Value::Null`
36/// for unsupported or null columns.
37fn mysql_value_to_json(row: &sqlx::mysql::MySqlRow, col_name: &str) -> Value {
38    // Try JSON first
39    if let Ok(v) = row.try_get::<Value, _>(col_name) {
40        return v;
41    }
42
43    // Try common scalar types
44    if let Ok(v) = row.try_get::<String, _>(col_name) {
45        return Value::String(v);
46    }
47    if let Ok(v) = row.try_get::<i64, _>(col_name) {
48        return Value::Number(v.into());
49    }
50    if let Ok(v) = row.try_get::<i32, _>(col_name) {
51        return Value::Number(v.into());
52    }
53    if let Ok(v) = row.try_get::<i16, _>(col_name) {
54        return Value::Number(v.into());
55    }
56    if let Ok(v) = row.try_get::<f64, _>(col_name) {
57        return serde_json::Number::from_f64(v)
58            .map(Value::Number)
59            .unwrap_or(Value::Null);
60    }
61    if let Ok(v) = row.try_get::<f32, _>(col_name) {
62        return serde_json::Number::from_f64(v as f64)
63            .map(Value::Number)
64            .unwrap_or(Value::Null);
65    }
66    if let Ok(v) = row.try_get::<bool, _>(col_name) {
67        return Value::Bool(v);
68    }
69
70    // Richer types that would otherwise silently decode to Null (#78/#43).
71    if let Ok(v) =
72        row.try_get::<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>, _>(col_name)
73    {
74        return Value::String(v.to_rfc3339());
75    }
76    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDateTime, _>(col_name) {
77        return Value::String(v.to_string());
78    }
79    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveDate, _>(col_name) {
80        return Value::String(v.to_string());
81    }
82    if let Ok(v) = row.try_get::<sqlx::types::chrono::NaiveTime, _>(col_name) {
83        return Value::String(v.to_string());
84    }
85    // DECIMAL → string, preserving exact precision.
86    if let Ok(v) = row.try_get::<sqlx::types::BigDecimal, _>(col_name) {
87        return Value::String(v.to_string());
88    }
89    // BLOB / BINARY → base64.
90    if let Ok(v) = row.try_get::<Vec<u8>, _>(col_name) {
91        use base64::Engine as _;
92        return Value::String(base64::engine::general_purpose::STANDARD.encode(v));
93    }
94
95    Value::Null
96}
97
98/// Build the effective SQL query and ordered context-bind values for a given
99/// parent context. Returns the literal query when there is no context.
100fn resolve_query(
101    config: &MysqlSourceConfig,
102    context: &std::collections::HashMap<String, Value>,
103) -> (String, Vec<Value>) {
104    if context.is_empty() {
105        (config.query.clone(), Vec::new())
106    } else {
107        faucet_core::util::substitute_context_bind_params(&config.query, context, 1, |_| {
108            "?".to_string()
109        })
110    }
111}
112
113/// Apply context-derived bind values onto a sqlx query.
114fn bind_params<'q>(
115    mut query: sqlx::query::Query<'q, sqlx::MySql, sqlx::mysql::MySqlArguments>,
116    bind_values: &'q [Value],
117) -> sqlx::query::Query<'q, sqlx::MySql, sqlx::mysql::MySqlArguments> {
118    for value in bind_values {
119        query = match value {
120            Value::String(s) => query.bind(s.clone()),
121            Value::Number(n) if n.is_i64() => query.bind(n.as_i64().unwrap()),
122            Value::Number(n) => query.bind(n.as_f64().unwrap_or(0.0)),
123            Value::Bool(b) => query.bind(*b),
124            Value::Null => query.bind(None::<String>),
125            _ => query.bind(value.to_string()),
126        };
127    }
128    query
129}
130
131/// Convert a single `MySqlRow` into a JSON object whose keys are the row's
132/// column names.
133fn row_to_json(row: &sqlx::mysql::MySqlRow) -> Value {
134    let mut map = serde_json::Map::new();
135    for col in row.columns() {
136        let name = col.name().to_string();
137        let value = mysql_value_to_json(row, &name);
138        map.insert(name, value);
139    }
140    Value::Object(map)
141}
142
143#[async_trait]
144impl faucet_core::Source for MysqlSource {
145    async fn fetch_with_context(
146        &self,
147        context: &std::collections::HashMap<String, serde_json::Value>,
148    ) -> Result<Vec<Value>, FaucetError> {
149        let (query_str, bind_values) = resolve_query(&self.config, context);
150        let query = bind_params(sqlx::query(&query_str), &bind_values);
151
152        let rows = query
153            .fetch_all(&self.pool)
154            .await
155            .map_err(|e| FaucetError::Config(format!("MySQL query failed: {e}")))?;
156
157        let records: Vec<Value> = rows.iter().map(row_to_json).collect();
158        tracing::info!(rows = records.len(), query = %self.config.query, "MySQL source fetch complete");
159        Ok(records)
160    }
161
162    /// Stream rows from the underlying sqlx cursor without buffering the full
163    /// result set. Each emitted [`StreamPage`] holds up to
164    /// [`MysqlSourceConfig::batch_size`] rows.
165    ///
166    /// The trait-level `batch_size` argument is ignored in favour of the
167    /// config field — the config is the user-facing knob the README
168    /// documents, and routing the pipeline-supplied hint through it would
169    /// silently override an explicit config value.
170    ///
171    /// `batch_size = 0` drains the entire cursor into a single page. The
172    /// mysql query source has no incremental-replication mode today, so
173    /// every emitted page carries `bookmark: None`.
174    fn stream_pages<'a>(
175        &'a self,
176        context: &'a std::collections::HashMap<String, Value>,
177        _batch_size: usize,
178    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
179        let batch_size = self.config.batch_size;
180
181        Box::pin(async_stream::try_stream! {
182            let (query_str, bind_values) = resolve_query(&self.config, context);
183            let query = bind_params(sqlx::query(&query_str), &bind_values);
184
185            let mut rows = query.fetch(&self.pool);
186            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
187            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
188            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
189            let mut total = 0usize;
190
191            while let Some(row) = rows
192                .try_next()
193                .await
194                .map_err(|e| FaucetError::Config(format!("MySQL query failed: {e}")))?
195            {
196                buffer.push(row_to_json(&row));
197                if buffer.len() >= chunk {
198                    let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
199                    total += page.len();
200                    yield StreamPage { records: page, bookmark: None };
201                }
202            }
203            if !buffer.is_empty() {
204                total += buffer.len();
205                yield StreamPage { records: buffer, bookmark: None };
206            }
207
208            tracing::info!(
209                rows = total,
210                batch_size,
211                query = %self.config.query,
212                "MySQL source stream complete",
213            );
214        })
215    }
216
217    fn config_schema(&self) -> serde_json::Value {
218        serde_json::to_value(faucet_core::schema_for!(MysqlSourceConfig))
219            .expect("schema serialization")
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[tokio::test]
228    async fn new_rejects_out_of_range_batch_size() {
229        let mut config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1");
230        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
231        match MysqlSource::new(config).await {
232            Err(faucet_core::FaucetError::Config(m)) => {
233                assert!(m.contains("batch_size"), "got: {m}")
234            }
235            _ => panic!("expected a batch_size Config error"),
236        }
237    }
238}