Skip to main content

faucet_source_mysql/
config.rs

1//! MySQL source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the MySQL query source.
8#[derive(Clone, Serialize, Deserialize, JsonSchema)]
9pub struct MysqlSourceConfig {
10    /// MySQL connection URL (e.g. `mysql://user:pass@host/db`).
11    pub connection_url: String,
12    /// SQL query to execute.
13    pub query: String,
14    /// Maximum number of connections in the pool. Defaults to 10.
15    #[serde(default = "default_max_connections")]
16    pub max_connections: u32,
17    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Rows are
18    /// drained from the sqlx cursor and yielded whenever the buffer reaches
19    /// this size. Defaults to [`DEFAULT_BATCH_SIZE`].
20    ///
21    /// `batch_size = 0` is the "no batching" sentinel: the cursor is fully
22    /// drained and the entire result set is emitted in a single page. Useful
23    /// for small lookup tables or for sinks (e.g. SQL `COPY`, BigQuery load
24    /// jobs) that prefer one large request to many small ones.
25    #[serde(default = "default_batch_size")]
26    pub batch_size: usize,
27}
28
29fn default_max_connections() -> u32 {
30    10
31}
32
33fn default_batch_size() -> usize {
34    DEFAULT_BATCH_SIZE
35}
36
37impl std::fmt::Debug for MysqlSourceConfig {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("MysqlSourceConfig")
40            .field("connection_url", &"***")
41            .field("query", &self.query)
42            .field("max_connections", &self.max_connections)
43            .field("batch_size", &self.batch_size)
44            .finish()
45    }
46}
47
48impl MysqlSourceConfig {
49    /// Create a new config with the required connection URL and query.
50    pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
51        Self {
52            connection_url: connection_url.into(),
53            query: query.into(),
54            max_connections: 10,
55            batch_size: DEFAULT_BATCH_SIZE,
56        }
57    }
58
59    /// Set the maximum number of connections in the pool.
60    pub fn with_max_connections(mut self, max_connections: u32) -> Self {
61        self.max_connections = max_connections;
62        self
63    }
64
65    /// Set the per-page row count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
66    ///
67    /// Pass `0` to opt out of batching — the entire result set is emitted in
68    /// a single [`StreamPage`](faucet_core::StreamPage).
69    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
70        self.batch_size = batch_size;
71        self
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn default_config() {
81        let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT * FROM events");
82        assert_eq!(config.query, "SELECT * FROM events");
83    }
84
85    #[test]
86    fn debug_masks_connection_url() {
87        let config = MysqlSourceConfig::new("mysql://secret:pass@host/db", "SELECT 1");
88        let debug = format!("{config:?}");
89        assert!(debug.contains("***"));
90        assert!(!debug.contains("secret"));
91        assert!(!debug.contains("pass"));
92    }
93
94    #[test]
95    fn batch_size_defaults_to_default_batch_size() {
96        let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1");
97        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
98    }
99
100    #[test]
101    fn with_batch_size_overrides_default() {
102        let config =
103            MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1").with_batch_size(500);
104        assert_eq!(config.batch_size, 500);
105    }
106
107    #[test]
108    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
109        let config =
110            MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1").with_batch_size(0);
111        assert_eq!(config.batch_size, 0);
112        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
113    }
114
115    #[test]
116    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
117        let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1")
118            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
119        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
120    }
121
122    #[test]
123    fn batch_size_deserializes_from_json() {
124        let json = r#"{
125            "connection_url": "mysql://localhost/test",
126            "query": "SELECT 1",
127            "batch_size": 250
128        }"#;
129        let config: MysqlSourceConfig = serde_json::from_str(json).unwrap();
130        assert_eq!(config.batch_size, 250);
131    }
132}