Skip to main content

faucet_source_postgres_cdc/
replication.rs

1//! Low-level replication-connection wrapper.
2//!
3//! This module wraps [`pgwire_replication`] to provide the slot lifecycle
4//! (`ensure_slot`) and streaming helpers (`start_replication`, `recv`,
5//! `send_status_update`) used by the rest of the CDC source.
6//!
7//! # Design
8//!
9//! `pgwire_replication` handles everything from TCP connect through auth,
10//! `START_REPLICATION`, keepalive replies, and `StandbyStatusUpdate` — all
11//! internally.  The library delivers events as a typed enum; [`recv`] surfaces
12//! the full [`ReplicationEvent`] to callers (absorbing only [`KeepAlive`] and
13//! [`StoppedAt`] internally) so Tasks 9+ can observe transaction boundaries.
14//!
15//! Slot creation (`CREATE_REPLICATION_SLOT`) is a control-plane operation that
16//! requires an ordinary (non-replication) SQL connection, so `ensure_slot`
17//! uses [`sqlx`] for that single query.
18//!
19//! # Type aliases
20//!
21//! The plan requires stable names `Client` and `Duplex` so that Tasks 9+ can
22//! refer to concrete types.  We define:
23//!
24//! - [`Client`] — a lightweight holder of `ReplicationParams` used to verify
25//!   connectivity and create the replication slot, before the stream is
26//!   opened.
27//! - [`Duplex`] — the live replication stream; a thin wrapper around
28//!   [`pgwire_replication::ReplicationClient`].
29//!
30//! [`KeepAlive`]: ReplicationEvent::KeepAlive
31//! [`StoppedAt`]: ReplicationEvent::StoppedAt
32
33use std::time::{Duration, SystemTime, UNIX_EPOCH};
34
35use faucet_core::FaucetError;
36use pgwire_replication::{Lsn, ReplicationClient, ReplicationConfig, TlsConfig};
37use sqlx::postgres::PgConnectOptions;
38
39/// Re-export so downstream modules (`stream.rs`, Task 9) can import the event
40/// type without depending on `pgwire_replication` directly.
41pub use pgwire_replication::ReplicationEvent;
42use sqlx::{Executor, PgConnection};
43use tracing::debug;
44
45/// Microseconds between the Unix epoch (1970-01-01) and the Postgres epoch
46/// (2000-01-01).  Used for converting between Postgres timestamps and Unix time.
47pub const POSTGRES_EPOCH_MICROS: i64 = 946_684_800_000_000;
48
49// ── Public type aliases ────────────────────────────────────────────────────
50
51/// Pre-stream handle returned by [`connect`] once the connection URL has been
52/// validated. The actual connection is opened by [`ensure_slot`] /
53/// [`start_replication`] from the borrowed [`ReplicationParams`], so this is a
54/// lightweight marker — it deliberately holds no owned copy of the connection
55/// string, which previously sat in leaked (`Box::leak`) heap for the process
56/// lifetime, including the password (#78/#13).
57pub struct Client {
58    _private: (),
59}
60
61/// Live replication stream.  Wraps [`pgwire_replication::ReplicationClient`].
62/// Obtained from [`start_replication`].
63pub struct Duplex {
64    inner: ReplicationClient,
65}
66
67// ── Parameters ────────────────────────────────────────────────────────────
68
69/// All parameters required to establish a logical replication connection.
70///
71/// This struct is accepted by every function in this module.
72#[derive(Clone, Debug)]
73pub struct ReplicationParams<'a> {
74    /// `postgres://user:pass@host:port/db` style URL.
75    pub connection_url: &'a str,
76    /// Name of the replication slot (must already exist, or `create_slot_if_missing = true`).
77    pub slot_name: &'a str,
78    /// Publication name — must already exist on the server.
79    pub publication_name: &'a str,
80    /// pgoutput protocol version. Only `1` is currently supported.
81    pub proto_version: u32,
82    /// Create the slot if it does not already exist.
83    pub create_slot_if_missing: bool,
84    /// Optional LSN to resume from.  `None` means "start from the slot's
85    /// `confirmed_flush_lsn`".
86    pub start_lsn: Option<u64>,
87    /// Protocol-level Standby Status Update cadence — must be shorter than
88    /// the server's `wal_sender_timeout`.
89    pub status_update_interval: Duration,
90    /// TCP-level keepalive interval. Larger than `status_update_interval`
91    /// in normal operation.
92    pub tcp_keepalive: Duration,
93    /// Whether a newly-created slot is permanent or temporary.
94    pub slot_type: crate::config::SlotType,
95    /// TLS settings for the replication connection.
96    pub tls: &'a crate::config::CdcTls,
97}
98
99// ── Helper: parse a postgres URL into (host, port, user, password, dbname) ─
100
101#[cfg_attr(test, derive(Debug))]
102struct PgCoords {
103    host: String,
104    port: u16,
105    user: String,
106    password: String,
107    dbname: String,
108}
109
110fn parse_url(url: &str) -> Result<PgCoords, FaucetError> {
111    // Parse via the standard `url` crate so we can extract all components
112    // without relying on sqlx accessor methods that may not be public.
113    let parsed = url::Url::parse(url)
114        .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
115
116    // Fail fast on a missing host or user rather than silently defaulting to
117    // `localhost` / empty — a malformed URL otherwise connects to an
118    // unintended local instance or produces a confusing late auth failure
119    // (#78/#47). Port (5432) and dbname (the user's DB) keep their standard
120    // libpq-style defaults since omitting them is conventional.
121    let host = parsed
122        .host_str()
123        .filter(|h| !h.is_empty())
124        .ok_or_else(|| {
125            FaucetError::Config(
126                "postgres-cdc: connection URL is missing a host (expected \
127                 postgres://user@host[:port]/dbname)"
128                    .to_owned(),
129            )
130        })?
131        .to_owned();
132    let port = parsed.port().unwrap_or(5432);
133    let user = parsed.username().to_owned();
134    if user.is_empty() {
135        return Err(FaucetError::Config(
136            "postgres-cdc: connection URL is missing a user (expected \
137             postgres://user@host[:port]/dbname)"
138                .to_owned(),
139        ));
140    }
141    let password = parsed.password().unwrap_or("").to_owned();
142    let dbname = parsed.path().trim_start_matches('/').to_owned();
143    let dbname = if dbname.is_empty() {
144        "postgres".to_owned()
145    } else {
146        dbname
147    };
148
149    Ok(PgCoords {
150        host,
151        port,
152        user,
153        password,
154        dbname,
155    })
156}
157
158// ── Public API ─────────────────────────────────────────────────────────────
159
160/// Validate connectivity and return a [`Client`] handle.
161///
162/// This function parses the connection URL and records the parameters.
163/// It does **not** open a TCP connection; actual connectivity is verified
164/// lazily when [`ensure_slot`] or [`start_replication`] is called.
165pub async fn connect(params: &ReplicationParams<'_>) -> Result<Client, FaucetError> {
166    // Eagerly validate the URL so bad configs fail fast. No part of the
167    // connection string is retained past this call.
168    let _ = parse_url(params.connection_url)?;
169    Ok(Client { _private: () })
170}
171
172/// Ensure the replication slot exists.
173///
174/// If the slot already exists this is a no-op.  If it does not exist and
175/// `create_if_missing` is `true`, the slot is created via
176/// `pg_create_logical_replication_slot`.  If `create_if_missing` is `false`
177/// and the slot is absent, an error is returned.
178pub async fn ensure_slot(
179    _client: &Client,
180    connection_url: &str,
181    slot_name: &str,
182    create_if_missing: bool,
183    slot_type: crate::config::SlotType,
184) -> Result<(), FaucetError> {
185    use crate::config::SlotType;
186    // Use sqlx for the control-plane query (not a replication connection).
187    let opts: PgConnectOptions = connection_url
188        .parse()
189        .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
190
191    use sqlx::ConnectOptions as _;
192    let mut conn: PgConnection = opts
193        .connect()
194        .await
195        .map_err(|e| FaucetError::Source(format!("postgres-cdc ensure_slot connect: {e}")))?;
196
197    // Check whether the slot already exists.
198    let row: Option<(String,)> =
199        sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
200            .bind(slot_name)
201            .fetch_optional(&mut conn)
202            .await
203            .map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
204
205    if row.is_some() {
206        debug!("postgres-cdc: replication slot '{slot_name}' already exists");
207        return Ok(());
208    }
209
210    if !create_if_missing {
211        return Err(FaucetError::Source(format!(
212            "postgres-cdc: replication slot '{slot_name}' does not exist \
213             and create_slot_if_missing = false"
214        )));
215    }
216
217    // Create the slot using the pgoutput plugin. The third arg to
218    // pg_create_logical_replication_slot is `temporary`: a temporary slot is
219    // dropped when this session disconnects; a permanent slot persists (and
220    // pins WAL) until explicitly dropped.
221    // `escape_simple` prevents injection via the slot name (already validated
222    // to [a-z0-9_] by config, but defence-in-depth doesn't hurt).
223    let temporary = matches!(slot_type, SlotType::Temporary);
224    let sql = format!(
225        "SELECT pg_create_logical_replication_slot({}, 'pgoutput', {})",
226        quote_literal(slot_name),
227        temporary
228    );
229    conn.execute(sql.as_str())
230        .await
231        .map_err(|e| FaucetError::Source(format!("postgres-cdc create slot: {e}")))?;
232
233    if temporary {
234        debug!("postgres-cdc: created temporary replication slot '{slot_name}'");
235    } else {
236        // A permanent slot retains WAL until consumed or dropped — surface it
237        // loudly so an abandoned slot doesn't silently fill pg_wal (#78/#12).
238        tracing::warn!(
239            "postgres-cdc: created PERMANENT replication slot '{slot_name}' — it will retain \
240             WAL on the server until consumed or explicitly dropped (drop_slot). Use \
241             slot_type=temporary for ephemeral runs."
242        );
243    }
244    Ok(())
245}
246
247/// Drop a logical replication slot via a control-plane SQL call
248/// (`pg_drop_replication_slot`). A missing slot is treated as success (no-op);
249/// an active slot (currently in use by another connection) surfaces an error.
250pub async fn drop_slot(connection_url: &str, slot_name: &str) -> Result<(), FaucetError> {
251    let opts: PgConnectOptions = connection_url
252        .parse()
253        .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
254    use sqlx::ConnectOptions as _;
255    let mut conn: PgConnection = opts
256        .connect()
257        .await
258        .map_err(|e| FaucetError::Source(format!("postgres-cdc drop_slot connect: {e}")))?;
259
260    let exists: Option<(String,)> =
261        sqlx::query_as("SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1")
262            .bind(slot_name)
263            .fetch_optional(&mut conn)
264            .await
265            .map_err(|e| FaucetError::Source(format!("postgres-cdc slot lookup: {e}")))?;
266    if exists.is_none() {
267        debug!("postgres-cdc: replication slot '{slot_name}' already absent; drop is a no-op");
268        return Ok(());
269    }
270
271    sqlx::query("SELECT pg_drop_replication_slot($1)")
272        .bind(slot_name)
273        .execute(&mut conn)
274        .await
275        .map_err(|e| FaucetError::Source(format!("postgres-cdc drop slot: {e}")))?;
276    debug!("postgres-cdc: dropped replication slot '{slot_name}'");
277    Ok(())
278}
279
280/// Map the user-facing [`CdcTls`](crate::config::CdcTls) config onto
281/// pgwire-replication's [`TlsConfig`].
282fn tls_config(tls: &crate::config::CdcTls) -> TlsConfig {
283    use crate::config::CdcTls;
284    use std::path::PathBuf;
285    match tls {
286        CdcTls::Disable => TlsConfig::disabled(),
287        CdcTls::Require => TlsConfig::require(),
288        CdcTls::VerifyCa { ca_path } => TlsConfig::verify_ca(ca_path.clone().map(PathBuf::from)),
289        CdcTls::VerifyFull { ca_path } => {
290            TlsConfig::verify_full(ca_path.clone().map(PathBuf::from))
291        }
292    }
293}
294
295/// Advance the slot's `confirmed_flush_lsn` to `lsn` via a control-plane SQL
296/// call (`pg_replication_slot_advance`), **before** the replication stream is
297/// opened.
298///
299/// This is how the connector resumes past already-consumed, durably-persisted
300/// changes without ever advancing the slot ahead of durability (#78/#1). For
301/// a logical slot, `START_REPLICATION` resumes decoding from the slot's
302/// `confirmed_flush_lsn` — the client-supplied start LSN does not filter
303/// transactions that committed below it — so the only way to skip consumed
304/// changes is to move `confirmed_flush_lsn` forward here, while the slot is
305/// inactive. `pg_replication_slot_advance` never moves a slot backwards or
306/// past the server's insert pointer, so a stale or zero `lsn` is a safe no-op.
307///
308/// The slot must be inactive, which it is between [`ensure_slot`] and
309/// [`start_replication`].
310pub async fn advance_slot(
311    connection_url: &str,
312    slot_name: &str,
313    lsn: u64,
314) -> Result<(), FaucetError> {
315    if lsn == 0 {
316        return Ok(());
317    }
318    let opts: PgConnectOptions = connection_url
319        .parse()
320        .map_err(|e| FaucetError::Config(format!("postgres-cdc: invalid connection URL: {e}")))?;
321
322    use sqlx::ConnectOptions as _;
323    let mut conn: PgConnection = opts
324        .connect()
325        .await
326        .map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot connect: {e}")))?;
327
328    // Bind the slot name and the LSN (as pg_lsn text) as parameters — no string
329    // interpolation into the SQL. `format_lsn` emits Postgres' canonical
330    // `X/X` text form.
331    sqlx::query("SELECT pg_replication_slot_advance($1, $2::pg_lsn)")
332        .bind(slot_name)
333        .bind(crate::state::format_lsn(lsn))
334        .execute(&mut conn)
335        .await
336        .map_err(|e| FaucetError::Source(format!("postgres-cdc advance_slot: {e}")))?;
337
338    debug!("postgres-cdc: advanced slot '{slot_name}' confirmed_flush_lsn to {lsn:#x}");
339    Ok(())
340}
341
342/// Open a logical replication stream and return a [`Duplex`] handle.
343///
344/// Internally this calls `pgwire_replication::ReplicationClient::connect`
345/// which handles TCP, TLS negotiation, auth, and `START_REPLICATION` in one
346/// shot.
347pub async fn start_replication(
348    _client: &Client,
349    params: &ReplicationParams<'_>,
350) -> Result<Duplex, FaucetError> {
351    if params.proto_version != 1 {
352        return Err(FaucetError::Config(format!(
353            "postgres-cdc: pgwire-replication 0.3.2 supports proto_version = 1 only; \
354             got {}",
355            params.proto_version
356        )));
357    }
358
359    let coords = parse_url(params.connection_url)?;
360
361    let start_lsn = Lsn::from_u64(params.start_lsn.unwrap_or(0));
362
363    let cfg = ReplicationConfig {
364        host: coords.host,
365        port: coords.port,
366        user: coords.user,
367        password: coords.password,
368        database: coords.dbname,
369        tls: tls_config(params.tls),
370        slot: params.slot_name.to_owned(),
371        publication: params.publication_name.to_owned(),
372        start_lsn,
373        stop_at_lsn: None,
374        // Use the dedicated status-update interval (not tcp_keepalive) so that
375        // Standby Status Updates fire on their own cadence.
376        status_interval: params.status_update_interval,
377        // Wake up the worker at least as often as we send status updates.
378        idle_wakeup_interval: params.status_update_interval,
379        buffer_events: 8192,
380    };
381
382    let inner = ReplicationClient::connect(cfg)
383        .await
384        .map_err(|e| FaucetError::Source(format!("postgres-cdc start_replication: {e}")))?;
385
386    Ok(Duplex { inner })
387}
388
389/// Report progress to the server (Standby Status Update).
390///
391/// `confirmed_lsn` is the highest LSN whose changes have been durably
392/// written to the sink.  The underlying library sends this feedback on its
393/// own keepalive schedule; calling this function additionally marks the
394/// progress so the next automatic feedback includes the latest position.
395///
396/// `reply_requested` mirrors the flag from the server's KeepAlive message
397/// (no-op here since the library handles immediate replies internally).
398pub async fn send_status_update(
399    duplex: &mut Duplex,
400    confirmed_lsn: u64,
401    _reply_requested: bool,
402) -> Result<(), FaucetError> {
403    duplex
404        .inner
405        .update_applied_lsn(Lsn::from_u64(confirmed_lsn));
406    Ok(())
407}
408
409/// Receive the next meaningful replication event from the server.
410///
411/// Returns:
412/// - `Ok(Some(event))` — the next [`ReplicationEvent`] that the caller should
413///   handle.  This includes [`ReplicationEvent::XLogData`],
414///   [`ReplicationEvent::Begin`], [`ReplicationEvent::Commit`], and
415///   [`ReplicationEvent::Message`].  Callers (Task 9+) can match on the full
416///   event type to observe transaction boundaries.
417/// - `Ok(None)` — stream ended cleanly (slot stopped, stop LSN reached, or
418///   `Duplex` was shut down).
419/// - `Err(_)` — network / protocol error.
420///
421/// [`ReplicationEvent::KeepAlive`] events are absorbed here.  We deliberately
422/// do **not** advance the applied-LSN to the server's `wal_end` on a keepalive
423/// (the previous behaviour): that position is not yet durable downstream, and
424/// advertising it as `confirmed_flush_lsn` would authorise Postgres to recycle
425/// WAL for changes the consumer never persisted — a crash in that window loses
426/// data (#78/#1).  The applied-LSN is advanced only from the durable bookmark,
427/// via [`send_status_update`] at the start of each run; the library keeps
428/// sending its periodic Standby Status Updates (carrying that durable
429/// position) to hold the connection open.  [`ReplicationEvent::StoppedAt`] is
430/// converted to `Ok(None)`.
431pub async fn recv(duplex: &mut Duplex) -> Result<Option<ReplicationEvent>, FaucetError> {
432    loop {
433        match duplex
434            .inner
435            .recv()
436            .await
437            .map_err(|e| FaucetError::Source(format!("postgres-cdc recv: {e}")))?
438        {
439            None => return Ok(None),
440
441            Some(ReplicationEvent::StoppedAt { .. }) => {
442                return Ok(None);
443            }
444
445            Some(ReplicationEvent::KeepAlive { .. }) => {
446                // Absorb keepalives without touching the applied-LSN — see the
447                // function doc. Continue the loop; do not surface to the caller.
448            }
449
450            Some(ev) => {
451                // Surface Begin, Commit, XLogData, Message (and any future
452                // variants) to the caller. The commit_lsn carried by Commit is
453                // what becomes the durable bookmark once the pipeline persists
454                // it — that, not wal_end, is the only position fed back to PG.
455                return Ok(Some(ev));
456            }
457        }
458    }
459}
460
461// ── Clock helpers ──────────────────────────────────────────────────────────
462
463/// Current time as a Postgres-epoch timestamp (µs since 2000-01-01 UTC).
464///
465/// Used in Standby Status Update messages.
466pub fn postgres_clock_now() -> i64 {
467    let now = SystemTime::now()
468        .duration_since(UNIX_EPOCH)
469        .unwrap_or_default();
470    let unix_micros = (now.as_secs() as i64) * 1_000_000 + (now.subsec_micros() as i64);
471    unix_micros - POSTGRES_EPOCH_MICROS
472}
473
474/// Convert a Postgres-epoch timestamp (µs since 2000-01-01) to Unix
475/// milliseconds (ms since 1970-01-01).
476pub fn postgres_clock_to_unix_ms(ts: i64) -> i64 {
477    (POSTGRES_EPOCH_MICROS.saturating_add(ts)) / 1_000
478}
479
480// ── Private SQL helpers ────────────────────────────────────────────────────
481
482/// Wrap `s` in double-quotes for use as a Postgres identifier.
483/// Any embedded double-quote is doubled (`"` → `""`).
484/// Reserved for DDL statements (e.g. `DROP REPLICATION SLOT`); used in tests.
485#[allow(dead_code)]
486fn quote_slot(s: &str) -> String {
487    format!("\"{}\"", s.replace('"', "\"\""))
488}
489
490/// Escape a string for use in a Postgres literal (single-quote context).
491/// Any embedded single-quote is doubled (`'` → `''`).
492fn escape_simple(s: &str) -> String {
493    s.replace('\'', "''")
494}
495
496/// Produce a single-quoted Postgres string literal.
497fn quote_literal(s: &str) -> String {
498    format!("'{}'", escape_simple(s))
499}
500
501// ── Tests ──────────────────────────────────────────────────────────────────
502
503/// Returns `true` if `err` is Postgres reporting that the replication slot is
504/// still **active** — held by a backend that has not yet released it. Postgres
505/// raises *"replication slot \"…\" is active for PID …"* (SQLSTATE `55006`).
506///
507/// This is transient on a rapid restart: a scheduler or `serve` re-running the
508/// pipeline before the previous connection's backend has fully exited finds the
509/// slot momentarily still in use. It clears within a short window, so it is
510/// safe to retry after a backoff (#146 M12).
511pub fn is_slot_active_error(err: &FaucetError) -> bool {
512    let msg = err.to_string().to_ascii_lowercase();
513    msg.contains("is active") || msg.contains("55006")
514}
515
516/// Exponential backoff for slot-acquisition retries: `250ms · 2^attempt`,
517/// capped at 4 s.
518fn slot_acquire_backoff(attempt: u32) -> Duration {
519    let factor = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
520    let ms = 250u64.saturating_mul(factor).min(4000);
521    Duration::from_millis(ms)
522}
523
524/// Run `op`, retrying up to `max_retries` times with exponential backoff while
525/// it fails because the replication slot is still active (#146 M12). Any other
526/// error — and the final attempt's error after exhausting retries — is returned
527/// immediately. `max_retries = 0` preserves the previous fail-fast behaviour.
528pub async fn retry_on_slot_active<F, Fut, T>(max_retries: u32, op: F) -> Result<T, FaucetError>
529where
530    F: Fn() -> Fut,
531    Fut: std::future::Future<Output = Result<T, FaucetError>>,
532{
533    let mut attempt = 0u32;
534    loop {
535        match op().await {
536            Ok(value) => return Ok(value),
537            Err(e) if attempt < max_retries && is_slot_active_error(&e) => {
538                let backoff = slot_acquire_backoff(attempt);
539                tracing::warn!(
540                    attempt = attempt + 1,
541                    max_retries,
542                    backoff_ms = backoff.as_millis() as u64,
543                    error = %e,
544                    "postgres-cdc: replication slot still active; retrying after backoff"
545                );
546                tokio::time::sleep(backoff).await;
547                attempt += 1;
548            }
549            Err(e) => return Err(e),
550        }
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use crate::config::CdcTls;
558    use chrono::{TimeZone, Utc};
559    use pgwire_replication::SslMode;
560
561    #[test]
562    fn tls_config_maps_each_mode() {
563        assert_eq!(tls_config(&CdcTls::Disable).mode, SslMode::Disable);
564        assert_eq!(tls_config(&CdcTls::Require).mode, SslMode::Require);
565        assert_eq!(
566            tls_config(&CdcTls::VerifyCa { ca_path: None }).mode,
567            SslMode::VerifyCa
568        );
569        assert_eq!(
570            tls_config(&CdcTls::VerifyFull {
571                ca_path: Some("/ca.pem".into())
572            })
573            .mode,
574            SslMode::VerifyFull
575        );
576    }
577
578    /// Convenience: turn `postgres_clock_to_unix_ms`-compatible math into a
579    /// `DateTime<Utc>`.  Used by tests only.
580    fn postgres_clock_to_datetime(ts: i64) -> chrono::DateTime<Utc> {
581        Utc.timestamp_micros(POSTGRES_EPOCH_MICROS.saturating_add(ts))
582            .single()
583            .unwrap_or_else(Utc::now)
584    }
585
586    #[test]
587    fn postgres_clock_round_trip() {
588        let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
589        let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
590        let back = postgres_clock_to_datetime(pg_ts);
591        assert_eq!(back, dt);
592    }
593
594    #[test]
595    fn unix_ms_conversion() {
596        let dt = Utc.with_ymd_and_hms(2026, 5, 17, 12, 0, 0).unwrap();
597        let pg_ts = dt.timestamp_micros() - POSTGRES_EPOCH_MICROS;
598        assert_eq!(postgres_clock_to_unix_ms(pg_ts), 1_779_019_200_000);
599    }
600
601    #[test]
602    fn quote_slot_simple() {
603        assert_eq!(quote_slot("faucet_slot"), "\"faucet_slot\"");
604    }
605
606    #[test]
607    fn escape_simple_doubles_quotes() {
608        assert_eq!(escape_simple("foo'bar"), "foo''bar");
609    }
610
611    #[test]
612    fn parse_url_extracts_all_components() {
613        let c = parse_url("postgres://alice:secret@db.example.com:5544/analytics").unwrap();
614        assert_eq!(c.host, "db.example.com");
615        assert_eq!(c.port, 5544);
616        assert_eq!(c.user, "alice");
617        assert_eq!(c.password, "secret");
618        assert_eq!(c.dbname, "analytics");
619    }
620
621    #[test]
622    fn parse_url_defaults_port_and_dbname() {
623        let c = parse_url("postgres://alice@db.example.com").unwrap();
624        assert_eq!(c.port, 5432);
625        assert_eq!(c.dbname, "postgres");
626        assert_eq!(c.password, "");
627    }
628
629    #[test]
630    fn parse_url_rejects_missing_host() {
631        // `postgres:///db` parses with an empty host — must fail fast (#78/#47).
632        let err = parse_url("postgres:///analytics").unwrap_err();
633        assert!(format!("{err}").contains("missing a host"), "{err}");
634    }
635
636    #[test]
637    fn parse_url_rejects_missing_user() {
638        let err = parse_url("postgres://db.example.com/analytics").unwrap_err();
639        assert!(format!("{err}").contains("missing a user"), "{err}");
640    }
641
642    #[test]
643    fn is_slot_active_error_classifies_the_postgres_message() {
644        // The canonical Postgres message (SQLSTATE 55006).
645        assert!(is_slot_active_error(&FaucetError::Source(
646            "postgres-cdc start_replication: db error: ERROR: replication slot \"s\" \
647             is active for PID 4242"
648                .into()
649        )));
650        // SQLSTATE code present.
651        assert!(is_slot_active_error(&FaucetError::Source(
652            "55006: replication slot is in use".into()
653        )));
654        // Unrelated errors are NOT slot-active.
655        assert!(!is_slot_active_error(&FaucetError::Source(
656            "connection refused".into()
657        )));
658        assert!(!is_slot_active_error(&FaucetError::Config(
659            "bad url".into()
660        )));
661    }
662
663    #[test]
664    fn slot_acquire_backoff_grows_and_is_capped() {
665        assert_eq!(slot_acquire_backoff(0), Duration::from_millis(250));
666        assert_eq!(slot_acquire_backoff(1), Duration::from_millis(500));
667        assert_eq!(slot_acquire_backoff(2), Duration::from_millis(1000));
668        // Capped at 4s no matter how large the attempt.
669        assert_eq!(slot_acquire_backoff(20), Duration::from_millis(4000));
670        assert_eq!(slot_acquire_backoff(64), Duration::from_millis(4000));
671    }
672
673    #[tokio::test]
674    async fn retry_on_slot_active_retries_then_succeeds() {
675        use std::sync::atomic::{AtomicU32, Ordering};
676        let calls = AtomicU32::new(0);
677        let result = retry_on_slot_active(5, || {
678            let n = calls.fetch_add(1, Ordering::SeqCst);
679            async move {
680                if n < 2 {
681                    Err(FaucetError::Source(
682                        "replication slot \"s\" is active for PID 1".into(),
683                    ))
684                } else {
685                    Ok::<u32, FaucetError>(42)
686                }
687            }
688        })
689        .await;
690        assert_eq!(result.unwrap(), 42);
691        assert_eq!(calls.load(Ordering::SeqCst), 3, "2 failures + 1 success");
692    }
693
694    #[tokio::test]
695    async fn retry_on_slot_active_gives_up_after_max_retries() {
696        use std::sync::atomic::{AtomicU32, Ordering};
697        let calls = AtomicU32::new(0);
698        let result: Result<(), _> = retry_on_slot_active(2, || {
699            calls.fetch_add(1, Ordering::SeqCst);
700            async { Err(FaucetError::Source("slot is active".into())) }
701        })
702        .await;
703        assert!(result.is_err());
704        assert_eq!(
705            calls.load(Ordering::SeqCst),
706            3,
707            "initial attempt + 2 retries"
708        );
709    }
710
711    #[tokio::test]
712    async fn retry_on_slot_active_does_not_retry_unrelated_errors() {
713        use std::sync::atomic::{AtomicU32, Ordering};
714        let calls = AtomicU32::new(0);
715        let result: Result<(), _> = retry_on_slot_active(5, || {
716            calls.fetch_add(1, Ordering::SeqCst);
717            async { Err(FaucetError::Source("connection refused".into())) }
718        })
719        .await;
720        assert!(result.is_err());
721        assert_eq!(
722            calls.load(Ordering::SeqCst),
723            1,
724            "a non-slot-active error must not be retried"
725        );
726    }
727}