Skip to main content

faucet_source_mssql/
config.rs

1//! Configuration for the MSSQL query source.
2
3use faucet_common_mssql::MssqlConnectionConfig;
4use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError, validate_batch_size};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9fn default_max_connections() -> u32 {
10    10
11}
12fn default_batch_size() -> usize {
13    DEFAULT_BATCH_SIZE
14}
15fn default_statement_timeout_secs() -> u64 {
16    300
17}
18
19/// How the source replicates rows across runs.
20///
21/// Serializes as `{ type: full }` or
22/// `{ type: incremental, column: "...", initial_value: ... }`.
23#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
24#[serde(tag = "type", rename_all = "snake_case")]
25pub enum MssqlReplication {
26    /// Every run fetches the full result set (default).
27    #[default]
28    Full,
29    /// Only rows whose `column` is strictly greater than the stored bookmark
30    /// (or `initial_value` on the first run) are emitted.
31    ///
32    /// The bookmark is applied two ways: if the query contains the literal
33    /// token `@bookmark`, it is bound as a parameter so the server filters
34    /// (efficient); the source *also* filters client-side as a correctness
35    /// backstop. The new maximum of `column` is persisted on the final page.
36    Incremental {
37        /// Column whose value is the replication cursor (e.g. `updated_at`).
38        column: String,
39        /// Lower bound used on the first run, before any bookmark is stored.
40        initial_value: Value,
41    },
42}
43
44/// Configuration for [`MssqlSource`](crate::MssqlSource).
45#[derive(Clone, Serialize, Deserialize, JsonSchema)]
46pub struct MssqlSourceConfig {
47    /// Connection + TLS settings (`connection_url` or `connection_string`).
48    #[serde(flatten)]
49    pub connection: MssqlConnectionConfig,
50    /// SQL query to run. Use `@P1`, `@P2`, … for [`params`](Self::params), and
51    /// the literal `@bookmark` token to bind the incremental cursor server-side.
52    pub query: String,
53    /// Positional bind parameters for the query (`@P1`…`@Pn`). Defaults to empty.
54    #[serde(default)]
55    pub params: Vec<Value>,
56    /// Maximum pooled connections. Defaults to 10.
57    #[serde(default = "default_max_connections")]
58    pub max_connections: u32,
59    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). `0` emits the
60    /// whole result set as a single page. Defaults to [`DEFAULT_BATCH_SIZE`].
61    #[serde(default = "default_batch_size")]
62    pub batch_size: usize,
63    /// Per-query timeout in seconds (`0` disables). Defaults to 300.
64    #[serde(default = "default_statement_timeout_secs")]
65    pub statement_timeout_secs: u64,
66    /// Replication mode. Defaults to [`MssqlReplication::Full`].
67    #[serde(default)]
68    pub replication: MssqlReplication,
69    /// Explicit state-store key for the bookmark. When unset, a key is derived
70    /// from the connection host and a query fingerprint.
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub state_key: Option<String>,
73}
74
75impl std::fmt::Debug for MssqlSourceConfig {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("MssqlSourceConfig")
78            .field("connection", &"***")
79            .field("query", &self.query)
80            .field("params", &self.params)
81            .field("max_connections", &self.max_connections)
82            .field("batch_size", &self.batch_size)
83            .field("statement_timeout_secs", &self.statement_timeout_secs)
84            .field("replication", &self.replication)
85            .field("state_key", &self.state_key)
86            .finish()
87    }
88}
89
90impl MssqlSourceConfig {
91    /// Build a config from a connection URL and query, with defaults elsewhere.
92    pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
93        Self {
94            connection: MssqlConnectionConfig {
95                connection_url: Some(connection_url.into()),
96                ..Default::default()
97            },
98            query: query.into(),
99            params: Vec::new(),
100            max_connections: default_max_connections(),
101            batch_size: default_batch_size(),
102            statement_timeout_secs: default_statement_timeout_secs(),
103            replication: MssqlReplication::Full,
104            state_key: None,
105        }
106    }
107
108    /// Validate connection source, batch size, and replication settings.
109    pub fn validate(&self) -> Result<(), FaucetError> {
110        self.connection.validate()?;
111        validate_batch_size(self.batch_size)?;
112        if let MssqlReplication::Incremental { column, .. } = &self.replication
113            && column.trim().is_empty()
114        {
115            return Err(FaucetError::Config(
116                "MSSQL incremental replication requires a non-empty `column`".into(),
117            ));
118        }
119        Ok(())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use serde_json::json;
127
128    #[test]
129    fn replication_full_is_default_and_round_trips() {
130        let r: MssqlReplication = serde_json::from_value(json!({"type": "full"})).unwrap();
131        assert_eq!(r, MssqlReplication::Full);
132        assert_eq!(MssqlReplication::default(), MssqlReplication::Full);
133    }
134
135    #[test]
136    fn replication_incremental_parses_column_and_initial_value() {
137        let r: MssqlReplication = serde_json::from_value(json!({
138            "type": "incremental",
139            "column": "updated_at",
140            "initial_value": "1970-01-01T00:00:00Z"
141        }))
142        .unwrap();
143        assert_eq!(
144            r,
145            MssqlReplication::Incremental {
146                column: "updated_at".into(),
147                initial_value: json!("1970-01-01T00:00:00Z"),
148            }
149        );
150    }
151
152    #[test]
153    fn config_flattens_connection_fields() {
154        let cfg: MssqlSourceConfig = serde_json::from_value(json!({
155            "connection_url": "mssql://sa:pw@host:1433/db",
156            "query": "SELECT 1",
157        }))
158        .unwrap();
159        assert_eq!(
160            cfg.connection.connection_url.as_deref(),
161            Some("mssql://sa:pw@host:1433/db")
162        );
163        assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
164        assert_eq!(cfg.max_connections, 10);
165        assert_eq!(cfg.statement_timeout_secs, 300);
166    }
167
168    #[test]
169    fn validate_rejects_incremental_without_column() {
170        let cfg = MssqlSourceConfig {
171            replication: MssqlReplication::Incremental {
172                column: "  ".into(),
173                initial_value: json!(0),
174            },
175            ..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
176        };
177        assert!(cfg.validate().is_err());
178    }
179
180    #[test]
181    fn validate_rejects_bad_batch_size() {
182        let cfg = MssqlSourceConfig {
183            batch_size: faucet_core::MAX_BATCH_SIZE + 1,
184            ..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
185        };
186        assert!(cfg.validate().is_err());
187    }
188
189    #[test]
190    fn debug_masks_connection() {
191        let cfg = MssqlSourceConfig::new("mssql://sa:secret@h/db", "SELECT 1");
192        let dbg = format!("{cfg:?}");
193        assert!(dbg.contains("***"));
194        assert!(!dbg.contains("secret"));
195    }
196}