Skip to main content

fraiseql_db/postgres/adapter/
mod.rs

1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::fmt::Write;
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20    dialect::PostgresDialect,
21    identifier::quote_postgres_identifier,
22    traits::DatabaseAdapter,
23    types::{JsonbValue, QueryParam, sql_hints::SqlProjectionHint},
24    where_clause::WhereClause,
25};
26
27/// Default maximum pool size for PostgreSQL connections.
28/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
29/// nested query load (fixes Issue #41).
30const DEFAULT_POOL_SIZE: usize = 25;
31
32/// Maximum retries for connection acquisition with exponential backoff.
33const MAX_CONNECTION_RETRIES: u32 = 3;
34
35/// Base delay in milliseconds for connection retry backoff.
36const CONNECTION_RETRY_DELAY_MS: u64 = 50;
37
38/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
39///
40/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
41/// This function is defense-in-depth: `OrderByClause` already rejects field names
42/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
43/// escaping ensures correctness for any future caller that bypasses that validation.
44pub(super) fn escape_jsonb_key(key: &str) -> String {
45    key.replace('\'', "''")
46}
47
48/// PostgreSQL database adapter with connection pooling.
49///
50/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
51///
52/// # Example
53///
54/// ```rust,no_run
55/// use fraiseql_db::postgres::PostgresAdapter;
56/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
57/// use serde_json::json;
58///
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// // Create adapter with connection string
61/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
62///
63/// // Execute query
64/// let where_clause = WhereClause::Field {
65///     path: vec!["email".to_string()],
66///     operator: WhereOperator::Icontains,
67///     value: json!("example.com"),
68/// };
69///
70/// let results = adapter
71///     .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
72///     .await?;
73///
74/// println!("Found {} users", results.len());
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone)]
79pub struct PostgresAdapter {
80    pub(super) pool:         Pool,
81    /// Whether mutation timing injection is enabled.
82    mutation_timing_enabled: bool,
83    /// The PostgreSQL session variable name for timing.
84    timing_variable_name:    String,
85}
86
87impl std::fmt::Debug for PostgresAdapter {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("PostgresAdapter")
90            .field("mutation_timing_enabled", &self.mutation_timing_enabled)
91            .field("timing_variable_name", &self.timing_variable_name)
92            .field("pool", &"<Pool>")
93            .finish()
94    }
95}
96
97impl PostgresAdapter {
98    /// Create new PostgreSQL adapter with default pool configuration.
99    ///
100    /// # Arguments
101    ///
102    /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
103    ///
104    /// # Errors
105    ///
106    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
107    ///
108    /// # Example
109    ///
110    /// ```rust,no_run
111    /// # use fraiseql_db::postgres::PostgresAdapter;
112    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113    /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub async fn new(connection_string: &str) -> Result<Self> {
118        Self::with_pool_size(connection_string, DEFAULT_POOL_SIZE).await
119    }
120
121    /// Create new PostgreSQL adapter with custom pool configuration.
122    ///
123    /// # Arguments
124    ///
125    /// * `connection_string` - PostgreSQL connection string
126    /// * `min_size` - Minimum size hint (not enforced by deadpool-postgres)
127    /// * `max_size` - Maximum number of connections in pool
128    ///
129    /// # Errors
130    ///
131    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
132    ///
133    /// # Note
134    ///
135    /// `min_size` is accepted for API compatibility but deadpool-postgres uses
136    /// lazy initialization with dynamic pool sizing up to `max_size`.
137    pub async fn with_pool_config(
138        connection_string: &str,
139        _min_size: usize,
140        max_size: usize,
141    ) -> Result<Self> {
142        Self::with_pool_size(connection_string, max_size).await
143    }
144
145    /// Create new PostgreSQL adapter with custom pool size.
146    ///
147    /// # Arguments
148    ///
149    /// * `connection_string` - PostgreSQL connection string
150    /// * `max_size` - Maximum number of connections in pool
151    ///
152    /// # Errors
153    ///
154    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
155    pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
156        let mut cfg = Config::new();
157        cfg.url = Some(connection_string.to_string());
158        cfg.manager = Some(ManagerConfig {
159            recycling_method: RecyclingMethod::Fast,
160        });
161        cfg.pool = Some(deadpool_postgres::PoolConfig::new(max_size));
162
163        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| {
164            FraiseQLError::ConnectionPool {
165                message: format!("Failed to create connection pool: {e}"),
166            }
167        })?;
168
169        // Test connection
170        let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
171            message: format!("Failed to acquire connection: {e}"),
172        })?;
173
174        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
175            message:   format!("Failed to connect to database: {e}"),
176            sql_state: e.code().map(|c| c.code().to_string()),
177        })?;
178
179        Ok(Self {
180            pool,
181            mutation_timing_enabled: false,
182            timing_variable_name: "fraiseql.started_at".to_string(),
183        })
184    }
185
186    /// Get a reference to the internal connection pool.
187    ///
188    /// This allows sharing the pool with other components like `PostgresIntrospector`.
189    #[must_use]
190    pub const fn pool(&self) -> &Pool {
191        &self.pool
192    }
193
194    /// Enable mutation timing injection.
195    ///
196    /// When enabled, `execute_function_call` wraps each mutation in a transaction
197    /// and sets a session variable to `clock_timestamp()::text` before execution,
198    /// allowing SQL functions to compute their own duration.
199    ///
200    /// # Arguments
201    ///
202    /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
203    #[must_use]
204    pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
205        self.mutation_timing_enabled = true;
206        self.timing_variable_name = variable_name.to_string();
207        self
208    }
209
210    /// Returns whether mutation timing injection is enabled.
211    #[must_use]
212    pub const fn mutation_timing_enabled(&self) -> bool {
213        self.mutation_timing_enabled
214    }
215
216    /// Execute raw SQL query and return JSONB rows.
217    ///
218    /// # Errors
219    ///
220    /// Returns `FraiseQLError::Database` on query execution failure.
221    pub(super) async fn execute_raw(
222        &self,
223        sql: &str,
224        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
225    ) -> Result<Vec<JsonbValue>> {
226        let client = self.acquire_connection_with_retry().await?;
227
228        let rows: Vec<Row> =
229            client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
230                message:   format!("Query execution failed: {e}"),
231                sql_state: e.code().map(|c| c.code().to_string()),
232            })?;
233
234        let results = rows
235            .into_iter()
236            .map(|row| {
237                let data: serde_json::Value = row.get(0);
238                JsonbValue::new(data)
239            })
240            .collect();
241
242        Ok(results)
243    }
244
245    /// Acquire a connection from the pool with retry logic.
246    ///
247    /// Implements exponential backoff retry when the pool is exhausted.
248    /// This prevents transient pool exhaustion from causing query failures
249    /// under concurrent load (fixes Issue #41).
250    ///
251    /// # Errors
252    ///
253    /// Returns `FraiseQLError::ConnectionPool` if all retries are exhausted.
254    pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
255        let mut last_error = None;
256
257        for attempt in 0..MAX_CONNECTION_RETRIES {
258            match self.pool.get().await {
259                Ok(client) => {
260                    if attempt > 0 {
261                        tracing::info!(
262                            "Successfully acquired connection after {} retries",
263                            attempt
264                        );
265                    }
266                    return Ok(client);
267                },
268                Err(e) => {
269                    last_error = Some(e);
270                    if attempt < MAX_CONNECTION_RETRIES - 1 {
271                        let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
272                        tracing::warn!(
273                            "Connection pool exhausted (attempt {}/{}), retrying in {}ms...",
274                            attempt + 1,
275                            MAX_CONNECTION_RETRIES,
276                            delay
277                        );
278                        tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
279                    }
280                },
281            }
282        }
283
284        // All retries exhausted - log detailed pool state
285        let pool_metrics = self.pool_metrics();
286        tracing::error!(
287            "Failed to acquire connection after {} retries. Pool state: available={}, active={}, total={}",
288            MAX_CONNECTION_RETRIES,
289            pool_metrics.idle_connections,
290            pool_metrics.active_connections,
291            pool_metrics.total_connections
292        );
293
294        Err(FraiseQLError::ConnectionPool {
295            message: format!(
296                "Failed to acquire connection after {} retries: {}. Pool exhausted (available={}/{}). Consider increasing pool size or reducing concurrent load.",
297                MAX_CONNECTION_RETRIES,
298                last_error.expect("last_error is set on every retry iteration"),
299                pool_metrics.idle_connections,
300                pool_metrics.total_connections
301            ),
302        })
303    }
304
305    /// Execute query with SQL field projection optimization.
306    ///
307    /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
308    /// only the requested fields from the JSONB column, reducing network payload and
309    /// JSON deserialization overhead.
310    ///
311    /// # Arguments
312    ///
313    /// * `view` - View/table name to query
314    /// * `projection` - Optional SQL projection hint with field list
315    /// * `where_clause` - Optional WHERE clause for filtering
316    /// * `limit` - Optional row limit
317    ///
318    /// # Returns
319    ///
320    /// Vector of projected JSONB rows with only the requested fields
321    ///
322    /// # Errors
323    ///
324    /// Returns `FraiseQLError::Database` on query execution failure.
325    ///
326    /// # Panics
327    ///
328    /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
329    /// immediately above it.
330    ///
331    /// # Example
332    ///
333    /// ```no_run
334    /// // Requires: running PostgreSQL database.
335    /// use fraiseql_db::postgres::PostgresAdapter;
336    /// use fraiseql_db::types::SqlProjectionHint;
337    /// use fraiseql_db::DatabaseType;
338    ///
339    /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
340    /// let projection = SqlProjectionHint {
341    ///     database: DatabaseType::PostgreSQL,
342    ///     projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
343    ///     estimated_reduction_percent: 75,
344    /// };
345    ///
346    /// let results = adapter
347    ///     .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
348    ///     .await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    pub async fn execute_with_projection(
353        &self,
354        view: &str,
355        projection: Option<&SqlProjectionHint>,
356        where_clause: Option<&WhereClause>,
357        limit: Option<u32>,
358        offset: Option<u32>,
359    ) -> Result<Vec<JsonbValue>> {
360        // If no projection, fall back to standard query
361        if projection.is_none() {
362            return self.execute_where_query(view, where_clause, limit, offset, None).await;
363        }
364
365        let projection = projection.expect("projection is Some; None was returned above");
366
367        // Build SQL with projection
368        // The projection_template is expected to be the SELECT clause with projection SQL
369        // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
370        let mut sql = format!(
371            "SELECT {} FROM {}",
372            projection.projection_template,
373            quote_postgres_identifier(view)
374        );
375
376        // Add WHERE clause if present
377        if let Some(clause) = where_clause {
378            let generator = PostgresWhereGenerator::new(PostgresDialect);
379            let (where_sql, where_params) = generator.generate(clause)?;
380            sql.push_str(" WHERE ");
381            sql.push_str(&where_sql);
382
383            // Convert WHERE params to typed params first
384            let mut typed_params: Vec<QueryParam> =
385                where_params.into_iter().map(QueryParam::from).collect();
386            let mut param_count = typed_params.len();
387
388            // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
389            // Reason (expect below): fmt::Write for String is infallible.
390            if let Some(lim) = limit {
391                param_count += 1;
392                write!(sql, " LIMIT ${param_count}").expect("write to String");
393                typed_params.push(QueryParam::BigInt(i64::from(lim)));
394            }
395
396            if let Some(off) = offset {
397                param_count += 1;
398                write!(sql, " OFFSET ${param_count}").expect("write to String");
399                typed_params.push(QueryParam::BigInt(i64::from(off)));
400            }
401
402            tracing::debug!("SQL with projection = {}", sql);
403            tracing::debug!("typed_params = {:?}", typed_params);
404
405            // Create references to QueryParam for ToSql
406            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
407                .iter()
408                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
409                .collect();
410
411            self.execute_raw(&sql, &param_refs).await
412        } else {
413            // No WHERE clause — only LIMIT/OFFSET params.
414            // Reason (expect below): fmt::Write for String is infallible.
415            let mut typed_params: Vec<QueryParam> = Vec::new();
416            let mut param_count = 0;
417
418            if let Some(lim) = limit {
419                param_count += 1;
420                write!(sql, " LIMIT ${param_count}").expect("write to String");
421                typed_params.push(QueryParam::BigInt(i64::from(lim)));
422            }
423
424            if let Some(off) = offset {
425                param_count += 1;
426                write!(sql, " OFFSET ${param_count}").expect("write to String");
427                typed_params.push(QueryParam::BigInt(i64::from(off)));
428            }
429
430            // Create references to QueryParam for ToSql
431            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
432                .iter()
433                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
434                .collect();
435
436            self.execute_raw(&sql, &param_refs).await
437        }
438    }
439}
440
441/// Build a parameterized `SELECT data FROM {view}` SQL string.
442///
443/// Shared by [`PostgresAdapter::execute_where_query`] and
444/// [`PostgresAdapter::explain_where_query`] so that SQL construction
445/// logic is never duplicated.
446///
447/// # Returns
448///
449/// `(sql, typed_params)` — the SQL string and the bound parameter values.
450///
451/// # Errors
452///
453/// Returns `FraiseQLError` if WHERE clause generation fails.
454pub(super) fn build_where_select_sql(
455    view: &str,
456    where_clause: Option<&WhereClause>,
457    limit: Option<u32>,
458    offset: Option<u32>,
459) -> Result<(String, Vec<QueryParam>)> {
460    // Build base query
461    let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
462
463    // Collect WHERE clause params (if any)
464    let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
465        let generator = PostgresWhereGenerator::new(PostgresDialect);
466        let (where_sql, where_params) = generator.generate(clause)?;
467        sql.push_str(" WHERE ");
468        sql.push_str(&where_sql);
469
470        // Convert WHERE clause JSON values to QueryParam
471        where_params.into_iter().map(QueryParam::from).collect()
472    } else {
473        Vec::new()
474    };
475    let mut param_count = typed_params.len();
476
477    // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
478    // Reason (expect below): fmt::Write for String is infallible.
479    if let Some(lim) = limit {
480        param_count += 1;
481        write!(sql, " LIMIT ${param_count}").expect("write to String");
482        typed_params.push(QueryParam::BigInt(i64::from(lim)));
483    }
484
485    // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
486    if let Some(off) = offset {
487        param_count += 1;
488        write!(sql, " OFFSET ${param_count}").expect("write to String");
489        typed_params.push(QueryParam::BigInt(i64::from(off)));
490    }
491
492    Ok((sql, typed_params))
493}