faucet_source_mssql/
config.rs1use faucet_common_mssql::MssqlConnectionConfig;
4use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError, validate_batch_size};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9fn default_max_connections() -> u32 {
10 10
11}
12fn default_batch_size() -> usize {
13 DEFAULT_BATCH_SIZE
14}
15fn default_statement_timeout_secs() -> u64 {
16 300
17}
18
19#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum MssqlReplication {
26 #[default]
28 Full,
29 Incremental {
37 column: String,
39 initial_value: Value,
41 },
42}
43
44#[derive(Clone, Serialize, Deserialize, JsonSchema)]
46pub struct MssqlSourceConfig {
47 #[serde(flatten)]
49 pub connection: MssqlConnectionConfig,
50 pub query: String,
53 #[serde(default)]
55 pub params: Vec<Value>,
56 #[serde(default = "default_max_connections")]
58 pub max_connections: u32,
59 #[serde(default = "default_batch_size")]
62 pub batch_size: usize,
63 #[serde(default = "default_statement_timeout_secs")]
65 pub statement_timeout_secs: u64,
66 #[serde(default)]
68 pub replication: MssqlReplication,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub state_key: Option<String>,
73}
74
75impl std::fmt::Debug for MssqlSourceConfig {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("MssqlSourceConfig")
78 .field("connection", &"***")
79 .field("query", &self.query)
80 .field("params", &self.params)
81 .field("max_connections", &self.max_connections)
82 .field("batch_size", &self.batch_size)
83 .field("statement_timeout_secs", &self.statement_timeout_secs)
84 .field("replication", &self.replication)
85 .field("state_key", &self.state_key)
86 .finish()
87 }
88}
89
90impl MssqlSourceConfig {
91 pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
93 Self {
94 connection: MssqlConnectionConfig {
95 connection_url: Some(connection_url.into()),
96 ..Default::default()
97 },
98 query: query.into(),
99 params: Vec::new(),
100 max_connections: default_max_connections(),
101 batch_size: default_batch_size(),
102 statement_timeout_secs: default_statement_timeout_secs(),
103 replication: MssqlReplication::Full,
104 state_key: None,
105 }
106 }
107
108 pub fn validate(&self) -> Result<(), FaucetError> {
110 self.connection.validate()?;
111 validate_batch_size(self.batch_size)?;
112 if let MssqlReplication::Incremental { column, .. } = &self.replication
113 && column.trim().is_empty()
114 {
115 return Err(FaucetError::Config(
116 "MSSQL incremental replication requires a non-empty `column`".into(),
117 ));
118 }
119 Ok(())
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use serde_json::json;
127
128 #[test]
129 fn replication_full_is_default_and_round_trips() {
130 let r: MssqlReplication = serde_json::from_value(json!({"type": "full"})).unwrap();
131 assert_eq!(r, MssqlReplication::Full);
132 assert_eq!(MssqlReplication::default(), MssqlReplication::Full);
133 }
134
135 #[test]
136 fn replication_incremental_parses_column_and_initial_value() {
137 let r: MssqlReplication = serde_json::from_value(json!({
138 "type": "incremental",
139 "column": "updated_at",
140 "initial_value": "1970-01-01T00:00:00Z"
141 }))
142 .unwrap();
143 assert_eq!(
144 r,
145 MssqlReplication::Incremental {
146 column: "updated_at".into(),
147 initial_value: json!("1970-01-01T00:00:00Z"),
148 }
149 );
150 }
151
152 #[test]
153 fn config_flattens_connection_fields() {
154 let cfg: MssqlSourceConfig = serde_json::from_value(json!({
155 "connection_url": "mssql://sa:pw@host:1433/db",
156 "query": "SELECT 1",
157 }))
158 .unwrap();
159 assert_eq!(
160 cfg.connection.connection_url.as_deref(),
161 Some("mssql://sa:pw@host:1433/db")
162 );
163 assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
164 assert_eq!(cfg.max_connections, 10);
165 assert_eq!(cfg.statement_timeout_secs, 300);
166 }
167
168 #[test]
169 fn validate_rejects_incremental_without_column() {
170 let cfg = MssqlSourceConfig {
171 replication: MssqlReplication::Incremental {
172 column: " ".into(),
173 initial_value: json!(0),
174 },
175 ..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
176 };
177 assert!(cfg.validate().is_err());
178 }
179
180 #[test]
181 fn validate_rejects_bad_batch_size() {
182 let cfg = MssqlSourceConfig {
183 batch_size: faucet_core::MAX_BATCH_SIZE + 1,
184 ..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
185 };
186 assert!(cfg.validate().is_err());
187 }
188
189 #[test]
190 fn debug_masks_connection() {
191 let cfg = MssqlSourceConfig::new("mssql://sa:secret@h/db", "SELECT 1");
192 let dbg = format!("{cfg:?}");
193 assert!(dbg.contains("***"));
194 assert!(!dbg.contains("secret"));
195 }
196}