faucet_sink_mysql/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum MysqlColumnMapping {
11 Json { column: String },
14 AutoMap,
17}
18
19impl Default for MysqlColumnMapping {
20 fn default() -> Self {
21 Self::Json {
22 column: "data".into(),
23 }
24 }
25}
26
27#[derive(Clone, Serialize, Deserialize, JsonSchema)]
29pub struct MysqlSinkConfig {
30 pub connection_url: String,
32 pub table_name: String,
34 pub column_mapping: MysqlColumnMapping,
36 #[serde(default = "default_batch_size")]
51 pub batch_size: usize,
52 pub max_connections: u32,
54}
55
56fn default_batch_size() -> usize {
57 DEFAULT_BATCH_SIZE
58}
59
60impl std::fmt::Debug for MysqlSinkConfig {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("MysqlSinkConfig")
63 .field("connection_url", &"***")
64 .field("table_name", &self.table_name)
65 .field("column_mapping", &self.column_mapping)
66 .field("batch_size", &self.batch_size)
67 .field("max_connections", &self.max_connections)
68 .finish()
69 }
70}
71
72impl MysqlSinkConfig {
73 pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
75 Self {
76 connection_url: connection_url.into(),
77 table_name: table_name.into(),
78 column_mapping: MysqlColumnMapping::default(),
79 batch_size: DEFAULT_BATCH_SIZE,
80 max_connections: 5,
81 }
82 }
83
84 pub fn column_mapping(mut self, mapping: MysqlColumnMapping) -> Self {
86 self.column_mapping = mapping;
87 self
88 }
89
90 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96 self.batch_size = batch_size;
97 self
98 }
99
100 pub fn max_connections(mut self, n: u32) -> Self {
102 self.max_connections = n;
103 self
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110
111 #[test]
112 fn default_config() {
113 let config = MysqlSinkConfig::new("mysql://localhost/test", "events");
114 assert_eq!(config.table_name, "events");
115 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
116 assert!(matches!(
117 config.column_mapping,
118 MysqlColumnMapping::Json { ref column } if column == "data"
119 ));
120 }
121
122 #[test]
123 fn builder_methods() {
124 let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
125 .column_mapping(MysqlColumnMapping::AutoMap)
126 .with_batch_size(100);
127 assert_eq!(config.batch_size, 100);
128 assert!(matches!(config.column_mapping, MysqlColumnMapping::AutoMap));
129 }
130
131 #[test]
132 fn with_batch_size_overrides_default() {
133 let config = MysqlSinkConfig::new("mysql://localhost/test", "events").with_batch_size(250);
134 assert_eq!(config.batch_size, 250);
135 }
136
137 #[test]
138 fn json_custom_column() {
139 let config = MysqlSinkConfig::new("mysql://localhost/test", "events").column_mapping(
140 MysqlColumnMapping::Json {
141 column: "payload".into(),
142 },
143 );
144 assert!(matches!(
145 config.column_mapping,
146 MysqlColumnMapping::Json { ref column } if column == "payload"
147 ));
148 }
149
150 #[test]
151 fn debug_masks_connection_url() {
152 let config = MysqlSinkConfig::new("mysql://secret:pass@host/db", "events");
153 let debug = format!("{config:?}");
154 assert!(debug.contains("***"));
155 assert!(!debug.contains("secret"));
156 assert!(!debug.contains("pass"));
157 }
158
159 #[test]
160 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
161 let config = MysqlSinkConfig::new("mysql://localhost/test", "events").with_batch_size(0);
162 assert_eq!(config.batch_size, 0);
163 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
164 }
165
166 #[test]
167 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
168 let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
169 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
170 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
171 }
172
173 #[test]
174 fn batch_size_deserializes_from_json() {
175 let json = r#"{
176 "connection_url": "mysql://localhost/test",
177 "table_name": "events",
178 "column_mapping": {"json": {"column": "data"}},
179 "batch_size": 250,
180 "max_connections": 5
181 }"#;
182 let config: MysqlSinkConfig = serde_json::from_str(json).unwrap();
183 assert_eq!(config.batch_size, 250);
184 }
185
186 #[test]
187 fn batch_size_defaults_when_absent_in_json() {
188 let json = r#"{
189 "connection_url": "mysql://localhost/test",
190 "table_name": "events",
191 "column_mapping": {"json": {"column": "data"}},
192 "max_connections": 5
193 }"#;
194 let config: MysqlSinkConfig = serde_json::from_str(json).unwrap();
195 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
196 }
197
198 #[test]
199 fn with_batch_size_chaining() {
200 let config = MysqlSinkConfig::new("mysql://localhost/test", "events")
201 .with_batch_size(100)
202 .with_batch_size(2_000);
203 assert_eq!(config.batch_size, 2_000);
204 }
205}