fraiseql_db/postgres/adapter/mod.rs
1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::fmt::Write;
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20 dialect::PostgresDialect,
21 identifier::quote_postgres_identifier,
22 traits::DatabaseAdapter,
23 types::{JsonbValue, QueryParam, sql_hints::SqlProjectionHint},
24 where_clause::WhereClause,
25};
26
27/// Default maximum pool size for PostgreSQL connections.
28/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
29/// nested query load (fixes Issue #41).
30const DEFAULT_POOL_SIZE: usize = 25;
31
32/// Maximum retries for connection acquisition with exponential backoff.
33const MAX_CONNECTION_RETRIES: u32 = 3;
34
35/// Base delay in milliseconds for connection retry backoff.
36const CONNECTION_RETRY_DELAY_MS: u64 = 50;
37
38/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
39///
40/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
41/// This function is defense-in-depth: `OrderByClause` already rejects field names
42/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
43/// escaping ensures correctness for any future caller that bypasses that validation.
44pub(super) fn escape_jsonb_key(key: &str) -> String {
45 key.replace('\'', "''")
46}
47
48/// PostgreSQL database adapter with connection pooling.
49///
50/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
51///
52/// # Example
53///
54/// ```rust,no_run
55/// use fraiseql_db::postgres::PostgresAdapter;
56/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
57/// use serde_json::json;
58///
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// // Create adapter with connection string
61/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
62///
63/// // Execute query
64/// let where_clause = WhereClause::Field {
65/// path: vec!["email".to_string()],
66/// operator: WhereOperator::Icontains,
67/// value: json!("example.com"),
68/// };
69///
70/// let results = adapter
71/// .execute_where_query("v_user", Some(&where_clause), Some(10), None, None)
72/// .await?;
73///
74/// println!("Found {} users", results.len());
75/// # Ok(())
76/// # }
77/// ```
78#[derive(Clone)]
79pub struct PostgresAdapter {
80 pub(super) pool: Pool,
81 /// Whether mutation timing injection is enabled.
82 mutation_timing_enabled: bool,
83 /// The PostgreSQL session variable name for timing.
84 timing_variable_name: String,
85}
86
87impl std::fmt::Debug for PostgresAdapter {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("PostgresAdapter")
90 .field("mutation_timing_enabled", &self.mutation_timing_enabled)
91 .field("timing_variable_name", &self.timing_variable_name)
92 .field("pool", &"<Pool>")
93 .finish()
94 }
95}
96
97impl PostgresAdapter {
98 /// Create new PostgreSQL adapter with default pool configuration.
99 ///
100 /// # Arguments
101 ///
102 /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
103 ///
104 /// # Errors
105 ///
106 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
107 ///
108 /// # Example
109 ///
110 /// ```rust,no_run
111 /// # use fraiseql_db::postgres::PostgresAdapter;
112 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
113 /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
114 /// # Ok(())
115 /// # }
116 /// ```
117 pub async fn new(connection_string: &str) -> Result<Self> {
118 Self::with_pool_size(connection_string, DEFAULT_POOL_SIZE).await
119 }
120
121 /// Create new PostgreSQL adapter with custom pool configuration.
122 ///
123 /// # Arguments
124 ///
125 /// * `connection_string` - PostgreSQL connection string
126 /// * `min_size` - Minimum size hint (not enforced by deadpool-postgres)
127 /// * `max_size` - Maximum number of connections in pool
128 ///
129 /// # Errors
130 ///
131 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
132 ///
133 /// # Note
134 ///
135 /// `min_size` is accepted for API compatibility but deadpool-postgres uses
136 /// lazy initialization with dynamic pool sizing up to `max_size`.
137 pub async fn with_pool_config(
138 connection_string: &str,
139 _min_size: usize,
140 max_size: usize,
141 ) -> Result<Self> {
142 Self::with_pool_size(connection_string, max_size).await
143 }
144
145 /// Create new PostgreSQL adapter with custom pool size.
146 ///
147 /// # Arguments
148 ///
149 /// * `connection_string` - PostgreSQL connection string
150 /// * `max_size` - Maximum number of connections in pool
151 ///
152 /// # Errors
153 ///
154 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
155 pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
156 let mut cfg = Config::new();
157 cfg.url = Some(connection_string.to_string());
158 cfg.manager = Some(ManagerConfig {
159 recycling_method: RecyclingMethod::Fast,
160 });
161 cfg.pool = Some(deadpool_postgres::PoolConfig::new(max_size));
162
163 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| {
164 FraiseQLError::ConnectionPool {
165 message: format!("Failed to create connection pool: {e}"),
166 }
167 })?;
168
169 // Test connection
170 let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
171 message: format!("Failed to acquire connection: {e}"),
172 })?;
173
174 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
175 message: format!("Failed to connect to database: {e}"),
176 sql_state: e.code().map(|c| c.code().to_string()),
177 })?;
178
179 Ok(Self {
180 pool,
181 mutation_timing_enabled: false,
182 timing_variable_name: "fraiseql.started_at".to_string(),
183 })
184 }
185
186 /// Get a reference to the internal connection pool.
187 ///
188 /// This allows sharing the pool with other components like `PostgresIntrospector`.
189 #[must_use]
190 pub const fn pool(&self) -> &Pool {
191 &self.pool
192 }
193
194 /// Enable mutation timing injection.
195 ///
196 /// When enabled, `execute_function_call` wraps each mutation in a transaction
197 /// and sets a session variable to `clock_timestamp()::text` before execution,
198 /// allowing SQL functions to compute their own duration.
199 ///
200 /// # Arguments
201 ///
202 /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
203 #[must_use]
204 pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
205 self.mutation_timing_enabled = true;
206 self.timing_variable_name = variable_name.to_string();
207 self
208 }
209
210 /// Returns whether mutation timing injection is enabled.
211 #[must_use]
212 pub const fn mutation_timing_enabled(&self) -> bool {
213 self.mutation_timing_enabled
214 }
215
216 /// Execute raw SQL query and return JSONB rows.
217 ///
218 /// # Errors
219 ///
220 /// Returns `FraiseQLError::Database` on query execution failure.
221 pub(super) async fn execute_raw(
222 &self,
223 sql: &str,
224 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
225 ) -> Result<Vec<JsonbValue>> {
226 let client = self.acquire_connection_with_retry().await?;
227
228 let rows: Vec<Row> =
229 client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
230 message: format!("Query execution failed: {e}"),
231 sql_state: e.code().map(|c| c.code().to_string()),
232 })?;
233
234 let results = rows
235 .into_iter()
236 .map(|row| {
237 let data: serde_json::Value = row.get(0);
238 JsonbValue::new(data)
239 })
240 .collect();
241
242 Ok(results)
243 }
244
245 /// Acquire a connection from the pool with retry logic.
246 ///
247 /// Implements exponential backoff retry when the pool is exhausted.
248 /// This prevents transient pool exhaustion from causing query failures
249 /// under concurrent load (fixes Issue #41).
250 ///
251 /// # Errors
252 ///
253 /// Returns `FraiseQLError::ConnectionPool` if all retries are exhausted.
254 pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
255 let mut last_error = None;
256
257 for attempt in 0..MAX_CONNECTION_RETRIES {
258 match self.pool.get().await {
259 Ok(client) => {
260 if attempt > 0 {
261 tracing::info!(
262 "Successfully acquired connection after {} retries",
263 attempt
264 );
265 }
266 return Ok(client);
267 },
268 Err(e) => {
269 last_error = Some(e);
270 if attempt < MAX_CONNECTION_RETRIES - 1 {
271 let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
272 tracing::warn!(
273 "Connection pool exhausted (attempt {}/{}), retrying in {}ms...",
274 attempt + 1,
275 MAX_CONNECTION_RETRIES,
276 delay
277 );
278 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
279 }
280 },
281 }
282 }
283
284 // All retries exhausted - log detailed pool state
285 let pool_metrics = self.pool_metrics();
286 tracing::error!(
287 "Failed to acquire connection after {} retries. Pool state: available={}, active={}, total={}",
288 MAX_CONNECTION_RETRIES,
289 pool_metrics.idle_connections,
290 pool_metrics.active_connections,
291 pool_metrics.total_connections
292 );
293
294 Err(FraiseQLError::ConnectionPool {
295 message: format!(
296 "Failed to acquire connection after {} retries: {}. Pool exhausted (available={}/{}). Consider increasing pool size or reducing concurrent load.",
297 MAX_CONNECTION_RETRIES,
298 last_error.expect("last_error is set on every retry iteration"),
299 pool_metrics.idle_connections,
300 pool_metrics.total_connections
301 ),
302 })
303 }
304
305 /// Execute query with SQL field projection optimization.
306 ///
307 /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
308 /// only the requested fields from the JSONB column, reducing network payload and
309 /// JSON deserialization overhead.
310 ///
311 /// # Arguments
312 ///
313 /// * `view` - View/table name to query
314 /// * `projection` - Optional SQL projection hint with field list
315 /// * `where_clause` - Optional WHERE clause for filtering
316 /// * `limit` - Optional row limit
317 ///
318 /// # Returns
319 ///
320 /// Vector of projected JSONB rows with only the requested fields
321 ///
322 /// # Errors
323 ///
324 /// Returns `FraiseQLError::Database` on query execution failure.
325 ///
326 /// # Panics
327 ///
328 /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
329 /// immediately above it.
330 ///
331 /// # Example
332 ///
333 /// ```no_run
334 /// // Requires: running PostgreSQL database.
335 /// use fraiseql_db::postgres::PostgresAdapter;
336 /// use fraiseql_db::types::SqlProjectionHint;
337 /// use fraiseql_db::DatabaseType;
338 ///
339 /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
340 /// let projection = SqlProjectionHint {
341 /// database: DatabaseType::PostgreSQL,
342 /// projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
343 /// estimated_reduction_percent: 75,
344 /// };
345 ///
346 /// let results = adapter
347 /// .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
348 /// .await?;
349 /// # Ok(())
350 /// # }
351 /// ```
352 pub async fn execute_with_projection(
353 &self,
354 view: &str,
355 projection: Option<&SqlProjectionHint>,
356 where_clause: Option<&WhereClause>,
357 limit: Option<u32>,
358 offset: Option<u32>,
359 ) -> Result<Vec<JsonbValue>> {
360 // If no projection, fall back to standard query
361 if projection.is_none() {
362 return self.execute_where_query(view, where_clause, limit, offset, None).await;
363 }
364
365 let projection = projection.expect("projection is Some; None was returned above");
366
367 // Build SQL with projection
368 // The projection_template is expected to be the SELECT clause with projection SQL
369 // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
370 let mut sql = format!(
371 "SELECT {} FROM {}",
372 projection.projection_template,
373 quote_postgres_identifier(view)
374 );
375
376 // Add WHERE clause if present
377 if let Some(clause) = where_clause {
378 let generator = PostgresWhereGenerator::new(PostgresDialect);
379 let (where_sql, where_params) = generator.generate(clause)?;
380 sql.push_str(" WHERE ");
381 sql.push_str(&where_sql);
382
383 // Convert WHERE params to typed params first
384 let mut typed_params: Vec<QueryParam> =
385 where_params.into_iter().map(QueryParam::from).collect();
386 let mut param_count = typed_params.len();
387
388 // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
389 // Reason (expect below): fmt::Write for String is infallible.
390 if let Some(lim) = limit {
391 param_count += 1;
392 write!(sql, " LIMIT ${param_count}").expect("write to String");
393 typed_params.push(QueryParam::BigInt(i64::from(lim)));
394 }
395
396 if let Some(off) = offset {
397 param_count += 1;
398 write!(sql, " OFFSET ${param_count}").expect("write to String");
399 typed_params.push(QueryParam::BigInt(i64::from(off)));
400 }
401
402 tracing::debug!("SQL with projection = {}", sql);
403 tracing::debug!("typed_params = {:?}", typed_params);
404
405 // Create references to QueryParam for ToSql
406 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
407 .iter()
408 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
409 .collect();
410
411 self.execute_raw(&sql, ¶m_refs).await
412 } else {
413 // No WHERE clause — only LIMIT/OFFSET params.
414 // Reason (expect below): fmt::Write for String is infallible.
415 let mut typed_params: Vec<QueryParam> = Vec::new();
416 let mut param_count = 0;
417
418 if let Some(lim) = limit {
419 param_count += 1;
420 write!(sql, " LIMIT ${param_count}").expect("write to String");
421 typed_params.push(QueryParam::BigInt(i64::from(lim)));
422 }
423
424 if let Some(off) = offset {
425 param_count += 1;
426 write!(sql, " OFFSET ${param_count}").expect("write to String");
427 typed_params.push(QueryParam::BigInt(i64::from(off)));
428 }
429
430 // Create references to QueryParam for ToSql
431 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
432 .iter()
433 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
434 .collect();
435
436 self.execute_raw(&sql, ¶m_refs).await
437 }
438 }
439}
440
441/// Build a parameterized `SELECT data FROM {view}` SQL string.
442///
443/// Shared by [`PostgresAdapter::execute_where_query`] and
444/// [`PostgresAdapter::explain_where_query`] so that SQL construction
445/// logic is never duplicated.
446///
447/// # Returns
448///
449/// `(sql, typed_params)` — the SQL string and the bound parameter values.
450///
451/// # Errors
452///
453/// Returns `FraiseQLError` if WHERE clause generation fails.
454pub(super) fn build_where_select_sql(
455 view: &str,
456 where_clause: Option<&WhereClause>,
457 limit: Option<u32>,
458 offset: Option<u32>,
459) -> Result<(String, Vec<QueryParam>)> {
460 // Build base query
461 let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
462
463 // Collect WHERE clause params (if any)
464 let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
465 let generator = PostgresWhereGenerator::new(PostgresDialect);
466 let (where_sql, where_params) = generator.generate(clause)?;
467 sql.push_str(" WHERE ");
468 sql.push_str(&where_sql);
469
470 // Convert WHERE clause JSON values to QueryParam
471 where_params.into_iter().map(QueryParam::from).collect()
472 } else {
473 Vec::new()
474 };
475 let mut param_count = typed_params.len();
476
477 // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
478 // Reason (expect below): fmt::Write for String is infallible.
479 if let Some(lim) = limit {
480 param_count += 1;
481 write!(sql, " LIMIT ${param_count}").expect("write to String");
482 typed_params.push(QueryParam::BigInt(i64::from(lim)));
483 }
484
485 // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
486 if let Some(off) = offset {
487 param_count += 1;
488 write!(sql, " OFFSET ${param_count}").expect("write to String");
489 typed_params.push(QueryParam::BigInt(i64::from(off)));
490 }
491
492 Ok((sql, typed_params))
493}