use std::time::{Duration, SystemTime, UNIX_EPOCH};
use faucet_core::FaucetError;
use pgwire_replication::{Lsn, ReplicationClient, ReplicationConfig, TlsConfig};
use sqlx::postgres::PgConnectOptions;
pub use pgwire_replication::ReplicationEvent;
use sqlx::{Executor, PgConnection};
use tracing::debug;
pub const POSTGRES_EPOCH_MICROS: i64 = 946_684_800_000_000;
pub struct Client {
_private: (),
}
pub struct Duplex {
inner: ReplicationClient,
}
#[derive(Clone, Debug)]
pub struct ReplicationParams<'a> {
pub connection_url: &'a str,
pub slot_name: &'a str,
pub publication_name: &'a str,
pub proto_version: u32,
pub create_slot_if_missing: bool,
pub start_lsn: Option<u64>,
pub status_update_interval: Duration,
pub tcp_keepalive: Duration,
pub slot_type: crate::config::SlotType,
pub tls: &'a crate::config::CdcTls,
}
#[cfg_attr(test, derive(Debug))]
struct PgCoords {
host: String,
port: u16,
user: String,
password: String,
dbname: String,
}
fn parse_url(url: &str) -> Result<PgCoords, FaucetError> {
let parsed = url::Url::parse(url)
.map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
let host = parsed
.host_str()
.filter(|h| !h.is_empty())
.ok_or_else(|| {
FaucetError::Config(
"postgres-cdc: connection URL is missing a host (expected \
postgres://user@host[:port]/dbname)"
.to_owned(),
)
})?
.to_owned();
let port = parsed.port().unwrap_or(5432);
let user = parsed.username().to_owned();
if user.is_empty() {
return Err(FaucetError::Config(
"postgres-cdc: connection URL is missing a user (expected \
postgres://user@host[:port]/dbname)"
.to_owned(),
));
}
let password = parsed.password().unwrap_or("").to_owned();
let dbname = parsed.path().trim_start_matches('/').to_owned();
let dbname = if dbname.is_empty() {
"postgres".to_owned()
} else {
dbname
};
Ok(PgCoords {
host,
port,
user,
password,
dbname,
})
}
pub async fn connect(params: &ReplicationParams<'_>) -> Result<Client, FaucetError> {
let _ = parse_url(params.connection_url)?;
Ok(Client { _private: () })
}
pub async fn ensure_slot(
_client: &Client,
connection_url: &str,
slot_name: &str,
create_if_missing: bool,
slot_type: crate::config::SlotType,
) -> Result<(), FaucetError> {
use crate::config::SlotType;
let opts: PgConnectOptions = connection_url
.parse()
.map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
use sqlx::ConnectOptions as _;
let mut conn: PgConnection = opts
.connect()
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc ensure_slot connect: {e}")))?;
let row: Option<(String,)> =
sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
.bind(slot_name)
.fetch_optional(&mut conn)
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
if row.is_some() {
debug!("postgres-cdc: replication slot '{slot_name}' already exists");
return Ok(());
}
if !create_if_missing {
return Err(FaucetError::Source(format!(
"postgres-cdc: replication slot '{slot_name}' does not exist \
and create_slot_if_missing = false"
)));
}
let temporary = matches!(slot_type, SlotType::Temporary);
let sql = format!(
"SELECT pg_create_logical_replication_slot({}, 'pgoutput', {})",
quote_literal(slot_name),
temporary
);
conn.execute(sql.as_str())
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc create slot: {e}")))?;
if temporary {
debug!("postgres-cdc: created temporary replication slot '{slot_name}'");
} else {
tracing::warn!(
"postgres-cdc: created PERMANENT replication slot '{slot_name}' — it will retain \
WAL on the server until consumed or explicitly dropped (drop_slot). Use \
slot_type=temporary for ephemeral runs."
);
}
Ok(())
}
pub async fn drop_slot(connection_url: &str, slot_name: &str) -> Result<(), FaucetError> {
let opts: PgConnectOptions = connection_url
.parse()
.map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
use sqlx::ConnectOptions as _;
let mut conn: PgConnection = opts
.connect()
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc drop_slot connect: {e}")))?;
let exists: Option<(String,)> =
sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
.bind(slot_name)
.fetch_optional(&mut conn)
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
if exists.is_none() {
debug!("postgres-cdc: replication slot '{slot_name}' already absent; drop is a no-op");
return Ok(());
}
sqlx::query("SELECT pg_drop_replication_slot($1)")
.bind(slot_name)
.execute(&mut conn)
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc drop slot: {e}")))?;
debug!("postgres-cdc: dropped replication slot '{slot_name}'");
Ok(())
}
fn tls_config(tls: &crate::config::CdcTls) -> TlsConfig {
use crate::config::CdcTls;
use std::path::PathBuf;
match tls {
CdcTls::Disable => TlsConfig::disabled(),
CdcTls::Require => TlsConfig::require(),
CdcTls::VerifyCa { ca_path } => TlsConfig::verify_ca(ca_path.clone().map(PathBuf::from)),
CdcTls::VerifyFull { ca_path } => {
TlsConfig::verify_full(ca_path.clone().map(PathBuf::from))
}
}
}
pub async fn advance_slot(
connection_url: &str,
slot_name: &str,
lsn: u64,
) -> Result<(), FaucetError> {
if lsn == 0 {
return Ok(());
}
let opts: PgConnectOptions = connection_url
.parse()
.map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
use sqlx::ConnectOptions as _;
let mut conn: PgConnection = opts
.connect()
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot connect: {e}")))?;
sqlx::query("SELECT pg_replication_slot_advance($1, $2::pg_lsn)")
.bind(slot_name)
.bind(crate::state::format_lsn(lsn))
.execute(&mut conn)
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot: {e}")))?;
debug!("postgres-cdc: advanced slot '{slot_name}' confirmed_flush_lsn to {lsn:#x}");
Ok(())
}
pub async fn start_replication(
_client: &Client,
params: &ReplicationParams<'_>,
) -> Result<Duplex, FaucetError> {
if params.proto_version != 1 {
return Err(FaucetError::Config(format!(
"postgres-cdc: pgwire-replication 0.3.2 supports proto_version = 1 only; \
got {}",
params.proto_version
)));
}
let coords = parse_url(params.connection_url)?;
let start_lsn = Lsn::from_u64(params.start_lsn.unwrap_or(0));
let cfg = ReplicationConfig {
host: coords.host,
port: coords.port,
user: coords.user,
password: coords.password,
database: coords.dbname,
tls: tls_config(params.tls),
slot: params.slot_name.to_owned(),
publication: params.publication_name.to_owned(),
start_lsn,
stop_at_lsn: None,
status_interval: params.status_update_interval,
idle_wakeup_interval: params.status_update_interval,
buffer_events: 8192,
};
let inner = ReplicationClient::connect(cfg)
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc start_replication: {e}")))?;
Ok(Duplex { inner })
}
pub async fn send_status_update(
duplex: &mut Duplex,
confirmed_lsn: u64,
_reply_requested: bool,
) -> Result<(), FaucetError> {
duplex
.inner
.update_applied_lsn(Lsn::from_u64(confirmed_lsn));
Ok(())
}
pub async fn recv(duplex: &mut Duplex) -> Result<Option<ReplicationEvent>, FaucetError> {
loop {
match duplex
.inner
.recv()
.await
.map_err(|e| FaucetError::Source(format!("postgres-cdc recv: {e}")))?
{
None => return Ok(None),
Some(ReplicationEvent::StoppedAt { .. }) => {
return Ok(None);
}
Some(ReplicationEvent::KeepAlive { .. }) => {
}
Some(ev) => {
return Ok(Some(ev));
}
}
}
}
pub fn postgres_clock_now() -> i64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let unix_micros = (now.as_secs() as i64) * 1_000_000 + (now.subsec_micros() as i64);
unix_micros - POSTGRES_EPOCH_MICROS
}
pub fn postgres_clock_to_unix_ms(ts: i64) -> i64 {
(POSTGRES_EPOCH_MICROS.saturating_add(ts)) / 1_000
}
#[allow(dead_code)]
fn quote_slot(s: &str) -> String {
format!("\"{}\"", s.replace('"', "\"\""))
}
fn escape_simple(s: &str) -> String {
s.replace('\'', "''")
}
fn quote_literal(s: &str) -> String {
format!("'{}'", escape_simple(s))
}
pub fn is_slot_active_error(err: &FaucetError) -> bool {
let msg = err.to_string().to_ascii_lowercase();
msg.contains("is active") || msg.contains("55006")
}
fn slot_acquire_backoff(attempt: u32) -> Duration {
let factor = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
let ms = 250u64.saturating_mul(factor).min(4000);
Duration::from_millis(ms)
}
pub async fn retry_on_slot_active<F, Fut, T>(max_retries: u32, op: F) -> Result<T, FaucetError>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, FaucetError>>,
{
let mut attempt = 0u32;
loop {
match op().await {
Ok(value) => return Ok(value),
Err(e) if attempt < max_retries && is_slot_active_error(&e) => {
let backoff = slot_acquire_backoff(attempt);
tracing::warn!(
attempt = attempt + 1,
max_retries,
backoff_ms = backoff.as_millis() as u64,
error = %e,
"postgres-cdc: replication slot still active; retrying after backoff"
);
tokio::time::sleep(backoff).await;
attempt += 1;
}
Err(e) => return Err(e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CdcTls;
use chrono::{TimeZone, Utc};
use pgwire_replication::SslMode;
#[test]
fn tls_config_maps_each_mode() {
assert_eq!(tls_config(&CdcTls::Disable).mode, SslMode::Disable);
assert_eq!(tls_config(&CdcTls::Require).mode, SslMode::Require);
assert_eq!(
tls_config(&CdcTls::VerifyCa { ca_path: None }).mode,
SslMode::VerifyCa
);
assert_eq!(
tls_config(&CdcTls::VerifyFull {
ca_path: Some("/ca.pem".into())
})
.mode,
SslMode::VerifyFull
);
}
fn postgres_clock_to_datetime(ts: i64) -> chrono::DateTime<Utc> {
Utc.timestamp_micros(POSTGRES_EPOCH_MICROS.saturating_add(ts))
.single()
.unwrap_or_else(Utc::now)
}
#[test]
fn postgres_clock_round_trip() {
let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
let back = postgres_clock_to_datetime(pg_ts);
assert_eq!(back, dt);
}
#[test]
fn unix_ms_conversion() {
let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
assert_eq!(postgres_clock_to_unix_ms(pg_ts), 1_779_019_200_000);
}
#[test]
fn quote_slot_simple() {
assert_eq!(quote_slot("faucet_slot"), "\"faucet_slot\"");
}
#[test]
fn escape_simple_doubles_quotes() {
assert_eq!(escape_simple("foo'bar"), "foo''bar");
}
#[test]
fn parse_url_extracts_all_components() {
let c = parse_url("postgres://alice:secret@db.example.com:5544/analytics").unwrap();
assert_eq!(c.host, "db.example.com");
assert_eq!(c.port, 5544);
assert_eq!(c.user, "alice");
assert_eq!(c.password, "secret");
assert_eq!(c.dbname, "analytics");
}
#[test]
fn parse_url_defaults_port_and_dbname() {
let c = parse_url("postgres://alice@db.example.com").unwrap();
assert_eq!(c.port, 5432);
assert_eq!(c.dbname, "postgres");
assert_eq!(c.password, "");
}
#[test]
fn parse_url_rejects_missing_host() {
let err = parse_url("postgres:///analytics").unwrap_err();
assert!(format!("{err}").contains("missing a host"), "{err}");
}
#[test]
fn parse_url_rejects_missing_user() {
let err = parse_url("postgres://db.example.com/analytics").unwrap_err();
assert!(format!("{err}").contains("missing a user"), "{err}");
}
#[test]
fn is_slot_active_error_classifies_the_postgres_message() {
assert!(is_slot_active_error(&FaucetError::Source(
"postgres-cdc start_replication: db error: ERROR: replication slot \"s\" \
is active for PID 4242"
.into()
)));
assert!(is_slot_active_error(&FaucetError::Source(
"55006: replication slot is in use".into()
)));
assert!(!is_slot_active_error(&FaucetError::Source(
"connection refused".into()
)));
assert!(!is_slot_active_error(&FaucetError::Config(
"bad url".into()
)));
}
#[test]
fn slot_acquire_backoff_grows_and_is_capped() {
assert_eq!(slot_acquire_backoff(0), Duration::from_millis(250));
assert_eq!(slot_acquire_backoff(1), Duration::from_millis(500));
assert_eq!(slot_acquire_backoff(2), Duration::from_millis(1000));
assert_eq!(slot_acquire_backoff(20), Duration::from_millis(4000));
assert_eq!(slot_acquire_backoff(64), Duration::from_millis(4000));
}
#[tokio::test]
async fn retry_on_slot_active_retries_then_succeeds() {
use std::sync::atomic::{AtomicU32, Ordering};
let calls = AtomicU32::new(0);
let result = retry_on_slot_active(5, || {
let n = calls.fetch_add(1, Ordering::SeqCst);
async move {
if n < 2 {
Err(FaucetError::Source(
"replication slot \"s\" is active for PID 1".into(),
))
} else {
Ok::<u32, FaucetError>(42)
}
}
})
.await;
assert_eq!(result.unwrap(), 42);
assert_eq!(calls.load(Ordering::SeqCst), 3, "2 failures + 1 success");
}
#[tokio::test]
async fn retry_on_slot_active_gives_up_after_max_retries() {
use std::sync::atomic::{AtomicU32, Ordering};
let calls = AtomicU32::new(0);
let result: Result<(), _> = retry_on_slot_active(2, || {
calls.fetch_add(1, Ordering::SeqCst);
async { Err(FaucetError::Source("slot is active".into())) }
})
.await;
assert!(result.is_err());
assert_eq!(
calls.load(Ordering::SeqCst),
3,
"initial attempt + 2 retries"
);
}
#[tokio::test]
async fn retry_on_slot_active_does_not_retry_unrelated_errors() {
use std::sync::atomic::{AtomicU32, Ordering};
let calls = AtomicU32::new(0);
let result: Result<(), _> = retry_on_slot_active(5, || {
calls.fetch_add(1, Ordering::SeqCst);
async { Err(FaucetError::Source("connection refused".into())) }
})
.await;
assert!(result.is_err());
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"a non-slot-active error must not be retried"
);
}
}