use faucet_common_mssql::MssqlConnectionConfig;
use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError, validate_batch_size};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
fn default_max_connections() -> u32 {
10
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
fn default_statement_timeout_secs() -> u64 {
300
}
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum MssqlReplication {
#[default]
Full,
Incremental {
column: String,
initial_value: Value,
},
}
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct MssqlSourceConfig {
#[serde(flatten)]
pub connection: MssqlConnectionConfig,
pub query: String,
#[serde(default)]
pub params: Vec<Value>,
#[serde(default = "default_max_connections")]
pub max_connections: u32,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[serde(default = "default_statement_timeout_secs")]
pub statement_timeout_secs: u64,
#[serde(default)]
pub replication: MssqlReplication,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_key: Option<String>,
}
impl std::fmt::Debug for MssqlSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MssqlSourceConfig")
.field("connection", &"***")
.field("query", &self.query)
.field("params", &self.params)
.field("max_connections", &self.max_connections)
.field("batch_size", &self.batch_size)
.field("statement_timeout_secs", &self.statement_timeout_secs)
.field("replication", &self.replication)
.field("state_key", &self.state_key)
.finish()
}
}
impl MssqlSourceConfig {
pub fn new(connection_url: impl Into<String>, query: impl Into<String>) -> Self {
Self {
connection: MssqlConnectionConfig {
connection_url: Some(connection_url.into()),
..Default::default()
},
query: query.into(),
params: Vec::new(),
max_connections: default_max_connections(),
batch_size: default_batch_size(),
statement_timeout_secs: default_statement_timeout_secs(),
replication: MssqlReplication::Full,
state_key: None,
}
}
pub fn validate(&self) -> Result<(), FaucetError> {
self.connection.validate()?;
validate_batch_size(self.batch_size)?;
if let MssqlReplication::Incremental { column, .. } = &self.replication
&& column.trim().is_empty()
{
return Err(FaucetError::Config(
"MSSQL incremental replication requires a non-empty `column`".into(),
));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn replication_full_is_default_and_round_trips() {
let r: MssqlReplication = serde_json::from_value(json!({"type": "full"})).unwrap();
assert_eq!(r, MssqlReplication::Full);
assert_eq!(MssqlReplication::default(), MssqlReplication::Full);
}
#[test]
fn replication_incremental_parses_column_and_initial_value() {
let r: MssqlReplication = serde_json::from_value(json!({
"type": "incremental",
"column": "updated_at",
"initial_value": "1970-01-01T00:00:00Z"
}))
.unwrap();
assert_eq!(
r,
MssqlReplication::Incremental {
column: "updated_at".into(),
initial_value: json!("1970-01-01T00:00:00Z"),
}
);
}
#[test]
fn config_flattens_connection_fields() {
let cfg: MssqlSourceConfig = serde_json::from_value(json!({
"connection_url": "mssql://sa:pw@host:1433/db",
"query": "SELECT 1",
}))
.unwrap();
assert_eq!(
cfg.connection.connection_url.as_deref(),
Some("mssql://sa:pw@host:1433/db")
);
assert_eq!(cfg.batch_size, DEFAULT_BATCH_SIZE);
assert_eq!(cfg.max_connections, 10);
assert_eq!(cfg.statement_timeout_secs, 300);
}
#[test]
fn validate_rejects_incremental_without_column() {
let cfg = MssqlSourceConfig {
replication: MssqlReplication::Incremental {
column: " ".into(),
initial_value: json!(0),
},
..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
};
assert!(cfg.validate().is_err());
}
#[test]
fn validate_rejects_bad_batch_size() {
let cfg = MssqlSourceConfig {
batch_size: faucet_core::MAX_BATCH_SIZE + 1,
..MssqlSourceConfig::new("mssql://sa:pw@h/db", "SELECT 1")
};
assert!(cfg.validate().is_err());
}
#[test]
fn debug_masks_connection() {
let cfg = MssqlSourceConfig::new("mssql://sa:secret@h/db", "SELECT 1");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("***"));
assert!(!dbg.contains("secret"));
}
}