pgwire_replication/
config.rs

1//! Configuration types for PostgreSQL replication connections.
2//!
3//! This module provides configuration structures for establishing replication
4//! connections to PostgreSQL, including TLS settings and replication parameters.
5
6use std::path::PathBuf;
7use std::time::Duration;
8
9use crate::lsn::Lsn;
10
11/// SSL/TLS connection mode.
12///
13/// These modes match PostgreSQL's `sslmode` connection parameter.
14/// See [PostgreSQL SSL Support](https://www.postgresql.org/docs/current/libpq-ssl.html)
15/// for detailed documentation.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SslMode {
18    /// Never use TLS. Connection will fail if server requires TLS.
19    #[default]
20    Disable,
21
22    /// Try TLS first, fall back to unencrypted if server doesn't support it.
23    ///
24    /// **Warning**: Vulnerable to downgrade attacks. Not recommended for production.
25    Prefer,
26
27    /// Require TLS but don't verify the server certificate.
28    ///
29    /// Protects against passive eavesdropping but not active MITM attacks.
30    Require,
31
32    /// Require TLS and verify the server certificate chain against trusted CAs.
33    ///
34    /// Does NOT verify that the certificate hostname matches the connection target.
35    VerifyCa,
36
37    /// Require TLS, verify certificate chain, AND verify hostname matches.
38    ///
39    /// **Recommended for production**. Provides full protection against MITM attacks.
40    VerifyFull,
41}
42
43impl SslMode {
44    /// Returns `true` if this mode requires TLS (won't fall back to plain).
45    #[inline]
46    pub fn requires_tls(&self) -> bool {
47        !matches!(self, SslMode::Disable | SslMode::Prefer)
48    }
49
50    /// Returns `true` if this mode verifies the certificate chain.
51    #[inline]
52    pub fn verifies_certificate(&self) -> bool {
53        matches!(self, SslMode::VerifyCa | SslMode::VerifyFull)
54    }
55
56    /// Returns `true` if this mode verifies the server hostname.
57    #[inline]
58    pub fn verifies_hostname(&self) -> bool {
59        matches!(self, SslMode::VerifyFull)
60    }
61}
62
63/// TLS/SSL configuration for PostgreSQL connections.
64#[derive(Debug, Clone, Default, PartialEq, Eq)]
65pub struct TlsConfig {
66    /// SSL mode controlling connection security level.
67    pub mode: SslMode,
68
69    /// Path to PEM file containing trusted CA certificates.
70    ///
71    /// If `None` and verification is enabled (`VerifyCa`/`VerifyFull`),
72    /// the Mozilla root certificates (webpki-roots) are used.
73    pub ca_pem_path: Option<PathBuf>,
74
75    /// Override SNI hostname sent during TLS handshake.
76    ///
77    /// Useful when:
78    /// - Connecting via IP address but certificate has a DNS name
79    /// - Using a load balancer with different internal/external names
80    ///
81    /// If `None`, the connection `host` is used for SNI.
82    pub sni_hostname: Option<String>,
83
84    /// Path to PEM file containing client certificate chain.
85    ///
86    /// Required for mutual TLS (mTLS) authentication.
87    /// Must be paired with `client_key_pem_path`.
88    pub client_cert_pem_path: Option<PathBuf>,
89
90    /// Path to PEM file containing client private key.
91    ///
92    /// Required for mutual TLS (mTLS) authentication.
93    /// Must be paired with `client_cert_pem_path`.
94    /// Supports PKCS#8, PKCS#1 (RSA), and SEC1 (EC) formats.
95    pub client_key_pem_path: Option<PathBuf>,
96}
97
98impl TlsConfig {
99    /// Create a configuration with TLS disabled.
100    ///
101    /// # Example
102    /// ```
103    /// use pgwire_replication::config::TlsConfig;
104    ///
105    /// let tls = TlsConfig::disabled();
106    /// assert!(!tls.mode.requires_tls());
107    /// ```
108    pub fn disabled() -> Self {
109        Self::default()
110    }
111
112    /// Create a configuration requiring TLS without certificate verification.
113    ///
114    /// **Warning**: This mode is vulnerable to MITM attacks.
115    /// Use `verify_ca()` or `verify_full()` for production.
116    ///
117    /// # Example
118    /// ```
119    /// use pgwire_replication::config::TlsConfig;
120    ///
121    /// let tls = TlsConfig::require();
122    /// assert!(tls.mode.requires_tls());
123    /// assert!(!tls.mode.verifies_certificate());
124    /// ```
125    pub fn require() -> Self {
126        Self {
127            mode: SslMode::Require,
128            ..Default::default()
129        }
130    }
131
132    /// Create a configuration with certificate chain verification.
133    ///
134    /// # Arguments
135    /// * `ca_path` - Path to CA certificate PEM file, or `None` for system roots
136    ///
137    /// # Example
138    /// ```
139    /// use pgwire_replication::config::TlsConfig;
140    ///
141    /// // Using system/Mozilla roots
142    /// let tls = TlsConfig::verify_ca(None);
143    ///
144    /// // Using custom CA
145    /// let tls = TlsConfig::verify_ca(Some("/path/to/ca.pem".into()));
146    /// ```
147    pub fn verify_ca(ca_path: Option<PathBuf>) -> Self {
148        Self {
149            mode: SslMode::VerifyCa,
150            ca_pem_path: ca_path,
151            ..Default::default()
152        }
153    }
154
155    /// Create a configuration with full verification (chain + hostname).
156    ///
157    /// **Recommended for production**.
158    ///
159    /// # Arguments
160    /// * `ca_path` - Path to CA certificate PEM file, or `None` for system roots
161    ///
162    /// # Example
163    /// ```
164    /// use pgwire_replication::config::TlsConfig;
165    ///
166    /// let tls = TlsConfig::verify_full(Some("/etc/ssl/certs/ca.pem".into()));
167    /// assert!(tls.mode.verifies_hostname());
168    /// ```
169    pub fn verify_full(ca_path: Option<PathBuf>) -> Self {
170        Self {
171            mode: SslMode::VerifyFull,
172            ca_pem_path: ca_path,
173            ..Default::default()
174        }
175    }
176
177    /// Set SNI hostname override.
178    ///
179    /// # Example
180    /// ```
181    /// use pgwire_replication::config::TlsConfig;
182    ///
183    /// let tls = TlsConfig::verify_full(None)
184    ///     .with_sni_hostname("db.example.com");
185    /// ```
186    pub fn with_sni_hostname(mut self, hostname: impl Into<String>) -> Self {
187        self.sni_hostname = Some(hostname.into());
188        self
189    }
190
191    /// Configure client certificate for mutual TLS.
192    ///
193    /// # Example
194    /// ```
195    /// use pgwire_replication::config::TlsConfig;
196    ///
197    /// let tls = TlsConfig::verify_full(Some("/ca.pem".into()))
198    ///     .with_client_cert("/client.pem", "/client.key");
199    /// ```
200    pub fn with_client_cert(
201        mut self,
202        cert_path: impl Into<PathBuf>,
203        key_path: impl Into<PathBuf>,
204    ) -> Self {
205        self.client_cert_pem_path = Some(cert_path.into());
206        self.client_key_pem_path = Some(key_path.into());
207        self
208    }
209
210    /// Returns `true` if mutual TLS (client certificate) is configured.
211    #[inline]
212    pub fn is_mtls(&self) -> bool {
213        self.client_cert_pem_path.is_some() && self.client_key_pem_path.is_some()
214    }
215}
216
217/// Configuration for PostgreSQL logical replication connections.
218///
219/// # Example
220///
221/// ```
222/// use pgwire_replication::config::{ReplicationConfig, TlsConfig, SslMode};
223/// use pgwire_replication::lsn::Lsn;
224/// use std::time::Duration;
225///
226/// let config = ReplicationConfig {
227///     host: "db.example.com".into(),
228///     port: 5432,
229///     user: "replicator".into(),
230///     password: "secret".into(),
231///     database: "mydb".into(),
232///     slot: "my_slot".into(),
233///     publication: "my_publication".into(),
234///     tls: TlsConfig::verify_full(Some("/path/to/ca.pem".into())),
235///     start_lsn: Lsn(0),  // Start from slot's confirmed position
236///     ..Default::default()
237/// };
238/// ```
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct ReplicationConfig {
241    /// PostgreSQL server hostname or IP address.
242    pub host: String,
243
244    /// PostgreSQL server port (default: 5432).
245    pub port: u16,
246
247    /// PostgreSQL username with replication privileges.
248    ///
249    /// The user must have the `REPLICATION` attribute or be a superuser.
250    pub user: String,
251
252    /// Password for authentication.
253    pub password: String,
254
255    /// Database name to connect to.
256    pub database: String,
257
258    /// TLS/SSL configuration.
259    pub tls: TlsConfig,
260
261    /// Name of the replication slot to use.
262    ///
263    /// The slot must already exist and be a logical replication slot
264    /// using the `pgoutput` plugin.
265    pub slot: String,
266
267    /// Name of the publication to subscribe to.
268    ///
269    /// The publication must exist and include the tables you want to replicate.
270    pub publication: String,
271
272    /// LSN position to start replication from.
273    ///
274    /// - `Lsn(0)`: Start from slot's `confirmed_flush_lsn`
275    /// - Specific LSN: Resume from that position (must be >= slot's restart_lsn)
276    pub start_lsn: Lsn,
277
278    /// Optional LSN to stop replication at.
279    ///
280    /// When set, replication will stop once a commit with `end_lsn >= stop_at_lsn`
281    /// is received. Useful for:
282    /// - Bounded replay (e.g., point-in-time recovery)
283    /// - Testing with known data ranges
284    ///
285    /// If `None`, replication continues indefinitely (normal CDC mode).
286    pub stop_at_lsn: Option<Lsn>,
287
288    /// Interval for sending standby status updates to the server.
289    ///
290    /// Status updates inform PostgreSQL of the client's replay position,
291    /// allowing the server to release WAL segments. Too infrequent updates
292    /// may cause WAL accumulation; too frequent updates add overhead.
293    ///
294    /// Default: 1 second (matches pg_recvlogical)
295    pub status_interval: Duration,
296
297    /// Maximum time to wait for server messages before waking up.
298    ///
299    /// Silence is normal during logical replication. When this interval elapses
300    /// with no incoming messages, the client will send a standby status update
301    /// (feedback) and continue waiting.
302    ///
303    /// This effectively bounds how long the worker can stay blocked in a read
304    /// while idle.
305    ///
306    /// Default: 10 seconds
307    pub idle_wakeup_interval: Duration,
308
309    /// Size of the bounded event buffer between replication worker and consumer.
310    ///
311    /// Larger buffers can smooth out processing latency spikes but use more memory.
312    /// Each event is typically 100-1000 bytes depending on row size.
313    ///
314    /// Default: 8192 events
315    pub buffer_events: usize,
316}
317
318impl Default for ReplicationConfig {
319    fn default() -> Self {
320        Self {
321            host: "127.0.0.1".into(),
322            port: 5432,
323            user: "postgres".into(),
324            password: "postgres".into(),
325            database: "postgres".into(),
326            tls: TlsConfig::default(),
327            slot: "slot".into(),
328            publication: "pub".into(),
329            start_lsn: Lsn(0),
330            stop_at_lsn: None,
331            status_interval: Duration::from_secs(10),
332            idle_wakeup_interval: Duration::from_secs(10),
333            buffer_events: 8192,
334        }
335    }
336}
337
338impl ReplicationConfig {
339    /// Create a new configuration with required fields.
340    ///
341    /// Other fields use defaults and can be customized with builder methods.
342    ///
343    /// # Example
344    /// ```
345    /// use pgwire_replication::config::ReplicationConfig;
346    ///
347    /// let config = ReplicationConfig::new(
348    ///     "db.example.com",
349    ///     "replicator",
350    ///     "secret",
351    ///     "mydb",
352    ///     "my_slot",
353    ///     "my_pub",
354    /// );
355    /// ```
356    pub fn new(
357        host: impl Into<String>,
358        user: impl Into<String>,
359        password: impl Into<String>,
360        database: impl Into<String>,
361        slot: impl Into<String>,
362        publication: impl Into<String>,
363    ) -> Self {
364        Self {
365            host: host.into(),
366            user: user.into(),
367            password: password.into(),
368            database: database.into(),
369            slot: slot.into(),
370            publication: publication.into(),
371            ..Default::default()
372        }
373    }
374
375    /// Set the server port.
376    pub fn with_port(mut self, port: u16) -> Self {
377        self.port = port;
378        self
379    }
380
381    /// Set TLS configuration.
382    pub fn with_tls(mut self, tls: TlsConfig) -> Self {
383        self.tls = tls;
384        self
385    }
386
387    /// Set the starting LSN.
388    pub fn with_start_lsn(mut self, lsn: Lsn) -> Self {
389        self.start_lsn = lsn;
390        self
391    }
392
393    /// Set an optional stop LSN for bounded replay.
394    pub fn with_stop_lsn(mut self, lsn: Lsn) -> Self {
395        self.stop_at_lsn = Some(lsn);
396        self
397    }
398
399    /// Set the status update interval.
400    pub fn with_status_interval(mut self, interval: Duration) -> Self {
401        self.status_interval = interval;
402        self
403    }
404
405    /// Set the idle wakeup interval.
406    pub fn with_wakeup_interval(mut self, timeout: Duration) -> Self {
407        self.idle_wakeup_interval = timeout;
408        self
409    }
410
411    /// Set the event buffer size.
412    pub fn with_buffer_size(mut self, size: usize) -> Self {
413        self.buffer_events = size;
414        self
415    }
416
417    /// Returns the connection string for display (password masked).
418    ///
419    /// Useful for logging without exposing credentials.
420    pub fn display_connection(&self) -> String {
421        format!(
422            "postgresql://{}:***@{}:{}/{}",
423            self.user, self.host, self.port, self.database
424        )
425    }
426}