faucet_source_postgres_cdc/
replication.rs1use std::time::{Duration, SystemTime, UNIX_EPOCH};
34
35use faucet_core::FaucetError;
36use pgwire_replication::{Lsn, ReplicationClient, ReplicationConfig, TlsConfig};
37use sqlx::postgres::PgConnectOptions;
38
39pub use pgwire_replication::ReplicationEvent;
42use sqlx::{Executor, PgConnection};
43use tracing::debug;
44
45pub const POSTGRES_EPOCH_MICROS: i64 = 946_684_800_000_000;
48
49pub struct Client {
58 _private: (),
59}
60
61pub struct Duplex {
64 inner: ReplicationClient,
65}
66
67#[derive(Clone, Debug)]
73pub struct ReplicationParams<'a> {
74 pub connection_url: &'a str,
76 pub slot_name: &'a str,
78 pub publication_name: &'a str,
80 pub proto_version: u32,
82 pub create_slot_if_missing: bool,
84 pub start_lsn: Option<u64>,
87 pub status_update_interval: Duration,
90 pub tcp_keepalive: Duration,
93 pub slot_type: crate::config::SlotType,
95 pub tls: &'a crate::config::CdcTls,
97}
98
99#[cfg_attr(test, derive(Debug))]
102struct PgCoords {
103 host: String,
104 port: u16,
105 user: String,
106 password: String,
107 dbname: String,
108}
109
110fn parse_url(url: &str) -> Result<PgCoords, FaucetError> {
111 let parsed = url::Url::parse(url)
114 .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
115
116 let host = parsed
122 .host_str()
123 .filter(|h| !h.is_empty())
124 .ok_or_else(|| {
125 FaucetError::Config(
126 "postgres-cdc: connection URL is missing a host (expected \
127 postgres://user@host[:port]/dbname)"
128 .to_owned(),
129 )
130 })?
131 .to_owned();
132 let port = parsed.port().unwrap_or(5432);
133 let user = parsed.username().to_owned();
134 if user.is_empty() {
135 return Err(FaucetError::Config(
136 "postgres-cdc: connection URL is missing a user (expected \
137 postgres://user@host[:port]/dbname)"
138 .to_owned(),
139 ));
140 }
141 let password = parsed.password().unwrap_or("").to_owned();
142 let dbname = parsed.path().trim_start_matches('/').to_owned();
143 let dbname = if dbname.is_empty() {
144 "postgres".to_owned()
145 } else {
146 dbname
147 };
148
149 Ok(PgCoords {
150 host,
151 port,
152 user,
153 password,
154 dbname,
155 })
156}
157
158pub async fn connect(params: &ReplicationParams<'_>) -> Result<Client, FaucetError> {
166 let _ = parse_url(params.connection_url)?;
169 Ok(Client { _private: () })
170}
171
172pub async fn ensure_slot(
179 _client: &Client,
180 connection_url: &str,
181 slot_name: &str,
182 create_if_missing: bool,
183 slot_type: crate::config::SlotType,
184) -> Result<(), FaucetError> {
185 use crate::config::SlotType;
186 let opts: PgConnectOptions = connection_url
188 .parse()
189 .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
190
191 use sqlx::ConnectOptions as _;
192 let mut conn: PgConnection = opts
193 .connect()
194 .await
195 .map_err(|e| FaucetError::Source(format!("postgres-cdc ensure_slot connect: {e}")))?;
196
197 let row: Option<(String,)> =
199 sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
200 .bind(slot_name)
201 .fetch_optional(&mut conn)
202 .await
203 .map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
204
205 if row.is_some() {
206 debug!("postgres-cdc: replication slot '{slot_name}' already exists");
207 return Ok(());
208 }
209
210 if !create_if_missing {
211 return Err(FaucetError::Source(format!(
212 "postgres-cdc: replication slot '{slot_name}' does not exist \
213 and create_slot_if_missing = false"
214 )));
215 }
216
217 let temporary = matches!(slot_type, SlotType::Temporary);
224 let sql = format!(
225 "SELECT pg_create_logical_replication_slot({}, 'pgoutput', {})",
226 quote_literal(slot_name),
227 temporary
228 );
229 conn.execute(sql.as_str())
230 .await
231 .map_err(|e| FaucetError::Source(format!("postgres-cdc create slot: {e}")))?;
232
233 if temporary {
234 debug!("postgres-cdc: created temporary replication slot '{slot_name}'");
235 } else {
236 tracing::warn!(
239 "postgres-cdc: created PERMANENT replication slot '{slot_name}' — it will retain \
240 WAL on the server until consumed or explicitly dropped (drop_slot). Use \
241 slot_type=temporary for ephemeral runs."
242 );
243 }
244 Ok(())
245}
246
247pub async fn drop_slot(connection_url: &str, slot_name: &str) -> Result<(), FaucetError> {
251 let opts: PgConnectOptions = connection_url
252 .parse()
253 .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
254 use sqlx::ConnectOptions as _;
255 let mut conn: PgConnection = opts
256 .connect()
257 .await
258 .map_err(|e| FaucetError::Source(format!("postgres-cdc drop_slot connect: {e}")))?;
259
260 let exists: Option<(String,)> =
261 sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
262 .bind(slot_name)
263 .fetch_optional(&mut conn)
264 .await
265 .map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
266 if exists.is_none() {
267 debug!("postgres-cdc: replication slot '{slot_name}' already absent; drop is a no-op");
268 return Ok(());
269 }
270
271 sqlx::query("SELECT pg_drop_replication_slot($1)")
272 .bind(slot_name)
273 .execute(&mut conn)
274 .await
275 .map_err(|e| FaucetError::Source(format!("postgres-cdc drop slot: {e}")))?;
276 debug!("postgres-cdc: dropped replication slot '{slot_name}'");
277 Ok(())
278}
279
280fn tls_config(tls: &crate::config::CdcTls) -> TlsConfig {
283 use crate::config::CdcTls;
284 use std::path::PathBuf;
285 match tls {
286 CdcTls::Disable => TlsConfig::disabled(),
287 CdcTls::Require => TlsConfig::require(),
288 CdcTls::VerifyCa { ca_path } => TlsConfig::verify_ca(ca_path.clone().map(PathBuf::from)),
289 CdcTls::VerifyFull { ca_path } => {
290 TlsConfig::verify_full(ca_path.clone().map(PathBuf::from))
291 }
292 }
293}
294
295pub async fn advance_slot(
311 connection_url: &str,
312 slot_name: &str,
313 lsn: u64,
314) -> Result<(), FaucetError> {
315 if lsn == 0 {
316 return Ok(());
317 }
318 let opts: PgConnectOptions = connection_url
319 .parse()
320 .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
321
322 use sqlx::ConnectOptions as _;
323 let mut conn: PgConnection = opts
324 .connect()
325 .await
326 .map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot connect: {e}")))?;
327
328 sqlx::query("SELECT pg_replication_slot_advance($1, $2::pg_lsn)")
332 .bind(slot_name)
333 .bind(crate::state::format_lsn(lsn))
334 .execute(&mut conn)
335 .await
336 .map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot: {e}")))?;
337
338 debug!("postgres-cdc: advanced slot '{slot_name}' confirmed_flush_lsn to {lsn:#x}");
339 Ok(())
340}
341
342pub async fn start_replication(
348 _client: &Client,
349 params: &ReplicationParams<'_>,
350) -> Result<Duplex, FaucetError> {
351 if params.proto_version != 1 {
352 return Err(FaucetError::Config(format!(
353 "postgres-cdc: pgwire-replication 0.3.2 supports proto_version = 1 only; \
354 got {}",
355 params.proto_version
356 )));
357 }
358
359 let coords = parse_url(params.connection_url)?;
360
361 let start_lsn = Lsn::from_u64(params.start_lsn.unwrap_or(0));
362
363 let cfg = ReplicationConfig {
364 host: coords.host,
365 port: coords.port,
366 user: coords.user,
367 password: coords.password,
368 database: coords.dbname,
369 tls: tls_config(params.tls),
370 slot: params.slot_name.to_owned(),
371 publication: params.publication_name.to_owned(),
372 start_lsn,
373 stop_at_lsn: None,
374 status_interval: params.status_update_interval,
377 idle_wakeup_interval: params.status_update_interval,
379 buffer_events: 8192,
380 };
381
382 let inner = ReplicationClient::connect(cfg)
383 .await
384 .map_err(|e| FaucetError::Source(format!("postgres-cdc start_replication: {e}")))?;
385
386 Ok(Duplex { inner })
387}
388
389pub async fn send_status_update(
399 duplex: &mut Duplex,
400 confirmed_lsn: u64,
401 _reply_requested: bool,
402) -> Result<(), FaucetError> {
403 duplex
404 .inner
405 .update_applied_lsn(Lsn::from_u64(confirmed_lsn));
406 Ok(())
407}
408
409pub async fn recv(duplex: &mut Duplex) -> Result<Option<ReplicationEvent>, FaucetError> {
432 loop {
433 match duplex
434 .inner
435 .recv()
436 .await
437 .map_err(|e| FaucetError::Source(format!("postgres-cdc recv: {e}")))?
438 {
439 None => return Ok(None),
440
441 Some(ReplicationEvent::StoppedAt { .. }) => {
442 return Ok(None);
443 }
444
445 Some(ReplicationEvent::KeepAlive { .. }) => {
446 }
449
450 Some(ev) => {
451 return Ok(Some(ev));
456 }
457 }
458 }
459}
460
461pub fn postgres_clock_now() -> i64 {
467 let now = SystemTime::now()
468 .duration_since(UNIX_EPOCH)
469 .unwrap_or_default();
470 let unix_micros = (now.as_secs() as i64) * 1_000_000 + (now.subsec_micros() as i64);
471 unix_micros - POSTGRES_EPOCH_MICROS
472}
473
474pub fn postgres_clock_to_unix_ms(ts: i64) -> i64 {
477 (POSTGRES_EPOCH_MICROS.saturating_add(ts)) / 1_000
478}
479
480#[allow(dead_code)]
486fn quote_slot(s: &str) -> String {
487 format!("\"{}\"", s.replace('"', "\"\""))
488}
489
490fn escape_simple(s: &str) -> String {
493 s.replace('\'', "''")
494}
495
496fn quote_literal(s: &str) -> String {
498 format!("'{}'", escape_simple(s))
499}
500
501pub fn is_slot_active_error(err: &FaucetError) -> bool {
512 let msg = err.to_string().to_ascii_lowercase();
513 msg.contains("is active") || msg.contains("55006")
514}
515
516fn slot_acquire_backoff(attempt: u32) -> Duration {
519 let factor = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
520 let ms = 250u64.saturating_mul(factor).min(4000);
521 Duration::from_millis(ms)
522}
523
524pub async fn retry_on_slot_active<F, Fut, T>(max_retries: u32, op: F) -> Result<T, FaucetError>
529where
530 F: Fn() -> Fut,
531 Fut: std::future::Future<Output = Result<T, FaucetError>>,
532{
533 let mut attempt = 0u32;
534 loop {
535 match op().await {
536 Ok(value) => return Ok(value),
537 Err(e) if attempt < max_retries && is_slot_active_error(&e) => {
538 let backoff = slot_acquire_backoff(attempt);
539 tracing::warn!(
540 attempt = attempt + 1,
541 max_retries,
542 backoff_ms = backoff.as_millis() as u64,
543 error = %e,
544 "postgres-cdc: replication slot still active; retrying after backoff"
545 );
546 tokio::time::sleep(backoff).await;
547 attempt += 1;
548 }
549 Err(e) => return Err(e),
550 }
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use crate::config::CdcTls;
558 use chrono::{TimeZone, Utc};
559 use pgwire_replication::SslMode;
560
561 #[test]
562 fn tls_config_maps_each_mode() {
563 assert_eq!(tls_config(&CdcTls::Disable).mode, SslMode::Disable);
564 assert_eq!(tls_config(&CdcTls::Require).mode, SslMode::Require);
565 assert_eq!(
566 tls_config(&CdcTls::VerifyCa { ca_path: None }).mode,
567 SslMode::VerifyCa
568 );
569 assert_eq!(
570 tls_config(&CdcTls::VerifyFull {
571 ca_path: Some("/ca.pem".into())
572 })
573 .mode,
574 SslMode::VerifyFull
575 );
576 }
577
578 fn postgres_clock_to_datetime(ts: i64) -> chrono::DateTime<Utc> {
581 Utc.timestamp_micros(POSTGRES_EPOCH_MICROS.saturating_add(ts))
582 .single()
583 .unwrap_or_else(Utc::now)
584 }
585
586 #[test]
587 fn postgres_clock_round_trip() {
588 let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
589 let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
590 let back = postgres_clock_to_datetime(pg_ts);
591 assert_eq!(back, dt);
592 }
593
594 #[test]
595 fn unix_ms_conversion() {
596 let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
597 let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
598 assert_eq!(postgres_clock_to_unix_ms(pg_ts), 1_779_019_200_000);
599 }
600
601 #[test]
602 fn quote_slot_simple() {
603 assert_eq!(quote_slot("faucet_slot"), "\"faucet_slot\"");
604 }
605
606 #[test]
607 fn escape_simple_doubles_quotes() {
608 assert_eq!(escape_simple("foo'bar"), "foo''bar");
609 }
610
611 #[test]
612 fn parse_url_extracts_all_components() {
613 let c = parse_url("postgres://alice:secret@db.example.com:5544/analytics").unwrap();
614 assert_eq!(c.host, "db.example.com");
615 assert_eq!(c.port, 5544);
616 assert_eq!(c.user, "alice");
617 assert_eq!(c.password, "secret");
618 assert_eq!(c.dbname, "analytics");
619 }
620
621 #[test]
622 fn parse_url_defaults_port_and_dbname() {
623 let c = parse_url("postgres://alice@db.example.com").unwrap();
624 assert_eq!(c.port, 5432);
625 assert_eq!(c.dbname, "postgres");
626 assert_eq!(c.password, "");
627 }
628
629 #[test]
630 fn parse_url_rejects_missing_host() {
631 let err = parse_url("postgres:///analytics").unwrap_err();
633 assert!(format!("{err}").contains("missing a host"), "{err}");
634 }
635
636 #[test]
637 fn parse_url_rejects_missing_user() {
638 let err = parse_url("postgres://db.example.com/analytics").unwrap_err();
639 assert!(format!("{err}").contains("missing a user"), "{err}");
640 }
641
642 #[test]
643 fn is_slot_active_error_classifies_the_postgres_message() {
644 assert!(is_slot_active_error(&FaucetError::Source(
646 "postgres-cdc start_replication: db error: ERROR: replication slot \"s\" \
647 is active for PID 4242"
648 .into()
649 )));
650 assert!(is_slot_active_error(&FaucetError::Source(
652 "55006: replication slot is in use".into()
653 )));
654 assert!(!is_slot_active_error(&FaucetError::Source(
656 "connection refused".into()
657 )));
658 assert!(!is_slot_active_error(&FaucetError::Config(
659 "bad url".into()
660 )));
661 }
662
663 #[test]
664 fn slot_acquire_backoff_grows_and_is_capped() {
665 assert_eq!(slot_acquire_backoff(0), Duration::from_millis(250));
666 assert_eq!(slot_acquire_backoff(1), Duration::from_millis(500));
667 assert_eq!(slot_acquire_backoff(2), Duration::from_millis(1000));
668 assert_eq!(slot_acquire_backoff(20), Duration::from_millis(4000));
670 assert_eq!(slot_acquire_backoff(64), Duration::from_millis(4000));
671 }
672
673 #[tokio::test]
674 async fn retry_on_slot_active_retries_then_succeeds() {
675 use std::sync::atomic::{AtomicU32, Ordering};
676 let calls = AtomicU32::new(0);
677 let result = retry_on_slot_active(5, || {
678 let n = calls.fetch_add(1, Ordering::SeqCst);
679 async move {
680 if n < 2 {
681 Err(FaucetError::Source(
682 "replication slot \"s\" is active for PID 1".into(),
683 ))
684 } else {
685 Ok::<u32, FaucetError>(42)
686 }
687 }
688 })
689 .await;
690 assert_eq!(result.unwrap(), 42);
691 assert_eq!(calls.load(Ordering::SeqCst), 3, "2 failures + 1 success");
692 }
693
694 #[tokio::test]
695 async fn retry_on_slot_active_gives_up_after_max_retries() {
696 use std::sync::atomic::{AtomicU32, Ordering};
697 let calls = AtomicU32::new(0);
698 let result: Result<(), _> = retry_on_slot_active(2, || {
699 calls.fetch_add(1, Ordering::SeqCst);
700 async { Err(FaucetError::Source("slot is active".into())) }
701 })
702 .await;
703 assert!(result.is_err());
704 assert_eq!(
705 calls.load(Ordering::SeqCst),
706 3,
707 "initial attempt + 2 retries"
708 );
709 }
710
711 #[tokio::test]
712 async fn retry_on_slot_active_does_not_retry_unrelated_errors() {
713 use std::sync::atomic::{AtomicU32, Ordering};
714 let calls = AtomicU32::new(0);
715 let result: Result<(), _> = retry_on_slot_active(5, || {
716 calls.fetch_add(1, Ordering::SeqCst);
717 async { Err(FaucetError::Source("connection refused".into())) }
718 })
719 .await;
720 assert!(result.is_err());
721 assert_eq!(
722 calls.load(Ordering::SeqCst),
723 1,
724 "a non-slot-active error must not be retried"
725 );
726 }
727}