Skip to main content

faucet_source_postgres_cdc/
config.rs

1//! Configuration for `PostgresCdcSource`.
2
3use faucet_core::{DEFAULT_BATCH_SIZE, FaucetError};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8fn default_true() -> bool {
9    true
10}
11fn default_proto_version() -> u32 {
12    1
13}
14fn default_idle_timeout() -> Duration {
15    Duration::from_secs(30)
16}
17fn default_status_update_interval() -> Duration {
18    Duration::from_secs(10)
19}
20fn default_tcp_keepalive() -> Duration {
21    Duration::from_secs(60)
22}
23fn default_batch_size() -> usize {
24    DEFAULT_BATCH_SIZE
25}
26fn default_slot_acquire_retries() -> u32 {
27    10
28}
29
30/// Configuration for [`PostgresCdcSource`](crate::PostgresCdcSource).
31#[derive(Clone, Serialize, Deserialize, JsonSchema)]
32pub struct PostgresCdcSourceConfig {
33    /// Connection URL pointing at the database whose WAL we want to read.
34    /// The crate internally upgrades the connection to `replication=database`
35    /// — callers do **not** need to add it themselves.
36    pub connection_url: String,
37
38    /// Logical replication slot name. Must match the Postgres naming rules:
39    /// 1–63 chars, lowercase letters / digits / underscores only.
40    pub slot_name: String,
41
42    /// Publication name on the server. Must already exist (faucet does not
43    /// create publications — they're a DBA-level concern that determines
44    /// which tables are replicated).
45    pub publication_name: String,
46
47    /// If the slot does not exist, create it as a logical/`pgoutput` slot
48    /// at connection time. Default: `true`.
49    #[serde(default = "default_true")]
50    pub create_slot_if_missing: bool,
51
52    /// Whether a newly-created slot is `permanent` (survives disconnect) or
53    /// `temporary` (auto-dropped when the replication connection closes).
54    ///
55    /// Default `permanent` (back-compatible). **A permanent slot pins WAL on
56    /// the server until it is consumed or dropped** — an abandoned permanent
57    /// slot fills `pg_wal` and can take the whole instance down. Use
58    /// `temporary` for ephemeral / test runs (note: a temporary slot resets on
59    /// reconnect, so bookmark-based resume across runs requires a permanent
60    /// slot). Drop an unused permanent slot explicitly with
61    /// [`PostgresCdcSource::drop_slot`](crate::PostgresCdcSource::drop_slot).
62    #[serde(default)]
63    pub slot_type: SlotType,
64
65    /// TLS settings for the replication connection. Default `disable`
66    /// (plaintext) for back-compatibility, but credentials and all WAL data
67    /// then travel unencrypted — set `require`/`verify_ca`/`verify_full` in
68    /// production.
69    #[serde(default)]
70    pub tls: CdcTls,
71
72    /// Optional starting LSN override (e.g. `"0/16A4F88"`). Ignored when a
73    /// state-store-managed bookmark is present (that bookmark wins).
74    /// When neither is set, replication starts from the slot's
75    /// `confirmed_flush_lsn`.
76    #[serde(default)]
77    pub start_lsn: Option<String>,
78
79    /// pgoutput protocol version. Only `1` is fully exercised in v1; `2` is
80    /// accepted but streaming-transaction messages (S/E/c/A) are not yet
81    /// decoded. Default: `1`.
82    #[serde(default = "default_proto_version")]
83    pub proto_version: u32,
84
85    /// Maximum time to wait for new replication messages before returning
86    /// the current batch. Default: 30 s.
87    #[serde(
88        default = "default_idle_timeout",
89        with = "faucet_core::config::duration_secs"
90    )]
91    #[schemars(with = "u64")]
92    pub idle_timeout: Duration,
93
94    /// Optional cap on the number of change events drained per fetch call.
95    /// Acts as a safety bound — `idle_timeout` is the primary terminator.
96    ///
97    /// **Note:** the cap is checked **after each COMMIT**, never mid-
98    /// transaction. A single transaction larger than `max_messages` will
99    /// still be emitted atomically (the fetch returns only after that
100    /// transaction's COMMIT and may produce more records than `max_messages`).
101    /// To bound the memory a *single* in-progress transaction can consume,
102    /// use [`max_staged_records`](Self::max_staged_records) instead.
103    #[serde(default)]
104    pub max_messages: Option<usize>,
105
106    /// Maximum number of change records buffered in memory for a *single*
107    /// in-progress transaction before it is aborted.
108    ///
109    /// Logical replication requires a transaction to be buffered until its
110    /// COMMIT so it can be emitted atomically (partial transactions must
111    /// never leak downstream). A single bulk `UPDATE`/`DELETE`/`COPY` of
112    /// millions of rows therefore buffers every decoded row as a
113    /// `serde_json::Value` in RAM, which can OOM the process. This bound is a
114    /// safety valve: when an in-progress transaction's staged record count
115    /// exceeds it, the source aborts with a typed
116    /// [`FaucetError::Source`] rather than
117    /// being OOM-killed.
118    ///
119    /// `None` (the default) means unbounded — atomic delivery of arbitrarily
120    /// large transactions at the cost of unbounded memory. Set a value sized
121    /// to your available memory if you replicate tables subject to large
122    /// bulk writes.
123    #[serde(default)]
124    pub max_staged_records: Option<usize>,
125
126    /// Interval at which Standby Status Update keepalives are sent to the
127    /// server. Must be shorter than `idle_timeout` and well under the
128    /// server's `wal_sender_timeout` (default 60 s). Default: 10 s.
129    #[serde(
130        default = "default_status_update_interval",
131        with = "faucet_core::config::duration_secs"
132    )]
133    #[schemars(with = "u64")]
134    pub status_update_interval: Duration,
135
136    /// TCP keepalive for the replication connection. Default: 60 s.
137    #[serde(
138        default = "default_tcp_keepalive",
139        with = "faucet_core::config::duration_secs"
140    )]
141    #[schemars(with = "u64")]
142    pub tcp_keepalive: Duration,
143
144    /// Advisory page size for
145    /// [`Source::stream_pages`](faucet_core::Source::stream_pages). The CDC
146    /// source emits **one `StreamPage` per committed transaction** so the
147    /// pipeline gets per-transaction durability via its per-page bookmark
148    /// persist. Because transactions are atomic units they are never split
149    /// across pages — a single transaction whose record count exceeds
150    /// `batch_size` still emits as one page. Defaults to
151    /// [`DEFAULT_BATCH_SIZE`].
152    ///
153    /// `batch_size = 0` is the "no batching" sentinel: every committed
154    /// transaction during the run window is accumulated into a single page
155    /// that is emitted at the end with `bookmark = max(commit_lsn)`. This
156    /// negates per-transaction durability and is only useful for tests or
157    /// initial-snapshot style runs.
158    #[serde(default = "default_batch_size")]
159    pub batch_size: usize,
160
161    /// Number of times to retry acquiring the replication slot when the server
162    /// reports it is still **active** (held by a not-yet-released prior
163    /// connection). On a rapid restart — a scheduler or `serve` re-running the
164    /// pipeline before the previous backend has dropped the slot — both the
165    /// pre-stream `pg_replication_slot_advance` and `START_REPLICATION` fail
166    /// with *"replication slot … is active for PID …"*. Each retry waits an
167    /// exponentially increasing backoff (250 ms, doubling, capped at 4 s).
168    /// `0` disables retries (fail fast). Defaults to 10.
169    #[serde(default = "default_slot_acquire_retries")]
170    pub slot_acquire_retries: u32,
171}
172
173/// Lifetime of a newly-created replication slot.
174#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
175#[serde(rename_all = "snake_case")]
176pub enum SlotType {
177    /// Survives disconnect; pins WAL until consumed or dropped. Default.
178    #[default]
179    Permanent,
180    /// Auto-dropped by the server when the replication connection closes.
181    Temporary,
182}
183
184/// TLS configuration for the CDC replication connection.
185#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
186#[serde(tag = "mode", rename_all = "snake_case")]
187pub enum CdcTls {
188    /// No TLS — plaintext (default, back-compatible).
189    #[default]
190    Disable,
191    /// Require TLS but do not verify the server certificate.
192    Require,
193    /// Require TLS and verify the certificate chain against `ca_path` (or the
194    /// system roots when `None`).
195    VerifyCa {
196        #[serde(default, skip_serializing_if = "Option::is_none")]
197        ca_path: Option<String>,
198    },
199    /// Require TLS and verify both the certificate chain and the hostname.
200    VerifyFull {
201        #[serde(default, skip_serializing_if = "Option::is_none")]
202        ca_path: Option<String>,
203    },
204}
205
206impl std::fmt::Debug for PostgresCdcSourceConfig {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.debug_struct("PostgresCdcSourceConfig")
209            .field("connection_url", &"***")
210            .field("slot_name", &self.slot_name)
211            .field("publication_name", &self.publication_name)
212            .field("create_slot_if_missing", &self.create_slot_if_missing)
213            .field("slot_type", &self.slot_type)
214            .field("tls", &self.tls)
215            .field("start_lsn", &self.start_lsn)
216            .field("proto_version", &self.proto_version)
217            .field("idle_timeout", &self.idle_timeout)
218            .field("max_messages", &self.max_messages)
219            .field("max_staged_records", &self.max_staged_records)
220            .field("status_update_interval", &self.status_update_interval)
221            .field("tcp_keepalive", &self.tcp_keepalive)
222            .field("batch_size", &self.batch_size)
223            .field("slot_acquire_retries", &self.slot_acquire_retries)
224            .finish()
225    }
226}
227
228impl PostgresCdcSourceConfig {
229    /// Override the advisory per-page record count emitted by
230    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
231    ///
232    /// Pass `0` to disable per-transaction emission — every transaction in
233    /// the run window will be accumulated into a single trailing page with
234    /// `bookmark = max(commit_lsn)`. Transactions are never split regardless
235    /// of `batch_size`.
236    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
237        self.batch_size = batch_size;
238        self
239    }
240
241    /// Validate fail-fast invariants. Called from `PostgresCdcSource::new`.
242    pub fn validate(&self) -> Result<(), FaucetError> {
243        if self.connection_url.trim().is_empty() {
244            return Err(FaucetError::Config(
245                "postgres-cdc: connection_url must not be empty".into(),
246            ));
247        }
248        validate_slot_name(&self.slot_name)?;
249        if self.publication_name.is_empty() {
250            return Err(FaucetError::Config(
251                "postgres-cdc: publication_name must not be empty".into(),
252            ));
253        }
254        if self.proto_version != 1 {
255            return Err(FaucetError::Config(format!(
256                "postgres-cdc: proto_version must be 1 (v2 streaming-transaction \
257                 support is not yet available via pgwire-replication), got {}",
258                self.proto_version
259            )));
260        }
261        if self.idle_timeout.is_zero() {
262            return Err(FaucetError::Config(
263                "postgres-cdc: idle_timeout must be > 0".into(),
264            ));
265        }
266        if self.status_update_interval >= self.idle_timeout {
267            return Err(FaucetError::Config(format!(
268                "postgres-cdc: status_update_interval ({}s) must be \
269                 strictly less than idle_timeout ({}s)",
270                self.status_update_interval.as_secs(),
271                self.idle_timeout.as_secs()
272            )));
273        }
274        Ok(())
275    }
276}
277
278fn validate_slot_name(name: &str) -> Result<(), FaucetError> {
279    if name.is_empty() {
280        return Err(FaucetError::Config(
281            "postgres-cdc: slot_name must not be empty".into(),
282        ));
283    }
284    if name.len() > 63 {
285        return Err(FaucetError::Config(format!(
286            "postgres-cdc: slot_name '{name}' exceeds Postgres' 63-char limit"
287        )));
288    }
289    if !name
290        .chars()
291        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
292    {
293        return Err(FaucetError::Config(format!(
294            "postgres-cdc: slot_name '{name}' must contain only \
295             [a-z0-9_]"
296        )));
297    }
298    Ok(())
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    fn minimal() -> PostgresCdcSourceConfig {
306        PostgresCdcSourceConfig {
307            connection_url: "postgres://u:p@localhost/db".into(),
308            slot_name: "faucet_slot".into(),
309            publication_name: "faucet_pub".into(),
310            create_slot_if_missing: true,
311            slot_type: SlotType::Permanent,
312            tls: CdcTls::Disable,
313            start_lsn: None,
314            proto_version: 1,
315            idle_timeout: std::time::Duration::from_secs(30),
316            max_messages: None,
317            max_staged_records: None,
318            status_update_interval: std::time::Duration::from_secs(10),
319            tcp_keepalive: std::time::Duration::from_secs(60),
320            batch_size: DEFAULT_BATCH_SIZE,
321            slot_acquire_retries: default_slot_acquire_retries(),
322        }
323    }
324
325    #[test]
326    fn defaults_via_serde() {
327        let value: PostgresCdcSourceConfig = serde_json::from_value(serde_json::json!({
328            "connection_url": "postgres://u:p@localhost/db",
329            "slot_name": "faucet_slot",
330            "publication_name": "faucet_pub",
331        }))
332        .unwrap();
333        assert!(value.create_slot_if_missing);
334        assert_eq!(value.proto_version, 1);
335        assert_eq!(value.idle_timeout.as_secs(), 30);
336        assert_eq!(value.status_update_interval.as_secs(), 10);
337        assert_eq!(value.tcp_keepalive.as_secs(), 60);
338        assert!(value.start_lsn.is_none());
339        assert!(value.max_messages.is_none());
340        assert_eq!(value.batch_size, DEFAULT_BATCH_SIZE);
341    }
342
343    #[test]
344    fn batch_size_defaults_to_default_batch_size() {
345        let c = minimal();
346        assert_eq!(c.batch_size, DEFAULT_BATCH_SIZE);
347    }
348
349    #[test]
350    fn with_batch_size_overrides_default() {
351        let c = minimal().with_batch_size(64);
352        assert_eq!(c.batch_size, 64);
353    }
354
355    #[test]
356    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
357        let c = minimal().with_batch_size(0);
358        assert_eq!(c.batch_size, 0);
359        assert!(faucet_core::validate_batch_size(c.batch_size).is_ok());
360    }
361
362    #[test]
363    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
364        let c = minimal().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
365        assert!(faucet_core::validate_batch_size(c.batch_size).is_err());
366    }
367
368    #[test]
369    fn batch_size_deserializes_from_json() {
370        let v: PostgresCdcSourceConfig = serde_json::from_value(serde_json::json!({
371            "connection_url": "postgres://u:p@localhost/db",
372            "slot_name": "faucet_slot",
373            "publication_name": "faucet_pub",
374            "batch_size": 256,
375        }))
376        .unwrap();
377        assert_eq!(v.batch_size, 256);
378    }
379
380    #[test]
381    fn rejects_empty_slot_name() {
382        let mut c = minimal();
383        c.slot_name = String::new();
384        assert!(c.validate().is_err());
385    }
386
387    #[test]
388    fn rejects_invalid_slot_name_chars() {
389        let mut c = minimal();
390        c.slot_name = "Faucet-Slot".into(); // uppercase + dash both disallowed
391        assert!(c.validate().is_err());
392    }
393
394    #[test]
395    fn rejects_slot_name_over_63_chars() {
396        let mut c = minimal();
397        c.slot_name = "a".repeat(64);
398        assert!(c.validate().is_err());
399    }
400
401    #[test]
402    fn rejects_empty_publication_name() {
403        let mut c = minimal();
404        c.publication_name = String::new();
405        assert!(c.validate().is_err());
406    }
407
408    #[test]
409    fn rejects_zero_idle_timeout() {
410        let mut c = minimal();
411        c.idle_timeout = std::time::Duration::from_secs(0);
412        assert!(c.validate().is_err());
413    }
414
415    #[test]
416    fn rejects_status_update_interval_longer_than_idle_timeout() {
417        // Keepalives must fire before idle_timeout would terminate the loop.
418        let mut c = minimal();
419        c.status_update_interval = std::time::Duration::from_secs(60);
420        c.idle_timeout = std::time::Duration::from_secs(30);
421        assert!(c.validate().is_err());
422    }
423
424    #[test]
425    fn rejects_invalid_proto_version() {
426        // 0, 2, and 3 are all rejected — only 1 is supported.
427        let mut c = minimal();
428        c.proto_version = 0;
429        assert!(c.validate().is_err());
430        c.proto_version = 2;
431        assert!(c.validate().is_err());
432        c.proto_version = 3;
433        assert!(c.validate().is_err());
434    }
435
436    #[test]
437    fn accepts_proto_version_one() {
438        let mut c = minimal();
439        c.proto_version = 1;
440        assert!(c.validate().is_ok());
441    }
442
443    #[test]
444    fn rejects_empty_connection_url() {
445        let mut c = minimal();
446        c.connection_url = String::new();
447        assert!(c.validate().is_err());
448    }
449
450    #[test]
451    fn rejects_whitespace_connection_url() {
452        let mut c = minimal();
453        c.connection_url = "   ".into();
454        assert!(c.validate().is_err());
455    }
456
457    #[test]
458    fn debug_redacts_connection_url() {
459        let cfg = minimal();
460        let dbg = format!("{cfg:?}");
461        assert!(dbg.contains("connection_url: \"***\""));
462        assert!(!dbg.contains("u:p@localhost"));
463    }
464
465    #[test]
466    fn schema_for_config_includes_required_fields() {
467        let schema = schemars::schema_for!(PostgresCdcSourceConfig);
468        let json = serde_json::to_value(&schema).unwrap();
469        let required = json["required"].as_array().expect("required array");
470        let names: Vec<_> = required.iter().filter_map(|v| v.as_str()).collect();
471        assert!(names.contains(&"connection_url"));
472        assert!(names.contains(&"slot_name"));
473        assert!(names.contains(&"publication_name"));
474    }
475}