Skip to main content

faucet_sink_mysql/
config.rs

1//! MySQL sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// How to map JSON records to table columns.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum MysqlColumnMapping {
11    /// Insert each record as a single JSON column. The column name
12    /// defaults to `"data"` but can be overridden.
13    Json { column: String },
14    /// Map top-level JSON keys directly to table columns.
15    /// Only keys that match existing columns are inserted; extra keys are ignored.
16    AutoMap,
17}
18
19impl Default for MysqlColumnMapping {
20    fn default() -> Self {
21        Self::Json {
22            column: "data".into(),
23        }
24    }
25}
26
27/// Configuration for the MySQL sink.
28#[derive(Clone, Serialize, Deserialize, JsonSchema)]
29pub struct MysqlSinkConfig {
30    /// MySQL connection URL (e.g. `mysql://user:pass@host/db`).
31    pub connection_url: String,
32    /// Target table name.
33    pub table_name: String,
34    /// How to map JSON records to columns.
35    pub column_mapping: MysqlColumnMapping,
36    /// Maximum rows per multi-row `INSERT` statement. Defaults to
37    /// [`DEFAULT_BATCH_SIZE`].
38    ///
39    /// When the upstream `StreamPage` carries more records than `batch_size`,
40    /// the sink slices the page into `batch_size`-row chunks and issues one
41    /// multi-row `INSERT INTO ... VALUES (...), (...), ...` statement per
42    /// chunk. When `batch_size = 0`, the entire upstream page is forwarded
43    /// in a single multi-row `INSERT` — useful when the source already
44    /// chunks to a size tuned for MySQL.
45    ///
46    /// `batch_size = 0` is the "no batching" sentinel: the full upstream
47    /// page is forwarded as one `INSERT`, subject to MySQL's
48    /// `max_allowed_packet` limit (default 64MB). Keep the default unless
49    /// the upstream `StreamPage` size is already tuned for MySQL.
50    #[serde(default = "default_batch_size")]
51    pub batch_size: usize,
52    /// Maximum number of connections in the pool. Defaults to 5.
53    pub max_connections: u32,
54}
55
56fn default_batch_size() -> usize {
57    DEFAULT_BATCH_SIZE
58}
59
60impl std::fmt::Debug for MysqlSinkConfig {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("MysqlSinkConfig")
63            .field("connection_url", &"***")
64            .field("table_name", &self.table_name)
65            .field("column_mapping", &self.column_mapping)
66            .field("batch_size", &self.batch_size)
67            .field("max_connections", &self.max_connections)
68            .finish()
69    }
70}
71
72impl MysqlSinkConfig {
73    /// Create a new config with required fields and sensible defaults.
74    pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
75        Self {
76            connection_url: connection_url.into(),
77            table_name: table_name.into(),
78            column_mapping: MysqlColumnMapping::default(),
79            batch_size: DEFAULT_BATCH_SIZE,
80            max_connections: 5,
81        }
82    }
83
84    /// Set the column mapping strategy.
85    pub fn column_mapping(mut self, mapping: MysqlColumnMapping) -> Self {
86        self.column_mapping = mapping;
87        self
88    }
89
90    /// Set the per-statement row count for the multi-row `INSERT`.
91    ///
92    /// Pass `0` to opt out of re-chunking — the sink forwards each upstream
93    /// [`StreamPage`](faucet_core::StreamPage) as a single multi-row
94    /// `INSERT`. MySQL's multi-row insert sweet spot is ~1000 rows.
95    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96        self.batch_size = batch_size;
97        self
98    }
99
100    /// Set the maximum number of connections in the pool.
101    pub fn max_connections(mut self, n: u32) -> Self {
102        self.max_connections = n;
103        self
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn default_config() {
113        let config = MysqlSinkConfig::new("mysql://localhost/test", "events");
114        assert_eq!(config.table_name, "events");
115        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
116        assert!(matches!(
117            config.column_mapping,
118            MysqlColumnMapping::Json { ref column } if column == "data"
119        ));
120    }
121
122    #[test]
123    fn builder_methods() {
124        let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
125            .column_mapping(MysqlColumnMapping::AutoMap)
126            .with_batch_size(100);
127        assert_eq!(config.batch_size, 100);
128        assert!(matches!(config.column_mapping, MysqlColumnMapping::AutoMap));
129    }
130
131    #[test]
132    fn with_batch_size_overrides_default() {
133        let config = MysqlSinkConfig::new("mysql://localhost/test", "events").with_batch_size(250);
134        assert_eq!(config.batch_size, 250);
135    }
136
137    #[test]
138    fn json_custom_column() {
139        let config = MysqlSinkConfig::new("mysql://localhost/test", "events").column_mapping(
140            MysqlColumnMapping::Json {
141                column: "payload".into(),
142            },
143        );
144        assert!(matches!(
145            config.column_mapping,
146            MysqlColumnMapping::Json { ref column } if column == "payload"
147        ));
148    }
149
150    #[test]
151    fn debug_masks_connection_url() {
152        let config = MysqlSinkConfig::new("mysql://secret:pass@host/db", "events");
153        let debug = format!("{config:?}");
154        assert!(debug.contains("***"));
155        assert!(!debug.contains("secret"));
156        assert!(!debug.contains("pass"));
157    }
158
159    #[test]
160    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
161        let config = MysqlSinkConfig::new("mysql://localhost/test", "events").with_batch_size(0);
162        assert_eq!(config.batch_size, 0);
163        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
164    }
165
166    #[test]
167    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
168        let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
169            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
170        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
171    }
172
173    #[test]
174    fn batch_size_deserializes_from_json() {
175        let json = r#"{
176            "connection_url": "mysql://localhost/test",
177            "table_name": "events",
178            "column_mapping": {"json": {"column": "data"}},
179            "batch_size": 250,
180            "max_connections": 5
181        }"#;
182        let config: MysqlSinkConfig = serde_json::from_str(json).unwrap();
183        assert_eq!(config.batch_size, 250);
184    }
185
186    #[test]
187    fn batch_size_defaults_when_absent_in_json() {
188        let json = r#"{
189            "connection_url": "mysql://localhost/test",
190            "table_name": "events",
191            "column_mapping": {"json": {"column": "data"}},
192            "max_connections": 5
193        }"#;
194        let config: MysqlSinkConfig = serde_json::from_str(json).unwrap();
195        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
196    }
197
198    #[test]
199    fn with_batch_size_chaining() {
200        let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
201            .with_batch_size(100)
202            .with_batch_size(2_000);
203        assert_eq!(config.batch_size, 2_000);
204    }
205}