Skip to main content

fraiseql_db/postgres/adapter/
mod.rs

1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod query_stats;
5mod relay;
6
7#[cfg(test)]
8mod tests;
9
10#[cfg(all(test, feature = "test-postgres"))]
11mod integration_tests;
12
13use std::{fmt::Write, time::Duration};
14
15use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
16use fraiseql_error::{FraiseQLError, Result};
17use tokio_postgres::{NoTls, Row};
18
19use super::where_generator::PostgresWhereGenerator;
20use crate::{
21    dialect::PostgresDialect,
22    identifier::quote_postgres_identifier,
23    order_by::append_order_by,
24    traits::DatabaseAdapter,
25    types::{
26        DatabaseType, JsonbValue, QueryParam,
27        sql_hints::{OrderByClause, SqlProjectionHint},
28    },
29    where_clause::WhereClause,
30};
31
32/// Default maximum pool size for PostgreSQL connections.
33/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
34/// nested query load (fixes Issue #41).
35const DEFAULT_POOL_SIZE: usize = 25;
36
37/// Maximum retries for connection acquisition with exponential backoff.
38const MAX_CONNECTION_RETRIES: u32 = 3;
39
40/// Base delay in milliseconds for connection retry backoff.
41const CONNECTION_RETRY_DELAY_MS: u64 = 50;
42
43/// Configuration for connection pool construction and pre-warming.
44///
45/// Controls the minimum guaranteed connections (pre-warmed at startup),
46/// the maximum pool ceiling, and the wait/create timeout for connection
47/// acquisition.
48///
49/// # Example
50///
51/// ```rust
52/// use fraiseql_db::postgres::PoolPrewarmConfig;
53///
54/// let cfg = PoolPrewarmConfig {
55///     min_size:     5,
56///     max_size:     20,
57///     timeout_secs: Some(30),
58/// };
59/// ```
60#[derive(Debug, Clone)]
61pub struct PoolPrewarmConfig {
62    /// Number of connections to establish at pool creation time.
63    ///
64    /// After the pool is created, `min_size` connections are opened eagerly
65    /// so they are ready when the first request arrives. Set to `0` to disable
66    /// pre-warming (lazy init — one connection from the startup health check).
67    pub min_size: usize,
68
69    /// Maximum number of connections the pool may hold.
70    pub max_size: usize,
71
72    /// Optional timeout (in seconds) for connection acquisition and creation.
73    ///
74    /// Applied to both the `wait` (blocked waiting for an idle connection) and
75    /// `create` (time to open a new TCP connection to PostgreSQL) deadpool slots.
76    /// When `None`, acquisition can block indefinitely on pool exhaustion.
77    pub timeout_secs: Option<u64>,
78}
79
80/// Build a `deadpool-postgres` pool with an optional wait/create timeout.
81///
82/// # Errors
83///
84/// Returns `FraiseQLError::ConnectionPool` if pool creation fails (e.g., unparseable URL).
85fn build_pool(connection_string: &str, max_size: usize, timeout_secs: Option<u64>) -> Result<Pool> {
86    let mut cfg = Config::new();
87    cfg.url = Some(connection_string.to_string());
88    cfg.manager = Some(ManagerConfig {
89        recycling_method: RecyclingMethod::Fast,
90    });
91
92    let mut pool_cfg = deadpool_postgres::PoolConfig::new(max_size);
93    if let Some(secs) = timeout_secs {
94        let t = Duration::from_secs(secs);
95        pool_cfg.timeouts.wait = Some(t);
96        pool_cfg.timeouts.create = Some(t);
97        // `recycle` intentionally stays None — fast recycle, not user-configurable.
98    }
99    cfg.pool = Some(pool_cfg);
100
101    cfg.create_pool(Some(Runtime::Tokio1), NoTls)
102        .map_err(|e| FraiseQLError::ConnectionPool {
103            message: format!("Failed to create connection pool: {e}"),
104        })
105}
106
107/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
108///
109/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
110/// This function is defense-in-depth: `OrderByClause` already rejects field names
111/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
112/// escaping ensures correctness for any future caller that bypasses that validation.
113pub(super) fn escape_jsonb_key(key: &str) -> String {
114    key.replace('\'', "''")
115}
116
117/// PostgreSQL database adapter with connection pooling.
118///
119/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
120///
121/// # Example
122///
123/// ```rust,no_run
124/// use fraiseql_db::postgres::PostgresAdapter;
125/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
126/// use serde_json::json;
127///
128/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
129/// // Create adapter with connection string
130/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
131///
132/// // Execute query
133/// let where_clause = WhereClause::Field {
134///     path: vec!["email".to_string()],
135///     operator: WhereOperator::Icontains,
136///     value: json!("example.com"),
137/// };
138///
139/// let results = adapter
140///     .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
141///     .await?;
142///
143/// println!("Found {} users", results.len());
144/// # Ok(())
145/// # }
146/// ```
147#[derive(Clone)]
148pub struct PostgresAdapter {
149    pub(super) pool:         Pool,
150    /// Whether mutation timing injection is enabled.
151    mutation_timing_enabled: bool,
152    /// The PostgreSQL session variable name for timing.
153    timing_variable_name:    String,
154}
155
156impl std::fmt::Debug for PostgresAdapter {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("PostgresAdapter")
159            .field("mutation_timing_enabled", &self.mutation_timing_enabled)
160            .field("timing_variable_name", &self.timing_variable_name)
161            .field("pool", &"<Pool>")
162            .finish()
163    }
164}
165
166impl PostgresAdapter {
167    /// Create new PostgreSQL adapter with default pool configuration.
168    ///
169    /// # Arguments
170    ///
171    /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
172    ///
173    /// # Errors
174    ///
175    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
176    ///
177    /// # Example
178    ///
179    /// ```rust,no_run
180    /// # use fraiseql_db::postgres::PostgresAdapter;
181    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
182    /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub async fn new(connection_string: &str) -> Result<Self> {
187        Self::with_pool_config(
188            connection_string,
189            PoolPrewarmConfig {
190                min_size:     0,
191                max_size:     DEFAULT_POOL_SIZE,
192                timeout_secs: None,
193            },
194        )
195        .await
196    }
197
198    /// Create new PostgreSQL adapter with pre-warming and timeout configuration.
199    ///
200    /// Constructs the pool, runs a startup health check, then eagerly opens
201    /// `cfg.min_size` connections so they are ready when the first request arrives.
202    ///
203    /// # Arguments
204    ///
205    /// * `connection_string` - PostgreSQL connection string
206    /// * `cfg` - Pool pre-warming and timeout configuration
207    ///
208    /// # Errors
209    ///
210    /// Returns `FraiseQLError::ConnectionPool` if pool creation or the startup
211    /// health check fails.
212    pub async fn with_pool_config(connection_string: &str, cfg: PoolPrewarmConfig) -> Result<Self> {
213        let pool = build_pool(connection_string, cfg.max_size, cfg.timeout_secs)?;
214
215        // Startup health check — establishes the first connection.
216        let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
217            message: format!("Failed to acquire connection: {e}"),
218        })?;
219
220        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
221            message:   format!("Failed to connect to database: {e}"),
222            sql_state: e.code().map(|c| c.code().to_string()),
223        })?;
224
225        // Drop client back to the pool before pre-warming so that the health-check
226        // connection counts as idle slot #1.
227        drop(client);
228
229        let adapter = Self {
230            pool,
231            mutation_timing_enabled: false,
232            timing_variable_name: "fraiseql.started_at".to_string(),
233        };
234
235        // Pre-warm: open `min_size - 1` additional connections (one already exists).
236        let warm_target = cfg.min_size.min(cfg.max_size).saturating_sub(1);
237        if warm_target > 0 {
238            adapter.prewarm(warm_target).await;
239        }
240
241        Ok(adapter)
242    }
243
244    /// Create new PostgreSQL adapter with custom pool size.
245    ///
246    /// # Arguments
247    ///
248    /// * `connection_string` - PostgreSQL connection string
249    /// * `max_size` - Maximum number of connections in pool
250    ///
251    /// # Errors
252    ///
253    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
254    pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
255        Self::with_pool_config(
256            connection_string,
257            PoolPrewarmConfig {
258                min_size: 0,
259                max_size,
260                timeout_secs: None,
261            },
262        )
263        .await
264    }
265
266    /// Pre-warm the pool by opening `count` additional connections.
267    ///
268    /// Pre-warming is best-effort: failures from individual connections are logged
269    /// but do not prevent startup. A 10-second outer timeout ensures the server
270    /// never blocks indefinitely on a slow or unreachable PostgreSQL instance.
271    async fn prewarm(&self, count: usize) {
272        use futures::future::join_all;
273        use tokio::time::timeout;
274
275        let handles: Vec<_> = (0..count)
276            .map(|_| {
277                let pool = self.pool.clone();
278                tokio::spawn(async move { pool.get().await })
279            })
280            .collect();
281
282        let result = timeout(Duration::from_secs(10), join_all(handles)).await;
283
284        let (succeeded, failed) = match result {
285            Ok(outcomes) => {
286                let s = outcomes
287                    .iter()
288                    .filter(|r| r.as_ref().map(|inner| inner.is_ok()).unwrap_or(false))
289                    .count();
290                (s, count - s)
291            },
292            Err(_elapsed) => {
293                tracing::warn!(
294                    target_connections = count,
295                    "Pool pre-warm timed out after 10s; server will continue with partial pre-warm"
296                );
297                (0, count)
298            },
299        };
300
301        if failed > 0 {
302            tracing::warn!(
303                succeeded,
304                failed,
305                "Pool pre-warm: some connections could not be established"
306            );
307        } else {
308            tracing::info!(
309                idle_connections = succeeded + 1,
310                "PostgreSQL pool pre-warmed successfully"
311            );
312        }
313    }
314
315    /// Get a reference to the internal connection pool.
316    ///
317    /// This allows sharing the pool with other components like `PostgresIntrospector`.
318    #[must_use]
319    pub const fn pool(&self) -> &Pool {
320        &self.pool
321    }
322
323    /// Enable mutation timing injection.
324    ///
325    /// When enabled, `execute_function_call` wraps each mutation in a transaction
326    /// and sets a session variable to `clock_timestamp()::text` before execution,
327    /// allowing SQL functions to compute their own duration.
328    ///
329    /// # Arguments
330    ///
331    /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
332    #[must_use]
333    pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
334        self.mutation_timing_enabled = true;
335        self.timing_variable_name = variable_name.to_string();
336        self
337    }
338
339    /// Returns whether mutation timing injection is enabled.
340    #[must_use]
341    pub const fn mutation_timing_enabled(&self) -> bool {
342        self.mutation_timing_enabled
343    }
344
345    /// Execute raw SQL query and return JSONB rows.
346    ///
347    /// # Errors
348    ///
349    /// Returns `FraiseQLError::Database` on query execution failure.
350    pub(super) async fn execute_raw(
351        &self,
352        sql: &str,
353        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
354    ) -> Result<Vec<JsonbValue>> {
355        let client = self.acquire_connection_with_retry().await?;
356
357        let rows: Vec<Row> =
358            client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
359                message:   format!("Query execution failed: {e}"),
360                sql_state: e.code().map(|c| c.code().to_string()),
361            })?;
362
363        let results = rows
364            .into_iter()
365            .map(|row| {
366                let data: serde_json::Value = row.get(0);
367                JsonbValue::new(data)
368            })
369            .collect();
370
371        Ok(results)
372    }
373
374    /// Acquire a connection from the pool with retry logic.
375    ///
376    /// - `PoolError::Timeout`: the pool was exhausted for the full configured wait period. This is
377    ///   not transient — retrying would only multiply the wait. Fails immediately.
378    /// - `PoolError::Backend` / create errors: potentially transient. Retries with exponential
379    ///   backoff (up to `MAX_CONNECTION_RETRIES` attempts).
380    ///
381    /// # Errors
382    ///
383    /// Returns `FraiseQLError::ConnectionPool` on timeout or when all retries are exhausted.
384    pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
385        use deadpool_postgres::PoolError;
386
387        let mut last_error = None;
388
389        for attempt in 0..MAX_CONNECTION_RETRIES {
390            match self.pool.get().await {
391                Ok(client) => {
392                    if attempt > 0 {
393                        tracing::info!(attempt, "Successfully acquired connection after retries");
394                    }
395                    return Ok(client);
396                },
397                // Pool exhausted for the full wait period — not transient, fail immediately.
398                Err(PoolError::Timeout(_)) => {
399                    let metrics = self.pool_metrics();
400                    tracing::error!(
401                        available = metrics.idle_connections,
402                        active = metrics.active_connections,
403                        max = metrics.total_connections,
404                        "Connection pool timeout: all connections busy"
405                    );
406                    return Err(FraiseQLError::ConnectionPool {
407                        message: format!(
408                            "Connection pool timeout: {}/{} connections busy. \
409                             Increase pool_max_size or reduce concurrent load.",
410                            metrics.active_connections, metrics.total_connections,
411                        ),
412                    });
413                },
414                // Backend/create errors are potentially transient — retry with backoff.
415                Err(e) => {
416                    last_error = Some(e);
417                    if attempt < MAX_CONNECTION_RETRIES - 1 {
418                        let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
419                        tracing::warn!(
420                            attempt = attempt + 1,
421                            total = MAX_CONNECTION_RETRIES,
422                            delay_ms = delay,
423                            "Transient connection error, retrying"
424                        );
425                        tokio::time::sleep(Duration::from_millis(delay)).await;
426                    }
427                },
428            }
429        }
430
431        // All retries for transient errors exhausted.
432        let pool_metrics = self.pool_metrics();
433        tracing::error!(
434            retries = MAX_CONNECTION_RETRIES,
435            available = pool_metrics.idle_connections,
436            active = pool_metrics.active_connections,
437            max = pool_metrics.total_connections,
438            "Failed to acquire connection after all retries"
439        );
440
441        Err(FraiseQLError::ConnectionPool {
442            message: format!(
443                "Failed to acquire connection after {} retries: {}. \
444                 Pool state: idle={}, active={}, max={}",
445                MAX_CONNECTION_RETRIES,
446                last_error.expect("last_error is set on every retry iteration"),
447                pool_metrics.idle_connections,
448                pool_metrics.active_connections,
449                pool_metrics.total_connections,
450            ),
451        })
452    }
453
454    /// Execute query with SQL field projection optimization.
455    ///
456    /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
457    /// only the requested fields from the JSONB column, reducing network payload and
458    /// JSON deserialization overhead.
459    ///
460    /// # Arguments
461    ///
462    /// * `view` - View/table name to query
463    /// * `projection` - Optional SQL projection hint with field list
464    /// * `where_clause` - Optional WHERE clause for filtering
465    /// * `limit` - Optional row limit
466    ///
467    /// # Returns
468    ///
469    /// Vector of projected JSONB rows with only the requested fields
470    ///
471    /// # Errors
472    ///
473    /// Returns `FraiseQLError::Database` on query execution failure.
474    ///
475    /// # Panics
476    ///
477    /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
478    /// immediately above it.
479    ///
480    /// # Example
481    ///
482    /// ```no_run
483    /// // Requires: running PostgreSQL database.
484    /// use fraiseql_db::postgres::PostgresAdapter;
485    /// use fraiseql_db::types::SqlProjectionHint;
486    /// use fraiseql_db::DatabaseType;
487    ///
488    /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
489    /// let projection = SqlProjectionHint::new(
490    ///     DatabaseType::PostgreSQL,
491    ///     "jsonb_build_object('id', data->>'id')".to_string(),
492    ///     75,
493    /// );
494    ///
495    /// let results = adapter
496    ///     .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
497    ///     .await?;
498    /// # Ok(())
499    /// # }
500    /// ```
501    /// Implementation of `execute_with_projection` with ORDER BY support.
502    ///
503    /// Called by both the inherent convenience method and the `DatabaseAdapter`
504    /// trait implementation.
505    pub(super) async fn execute_with_projection_impl(
506        &self,
507        view: &str,
508        projection: Option<&SqlProjectionHint>,
509        where_clause: Option<&WhereClause>,
510        limit: Option<u32>,
511        offset: Option<u32>,
512        order_by: Option<&[OrderByClause]>,
513    ) -> Result<Vec<JsonbValue>> {
514        // If no projection, fall back to standard query
515        if projection.is_none() {
516            return self.execute_where_query(view, where_clause, limit, offset, order_by).await;
517        }
518
519        let projection = projection.expect("projection is Some; None was returned above");
520
521        // Build SQL with projection
522        // The projection_template is expected to be the SELECT clause with projection SQL
523        // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
524        let mut sql = format!(
525            "SELECT {} FROM {}",
526            projection.projection_template,
527            quote_postgres_identifier(view)
528        );
529
530        // Add WHERE clause if present
531        let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
532            let generator = PostgresWhereGenerator::new(PostgresDialect);
533            let (where_sql, where_params) = generator.generate(clause)?;
534            sql.push_str(" WHERE ");
535            sql.push_str(&where_sql);
536            where_params.into_iter().map(QueryParam::from).collect()
537        } else {
538            Vec::new()
539        };
540        let mut param_count = typed_params.len();
541
542        // ORDER BY must come before LIMIT/OFFSET in SQL.
543        append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
544
545        // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
546        // Reason (expect below): fmt::Write for String is infallible.
547        if let Some(lim) = limit {
548            param_count += 1;
549            write!(sql, " LIMIT ${param_count}").expect("write to String");
550            typed_params.push(QueryParam::BigInt(i64::from(lim)));
551        }
552
553        if let Some(off) = offset {
554            param_count += 1;
555            write!(sql, " OFFSET ${param_count}").expect("write to String");
556            typed_params.push(QueryParam::BigInt(i64::from(off)));
557        }
558
559        tracing::debug!("SQL with projection = {}", sql);
560        tracing::debug!("typed_params = {:?}", typed_params);
561
562        let param_refs = crate::types::as_sql_param_refs(&typed_params);
563
564        self.execute_raw(&sql, &param_refs).await
565    }
566
567    /// Execute query with SQL field projection optimization.
568    ///
569    /// Convenience wrapper for callers that don't need ORDER BY.
570    /// See `execute_with_projection_impl` for details.
571    ///
572    /// # Errors
573    ///
574    /// Returns `FraiseQLError::Database` on query execution failure.
575    pub async fn execute_with_projection(
576        &self,
577        view: &str,
578        projection: Option<&SqlProjectionHint>,
579        where_clause: Option<&WhereClause>,
580        limit: Option<u32>,
581        offset: Option<u32>,
582    ) -> Result<Vec<JsonbValue>> {
583        self.execute_with_projection_impl(view, projection, where_clause, limit, offset, None)
584            .await
585    }
586}
587
588/// Build a parameterized `SELECT data FROM {view}` SQL string.
589///
590/// Shared by [`PostgresAdapter::execute_where_query`] and
591/// [`PostgresAdapter::explain_where_query`] so that SQL construction
592/// logic is never duplicated.
593///
594/// # Returns
595///
596/// `(sql, typed_params)` — the SQL string and the bound parameter values.
597///
598/// # Errors
599///
600/// Returns `FraiseQLError` if WHERE clause generation fails.
601pub(super) fn build_where_select_sql(
602    view: &str,
603    where_clause: Option<&WhereClause>,
604    limit: Option<u32>,
605    offset: Option<u32>,
606) -> Result<(String, Vec<QueryParam>)> {
607    build_where_select_sql_ordered(view, where_clause, limit, offset, None)
608}
609
610/// Build a parameterized `SELECT data FROM {view}` SQL string with optional ORDER BY.
611///
612/// ORDER BY is inserted between the WHERE clause and LIMIT/OFFSET as required by SQL.
613///
614/// # Returns
615///
616/// `(sql, typed_params)` — the SQL string and the bound parameter values.
617///
618/// # Errors
619///
620/// Returns `FraiseQLError` if WHERE clause generation or field name validation fails.
621pub(super) fn build_where_select_sql_ordered(
622    view: &str,
623    where_clause: Option<&WhereClause>,
624    limit: Option<u32>,
625    offset: Option<u32>,
626    order_by: Option<&[OrderByClause]>,
627) -> Result<(String, Vec<QueryParam>)> {
628    // Build base query
629    let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
630
631    // Collect WHERE clause params (if any)
632    let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
633        let generator = PostgresWhereGenerator::new(PostgresDialect);
634        let (where_sql, where_params) = generator.generate(clause)?;
635        sql.push_str(" WHERE ");
636        sql.push_str(&where_sql);
637
638        // Convert WHERE clause JSON values to QueryParam
639        where_params.into_iter().map(QueryParam::from).collect()
640    } else {
641        Vec::new()
642    };
643    let mut param_count = typed_params.len();
644
645    // ORDER BY must come before LIMIT/OFFSET in SQL.
646    append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
647
648    // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
649    // Reason (expect below): fmt::Write for String is infallible.
650    if let Some(lim) = limit {
651        param_count += 1;
652        write!(sql, " LIMIT ${param_count}").expect("write to String");
653        typed_params.push(QueryParam::BigInt(i64::from(lim)));
654    }
655
656    // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
657    if let Some(off) = offset {
658        param_count += 1;
659        write!(sql, " OFFSET ${param_count}").expect("write to String");
660        typed_params.push(QueryParam::BigInt(i64::from(off)));
661    }
662
663    Ok((sql, typed_params))
664}