faucet_source_mysql/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Clone, Serialize, Deserialize, JsonSchema)]
9pub struct MysqlSourceConfig {
10 pub connection_url: String,
12 pub query: String,
14 #[serde(default = "default_max_connections")]
16 pub max_connections: u32,
17 #[serde(default = "default_batch_size")]
26 pub batch_size: usize,
27}
28
29fn default_max_connections() -> u32 {
30 10
31}
32
33fn default_batch_size() -> usize {
34 DEFAULT_BATCH_SIZE
35}
36
37impl std::fmt::Debug for MysqlSourceConfig {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("MysqlSourceConfig")
40 .field("connection_url", &"***")
41 .field("query", &self.query)
42 .field("max_connections", &self.max_connections)
43 .field("batch_size", &self.batch_size)
44 .finish()
45 }
46}
47
48impl MysqlSourceConfig {
49 pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
51 Self {
52 connection_url: connection_url.into(),
53 query: query.into(),
54 max_connections: 10,
55 batch_size: DEFAULT_BATCH_SIZE,
56 }
57 }
58
59 pub fn with_max_connections(mut self, max_connections: u32) -> Self {
61 self.max_connections = max_connections;
62 self
63 }
64
65 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
70 self.batch_size = batch_size;
71 self
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78
79 #[test]
80 fn default_config() {
81 let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT * FROM events");
82 assert_eq!(config.query, "SELECT * FROM events");
83 }
84
85 #[test]
86 fn debug_masks_connection_url() {
87 let config = MysqlSourceConfig::new("mysql://secret:pass@host/db", "SELECT 1");
88 let debug = format!("{config:?}");
89 assert!(debug.contains("***"));
90 assert!(!debug.contains("secret"));
91 assert!(!debug.contains("pass"));
92 }
93
94 #[test]
95 fn batch_size_defaults_to_default_batch_size() {
96 let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1");
97 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
98 }
99
100 #[test]
101 fn with_batch_size_overrides_default() {
102 let config =
103 MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1").with_batch_size(500);
104 assert_eq!(config.batch_size, 500);
105 }
106
107 #[test]
108 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
109 let config =
110 MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1").with_batch_size(0);
111 assert_eq!(config.batch_size, 0);
112 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
113 }
114
115 #[test]
116 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
117 let config = MysqlSourceConfig::new("mysql://localhost/test", "SELECT 1")
118 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
119 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
120 }
121
122 #[test]
123 fn batch_size_deserializes_from_json() {
124 let json = r#"{
125 "connection_url": "mysql://localhost/test",
126 "query": "SELECT 1",
127 "batch_size": 250
128 }"#;
129 let config: MysqlSourceConfig = serde_json::from_str(json).unwrap();
130 assert_eq!(config.batch_size, 250);
131 }
132}