faucet_sink_mssql/
config.rs1use faucet_common_mssql::MssqlConnectionConfig;
4use faucet_core::{FaucetError, validate_batch_size};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8fn default_batch_size() -> usize {
9 500
10}
11fn default_max_connections() -> u32 {
12 5
13}
14fn default_statement_timeout_secs() -> u64 {
15 300
16}
17fn default_true() -> bool {
18 true
19}
20
21#[derive(Clone, Copy, Debug, Serialize, Deserialize, JsonSchema, Default, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum OnUnknownField {
25 #[default]
27 Warn,
28 Drop,
30 Error,
32}
33
34#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum MssqlColumnMapping {
41 AutoColumns {
44 #[serde(default)]
45 on_unknown_field: OnUnknownField,
46 },
47 JsonColumn { column: String },
50}
51
52impl Default for MssqlColumnMapping {
53 fn default() -> Self {
54 Self::JsonColumn {
55 column: "data".into(),
56 }
57 }
58}
59
60#[derive(Clone, Serialize, Deserialize, JsonSchema)]
62pub struct MssqlSinkConfig {
63 #[serde(flatten)]
65 pub connection: MssqlConnectionConfig,
66 pub table: String,
68 #[serde(default)]
70 pub column_mapping: MssqlColumnMapping,
71 #[serde(default = "default_batch_size")]
75 pub batch_size: usize,
76 #[serde(default = "default_max_connections")]
78 pub max_connections: u32,
79 #[serde(default = "default_true")]
81 pub transaction_per_batch: bool,
82 #[serde(default = "default_true")]
86 pub isolate_row_failures: bool,
87 #[serde(default = "default_statement_timeout_secs")]
89 pub statement_timeout_secs: u64,
90 #[serde(default)]
94 pub create_table: bool,
95}
96
97impl std::fmt::Debug for MssqlSinkConfig {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("MssqlSinkConfig")
100 .field("connection", &"***")
101 .field("table", &self.table)
102 .field("column_mapping", &self.column_mapping)
103 .field("batch_size", &self.batch_size)
104 .field("max_connections", &self.max_connections)
105 .field("transaction_per_batch", &self.transaction_per_batch)
106 .field("isolate_row_failures", &self.isolate_row_failures)
107 .field("statement_timeout_secs", &self.statement_timeout_secs)
108 .field("create_table", &self.create_table)
109 .finish()
110 }
111}
112
113impl MssqlSinkConfig {
114 pub fn new(connection_url: impl Into<String>, table: impl Into<String>) -> Self {
116 Self {
117 connection: MssqlConnectionConfig {
118 connection_url: Some(connection_url.into()),
119 ..Default::default()
120 },
121 table: table.into(),
122 column_mapping: MssqlColumnMapping::default(),
123 batch_size: default_batch_size(),
124 max_connections: default_max_connections(),
125 transaction_per_batch: true,
126 isolate_row_failures: true,
127 statement_timeout_secs: default_statement_timeout_secs(),
128 create_table: false,
129 }
130 }
131
132 pub fn validate(&self) -> Result<(), FaucetError> {
134 self.connection.validate()?;
135 validate_batch_size(self.batch_size)?;
136 if self.table.trim().is_empty() {
137 return Err(FaucetError::Config("MSSQL sink requires a `table`".into()));
138 }
139 if self.create_table
140 && matches!(self.column_mapping, MssqlColumnMapping::AutoColumns { .. })
141 {
142 return Err(FaucetError::Config(
143 "MSSQL sink `create_table` is only supported with `json_column` mode \
144 (schema inference for auto_columns is unsafe — create the table first)"
145 .into(),
146 ));
147 }
148 Ok(())
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use serde_json::json;
156
157 #[test]
158 fn json_column_is_default() {
159 assert_eq!(
160 MssqlColumnMapping::default(),
161 MssqlColumnMapping::JsonColumn {
162 column: "data".into()
163 }
164 );
165 }
166
167 #[test]
168 fn column_mapping_round_trips() {
169 let auto: MssqlColumnMapping =
170 serde_json::from_value(json!({"type": "auto_columns", "on_unknown_field": "error"}))
171 .unwrap();
172 assert_eq!(
173 auto,
174 MssqlColumnMapping::AutoColumns {
175 on_unknown_field: OnUnknownField::Error
176 }
177 );
178
179 let jc: MssqlColumnMapping =
180 serde_json::from_value(json!({"type": "json_column", "column": "payload"})).unwrap();
181 assert_eq!(
182 jc,
183 MssqlColumnMapping::JsonColumn {
184 column: "payload".into()
185 }
186 );
187 }
188
189 #[test]
190 fn auto_columns_defaults_unknown_field_to_warn() {
191 let auto: MssqlColumnMapping =
192 serde_json::from_value(json!({"type": "auto_columns"})).unwrap();
193 assert_eq!(
194 auto,
195 MssqlColumnMapping::AutoColumns {
196 on_unknown_field: OnUnknownField::Warn
197 }
198 );
199 }
200
201 #[test]
202 fn config_defaults() {
203 let cfg: MssqlSinkConfig = serde_json::from_value(json!({
204 "connection_url": "mssql://sa:pw@h/db",
205 "table": "dbo.events",
206 }))
207 .unwrap();
208 assert_eq!(cfg.batch_size, 500);
209 assert_eq!(cfg.max_connections, 5);
210 assert!(cfg.transaction_per_batch);
211 assert!(cfg.isolate_row_failures);
212 assert_eq!(cfg.statement_timeout_secs, 300);
213 assert!(!cfg.create_table);
214 }
215
216 #[test]
217 fn validate_rejects_auto_columns_with_create_table() {
218 let cfg = MssqlSinkConfig {
219 column_mapping: MssqlColumnMapping::AutoColumns {
220 on_unknown_field: OnUnknownField::Warn,
221 },
222 create_table: true,
223 ..MssqlSinkConfig::new("mssql://sa:pw@h/db", "dbo.events")
224 };
225 assert!(cfg.validate().is_err());
226 }
227
228 #[test]
229 fn validate_rejects_empty_table() {
230 let cfg = MssqlSinkConfig::new("mssql://sa:pw@h/db", " ");
231 assert!(cfg.validate().is_err());
232 }
233
234 #[test]
235 fn debug_masks_connection() {
236 let cfg = MssqlSinkConfig::new("mssql://sa:secret@h/db", "dbo.t");
237 let dbg = format!("{cfg:?}");
238 assert!(dbg.contains("***"));
239 assert!(!dbg.contains("secret"));
240 }
241}