Skip to main content

systemprompt_database/services/postgres/
connection.rs

1//! Initial-connect retry policy for `PostgresProvider`.
2//!
3//! Wraps the first `PgPool` connect in a bounded exponential backoff so
4//! transient startup races (Postgres still booting, SSL handshake racing
5//! the TCP listener) recover without surfacing as user-visible failures.
6//! The retry loop intentionally targets a narrow set of error shapes so
7//! permanent failures (auth, missing database, bad URL) fail fast. The
8//! backoff itself runs on [`crate::resilience::retry::retry_async`].
9
10use std::future::Future;
11use std::time::Duration;
12
13use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
14
15use crate::error::DatabaseResult;
16use crate::resilience::classify::Outcome;
17use crate::resilience::config::RetryConfig;
18use crate::resilience::retry::retry_async;
19
20const RETRY_DELAYS_MS: &[u64] = &[100, 200, 400, 800, 1600];
21const MAX_ATTEMPTS: u32 = 5;
22
23#[must_use]
24pub fn build_pool_options() -> PgPoolOptions {
25    PgPoolOptions::new()
26        .max_connections(50)
27        .min_connections(0)
28        .max_lifetime(Duration::from_secs(1800))
29        .acquire_timeout(Duration::from_secs(30))
30        .idle_timeout(Duration::from_secs(300))
31}
32
33pub async fn connect_with_retry(
34    options: PgPoolOptions,
35    connect_options: PgConnectOptions,
36) -> DatabaseResult<PgPool> {
37    let connector = |opts: PgConnectOptions| {
38        let options = options.clone();
39        async move { options.connect_with(opts).await }
40    };
41    connect_with_retry_using(connect_options, MAX_ATTEMPTS, RETRY_DELAYS_MS, connector).await
42}
43
44pub async fn connect_with_retry_using<T, F, Fut>(
45    connect_options: PgConnectOptions,
46    max_attempts: u32,
47    delays_ms: &[u64],
48    connector: F,
49) -> DatabaseResult<T>
50where
51    T: Send,
52    F: Fn(PgConnectOptions) -> Fut + Send + Sync,
53    Fut: Future<Output = Result<T, sqlx::Error>> + Send,
54{
55    let cfg = RetryConfig {
56        max_attempts,
57        base_delay: Duration::from_millis(delays_ms.first().copied().unwrap_or(100)),
58        max_delay: Duration::from_millis(delays_ms.iter().copied().max().unwrap_or(1600)),
59        jitter: false,
60    };
61    let classify = |err: &sqlx::Error| {
62        if is_retryable(err) {
63            Outcome::Transient { retry_after: None }
64        } else {
65            Outcome::Permanent
66        }
67    };
68    retry_async(&cfg, "postgres-connect", classify, || {
69        connector(connect_options.clone())
70    })
71    .await
72    .map_err(Into::into)
73}
74
75fn is_retryable(err: &sqlx::Error) -> bool {
76    if let sqlx::Error::Io(io_err) = err {
77        if io_err.kind() == std::io::ErrorKind::ConnectionRefused {
78            return true;
79        }
80    }
81    let msg = err.to_string();
82    msg.contains("unexpected response from SSLRequest") || msg.contains("starting up")
83}