Skip to main content

fraiseql_db/postgres/adapter/
mod.rs

1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::{fmt::Write, time::Duration};
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20    dialect::PostgresDialect,
21    identifier::quote_postgres_identifier,
22    traits::DatabaseAdapter,
23    types::{JsonbValue, QueryParam, sql_hints::SqlProjectionHint},
24    where_clause::WhereClause,
25};
26
27/// Default maximum pool size for PostgreSQL connections.
28/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
29/// nested query load (fixes Issue #41).
30const DEFAULT_POOL_SIZE: usize = 25;
31
32/// Maximum retries for connection acquisition with exponential backoff.
33const MAX_CONNECTION_RETRIES: u32 = 3;
34
35/// Base delay in milliseconds for connection retry backoff.
36const CONNECTION_RETRY_DELAY_MS: u64 = 50;
37
38/// Configuration for connection pool construction and pre-warming.
39///
40/// Controls the minimum guaranteed connections (pre-warmed at startup),
41/// the maximum pool ceiling, and the wait/create timeout for connection
42/// acquisition.
43///
44/// # Example
45///
46/// ```rust
47/// use fraiseql_db::postgres::PoolPrewarmConfig;
48///
49/// let cfg = PoolPrewarmConfig {
50///     min_size:     5,
51///     max_size:     20,
52///     timeout_secs: Some(30),
53/// };
54/// ```
55#[derive(Debug, Clone)]
56pub struct PoolPrewarmConfig {
57    /// Number of connections to establish at pool creation time.
58    ///
59    /// After the pool is created, `min_size` connections are opened eagerly
60    /// so they are ready when the first request arrives. Set to `0` to disable
61    /// pre-warming (lazy init — one connection from the startup health check).
62    pub min_size: usize,
63
64    /// Maximum number of connections the pool may hold.
65    pub max_size: usize,
66
67    /// Optional timeout (in seconds) for connection acquisition and creation.
68    ///
69    /// Applied to both the `wait` (blocked waiting for an idle connection) and
70    /// `create` (time to open a new TCP connection to PostgreSQL) deadpool slots.
71    /// When `None`, acquisition can block indefinitely on pool exhaustion.
72    pub timeout_secs: Option<u64>,
73}
74
75/// Build a `deadpool-postgres` pool with an optional wait/create timeout.
76///
77/// # Errors
78///
79/// Returns `FraiseQLError::ConnectionPool` if pool creation fails (e.g., unparseable URL).
80fn build_pool(
81    connection_string: &str,
82    max_size: usize,
83    timeout_secs: Option<u64>,
84) -> Result<Pool> {
85    let mut cfg = Config::new();
86    cfg.url = Some(connection_string.to_string());
87    cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });
88
89    let mut pool_cfg = deadpool_postgres::PoolConfig::new(max_size);
90    if let Some(secs) = timeout_secs {
91        let t = Duration::from_secs(secs);
92        pool_cfg.timeouts.wait = Some(t);
93        pool_cfg.timeouts.create = Some(t);
94        // `recycle` intentionally stays None — fast recycle, not user-configurable.
95    }
96    cfg.pool = Some(pool_cfg);
97
98    cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| FraiseQLError::ConnectionPool {
99        message: format!("Failed to create connection pool: {e}"),
100    })
101}
102
103/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
104///
105/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
106/// This function is defense-in-depth: `OrderByClause` already rejects field names
107/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
108/// escaping ensures correctness for any future caller that bypasses that validation.
109pub(super) fn escape_jsonb_key(key: &str) -> String {
110    key.replace('\'', "''")
111}
112
113/// PostgreSQL database adapter with connection pooling.
114///
115/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
116///
117/// # Example
118///
119/// ```rust,no_run
120/// use fraiseql_db::postgres::PostgresAdapter;
121/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
122/// use serde_json::json;
123///
124/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
125/// // Create adapter with connection string
126/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
127///
128/// // Execute query
129/// let where_clause = WhereClause::Field {
130///     path: vec!["email".to_string()],
131///     operator: WhereOperator::Icontains,
132///     value: json!("example.com"),
133/// };
134///
135/// let results = adapter
136///     .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
137///     .await?;
138///
139/// println!("Found {} users", results.len());
140/// # Ok(())
141/// # }
142/// ```
143#[derive(Clone)]
144pub struct PostgresAdapter {
145    pub(super) pool:         Pool,
146    /// Whether mutation timing injection is enabled.
147    mutation_timing_enabled: bool,
148    /// The PostgreSQL session variable name for timing.
149    timing_variable_name:    String,
150}
151
152impl std::fmt::Debug for PostgresAdapter {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("PostgresAdapter")
155            .field("mutation_timing_enabled", &self.mutation_timing_enabled)
156            .field("timing_variable_name", &self.timing_variable_name)
157            .field("pool", &"<Pool>")
158            .finish()
159    }
160}
161
162impl PostgresAdapter {
163    /// Create new PostgreSQL adapter with default pool configuration.
164    ///
165    /// # Arguments
166    ///
167    /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
168    ///
169    /// # Errors
170    ///
171    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
172    ///
173    /// # Example
174    ///
175    /// ```rust,no_run
176    /// # use fraiseql_db::postgres::PostgresAdapter;
177    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
178    /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    pub async fn new(connection_string: &str) -> Result<Self> {
183        Self::with_pool_config(
184            connection_string,
185            PoolPrewarmConfig { min_size: 0, max_size: DEFAULT_POOL_SIZE, timeout_secs: None },
186        )
187        .await
188    }
189
190    /// Create new PostgreSQL adapter with pre-warming and timeout configuration.
191    ///
192    /// Constructs the pool, runs a startup health check, then eagerly opens
193    /// `cfg.min_size` connections so they are ready when the first request arrives.
194    ///
195    /// # Arguments
196    ///
197    /// * `connection_string` - PostgreSQL connection string
198    /// * `cfg` - Pool pre-warming and timeout configuration
199    ///
200    /// # Errors
201    ///
202    /// Returns `FraiseQLError::ConnectionPool` if pool creation or the startup
203    /// health check fails.
204    pub async fn with_pool_config(
205        connection_string: &str,
206        cfg: PoolPrewarmConfig,
207    ) -> Result<Self> {
208        let pool = build_pool(connection_string, cfg.max_size, cfg.timeout_secs)?;
209
210        // Startup health check — establishes the first connection.
211        let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
212            message: format!("Failed to acquire connection: {e}"),
213        })?;
214
215        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
216            message:   format!("Failed to connect to database: {e}"),
217            sql_state: e.code().map(|c| c.code().to_string()),
218        })?;
219
220        // Drop client back to the pool before pre-warming so that the health-check
221        // connection counts as idle slot #1.
222        drop(client);
223
224        let adapter = Self {
225            pool,
226            mutation_timing_enabled: false,
227            timing_variable_name: "fraiseql.started_at".to_string(),
228        };
229
230        // Pre-warm: open `min_size - 1` additional connections (one already exists).
231        let warm_target = cfg.min_size.min(cfg.max_size).saturating_sub(1);
232        if warm_target > 0 {
233            adapter.prewarm(warm_target).await;
234        }
235
236        Ok(adapter)
237    }
238
239    /// Create new PostgreSQL adapter with custom pool size.
240    ///
241    /// # Arguments
242    ///
243    /// * `connection_string` - PostgreSQL connection string
244    /// * `max_size` - Maximum number of connections in pool
245    ///
246    /// # Errors
247    ///
248    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
249    pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
250        Self::with_pool_config(
251            connection_string,
252            PoolPrewarmConfig { min_size: 0, max_size, timeout_secs: None },
253        )
254        .await
255    }
256
257    /// Pre-warm the pool by opening `count` additional connections.
258    ///
259    /// Pre-warming is best-effort: failures from individual connections are logged
260    /// but do not prevent startup. A 10-second outer timeout ensures the server
261    /// never blocks indefinitely on a slow or unreachable PostgreSQL instance.
262    async fn prewarm(&self, count: usize) {
263        use futures::future::join_all;
264        use tokio::time::timeout;
265
266        let handles: Vec<_> = (0..count)
267            .map(|_| {
268                let pool = self.pool.clone();
269                tokio::spawn(async move { pool.get().await })
270            })
271            .collect();
272
273        let result = timeout(Duration::from_secs(10), join_all(handles)).await;
274
275        let (succeeded, failed) = match result {
276            Ok(outcomes) => {
277                let s = outcomes
278                    .iter()
279                    .filter(|r| r.as_ref().map(|inner| inner.is_ok()).unwrap_or(false))
280                    .count();
281                (s, count - s)
282            },
283            Err(_elapsed) => {
284                tracing::warn!(
285                    target_connections = count,
286                    "Pool pre-warm timed out after 10s; server will continue with partial pre-warm"
287                );
288                (0, count)
289            },
290        };
291
292        if failed > 0 {
293            tracing::warn!(
294                succeeded,
295                failed,
296                "Pool pre-warm: some connections could not be established"
297            );
298        } else {
299            tracing::info!(
300                idle_connections = succeeded + 1,
301                "PostgreSQL pool pre-warmed successfully"
302            );
303        }
304    }
305
306    /// Get a reference to the internal connection pool.
307    ///
308    /// This allows sharing the pool with other components like `PostgresIntrospector`.
309    #[must_use]
310    pub const fn pool(&self) -> &Pool {
311        &self.pool
312    }
313
314    /// Enable mutation timing injection.
315    ///
316    /// When enabled, `execute_function_call` wraps each mutation in a transaction
317    /// and sets a session variable to `clock_timestamp()::text` before execution,
318    /// allowing SQL functions to compute their own duration.
319    ///
320    /// # Arguments
321    ///
322    /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
323    #[must_use]
324    pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
325        self.mutation_timing_enabled = true;
326        self.timing_variable_name = variable_name.to_string();
327        self
328    }
329
330    /// Returns whether mutation timing injection is enabled.
331    #[must_use]
332    pub const fn mutation_timing_enabled(&self) -> bool {
333        self.mutation_timing_enabled
334    }
335
336    /// Execute raw SQL query and return JSONB rows.
337    ///
338    /// # Errors
339    ///
340    /// Returns `FraiseQLError::Database` on query execution failure.
341    pub(super) async fn execute_raw(
342        &self,
343        sql: &str,
344        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
345    ) -> Result<Vec<JsonbValue>> {
346        let client = self.acquire_connection_with_retry().await?;
347
348        let rows: Vec<Row> =
349            client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
350                message:   format!("Query execution failed: {e}"),
351                sql_state: e.code().map(|c| c.code().to_string()),
352            })?;
353
354        let results = rows
355            .into_iter()
356            .map(|row| {
357                let data: serde_json::Value = row.get(0);
358                JsonbValue::new(data)
359            })
360            .collect();
361
362        Ok(results)
363    }
364
365    /// Acquire a connection from the pool with retry logic.
366    ///
367    /// - `PoolError::Timeout`: the pool was exhausted for the full configured wait period.
368    ///   This is not transient — retrying would only multiply the wait. Fails immediately.
369    /// - `PoolError::Backend` / create errors: potentially transient. Retries with
370    ///   exponential backoff (up to `MAX_CONNECTION_RETRIES` attempts).
371    ///
372    /// # Errors
373    ///
374    /// Returns `FraiseQLError::ConnectionPool` on timeout or when all retries are exhausted.
375    pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
376        use deadpool_postgres::PoolError;
377
378        let mut last_error = None;
379
380        for attempt in 0..MAX_CONNECTION_RETRIES {
381            match self.pool.get().await {
382                Ok(client) => {
383                    if attempt > 0 {
384                        tracing::info!(
385                            attempt,
386                            "Successfully acquired connection after retries"
387                        );
388                    }
389                    return Ok(client);
390                },
391                // Pool exhausted for the full wait period — not transient, fail immediately.
392                Err(PoolError::Timeout(_)) => {
393                    let metrics = self.pool_metrics();
394                    tracing::error!(
395                        available = metrics.idle_connections,
396                        active    = metrics.active_connections,
397                        max       = metrics.total_connections,
398                        "Connection pool timeout: all connections busy"
399                    );
400                    return Err(FraiseQLError::ConnectionPool {
401                        message: format!(
402                            "Connection pool timeout: {}/{} connections busy. \
403                             Increase pool_max_size or reduce concurrent load.",
404                            metrics.active_connections, metrics.total_connections,
405                        ),
406                    });
407                },
408                // Backend/create errors are potentially transient — retry with backoff.
409                Err(e) => {
410                    last_error = Some(e);
411                    if attempt < MAX_CONNECTION_RETRIES - 1 {
412                        let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
413                        tracing::warn!(
414                            attempt = attempt + 1,
415                            total   = MAX_CONNECTION_RETRIES,
416                            delay_ms = delay,
417                            "Transient connection error, retrying"
418                        );
419                        tokio::time::sleep(Duration::from_millis(delay)).await;
420                    }
421                },
422            }
423        }
424
425        // All retries for transient errors exhausted.
426        let pool_metrics = self.pool_metrics();
427        tracing::error!(
428            retries  = MAX_CONNECTION_RETRIES,
429            available = pool_metrics.idle_connections,
430            active    = pool_metrics.active_connections,
431            max       = pool_metrics.total_connections,
432            "Failed to acquire connection after all retries"
433        );
434
435        Err(FraiseQLError::ConnectionPool {
436            message: format!(
437                "Failed to acquire connection after {} retries: {}. \
438                 Pool state: idle={}, active={}, max={}",
439                MAX_CONNECTION_RETRIES,
440                last_error.expect("last_error is set on every retry iteration"),
441                pool_metrics.idle_connections,
442                pool_metrics.active_connections,
443                pool_metrics.total_connections,
444            ),
445        })
446    }
447
448    /// Execute query with SQL field projection optimization.
449    ///
450    /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
451    /// only the requested fields from the JSONB column, reducing network payload and
452    /// JSON deserialization overhead.
453    ///
454    /// # Arguments
455    ///
456    /// * `view` - View/table name to query
457    /// * `projection` - Optional SQL projection hint with field list
458    /// * `where_clause` - Optional WHERE clause for filtering
459    /// * `limit` - Optional row limit
460    ///
461    /// # Returns
462    ///
463    /// Vector of projected JSONB rows with only the requested fields
464    ///
465    /// # Errors
466    ///
467    /// Returns `FraiseQLError::Database` on query execution failure.
468    ///
469    /// # Panics
470    ///
471    /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
472    /// immediately above it.
473    ///
474    /// # Example
475    ///
476    /// ```no_run
477    /// // Requires: running PostgreSQL database.
478    /// use fraiseql_db::postgres::PostgresAdapter;
479    /// use fraiseql_db::types::SqlProjectionHint;
480    /// use fraiseql_db::DatabaseType;
481    ///
482    /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
483    /// let projection = SqlProjectionHint {
484    ///     database: DatabaseType::PostgreSQL,
485    ///     projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
486    ///     estimated_reduction_percent: 75,
487    /// };
488    ///
489    /// let results = adapter
490    ///     .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
491    ///     .await?;
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn execute_with_projection(
496        &self,
497        view: &str,
498        projection: Option<&SqlProjectionHint>,
499        where_clause: Option<&WhereClause>,
500        limit: Option<u32>,
501        offset: Option<u32>,
502    ) -> Result<Vec<JsonbValue>> {
503        // If no projection, fall back to standard query
504        if projection.is_none() {
505            return self.execute_where_query(view, where_clause, limit, offset, None).await;
506        }
507
508        let projection = projection.expect("projection is Some; None was returned above");
509
510        // Build SQL with projection
511        // The projection_template is expected to be the SELECT clause with projection SQL
512        // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
513        let mut sql = format!(
514            "SELECT {} FROM {}",
515            projection.projection_template,
516            quote_postgres_identifier(view)
517        );
518
519        // Add WHERE clause if present
520        if let Some(clause) = where_clause {
521            let generator = PostgresWhereGenerator::new(PostgresDialect);
522            let (where_sql, where_params) = generator.generate(clause)?;
523            sql.push_str(" WHERE ");
524            sql.push_str(&where_sql);
525
526            // Convert WHERE params to typed params first
527            let mut typed_params: Vec<QueryParam> =
528                where_params.into_iter().map(QueryParam::from).collect();
529            let mut param_count = typed_params.len();
530
531            // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
532            // Reason (expect below): fmt::Write for String is infallible.
533            if let Some(lim) = limit {
534                param_count += 1;
535                write!(sql, " LIMIT ${param_count}").expect("write to String");
536                typed_params.push(QueryParam::BigInt(i64::from(lim)));
537            }
538
539            if let Some(off) = offset {
540                param_count += 1;
541                write!(sql, " OFFSET ${param_count}").expect("write to String");
542                typed_params.push(QueryParam::BigInt(i64::from(off)));
543            }
544
545            tracing::debug!("SQL with projection = {}", sql);
546            tracing::debug!("typed_params = {:?}", typed_params);
547
548            // Create references to QueryParam for ToSql
549            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
550                .iter()
551                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
552                .collect();
553
554            self.execute_raw(&sql, &param_refs).await
555        } else {
556            // No WHERE clause — only LIMIT/OFFSET params.
557            // Reason (expect below): fmt::Write for String is infallible.
558            let mut typed_params: Vec<QueryParam> = Vec::new();
559            let mut param_count = 0;
560
561            if let Some(lim) = limit {
562                param_count += 1;
563                write!(sql, " LIMIT ${param_count}").expect("write to String");
564                typed_params.push(QueryParam::BigInt(i64::from(lim)));
565            }
566
567            if let Some(off) = offset {
568                param_count += 1;
569                write!(sql, " OFFSET ${param_count}").expect("write to String");
570                typed_params.push(QueryParam::BigInt(i64::from(off)));
571            }
572
573            // Create references to QueryParam for ToSql
574            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
575                .iter()
576                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
577                .collect();
578
579            self.execute_raw(&sql, &param_refs).await
580        }
581    }
582}
583
584/// Build a parameterized `SELECT data FROM {view}` SQL string.
585///
586/// Shared by [`PostgresAdapter::execute_where_query`] and
587/// [`PostgresAdapter::explain_where_query`] so that SQL construction
588/// logic is never duplicated.
589///
590/// # Returns
591///
592/// `(sql, typed_params)` — the SQL string and the bound parameter values.
593///
594/// # Errors
595///
596/// Returns `FraiseQLError` if WHERE clause generation fails.
597pub(super) fn build_where_select_sql(
598    view: &str,
599    where_clause: Option<&WhereClause>,
600    limit: Option<u32>,
601    offset: Option<u32>,
602) -> Result<(String, Vec<QueryParam>)> {
603    // Build base query
604    let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
605
606    // Collect WHERE clause params (if any)
607    let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
608        let generator = PostgresWhereGenerator::new(PostgresDialect);
609        let (where_sql, where_params) = generator.generate(clause)?;
610        sql.push_str(" WHERE ");
611        sql.push_str(&where_sql);
612
613        // Convert WHERE clause JSON values to QueryParam
614        where_params.into_iter().map(QueryParam::from).collect()
615    } else {
616        Vec::new()
617    };
618    let mut param_count = typed_params.len();
619
620    // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
621    // Reason (expect below): fmt::Write for String is infallible.
622    if let Some(lim) = limit {
623        param_count += 1;
624        write!(sql, " LIMIT ${param_count}").expect("write to String");
625        typed_params.push(QueryParam::BigInt(i64::from(lim)));
626    }
627
628    // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
629    if let Some(off) = offset {
630        param_count += 1;
631        write!(sql, " OFFSET ${param_count}").expect("write to String");
632        typed_params.push(QueryParam::BigInt(i64::from(off)));
633    }
634
635    Ok((sql, typed_params))
636}