faucet_source_postgres_cdc/
config.rs1use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8fn default_true() -> bool {
9 true
10}
11fn default_proto_version() -> u32 {
12 1
13}
14fn default_idle_timeout() -> Duration {
15 Duration::from_secs(30)
16}
17fn default_status_update_interval() -> Duration {
18 Duration::from_secs(10)
19}
20fn default_tcp_keepalive() -> Duration {
21 Duration::from_secs(60)
22}
23fn default_batch_size() -> usize {
24 DEFAULT_BATCH_SIZE
25}
26fn default_slot_acquire_retries() -> u32 {
27 10
28}
29
30#[derive(Clone, Serialize, Deserialize, JsonSchema)]
32pub struct PostgresCdcSourceConfig {
33 pub connection_url: String,
37
38 pub slot_name: String,
41
42 pub publication_name: String,
46
47 #[serde(default = "default_true")]
50 pub create_slot_if_missing: bool,
51
52 #[serde(default)]
63 pub slot_type: SlotType,
64
65 #[serde(default)]
70 pub tls: CdcTls,
71
72 #[serde(default)]
77 pub start_lsn: Option<String>,
78
79 #[serde(default = "default_proto_version")]
83 pub proto_version: u32,
84
85 #[serde(
88 default = "default_idle_timeout",
89 with = "faucet_core::config::duration_secs"
90 )]
91 #[schemars(with = "u64")]
92 pub idle_timeout: Duration,
93
94 #[serde(default)]
104 pub max_messages: Option<usize>,
105
106 #[serde(default)]
124 pub max_staged_records: Option<usize>,
125
126 #[serde(
130 default = "default_status_update_interval",
131 with = "faucet_core::config::duration_secs"
132 )]
133 #[schemars(with = "u64")]
134 pub status_update_interval: Duration,
135
136 #[serde(
138 default = "default_tcp_keepalive",
139 with = "faucet_core::config::duration_secs"
140 )]
141 #[schemars(with = "u64")]
142 pub tcp_keepalive: Duration,
143
144 #[serde(default = "default_batch_size")]
159 pub batch_size: usize,
160
161 #[serde(default = "default_slot_acquire_retries")]
170 pub slot_acquire_retries: u32,
171}
172
173#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
175#[serde(rename_all = "snake_case")]
176pub enum SlotType {
177 #[default]
179 Permanent,
180 Temporary,
182}
183
184#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
186#[serde(tag = "mode", rename_all = "snake_case")]
187pub enum CdcTls {
188 #[default]
190 Disable,
191 Require,
193 VerifyCa {
196 #[serde(default, skip_serializing_if = "Option::is_none")]
197 ca_path: Option<String>,
198 },
199 VerifyFull {
201 #[serde(default, skip_serializing_if = "Option::is_none")]
202 ca_path: Option<String>,
203 },
204}
205
206impl std::fmt::Debug for PostgresCdcSourceConfig {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.debug_struct("PostgresCdcSourceConfig")
209 .field("connection_url", &"***")
210 .field("slot_name", &self.slot_name)
211 .field("publication_name", &self.publication_name)
212 .field("create_slot_if_missing", &self.create_slot_if_missing)
213 .field("slot_type", &self.slot_type)
214 .field("tls", &self.tls)
215 .field("start_lsn", &self.start_lsn)
216 .field("proto_version", &self.proto_version)
217 .field("idle_timeout", &self.idle_timeout)
218 .field("max_messages", &self.max_messages)
219 .field("max_staged_records", &self.max_staged_records)
220 .field("status_update_interval", &self.status_update_interval)
221 .field("tcp_keepalive", &self.tcp_keepalive)
222 .field("batch_size", &self.batch_size)
223 .field("slot_acquire_retries", &self.slot_acquire_retries)
224 .finish()
225 }
226}
227
228impl PostgresCdcSourceConfig {
229 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
237 self.batch_size = batch_size;
238 self
239 }
240
241 pub fn validate(&self) -> Result<(), FaucetError> {
243 if self.connection_url.trim().is_empty() {
244 return Err(FaucetError::Config(
245 "postgres-cdc: connection_url must not be empty".into(),
246 ));
247 }
248 validate_slot_name(&self.slot_name)?;
249 if self.publication_name.is_empty() {
250 return Err(FaucetError::Config(
251 "postgres-cdc: publication_name must not be empty".into(),
252 ));
253 }
254 if self.proto_version != 1 {
255 return Err(FaucetError::Config(format!(
256 "postgres-cdc: proto_version must be 1 (v2 streaming-transaction \
257 support is not yet available via pgwire-replication), got {}",
258 self.proto_version
259 )));
260 }
261 if self.idle_timeout.is_zero() {
262 return Err(FaucetError::Config(
263 "postgres-cdc: idle_timeout must be > 0".into(),
264 ));
265 }
266 if self.status_update_interval >= self.idle_timeout {
267 return Err(FaucetError::Config(format!(
268 "postgres-cdc: status_update_interval ({}s) must be \
269 strictly less than idle_timeout ({}s)",
270 self.status_update_interval.as_secs(),
271 self.idle_timeout.as_secs()
272 )));
273 }
274 Ok(())
275 }
276}
277
278fn validate_slot_name(name: &str) -> Result<(), FaucetError> {
279 if name.is_empty() {
280 return Err(FaucetError::Config(
281 "postgres-cdc: slot_name must not be empty".into(),
282 ));
283 }
284 if name.len() > 63 {
285 return Err(FaucetError::Config(format!(
286 "postgres-cdc: slot_name '{name}' exceeds Postgres' 63-char limit"
287 )));
288 }
289 if !name
290 .chars()
291 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
292 {
293 return Err(FaucetError::Config(format!(
294 "postgres-cdc: slot_name '{name}' must contain only \
295 [a-z0-9_]"
296 )));
297 }
298 Ok(())
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 fn minimal() -> PostgresCdcSourceConfig {
306 PostgresCdcSourceConfig {
307 connection_url: "postgres://u:p@localhost/db".into(),
308 slot_name: "faucet_slot".into(),
309 publication_name: "faucet_pub".into(),
310 create_slot_if_missing: true,
311 slot_type: SlotType::Permanent,
312 tls: CdcTls::Disable,
313 start_lsn: None,
314 proto_version: 1,
315 idle_timeout: std::time::Duration::from_secs(30),
316 max_messages: None,
317 max_staged_records: None,
318 status_update_interval: std::time::Duration::from_secs(10),
319 tcp_keepalive: std::time::Duration::from_secs(60),
320 batch_size: DEFAULT_BATCH_SIZE,
321 slot_acquire_retries: default_slot_acquire_retries(),
322 }
323 }
324
325 #[test]
326 fn defaults_via_serde() {
327 let value: PostgresCdcSourceConfig = serde_json::from_value(serde_json::json!({
328 "connection_url": "postgres://u:p@localhost/db",
329 "slot_name": "faucet_slot",
330 "publication_name": "faucet_pub",
331 }))
332 .unwrap();
333 assert!(value.create_slot_if_missing);
334 assert_eq!(value.proto_version, 1);
335 assert_eq!(value.idle_timeout.as_secs(), 30);
336 assert_eq!(value.status_update_interval.as_secs(), 10);
337 assert_eq!(value.tcp_keepalive.as_secs(), 60);
338 assert!(value.start_lsn.is_none());
339 assert!(value.max_messages.is_none());
340 assert_eq!(value.batch_size, DEFAULT_BATCH_SIZE);
341 }
342
343 #[test]
344 fn batch_size_defaults_to_default_batch_size() {
345 let c = minimal();
346 assert_eq!(c.batch_size, DEFAULT_BATCH_SIZE);
347 }
348
349 #[test]
350 fn with_batch_size_overrides_default() {
351 let c = minimal().with_batch_size(64);
352 assert_eq!(c.batch_size, 64);
353 }
354
355 #[test]
356 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
357 let c = minimal().with_batch_size(0);
358 assert_eq!(c.batch_size, 0);
359 assert!(faucet_core::validate_batch_size(c.batch_size).is_ok());
360 }
361
362 #[test]
363 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
364 let c = minimal().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
365 assert!(faucet_core::validate_batch_size(c.batch_size).is_err());
366 }
367
368 #[test]
369 fn batch_size_deserializes_from_json() {
370 let v: PostgresCdcSourceConfig = serde_json::from_value(serde_json::json!({
371 "connection_url": "postgres://u:p@localhost/db",
372 "slot_name": "faucet_slot",
373 "publication_name": "faucet_pub",
374 "batch_size": 256,
375 }))
376 .unwrap();
377 assert_eq!(v.batch_size, 256);
378 }
379
380 #[test]
381 fn rejects_empty_slot_name() {
382 let mut c = minimal();
383 c.slot_name = String::new();
384 assert!(c.validate().is_err());
385 }
386
387 #[test]
388 fn rejects_invalid_slot_name_chars() {
389 let mut c = minimal();
390 c.slot_name = "Faucet-Slot".into(); assert!(c.validate().is_err());
392 }
393
394 #[test]
395 fn rejects_slot_name_over_63_chars() {
396 let mut c = minimal();
397 c.slot_name = "a".repeat(64);
398 assert!(c.validate().is_err());
399 }
400
401 #[test]
402 fn rejects_empty_publication_name() {
403 let mut c = minimal();
404 c.publication_name = String::new();
405 assert!(c.validate().is_err());
406 }
407
408 #[test]
409 fn rejects_zero_idle_timeout() {
410 let mut c = minimal();
411 c.idle_timeout = std::time::Duration::from_secs(0);
412 assert!(c.validate().is_err());
413 }
414
415 #[test]
416 fn rejects_status_update_interval_longer_than_idle_timeout() {
417 let mut c = minimal();
419 c.status_update_interval = std::time::Duration::from_secs(60);
420 c.idle_timeout = std::time::Duration::from_secs(30);
421 assert!(c.validate().is_err());
422 }
423
424 #[test]
425 fn rejects_invalid_proto_version() {
426 let mut c = minimal();
428 c.proto_version = 0;
429 assert!(c.validate().is_err());
430 c.proto_version = 2;
431 assert!(c.validate().is_err());
432 c.proto_version = 3;
433 assert!(c.validate().is_err());
434 }
435
436 #[test]
437 fn accepts_proto_version_one() {
438 let mut c = minimal();
439 c.proto_version = 1;
440 assert!(c.validate().is_ok());
441 }
442
443 #[test]
444 fn rejects_empty_connection_url() {
445 let mut c = minimal();
446 c.connection_url = String::new();
447 assert!(c.validate().is_err());
448 }
449
450 #[test]
451 fn rejects_whitespace_connection_url() {
452 let mut c = minimal();
453 c.connection_url = " ".into();
454 assert!(c.validate().is_err());
455 }
456
457 #[test]
458 fn debug_redacts_connection_url() {
459 let cfg = minimal();
460 let dbg = format!("{cfg:?}");
461 assert!(dbg.contains("connection_url: \"***\""));
462 assert!(!dbg.contains("u:p@localhost"));
463 }
464
465 #[test]
466 fn schema_for_config_includes_required_fields() {
467 let schema = schemars::schema_for!(PostgresCdcSourceConfig);
468 let json = serde_json::to_value(&schema).unwrap();
469 let required = json["required"].as_array().expect("required array");
470 let names: Vec<_> = required.iter().filter_map(|v| v.as_str()).collect();
471 assert!(names.contains(&"connection_url"));
472 assert!(names.contains(&"slot_name"));
473 assert!(names.contains(&"publication_name"));
474 }
475}