Skip to main content

faucet_sink_mssql/
config.rs

1//! Configuration for the MSSQL sink.
2
3use faucet_common_mssql::MssqlConnectionConfig;
4use faucet_core::{FaucetError, validate_batch_size};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8fn default_batch_size() -> usize {
9    500
10}
11fn default_max_connections() -> u32 {
12    5
13}
14fn default_statement_timeout_secs() -> u64 {
15    300
16}
17fn default_true() -> bool {
18    true
19}
20
21/// What to do with record keys that don't match a table column (`auto_columns`).
22#[derive(Clone, Copy, Debug, Serialize, Deserialize, JsonSchema, Default, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum OnUnknownField {
25    /// Log a one-shot warning and drop the unknown keys (default).
26    #[default]
27    Warn,
28    /// Silently drop the unknown keys.
29    Drop,
30    /// Fail the write with [`FaucetError::Sink`].
31    Error,
32}
33
34/// How records map onto table columns.
35///
36/// Serializes as `{ type: json_column, column: "data" }` (default) or
37/// `{ type: auto_columns, on_unknown_field: warn }`.
38#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum MssqlColumnMapping {
41    /// Map top-level JSON keys to same-named table columns. IDENTITY columns are
42    /// skipped (the server generates them).
43    AutoColumns {
44        #[serde(default)]
45        on_unknown_field: OnUnknownField,
46    },
47    /// Serialize each record to a JSON string inserted into a single
48    /// `NVARCHAR(MAX)` (or native `JSON`) column.
49    JsonColumn { column: String },
50}
51
52impl Default for MssqlColumnMapping {
53    fn default() -> Self {
54        Self::JsonColumn {
55            column: "data".into(),
56        }
57    }
58}
59
60/// Configuration for [`MssqlSink`](crate::MssqlSink).
61#[derive(Clone, Serialize, Deserialize, JsonSchema)]
62pub struct MssqlSinkConfig {
63    /// Connection + TLS settings (`connection_url` or `connection_string`).
64    #[serde(flatten)]
65    pub connection: MssqlConnectionConfig,
66    /// Target table, optionally schema-qualified (e.g. `dbo.events`).
67    pub table: String,
68    /// How records map onto columns. Defaults to a single `data` JSON column.
69    #[serde(default)]
70    pub column_mapping: MssqlColumnMapping,
71    /// Rows per multi-row `INSERT`. Auto-split further so `rows * columns` stays
72    /// within MSSQL's 2100-parameter limit. `0` sends the whole page (still
73    /// param-split). Defaults to 500.
74    #[serde(default = "default_batch_size")]
75    pub batch_size: usize,
76    /// Maximum pooled connections. Defaults to 5.
77    #[serde(default = "default_max_connections")]
78    pub max_connections: u32,
79    /// Wrap each batch's INSERTs in `BEGIN TRAN` / `COMMIT TRAN`. Defaults to true.
80    #[serde(default = "default_true")]
81    pub transaction_per_batch: bool,
82    /// On a batch failure, retry row-by-row to isolate the offender so good rows
83    /// still land and only the bad row is DLQ-routed. When false, one bad row
84    /// fails the whole batch (fewer round-trips). Defaults to true.
85    #[serde(default = "default_true")]
86    pub isolate_row_failures: bool,
87    /// Per-statement timeout in seconds (`0` disables). Defaults to 300.
88    #[serde(default = "default_statement_timeout_secs")]
89    pub statement_timeout_secs: u64,
90    /// In `json_column` mode only, create the table if absent as
91    /// `(id BIGINT IDENTITY PRIMARY KEY, <column> NVARCHAR(MAX))`. Rejected with
92    /// `auto_columns` (schema inference is unsafe for MSSQL types). Defaults to false.
93    #[serde(default)]
94    pub create_table: bool,
95}
96
97impl std::fmt::Debug for MssqlSinkConfig {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("MssqlSinkConfig")
100            .field("connection", &"***")
101            .field("table", &self.table)
102            .field("column_mapping", &self.column_mapping)
103            .field("batch_size", &self.batch_size)
104            .field("max_connections", &self.max_connections)
105            .field("transaction_per_batch", &self.transaction_per_batch)
106            .field("isolate_row_failures", &self.isolate_row_failures)
107            .field("statement_timeout_secs", &self.statement_timeout_secs)
108            .field("create_table", &self.create_table)
109            .finish()
110    }
111}
112
113impl MssqlSinkConfig {
114    /// Build a config from a connection URL and table, with defaults elsewhere.
115    pub fn new(connection_url: impl Into<String>, table: impl Into<String>) -> Self {
116        Self {
117            connection: MssqlConnectionConfig {
118                connection_url: Some(connection_url.into()),
119                ..Default::default()
120            },
121            table: table.into(),
122            column_mapping: MssqlColumnMapping::default(),
123            batch_size: default_batch_size(),
124            max_connections: default_max_connections(),
125            transaction_per_batch: true,
126            isolate_row_failures: true,
127            statement_timeout_secs: default_statement_timeout_secs(),
128            create_table: false,
129        }
130    }
131
132    /// Validate connection source, batch size, table, and mode combination.
133    pub fn validate(&self) -> Result<(), FaucetError> {
134        self.connection.validate()?;
135        validate_batch_size(self.batch_size)?;
136        if self.table.trim().is_empty() {
137            return Err(FaucetError::Config("MSSQL sink requires a `table`".into()));
138        }
139        if self.create_table
140            && matches!(self.column_mapping, MssqlColumnMapping::AutoColumns { .. })
141        {
142            return Err(FaucetError::Config(
143                "MSSQL sink `create_table` is only supported with `json_column` mode \
144                 (schema inference for auto_columns is unsafe — create the table first)"
145                    .into(),
146            ));
147        }
148        Ok(())
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use serde_json::json;
156
157    #[test]
158    fn json_column_is_default() {
159        assert_eq!(
160            MssqlColumnMapping::default(),
161            MssqlColumnMapping::JsonColumn {
162                column: "data".into()
163            }
164        );
165    }
166
167    #[test]
168    fn column_mapping_round_trips() {
169        let auto: MssqlColumnMapping =
170            serde_json::from_value(json!({"type": "auto_columns", "on_unknown_field": "error"}))
171                .unwrap();
172        assert_eq!(
173            auto,
174            MssqlColumnMapping::AutoColumns {
175                on_unknown_field: OnUnknownField::Error
176            }
177        );
178
179        let jc: MssqlColumnMapping =
180            serde_json::from_value(json!({"type": "json_column", "column": "payload"})).unwrap();
181        assert_eq!(
182            jc,
183            MssqlColumnMapping::JsonColumn {
184                column: "payload".into()
185            }
186        );
187    }
188
189    #[test]
190    fn auto_columns_defaults_unknown_field_to_warn() {
191        let auto: MssqlColumnMapping =
192            serde_json::from_value(json!({"type": "auto_columns"})).unwrap();
193        assert_eq!(
194            auto,
195            MssqlColumnMapping::AutoColumns {
196                on_unknown_field: OnUnknownField::Warn
197            }
198        );
199    }
200
201    #[test]
202    fn config_defaults() {
203        let cfg: MssqlSinkConfig = serde_json::from_value(json!({
204            "connection_url": "mssql://sa:pw@h/db",
205            "table": "dbo.events",
206        }))
207        .unwrap();
208        assert_eq!(cfg.batch_size, 500);
209        assert_eq!(cfg.max_connections, 5);
210        assert!(cfg.transaction_per_batch);
211        assert!(cfg.isolate_row_failures);
212        assert_eq!(cfg.statement_timeout_secs, 300);
213        assert!(!cfg.create_table);
214    }
215
216    #[test]
217    fn validate_rejects_auto_columns_with_create_table() {
218        let cfg = MssqlSinkConfig {
219            column_mapping: MssqlColumnMapping::AutoColumns {
220                on_unknown_field: OnUnknownField::Warn,
221            },
222            create_table: true,
223            ..MssqlSinkConfig::new("mssql://sa:pw@h/db", "dbo.events")
224        };
225        assert!(cfg.validate().is_err());
226    }
227
228    #[test]
229    fn validate_rejects_empty_table() {
230        let cfg = MssqlSinkConfig::new("mssql://sa:pw@h/db", "  ");
231        assert!(cfg.validate().is_err());
232    }
233
234    #[test]
235    fn debug_masks_connection() {
236        let cfg = MssqlSinkConfig::new("mssql://sa:secret@h/db", "dbo.t");
237        let dbg = format!("{cfg:?}");
238        assert!(dbg.contains("***"));
239        assert!(!dbg.contains("secret"));
240    }
241}