pgwire_replication/config.rs
1//! Configuration types for PostgreSQL replication connections.
2//!
3//! This module provides configuration structures for establishing replication
4//! connections to PostgreSQL, including TLS settings and replication parameters.
5
6use std::path::PathBuf;
7use std::time::Duration;
8
9use crate::lsn::Lsn;
10
11/// SSL/TLS connection mode.
12///
13/// These modes match PostgreSQL's `sslmode` connection parameter.
14/// See [PostgreSQL SSL Support](https://www.postgresql.org/docs/current/libpq-ssl.html)
15/// for detailed documentation.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SslMode {
18 /// Never use TLS. Connection will fail if server requires TLS.
19 #[default]
20 Disable,
21
22 /// Try TLS first, fall back to unencrypted if server doesn't support it.
23 ///
24 /// **Warning**: Vulnerable to downgrade attacks. Not recommended for production.
25 Prefer,
26
27 /// Require TLS but don't verify the server certificate.
28 ///
29 /// Protects against passive eavesdropping but not active MITM attacks.
30 Require,
31
32 /// Require TLS and verify the server certificate chain against trusted CAs.
33 ///
34 /// Does NOT verify that the certificate hostname matches the connection target.
35 VerifyCa,
36
37 /// Require TLS, verify certificate chain, AND verify hostname matches.
38 ///
39 /// **Recommended for production**. Provides full protection against MITM attacks.
40 VerifyFull,
41}
42
43impl SslMode {
44 /// Returns `true` if this mode requires TLS (won't fall back to plain).
45 #[inline]
46 pub fn requires_tls(&self) -> bool {
47 !matches!(self, SslMode::Disable | SslMode::Prefer)
48 }
49
50 /// Returns `true` if this mode verifies the certificate chain.
51 #[inline]
52 pub fn verifies_certificate(&self) -> bool {
53 matches!(self, SslMode::VerifyCa | SslMode::VerifyFull)
54 }
55
56 /// Returns `true` if this mode verifies the server hostname.
57 #[inline]
58 pub fn verifies_hostname(&self) -> bool {
59 matches!(self, SslMode::VerifyFull)
60 }
61}
62
63/// TLS/SSL configuration for PostgreSQL connections.
64#[derive(Debug, Clone, Default, PartialEq, Eq)]
65pub struct TlsConfig {
66 /// SSL mode controlling connection security level.
67 pub mode: SslMode,
68
69 /// Path to PEM file containing trusted CA certificates.
70 ///
71 /// If `None` and verification is enabled (`VerifyCa`/`VerifyFull`),
72 /// the Mozilla root certificates (webpki-roots) are used.
73 pub ca_pem_path: Option<PathBuf>,
74
75 /// Override SNI hostname sent during TLS handshake.
76 ///
77 /// Useful when:
78 /// - Connecting via IP address but certificate has a DNS name
79 /// - Using a load balancer with different internal/external names
80 ///
81 /// If `None`, the connection `host` is used for SNI.
82 pub sni_hostname: Option<String>,
83
84 /// Path to PEM file containing client certificate chain.
85 ///
86 /// Required for mutual TLS (mTLS) authentication.
87 /// Must be paired with `client_key_pem_path`.
88 pub client_cert_pem_path: Option<PathBuf>,
89
90 /// Path to PEM file containing client private key.
91 ///
92 /// Required for mutual TLS (mTLS) authentication.
93 /// Must be paired with `client_cert_pem_path`.
94 /// Supports PKCS#8, PKCS#1 (RSA), and SEC1 (EC) formats.
95 pub client_key_pem_path: Option<PathBuf>,
96}
97
98impl TlsConfig {
99 /// Create a configuration with TLS disabled.
100 ///
101 /// # Example
102 /// ```
103 /// use pgwire_replication::config::TlsConfig;
104 ///
105 /// let tls = TlsConfig::disabled();
106 /// assert!(!tls.mode.requires_tls());
107 /// ```
108 pub fn disabled() -> Self {
109 Self::default()
110 }
111
112 /// Create a configuration requiring TLS without certificate verification.
113 ///
114 /// **Warning**: This mode is vulnerable to MITM attacks.
115 /// Use `verify_ca()` or `verify_full()` for production.
116 ///
117 /// # Example
118 /// ```
119 /// use pgwire_replication::config::TlsConfig;
120 ///
121 /// let tls = TlsConfig::require();
122 /// assert!(tls.mode.requires_tls());
123 /// assert!(!tls.mode.verifies_certificate());
124 /// ```
125 pub fn require() -> Self {
126 Self {
127 mode: SslMode::Require,
128 ..Default::default()
129 }
130 }
131
132 /// Create a configuration with certificate chain verification.
133 ///
134 /// # Arguments
135 /// * `ca_path` - Path to CA certificate PEM file, or `None` for system roots
136 ///
137 /// # Example
138 /// ```
139 /// use pgwire_replication::config::TlsConfig;
140 ///
141 /// // Using system/Mozilla roots
142 /// let tls = TlsConfig::verify_ca(None);
143 ///
144 /// // Using custom CA
145 /// let tls = TlsConfig::verify_ca(Some("/path/to/ca.pem".into()));
146 /// ```
147 pub fn verify_ca(ca_path: Option<PathBuf>) -> Self {
148 Self {
149 mode: SslMode::VerifyCa,
150 ca_pem_path: ca_path,
151 ..Default::default()
152 }
153 }
154
155 /// Create a configuration with full verification (chain + hostname).
156 ///
157 /// **Recommended for production**.
158 ///
159 /// # Arguments
160 /// * `ca_path` - Path to CA certificate PEM file, or `None` for system roots
161 ///
162 /// # Example
163 /// ```
164 /// use pgwire_replication::config::TlsConfig;
165 ///
166 /// let tls = TlsConfig::verify_full(Some("/etc/ssl/certs/ca.pem".into()));
167 /// assert!(tls.mode.verifies_hostname());
168 /// ```
169 pub fn verify_full(ca_path: Option<PathBuf>) -> Self {
170 Self {
171 mode: SslMode::VerifyFull,
172 ca_pem_path: ca_path,
173 ..Default::default()
174 }
175 }
176
177 /// Set SNI hostname override.
178 ///
179 /// # Example
180 /// ```
181 /// use pgwire_replication::config::TlsConfig;
182 ///
183 /// let tls = TlsConfig::verify_full(None)
184 /// .with_sni_hostname("db.example.com");
185 /// ```
186 pub fn with_sni_hostname(mut self, hostname: impl Into<String>) -> Self {
187 self.sni_hostname = Some(hostname.into());
188 self
189 }
190
191 /// Configure client certificate for mutual TLS.
192 ///
193 /// # Example
194 /// ```
195 /// use pgwire_replication::config::TlsConfig;
196 ///
197 /// let tls = TlsConfig::verify_full(Some("/ca.pem".into()))
198 /// .with_client_cert("/client.pem", "/client.key");
199 /// ```
200 pub fn with_client_cert(
201 mut self,
202 cert_path: impl Into<PathBuf>,
203 key_path: impl Into<PathBuf>,
204 ) -> Self {
205 self.client_cert_pem_path = Some(cert_path.into());
206 self.client_key_pem_path = Some(key_path.into());
207 self
208 }
209
210 /// Returns `true` if mutual TLS (client certificate) is configured.
211 #[inline]
212 pub fn is_mtls(&self) -> bool {
213 self.client_cert_pem_path.is_some() && self.client_key_pem_path.is_some()
214 }
215}
216
217/// Configuration for PostgreSQL logical replication connections.
218///
219/// # Example
220///
221/// ```
222/// use pgwire_replication::config::{ReplicationConfig, TlsConfig, SslMode};
223/// use pgwire_replication::lsn::Lsn;
224/// use std::time::Duration;
225///
226/// let config = ReplicationConfig {
227/// host: "db.example.com".into(),
228/// port: 5432,
229/// user: "replicator".into(),
230/// password: "secret".into(),
231/// database: "mydb".into(),
232/// slot: "my_slot".into(),
233/// publication: "my_publication".into(),
234/// tls: TlsConfig::verify_full(Some("/path/to/ca.pem".into())),
235/// start_lsn: Lsn(0), // Start from slot's confirmed position
236/// ..Default::default()
237/// };
238/// ```
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct ReplicationConfig {
241 /// PostgreSQL server hostname or IP address.
242 pub host: String,
243
244 /// PostgreSQL server port (default: 5432).
245 pub port: u16,
246
247 /// PostgreSQL username with replication privileges.
248 ///
249 /// The user must have the `REPLICATION` attribute or be a superuser.
250 pub user: String,
251
252 /// Password for authentication.
253 pub password: String,
254
255 /// Database name to connect to.
256 pub database: String,
257
258 /// TLS/SSL configuration.
259 pub tls: TlsConfig,
260
261 /// Name of the replication slot to use.
262 ///
263 /// The slot must already exist and be a logical replication slot
264 /// using the `pgoutput` plugin.
265 pub slot: String,
266
267 /// Name of the publication to subscribe to.
268 ///
269 /// The publication must exist and include the tables you want to replicate.
270 pub publication: String,
271
272 /// LSN position to start replication from.
273 ///
274 /// - `Lsn(0)`: Start from slot's `confirmed_flush_lsn`
275 /// - Specific LSN: Resume from that position (must be >= slot's restart_lsn)
276 pub start_lsn: Lsn,
277
278 /// Optional LSN to stop replication at.
279 ///
280 /// When set, replication will stop once a commit with `end_lsn >= stop_at_lsn`
281 /// is received. Useful for:
282 /// - Bounded replay (e.g., point-in-time recovery)
283 /// - Testing with known data ranges
284 ///
285 /// If `None`, replication continues indefinitely (normal CDC mode).
286 pub stop_at_lsn: Option<Lsn>,
287
288 /// Interval for sending standby status updates to the server.
289 ///
290 /// Status updates inform PostgreSQL of the client's replay position,
291 /// allowing the server to release WAL segments. Too infrequent updates
292 /// may cause WAL accumulation; too frequent updates add overhead.
293 ///
294 /// Default: 1 second (matches pg_recvlogical)
295 pub status_interval: Duration,
296
297 /// Maximum time to wait for server messages before waking up.
298 ///
299 /// Silence is normal during logical replication. When this interval elapses
300 /// with no incoming messages, the client will send a standby status update
301 /// (feedback) and continue waiting.
302 ///
303 /// This effectively bounds how long the worker can stay blocked in a read
304 /// while idle.
305 ///
306 /// Default: 10 seconds
307 pub idle_wakeup_interval: Duration,
308
309 /// Size of the bounded event buffer between replication worker and consumer.
310 ///
311 /// Larger buffers can smooth out processing latency spikes but use more memory.
312 /// Each event is typically 100-1000 bytes depending on row size.
313 ///
314 /// Default: 8192 events
315 pub buffer_events: usize,
316}
317
318impl Default for ReplicationConfig {
319 fn default() -> Self {
320 Self {
321 host: "127.0.0.1".into(),
322 port: 5432,
323 user: "postgres".into(),
324 password: "postgres".into(),
325 database: "postgres".into(),
326 tls: TlsConfig::default(),
327 slot: "slot".into(),
328 publication: "pub".into(),
329 start_lsn: Lsn(0),
330 stop_at_lsn: None,
331 status_interval: Duration::from_secs(10),
332 idle_wakeup_interval: Duration::from_secs(10),
333 buffer_events: 8192,
334 }
335 }
336}
337
338impl ReplicationConfig {
339 /// Create a new configuration with required fields.
340 ///
341 /// Other fields use defaults and can be customized with builder methods.
342 ///
343 /// # Example
344 /// ```
345 /// use pgwire_replication::config::ReplicationConfig;
346 ///
347 /// let config = ReplicationConfig::new(
348 /// "db.example.com",
349 /// "replicator",
350 /// "secret",
351 /// "mydb",
352 /// "my_slot",
353 /// "my_pub",
354 /// );
355 /// ```
356 pub fn new(
357 host: impl Into<String>,
358 user: impl Into<String>,
359 password: impl Into<String>,
360 database: impl Into<String>,
361 slot: impl Into<String>,
362 publication: impl Into<String>,
363 ) -> Self {
364 Self {
365 host: host.into(),
366 user: user.into(),
367 password: password.into(),
368 database: database.into(),
369 slot: slot.into(),
370 publication: publication.into(),
371 ..Default::default()
372 }
373 }
374
375 /// Set the server port.
376 pub fn with_port(mut self, port: u16) -> Self {
377 self.port = port;
378 self
379 }
380
381 /// Set TLS configuration.
382 pub fn with_tls(mut self, tls: TlsConfig) -> Self {
383 self.tls = tls;
384 self
385 }
386
387 /// Set the starting LSN.
388 pub fn with_start_lsn(mut self, lsn: Lsn) -> Self {
389 self.start_lsn = lsn;
390 self
391 }
392
393 /// Set an optional stop LSN for bounded replay.
394 pub fn with_stop_lsn(mut self, lsn: Lsn) -> Self {
395 self.stop_at_lsn = Some(lsn);
396 self
397 }
398
399 /// Set the status update interval.
400 pub fn with_status_interval(mut self, interval: Duration) -> Self {
401 self.status_interval = interval;
402 self
403 }
404
405 /// Set the idle wakeup interval.
406 pub fn with_wakeup_interval(mut self, timeout: Duration) -> Self {
407 self.idle_wakeup_interval = timeout;
408 self
409 }
410
411 /// Set the event buffer size.
412 pub fn with_buffer_size(mut self, size: usize) -> Self {
413 self.buffer_events = size;
414 self
415 }
416
417 /// Returns the connection string for display (password masked).
418 ///
419 /// Useful for logging without exposing credentials.
420 pub fn display_connection(&self) -> String {
421 format!(
422 "postgresql://{}:***@{}:{}/{}",
423 self.user, self.host, self.port, self.database
424 )
425 }
426}