Skip to main content

fraiseql_db/postgres/adapter/
mod.rs

1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::{fmt::Write, time::Duration};
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20    dialect::PostgresDialect,
21    identifier::quote_postgres_identifier,
22    order_by::append_order_by,
23    traits::DatabaseAdapter,
24    types::{
25        DatabaseType, JsonbValue, QueryParam,
26        sql_hints::{OrderByClause, SqlProjectionHint},
27    },
28    where_clause::WhereClause,
29};
30
31/// Default maximum pool size for PostgreSQL connections.
32/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
33/// nested query load (fixes Issue #41).
34const DEFAULT_POOL_SIZE: usize = 25;
35
36/// Maximum retries for connection acquisition with exponential backoff.
37const MAX_CONNECTION_RETRIES: u32 = 3;
38
39/// Base delay in milliseconds for connection retry backoff.
40const CONNECTION_RETRY_DELAY_MS: u64 = 50;
41
42/// Configuration for connection pool construction and pre-warming.
43///
44/// Controls the minimum guaranteed connections (pre-warmed at startup),
45/// the maximum pool ceiling, and the wait/create timeout for connection
46/// acquisition.
47///
48/// # Example
49///
50/// ```rust
51/// use fraiseql_db::postgres::PoolPrewarmConfig;
52///
53/// let cfg = PoolPrewarmConfig {
54///     min_size:     5,
55///     max_size:     20,
56///     timeout_secs: Some(30),
57/// };
58/// ```
59#[derive(Debug, Clone)]
60pub struct PoolPrewarmConfig {
61    /// Number of connections to establish at pool creation time.
62    ///
63    /// After the pool is created, `min_size` connections are opened eagerly
64    /// so they are ready when the first request arrives. Set to `0` to disable
65    /// pre-warming (lazy init — one connection from the startup health check).
66    pub min_size: usize,
67
68    /// Maximum number of connections the pool may hold.
69    pub max_size: usize,
70
71    /// Optional timeout (in seconds) for connection acquisition and creation.
72    ///
73    /// Applied to both the `wait` (blocked waiting for an idle connection) and
74    /// `create` (time to open a new TCP connection to PostgreSQL) deadpool slots.
75    /// When `None`, acquisition can block indefinitely on pool exhaustion.
76    pub timeout_secs: Option<u64>,
77}
78
79/// Build a `deadpool-postgres` pool with an optional wait/create timeout.
80///
81/// # Errors
82///
83/// Returns `FraiseQLError::ConnectionPool` if pool creation fails (e.g., unparseable URL).
84fn build_pool(connection_string: &str, max_size: usize, timeout_secs: Option<u64>) -> Result<Pool> {
85    let mut cfg = Config::new();
86    cfg.url = Some(connection_string.to_string());
87    cfg.manager = Some(ManagerConfig {
88        recycling_method: RecyclingMethod::Fast,
89    });
90
91    let mut pool_cfg = deadpool_postgres::PoolConfig::new(max_size);
92    if let Some(secs) = timeout_secs {
93        let t = Duration::from_secs(secs);
94        pool_cfg.timeouts.wait = Some(t);
95        pool_cfg.timeouts.create = Some(t);
96        // `recycle` intentionally stays None — fast recycle, not user-configurable.
97    }
98    cfg.pool = Some(pool_cfg);
99
100    cfg.create_pool(Some(Runtime::Tokio1), NoTls)
101        .map_err(|e| FraiseQLError::ConnectionPool {
102            message: format!("Failed to create connection pool: {e}"),
103        })
104}
105
106/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
107///
108/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
109/// This function is defense-in-depth: `OrderByClause` already rejects field names
110/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
111/// escaping ensures correctness for any future caller that bypasses that validation.
112pub(super) fn escape_jsonb_key(key: &str) -> String {
113    key.replace('\'', "''")
114}
115
116/// PostgreSQL database adapter with connection pooling.
117///
118/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
119///
120/// # Example
121///
122/// ```rust,no_run
123/// use fraiseql_db::postgres::PostgresAdapter;
124/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
125/// use serde_json::json;
126///
127/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
128/// // Create adapter with connection string
129/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
130///
131/// // Execute query
132/// let where_clause = WhereClause::Field {
133///     path: vec!["email".to_string()],
134///     operator: WhereOperator::Icontains,
135///     value: json!("example.com"),
136/// };
137///
138/// let results = adapter
139///     .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
140///     .await?;
141///
142/// println!("Found {} users", results.len());
143/// # Ok(())
144/// # }
145/// ```
146#[derive(Clone)]
147pub struct PostgresAdapter {
148    pub(super) pool:         Pool,
149    /// Whether mutation timing injection is enabled.
150    mutation_timing_enabled: bool,
151    /// The PostgreSQL session variable name for timing.
152    timing_variable_name:    String,
153}
154
155impl std::fmt::Debug for PostgresAdapter {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("PostgresAdapter")
158            .field("mutation_timing_enabled", &self.mutation_timing_enabled)
159            .field("timing_variable_name", &self.timing_variable_name)
160            .field("pool", &"<Pool>")
161            .finish()
162    }
163}
164
165impl PostgresAdapter {
166    /// Create new PostgreSQL adapter with default pool configuration.
167    ///
168    /// # Arguments
169    ///
170    /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
171    ///
172    /// # Errors
173    ///
174    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
175    ///
176    /// # Example
177    ///
178    /// ```rust,no_run
179    /// # use fraiseql_db::postgres::PostgresAdapter;
180    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
181    /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    pub async fn new(connection_string: &str) -> Result<Self> {
186        Self::with_pool_config(
187            connection_string,
188            PoolPrewarmConfig {
189                min_size:     0,
190                max_size:     DEFAULT_POOL_SIZE,
191                timeout_secs: None,
192            },
193        )
194        .await
195    }
196
197    /// Create new PostgreSQL adapter with pre-warming and timeout configuration.
198    ///
199    /// Constructs the pool, runs a startup health check, then eagerly opens
200    /// `cfg.min_size` connections so they are ready when the first request arrives.
201    ///
202    /// # Arguments
203    ///
204    /// * `connection_string` - PostgreSQL connection string
205    /// * `cfg` - Pool pre-warming and timeout configuration
206    ///
207    /// # Errors
208    ///
209    /// Returns `FraiseQLError::ConnectionPool` if pool creation or the startup
210    /// health check fails.
211    pub async fn with_pool_config(connection_string: &str, cfg: PoolPrewarmConfig) -> Result<Self> {
212        let pool = build_pool(connection_string, cfg.max_size, cfg.timeout_secs)?;
213
214        // Startup health check — establishes the first connection.
215        let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
216            message: format!("Failed to acquire connection: {e}"),
217        })?;
218
219        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
220            message:   format!("Failed to connect to database: {e}"),
221            sql_state: e.code().map(|c| c.code().to_string()),
222        })?;
223
224        // Drop client back to the pool before pre-warming so that the health-check
225        // connection counts as idle slot #1.
226        drop(client);
227
228        let adapter = Self {
229            pool,
230            mutation_timing_enabled: false,
231            timing_variable_name: "fraiseql.started_at".to_string(),
232        };
233
234        // Pre-warm: open `min_size - 1` additional connections (one already exists).
235        let warm_target = cfg.min_size.min(cfg.max_size).saturating_sub(1);
236        if warm_target > 0 {
237            adapter.prewarm(warm_target).await;
238        }
239
240        Ok(adapter)
241    }
242
243    /// Create new PostgreSQL adapter with custom pool size.
244    ///
245    /// # Arguments
246    ///
247    /// * `connection_string` - PostgreSQL connection string
248    /// * `max_size` - Maximum number of connections in pool
249    ///
250    /// # Errors
251    ///
252    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
253    pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
254        Self::with_pool_config(
255            connection_string,
256            PoolPrewarmConfig {
257                min_size: 0,
258                max_size,
259                timeout_secs: None,
260            },
261        )
262        .await
263    }
264
265    /// Pre-warm the pool by opening `count` additional connections.
266    ///
267    /// Pre-warming is best-effort: failures from individual connections are logged
268    /// but do not prevent startup. A 10-second outer timeout ensures the server
269    /// never blocks indefinitely on a slow or unreachable PostgreSQL instance.
270    async fn prewarm(&self, count: usize) {
271        use futures::future::join_all;
272        use tokio::time::timeout;
273
274        let handles: Vec<_> = (0..count)
275            .map(|_| {
276                let pool = self.pool.clone();
277                tokio::spawn(async move { pool.get().await })
278            })
279            .collect();
280
281        let result = timeout(Duration::from_secs(10), join_all(handles)).await;
282
283        let (succeeded, failed) = match result {
284            Ok(outcomes) => {
285                let s = outcomes
286                    .iter()
287                    .filter(|r| r.as_ref().map(|inner| inner.is_ok()).unwrap_or(false))
288                    .count();
289                (s, count - s)
290            },
291            Err(_elapsed) => {
292                tracing::warn!(
293                    target_connections = count,
294                    "Pool pre-warm timed out after 10s; server will continue with partial pre-warm"
295                );
296                (0, count)
297            },
298        };
299
300        if failed > 0 {
301            tracing::warn!(
302                succeeded,
303                failed,
304                "Pool pre-warm: some connections could not be established"
305            );
306        } else {
307            tracing::info!(
308                idle_connections = succeeded + 1,
309                "PostgreSQL pool pre-warmed successfully"
310            );
311        }
312    }
313
314    /// Get a reference to the internal connection pool.
315    ///
316    /// This allows sharing the pool with other components like `PostgresIntrospector`.
317    #[must_use]
318    pub const fn pool(&self) -> &Pool {
319        &self.pool
320    }
321
322    /// Enable mutation timing injection.
323    ///
324    /// When enabled, `execute_function_call` wraps each mutation in a transaction
325    /// and sets a session variable to `clock_timestamp()::text` before execution,
326    /// allowing SQL functions to compute their own duration.
327    ///
328    /// # Arguments
329    ///
330    /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
331    #[must_use]
332    pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
333        self.mutation_timing_enabled = true;
334        self.timing_variable_name = variable_name.to_string();
335        self
336    }
337
338    /// Returns whether mutation timing injection is enabled.
339    #[must_use]
340    pub const fn mutation_timing_enabled(&self) -> bool {
341        self.mutation_timing_enabled
342    }
343
344    /// Execute raw SQL query and return JSONB rows.
345    ///
346    /// # Errors
347    ///
348    /// Returns `FraiseQLError::Database` on query execution failure.
349    pub(super) async fn execute_raw(
350        &self,
351        sql: &str,
352        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
353    ) -> Result<Vec<JsonbValue>> {
354        let client = self.acquire_connection_with_retry().await?;
355
356        let rows: Vec<Row> =
357            client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
358                message:   format!("Query execution failed: {e}"),
359                sql_state: e.code().map(|c| c.code().to_string()),
360            })?;
361
362        let results = rows
363            .into_iter()
364            .map(|row| {
365                let data: serde_json::Value = row.get(0);
366                JsonbValue::new(data)
367            })
368            .collect();
369
370        Ok(results)
371    }
372
373    /// Acquire a connection from the pool with retry logic.
374    ///
375    /// - `PoolError::Timeout`: the pool was exhausted for the full configured wait period. This is
376    ///   not transient — retrying would only multiply the wait. Fails immediately.
377    /// - `PoolError::Backend` / create errors: potentially transient. Retries with exponential
378    ///   backoff (up to `MAX_CONNECTION_RETRIES` attempts).
379    ///
380    /// # Errors
381    ///
382    /// Returns `FraiseQLError::ConnectionPool` on timeout or when all retries are exhausted.
383    pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
384        use deadpool_postgres::PoolError;
385
386        let mut last_error = None;
387
388        for attempt in 0..MAX_CONNECTION_RETRIES {
389            match self.pool.get().await {
390                Ok(client) => {
391                    if attempt > 0 {
392                        tracing::info!(attempt, "Successfully acquired connection after retries");
393                    }
394                    return Ok(client);
395                },
396                // Pool exhausted for the full wait period — not transient, fail immediately.
397                Err(PoolError::Timeout(_)) => {
398                    let metrics = self.pool_metrics();
399                    tracing::error!(
400                        available = metrics.idle_connections,
401                        active = metrics.active_connections,
402                        max = metrics.total_connections,
403                        "Connection pool timeout: all connections busy"
404                    );
405                    return Err(FraiseQLError::ConnectionPool {
406                        message: format!(
407                            "Connection pool timeout: {}/{} connections busy. \
408                             Increase pool_max_size or reduce concurrent load.",
409                            metrics.active_connections, metrics.total_connections,
410                        ),
411                    });
412                },
413                // Backend/create errors are potentially transient — retry with backoff.
414                Err(e) => {
415                    last_error = Some(e);
416                    if attempt < MAX_CONNECTION_RETRIES - 1 {
417                        let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
418                        tracing::warn!(
419                            attempt = attempt + 1,
420                            total = MAX_CONNECTION_RETRIES,
421                            delay_ms = delay,
422                            "Transient connection error, retrying"
423                        );
424                        tokio::time::sleep(Duration::from_millis(delay)).await;
425                    }
426                },
427            }
428        }
429
430        // All retries for transient errors exhausted.
431        let pool_metrics = self.pool_metrics();
432        tracing::error!(
433            retries = MAX_CONNECTION_RETRIES,
434            available = pool_metrics.idle_connections,
435            active = pool_metrics.active_connections,
436            max = pool_metrics.total_connections,
437            "Failed to acquire connection after all retries"
438        );
439
440        Err(FraiseQLError::ConnectionPool {
441            message: format!(
442                "Failed to acquire connection after {} retries: {}. \
443                 Pool state: idle={}, active={}, max={}",
444                MAX_CONNECTION_RETRIES,
445                last_error.expect("last_error is set on every retry iteration"),
446                pool_metrics.idle_connections,
447                pool_metrics.active_connections,
448                pool_metrics.total_connections,
449            ),
450        })
451    }
452
453    /// Execute query with SQL field projection optimization.
454    ///
455    /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
456    /// only the requested fields from the JSONB column, reducing network payload and
457    /// JSON deserialization overhead.
458    ///
459    /// # Arguments
460    ///
461    /// * `view` - View/table name to query
462    /// * `projection` - Optional SQL projection hint with field list
463    /// * `where_clause` - Optional WHERE clause for filtering
464    /// * `limit` - Optional row limit
465    ///
466    /// # Returns
467    ///
468    /// Vector of projected JSONB rows with only the requested fields
469    ///
470    /// # Errors
471    ///
472    /// Returns `FraiseQLError::Database` on query execution failure.
473    ///
474    /// # Panics
475    ///
476    /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
477    /// immediately above it.
478    ///
479    /// # Example
480    ///
481    /// ```no_run
482    /// // Requires: running PostgreSQL database.
483    /// use fraiseql_db::postgres::PostgresAdapter;
484    /// use fraiseql_db::types::SqlProjectionHint;
485    /// use fraiseql_db::DatabaseType;
486    ///
487    /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
488    /// let projection = SqlProjectionHint {
489    ///     database: DatabaseType::PostgreSQL,
490    ///     projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
491    ///     estimated_reduction_percent: 75,
492    /// };
493    ///
494    /// let results = adapter
495    ///     .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
496    ///     .await?;
497    /// # Ok(())
498    /// # }
499    /// ```
500    /// Implementation of `execute_with_projection` with ORDER BY support.
501    ///
502    /// Called by both the inherent convenience method and the `DatabaseAdapter`
503    /// trait implementation.
504    pub(super) async fn execute_with_projection_impl(
505        &self,
506        view: &str,
507        projection: Option<&SqlProjectionHint>,
508        where_clause: Option<&WhereClause>,
509        limit: Option<u32>,
510        offset: Option<u32>,
511        order_by: Option<&[OrderByClause]>,
512    ) -> Result<Vec<JsonbValue>> {
513        // If no projection, fall back to standard query
514        if projection.is_none() {
515            return self.execute_where_query(view, where_clause, limit, offset, order_by).await;
516        }
517
518        let projection = projection.expect("projection is Some; None was returned above");
519
520        // Build SQL with projection
521        // The projection_template is expected to be the SELECT clause with projection SQL
522        // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
523        let mut sql = format!(
524            "SELECT {} FROM {}",
525            projection.projection_template,
526            quote_postgres_identifier(view)
527        );
528
529        // Add WHERE clause if present
530        let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
531            let generator = PostgresWhereGenerator::new(PostgresDialect);
532            let (where_sql, where_params) = generator.generate(clause)?;
533            sql.push_str(" WHERE ");
534            sql.push_str(&where_sql);
535            where_params.into_iter().map(QueryParam::from).collect()
536        } else {
537            Vec::new()
538        };
539        let mut param_count = typed_params.len();
540
541        // ORDER BY must come before LIMIT/OFFSET in SQL.
542        append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
543
544        // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
545        // Reason (expect below): fmt::Write for String is infallible.
546        if let Some(lim) = limit {
547            param_count += 1;
548            write!(sql, " LIMIT ${param_count}").expect("write to String");
549            typed_params.push(QueryParam::BigInt(i64::from(lim)));
550        }
551
552        if let Some(off) = offset {
553            param_count += 1;
554            write!(sql, " OFFSET ${param_count}").expect("write to String");
555            typed_params.push(QueryParam::BigInt(i64::from(off)));
556        }
557
558        tracing::debug!("SQL with projection = {}", sql);
559        tracing::debug!("typed_params = {:?}", typed_params);
560
561        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
562            .iter()
563            .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
564            .collect();
565
566        self.execute_raw(&sql, &param_refs).await
567    }
568
569    /// Execute query with SQL field projection optimization.
570    ///
571    /// Convenience wrapper for callers that don't need ORDER BY.
572    /// See [`execute_with_projection_impl`](Self::execute_with_projection_impl) for details.
573    ///
574    /// # Errors
575    ///
576    /// Returns `FraiseQLError::Database` on query execution failure.
577    pub async fn execute_with_projection(
578        &self,
579        view: &str,
580        projection: Option<&SqlProjectionHint>,
581        where_clause: Option<&WhereClause>,
582        limit: Option<u32>,
583        offset: Option<u32>,
584    ) -> Result<Vec<JsonbValue>> {
585        self.execute_with_projection_impl(view, projection, where_clause, limit, offset, None)
586            .await
587    }
588}
589
590/// Build a parameterized `SELECT data FROM {view}` SQL string.
591///
592/// Shared by [`PostgresAdapter::execute_where_query`] and
593/// [`PostgresAdapter::explain_where_query`] so that SQL construction
594/// logic is never duplicated.
595///
596/// # Returns
597///
598/// `(sql, typed_params)` — the SQL string and the bound parameter values.
599///
600/// # Errors
601///
602/// Returns `FraiseQLError` if WHERE clause generation fails.
603pub(super) fn build_where_select_sql(
604    view: &str,
605    where_clause: Option<&WhereClause>,
606    limit: Option<u32>,
607    offset: Option<u32>,
608) -> Result<(String, Vec<QueryParam>)> {
609    build_where_select_sql_ordered(view, where_clause, limit, offset, None)
610}
611
612/// Build a parameterized `SELECT data FROM {view}` SQL string with optional ORDER BY.
613///
614/// ORDER BY is inserted between the WHERE clause and LIMIT/OFFSET as required by SQL.
615///
616/// # Returns
617///
618/// `(sql, typed_params)` — the SQL string and the bound parameter values.
619///
620/// # Errors
621///
622/// Returns `FraiseQLError` if WHERE clause generation or field name validation fails.
623pub(super) fn build_where_select_sql_ordered(
624    view: &str,
625    where_clause: Option<&WhereClause>,
626    limit: Option<u32>,
627    offset: Option<u32>,
628    order_by: Option<&[OrderByClause]>,
629) -> Result<(String, Vec<QueryParam>)> {
630    // Build base query
631    let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
632
633    // Collect WHERE clause params (if any)
634    let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
635        let generator = PostgresWhereGenerator::new(PostgresDialect);
636        let (where_sql, where_params) = generator.generate(clause)?;
637        sql.push_str(" WHERE ");
638        sql.push_str(&where_sql);
639
640        // Convert WHERE clause JSON values to QueryParam
641        where_params.into_iter().map(QueryParam::from).collect()
642    } else {
643        Vec::new()
644    };
645    let mut param_count = typed_params.len();
646
647    // ORDER BY must come before LIMIT/OFFSET in SQL.
648    append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
649
650    // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
651    // Reason (expect below): fmt::Write for String is infallible.
652    if let Some(lim) = limit {
653        param_count += 1;
654        write!(sql, " LIMIT ${param_count}").expect("write to String");
655        typed_params.push(QueryParam::BigInt(i64::from(lim)));
656    }
657
658    // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
659    if let Some(off) = offset {
660        param_count += 1;
661        write!(sql, " OFFSET ${param_count}").expect("write to String");
662        typed_params.push(QueryParam::BigInt(i64::from(off)));
663    }
664
665    Ok((sql, typed_params))
666}