fraiseql_db/postgres/adapter/mod.rs
1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::{fmt::Write, time::Duration};
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20 dialect::PostgresDialect,
21 identifier::quote_postgres_identifier,
22 traits::DatabaseAdapter,
23 types::{JsonbValue, QueryParam, sql_hints::SqlProjectionHint},
24 where_clause::WhereClause,
25};
26
27/// Default maximum pool size for PostgreSQL connections.
28/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
29/// nested query load (fixes Issue #41).
30const DEFAULT_POOL_SIZE: usize = 25;
31
32/// Maximum retries for connection acquisition with exponential backoff.
33const MAX_CONNECTION_RETRIES: u32 = 3;
34
35/// Base delay in milliseconds for connection retry backoff.
36const CONNECTION_RETRY_DELAY_MS: u64 = 50;
37
38/// Configuration for connection pool construction and pre-warming.
39///
40/// Controls the minimum guaranteed connections (pre-warmed at startup),
41/// the maximum pool ceiling, and the wait/create timeout for connection
42/// acquisition.
43///
44/// # Example
45///
46/// ```rust
47/// use fraiseql_db::postgres::PoolPrewarmConfig;
48///
49/// let cfg = PoolPrewarmConfig {
50/// min_size: 5,
51/// max_size: 20,
52/// timeout_secs: Some(30),
53/// };
54/// ```
55#[derive(Debug, Clone)]
56pub struct PoolPrewarmConfig {
57 /// Number of connections to establish at pool creation time.
58 ///
59 /// After the pool is created, `min_size` connections are opened eagerly
60 /// so they are ready when the first request arrives. Set to `0` to disable
61 /// pre-warming (lazy init — one connection from the startup health check).
62 pub min_size: usize,
63
64 /// Maximum number of connections the pool may hold.
65 pub max_size: usize,
66
67 /// Optional timeout (in seconds) for connection acquisition and creation.
68 ///
69 /// Applied to both the `wait` (blocked waiting for an idle connection) and
70 /// `create` (time to open a new TCP connection to PostgreSQL) deadpool slots.
71 /// When `None`, acquisition can block indefinitely on pool exhaustion.
72 pub timeout_secs: Option<u64>,
73}
74
75/// Build a `deadpool-postgres` pool with an optional wait/create timeout.
76///
77/// # Errors
78///
79/// Returns `FraiseQLError::ConnectionPool` if pool creation fails (e.g., unparseable URL).
80fn build_pool(
81 connection_string: &str,
82 max_size: usize,
83 timeout_secs: Option<u64>,
84) -> Result<Pool> {
85 let mut cfg = Config::new();
86 cfg.url = Some(connection_string.to_string());
87 cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });
88
89 let mut pool_cfg = deadpool_postgres::PoolConfig::new(max_size);
90 if let Some(secs) = timeout_secs {
91 let t = Duration::from_secs(secs);
92 pool_cfg.timeouts.wait = Some(t);
93 pool_cfg.timeouts.create = Some(t);
94 // `recycle` intentionally stays None — fast recycle, not user-configurable.
95 }
96 cfg.pool = Some(pool_cfg);
97
98 cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| FraiseQLError::ConnectionPool {
99 message: format!("Failed to create connection pool: {e}"),
100 })
101}
102
103/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
104///
105/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
106/// This function is defense-in-depth: `OrderByClause` already rejects field names
107/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
108/// escaping ensures correctness for any future caller that bypasses that validation.
109pub(super) fn escape_jsonb_key(key: &str) -> String {
110 key.replace('\'', "''")
111}
112
113/// PostgreSQL database adapter with connection pooling.
114///
115/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
116///
117/// # Example
118///
119/// ```rust,no_run
120/// use fraiseql_db::postgres::PostgresAdapter;
121/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
122/// use serde_json::json;
123///
124/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
125/// // Create adapter with connection string
126/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
127///
128/// // Execute query
129/// let where_clause = WhereClause::Field {
130/// path: vec!["email".to_string()],
131/// operator: WhereOperator::Icontains,
132/// value: json!("example.com"),
133/// };
134///
135/// let results = adapter
136/// .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
137/// .await?;
138///
139/// println!("Found {} users", results.len());
140/// # Ok(())
141/// # }
142/// ```
143#[derive(Clone)]
144pub struct PostgresAdapter {
145 pub(super) pool: Pool,
146 /// Whether mutation timing injection is enabled.
147 mutation_timing_enabled: bool,
148 /// The PostgreSQL session variable name for timing.
149 timing_variable_name: String,
150}
151
152impl std::fmt::Debug for PostgresAdapter {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("PostgresAdapter")
155 .field("mutation_timing_enabled", &self.mutation_timing_enabled)
156 .field("timing_variable_name", &self.timing_variable_name)
157 .field("pool", &"<Pool>")
158 .finish()
159 }
160}
161
162impl PostgresAdapter {
163 /// Create new PostgreSQL adapter with default pool configuration.
164 ///
165 /// # Arguments
166 ///
167 /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
168 ///
169 /// # Errors
170 ///
171 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
172 ///
173 /// # Example
174 ///
175 /// ```rust,no_run
176 /// # use fraiseql_db::postgres::PostgresAdapter;
177 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
178 /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
179 /// # Ok(())
180 /// # }
181 /// ```
182 pub async fn new(connection_string: &str) -> Result<Self> {
183 Self::with_pool_config(
184 connection_string,
185 PoolPrewarmConfig { min_size: 0, max_size: DEFAULT_POOL_SIZE, timeout_secs: None },
186 )
187 .await
188 }
189
190 /// Create new PostgreSQL adapter with pre-warming and timeout configuration.
191 ///
192 /// Constructs the pool, runs a startup health check, then eagerly opens
193 /// `cfg.min_size` connections so they are ready when the first request arrives.
194 ///
195 /// # Arguments
196 ///
197 /// * `connection_string` - PostgreSQL connection string
198 /// * `cfg` - Pool pre-warming and timeout configuration
199 ///
200 /// # Errors
201 ///
202 /// Returns `FraiseQLError::ConnectionPool` if pool creation or the startup
203 /// health check fails.
204 pub async fn with_pool_config(
205 connection_string: &str,
206 cfg: PoolPrewarmConfig,
207 ) -> Result<Self> {
208 let pool = build_pool(connection_string, cfg.max_size, cfg.timeout_secs)?;
209
210 // Startup health check — establishes the first connection.
211 let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
212 message: format!("Failed to acquire connection: {e}"),
213 })?;
214
215 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
216 message: format!("Failed to connect to database: {e}"),
217 sql_state: e.code().map(|c| c.code().to_string()),
218 })?;
219
220 // Drop client back to the pool before pre-warming so that the health-check
221 // connection counts as idle slot #1.
222 drop(client);
223
224 let adapter = Self {
225 pool,
226 mutation_timing_enabled: false,
227 timing_variable_name: "fraiseql.started_at".to_string(),
228 };
229
230 // Pre-warm: open `min_size - 1` additional connections (one already exists).
231 let warm_target = cfg.min_size.min(cfg.max_size).saturating_sub(1);
232 if warm_target > 0 {
233 adapter.prewarm(warm_target).await;
234 }
235
236 Ok(adapter)
237 }
238
239 /// Create new PostgreSQL adapter with custom pool size.
240 ///
241 /// # Arguments
242 ///
243 /// * `connection_string` - PostgreSQL connection string
244 /// * `max_size` - Maximum number of connections in pool
245 ///
246 /// # Errors
247 ///
248 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
249 pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
250 Self::with_pool_config(
251 connection_string,
252 PoolPrewarmConfig { min_size: 0, max_size, timeout_secs: None },
253 )
254 .await
255 }
256
257 /// Pre-warm the pool by opening `count` additional connections.
258 ///
259 /// Pre-warming is best-effort: failures from individual connections are logged
260 /// but do not prevent startup. A 10-second outer timeout ensures the server
261 /// never blocks indefinitely on a slow or unreachable PostgreSQL instance.
262 async fn prewarm(&self, count: usize) {
263 use futures::future::join_all;
264 use tokio::time::timeout;
265
266 let handles: Vec<_> = (0..count)
267 .map(|_| {
268 let pool = self.pool.clone();
269 tokio::spawn(async move { pool.get().await })
270 })
271 .collect();
272
273 let result = timeout(Duration::from_secs(10), join_all(handles)).await;
274
275 let (succeeded, failed) = match result {
276 Ok(outcomes) => {
277 let s = outcomes
278 .iter()
279 .filter(|r| r.as_ref().map(|inner| inner.is_ok()).unwrap_or(false))
280 .count();
281 (s, count - s)
282 },
283 Err(_elapsed) => {
284 tracing::warn!(
285 target_connections = count,
286 "Pool pre-warm timed out after 10s; server will continue with partial pre-warm"
287 );
288 (0, count)
289 },
290 };
291
292 if failed > 0 {
293 tracing::warn!(
294 succeeded,
295 failed,
296 "Pool pre-warm: some connections could not be established"
297 );
298 } else {
299 tracing::info!(
300 idle_connections = succeeded + 1,
301 "PostgreSQL pool pre-warmed successfully"
302 );
303 }
304 }
305
306 /// Get a reference to the internal connection pool.
307 ///
308 /// This allows sharing the pool with other components like `PostgresIntrospector`.
309 #[must_use]
310 pub const fn pool(&self) -> &Pool {
311 &self.pool
312 }
313
314 /// Enable mutation timing injection.
315 ///
316 /// When enabled, `execute_function_call` wraps each mutation in a transaction
317 /// and sets a session variable to `clock_timestamp()::text` before execution,
318 /// allowing SQL functions to compute their own duration.
319 ///
320 /// # Arguments
321 ///
322 /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
323 #[must_use]
324 pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
325 self.mutation_timing_enabled = true;
326 self.timing_variable_name = variable_name.to_string();
327 self
328 }
329
330 /// Returns whether mutation timing injection is enabled.
331 #[must_use]
332 pub const fn mutation_timing_enabled(&self) -> bool {
333 self.mutation_timing_enabled
334 }
335
336 /// Execute raw SQL query and return JSONB rows.
337 ///
338 /// # Errors
339 ///
340 /// Returns `FraiseQLError::Database` on query execution failure.
341 pub(super) async fn execute_raw(
342 &self,
343 sql: &str,
344 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
345 ) -> Result<Vec<JsonbValue>> {
346 let client = self.acquire_connection_with_retry().await?;
347
348 let rows: Vec<Row> =
349 client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
350 message: format!("Query execution failed: {e}"),
351 sql_state: e.code().map(|c| c.code().to_string()),
352 })?;
353
354 let results = rows
355 .into_iter()
356 .map(|row| {
357 let data: serde_json::Value = row.get(0);
358 JsonbValue::new(data)
359 })
360 .collect();
361
362 Ok(results)
363 }
364
365 /// Acquire a connection from the pool with retry logic.
366 ///
367 /// - `PoolError::Timeout`: the pool was exhausted for the full configured wait period.
368 /// This is not transient — retrying would only multiply the wait. Fails immediately.
369 /// - `PoolError::Backend` / create errors: potentially transient. Retries with
370 /// exponential backoff (up to `MAX_CONNECTION_RETRIES` attempts).
371 ///
372 /// # Errors
373 ///
374 /// Returns `FraiseQLError::ConnectionPool` on timeout or when all retries are exhausted.
375 pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
376 use deadpool_postgres::PoolError;
377
378 let mut last_error = None;
379
380 for attempt in 0..MAX_CONNECTION_RETRIES {
381 match self.pool.get().await {
382 Ok(client) => {
383 if attempt > 0 {
384 tracing::info!(
385 attempt,
386 "Successfully acquired connection after retries"
387 );
388 }
389 return Ok(client);
390 },
391 // Pool exhausted for the full wait period — not transient, fail immediately.
392 Err(PoolError::Timeout(_)) => {
393 let metrics = self.pool_metrics();
394 tracing::error!(
395 available = metrics.idle_connections,
396 active = metrics.active_connections,
397 max = metrics.total_connections,
398 "Connection pool timeout: all connections busy"
399 );
400 return Err(FraiseQLError::ConnectionPool {
401 message: format!(
402 "Connection pool timeout: {}/{} connections busy. \
403 Increase pool_max_size or reduce concurrent load.",
404 metrics.active_connections, metrics.total_connections,
405 ),
406 });
407 },
408 // Backend/create errors are potentially transient — retry with backoff.
409 Err(e) => {
410 last_error = Some(e);
411 if attempt < MAX_CONNECTION_RETRIES - 1 {
412 let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
413 tracing::warn!(
414 attempt = attempt + 1,
415 total = MAX_CONNECTION_RETRIES,
416 delay_ms = delay,
417 "Transient connection error, retrying"
418 );
419 tokio::time::sleep(Duration::from_millis(delay)).await;
420 }
421 },
422 }
423 }
424
425 // All retries for transient errors exhausted.
426 let pool_metrics = self.pool_metrics();
427 tracing::error!(
428 retries = MAX_CONNECTION_RETRIES,
429 available = pool_metrics.idle_connections,
430 active = pool_metrics.active_connections,
431 max = pool_metrics.total_connections,
432 "Failed to acquire connection after all retries"
433 );
434
435 Err(FraiseQLError::ConnectionPool {
436 message: format!(
437 "Failed to acquire connection after {} retries: {}. \
438 Pool state: idle={}, active={}, max={}",
439 MAX_CONNECTION_RETRIES,
440 last_error.expect("last_error is set on every retry iteration"),
441 pool_metrics.idle_connections,
442 pool_metrics.active_connections,
443 pool_metrics.total_connections,
444 ),
445 })
446 }
447
448 /// Execute query with SQL field projection optimization.
449 ///
450 /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
451 /// only the requested fields from the JSONB column, reducing network payload and
452 /// JSON deserialization overhead.
453 ///
454 /// # Arguments
455 ///
456 /// * `view` - View/table name to query
457 /// * `projection` - Optional SQL projection hint with field list
458 /// * `where_clause` - Optional WHERE clause for filtering
459 /// * `limit` - Optional row limit
460 ///
461 /// # Returns
462 ///
463 /// Vector of projected JSONB rows with only the requested fields
464 ///
465 /// # Errors
466 ///
467 /// Returns `FraiseQLError::Database` on query execution failure.
468 ///
469 /// # Panics
470 ///
471 /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
472 /// immediately above it.
473 ///
474 /// # Example
475 ///
476 /// ```no_run
477 /// // Requires: running PostgreSQL database.
478 /// use fraiseql_db::postgres::PostgresAdapter;
479 /// use fraiseql_db::types::SqlProjectionHint;
480 /// use fraiseql_db::DatabaseType;
481 ///
482 /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
483 /// let projection = SqlProjectionHint {
484 /// database: DatabaseType::PostgreSQL,
485 /// projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
486 /// estimated_reduction_percent: 75,
487 /// };
488 ///
489 /// let results = adapter
490 /// .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
491 /// .await?;
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub async fn execute_with_projection(
496 &self,
497 view: &str,
498 projection: Option<&SqlProjectionHint>,
499 where_clause: Option<&WhereClause>,
500 limit: Option<u32>,
501 offset: Option<u32>,
502 ) -> Result<Vec<JsonbValue>> {
503 // If no projection, fall back to standard query
504 if projection.is_none() {
505 return self.execute_where_query(view, where_clause, limit, offset, None).await;
506 }
507
508 let projection = projection.expect("projection is Some; None was returned above");
509
510 // Build SQL with projection
511 // The projection_template is expected to be the SELECT clause with projection SQL
512 // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
513 let mut sql = format!(
514 "SELECT {} FROM {}",
515 projection.projection_template,
516 quote_postgres_identifier(view)
517 );
518
519 // Add WHERE clause if present
520 if let Some(clause) = where_clause {
521 let generator = PostgresWhereGenerator::new(PostgresDialect);
522 let (where_sql, where_params) = generator.generate(clause)?;
523 sql.push_str(" WHERE ");
524 sql.push_str(&where_sql);
525
526 // Convert WHERE params to typed params first
527 let mut typed_params: Vec<QueryParam> =
528 where_params.into_iter().map(QueryParam::from).collect();
529 let mut param_count = typed_params.len();
530
531 // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
532 // Reason (expect below): fmt::Write for String is infallible.
533 if let Some(lim) = limit {
534 param_count += 1;
535 write!(sql, " LIMIT ${param_count}").expect("write to String");
536 typed_params.push(QueryParam::BigInt(i64::from(lim)));
537 }
538
539 if let Some(off) = offset {
540 param_count += 1;
541 write!(sql, " OFFSET ${param_count}").expect("write to String");
542 typed_params.push(QueryParam::BigInt(i64::from(off)));
543 }
544
545 tracing::debug!("SQL with projection = {}", sql);
546 tracing::debug!("typed_params = {:?}", typed_params);
547
548 // Create references to QueryParam for ToSql
549 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
550 .iter()
551 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
552 .collect();
553
554 self.execute_raw(&sql, ¶m_refs).await
555 } else {
556 // No WHERE clause — only LIMIT/OFFSET params.
557 // Reason (expect below): fmt::Write for String is infallible.
558 let mut typed_params: Vec<QueryParam> = Vec::new();
559 let mut param_count = 0;
560
561 if let Some(lim) = limit {
562 param_count += 1;
563 write!(sql, " LIMIT ${param_count}").expect("write to String");
564 typed_params.push(QueryParam::BigInt(i64::from(lim)));
565 }
566
567 if let Some(off) = offset {
568 param_count += 1;
569 write!(sql, " OFFSET ${param_count}").expect("write to String");
570 typed_params.push(QueryParam::BigInt(i64::from(off)));
571 }
572
573 // Create references to QueryParam for ToSql
574 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
575 .iter()
576 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
577 .collect();
578
579 self.execute_raw(&sql, ¶m_refs).await
580 }
581 }
582}
583
584/// Build a parameterized `SELECT data FROM {view}` SQL string.
585///
586/// Shared by [`PostgresAdapter::execute_where_query`] and
587/// [`PostgresAdapter::explain_where_query`] so that SQL construction
588/// logic is never duplicated.
589///
590/// # Returns
591///
592/// `(sql, typed_params)` — the SQL string and the bound parameter values.
593///
594/// # Errors
595///
596/// Returns `FraiseQLError` if WHERE clause generation fails.
597pub(super) fn build_where_select_sql(
598 view: &str,
599 where_clause: Option<&WhereClause>,
600 limit: Option<u32>,
601 offset: Option<u32>,
602) -> Result<(String, Vec<QueryParam>)> {
603 // Build base query
604 let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
605
606 // Collect WHERE clause params (if any)
607 let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
608 let generator = PostgresWhereGenerator::new(PostgresDialect);
609 let (where_sql, where_params) = generator.generate(clause)?;
610 sql.push_str(" WHERE ");
611 sql.push_str(&where_sql);
612
613 // Convert WHERE clause JSON values to QueryParam
614 where_params.into_iter().map(QueryParam::from).collect()
615 } else {
616 Vec::new()
617 };
618 let mut param_count = typed_params.len();
619
620 // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
621 // Reason (expect below): fmt::Write for String is infallible.
622 if let Some(lim) = limit {
623 param_count += 1;
624 write!(sql, " LIMIT ${param_count}").expect("write to String");
625 typed_params.push(QueryParam::BigInt(i64::from(lim)));
626 }
627
628 // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
629 if let Some(off) = offset {
630 param_count += 1;
631 write!(sql, " OFFSET ${param_count}").expect("write to String");
632 typed_params.push(QueryParam::BigInt(i64::from(off)));
633 }
634
635 Ok((sql, typed_params))
636}