fraiseql_db/postgres/adapter/mod.rs
1//! PostgreSQL database adapter implementation.
2
3mod database;
4mod relay;
5
6#[cfg(test)]
7mod tests;
8
9#[cfg(all(test, feature = "test-postgres"))]
10mod integration_tests;
11
12use std::{fmt::Write, time::Duration};
13
14use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
15use fraiseql_error::{FraiseQLError, Result};
16use tokio_postgres::{NoTls, Row};
17
18use super::where_generator::PostgresWhereGenerator;
19use crate::{
20 dialect::PostgresDialect,
21 identifier::quote_postgres_identifier,
22 order_by::append_order_by,
23 traits::DatabaseAdapter,
24 types::{
25 DatabaseType, JsonbValue, QueryParam,
26 sql_hints::{OrderByClause, SqlProjectionHint},
27 },
28 where_clause::WhereClause,
29};
30
31/// Default maximum pool size for PostgreSQL connections.
32/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
33/// nested query load (fixes Issue #41).
34const DEFAULT_POOL_SIZE: usize = 25;
35
36/// Maximum retries for connection acquisition with exponential backoff.
37const MAX_CONNECTION_RETRIES: u32 = 3;
38
39/// Base delay in milliseconds for connection retry backoff.
40const CONNECTION_RETRY_DELAY_MS: u64 = 50;
41
42/// Configuration for connection pool construction and pre-warming.
43///
44/// Controls the minimum guaranteed connections (pre-warmed at startup),
45/// the maximum pool ceiling, and the wait/create timeout for connection
46/// acquisition.
47///
48/// # Example
49///
50/// ```rust
51/// use fraiseql_db::postgres::PoolPrewarmConfig;
52///
53/// let cfg = PoolPrewarmConfig {
54/// min_size: 5,
55/// max_size: 20,
56/// timeout_secs: Some(30),
57/// };
58/// ```
59#[derive(Debug, Clone)]
60pub struct PoolPrewarmConfig {
61 /// Number of connections to establish at pool creation time.
62 ///
63 /// After the pool is created, `min_size` connections are opened eagerly
64 /// so they are ready when the first request arrives. Set to `0` to disable
65 /// pre-warming (lazy init — one connection from the startup health check).
66 pub min_size: usize,
67
68 /// Maximum number of connections the pool may hold.
69 pub max_size: usize,
70
71 /// Optional timeout (in seconds) for connection acquisition and creation.
72 ///
73 /// Applied to both the `wait` (blocked waiting for an idle connection) and
74 /// `create` (time to open a new TCP connection to PostgreSQL) deadpool slots.
75 /// When `None`, acquisition can block indefinitely on pool exhaustion.
76 pub timeout_secs: Option<u64>,
77}
78
79/// Build a `deadpool-postgres` pool with an optional wait/create timeout.
80///
81/// # Errors
82///
83/// Returns `FraiseQLError::ConnectionPool` if pool creation fails (e.g., unparseable URL).
84fn build_pool(connection_string: &str, max_size: usize, timeout_secs: Option<u64>) -> Result<Pool> {
85 let mut cfg = Config::new();
86 cfg.url = Some(connection_string.to_string());
87 cfg.manager = Some(ManagerConfig {
88 recycling_method: RecyclingMethod::Fast,
89 });
90
91 let mut pool_cfg = deadpool_postgres::PoolConfig::new(max_size);
92 if let Some(secs) = timeout_secs {
93 let t = Duration::from_secs(secs);
94 pool_cfg.timeouts.wait = Some(t);
95 pool_cfg.timeouts.create = Some(t);
96 // `recycle` intentionally stays None — fast recycle, not user-configurable.
97 }
98 cfg.pool = Some(pool_cfg);
99
100 cfg.create_pool(Some(Runtime::Tokio1), NoTls)
101 .map_err(|e| FraiseQLError::ConnectionPool {
102 message: format!("Failed to create connection pool: {e}"),
103 })
104}
105
106/// Escape a JSONB key for use in a PostgreSQL string literal (`data->>'key'`).
107///
108/// PostgreSQL string literals use single-quote doubling for escaping (`'` → `''`).
109/// This function is defense-in-depth: `OrderByClause` already rejects field names
110/// that are not valid GraphQL identifiers (which cannot contain `'`), but this
111/// escaping ensures correctness for any future caller that bypasses that validation.
112pub(super) fn escape_jsonb_key(key: &str) -> String {
113 key.replace('\'', "''")
114}
115
116/// PostgreSQL database adapter with connection pooling.
117///
118/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
119///
120/// # Example
121///
122/// ```rust,no_run
123/// use fraiseql_db::postgres::PostgresAdapter;
124/// use fraiseql_db::{DatabaseAdapter, WhereClause, WhereOperator};
125/// use serde_json::json;
126///
127/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
128/// // Create adapter with connection string
129/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
130///
131/// // Execute query
132/// let where_clause = WhereClause::Field {
133/// path: vec!["email".to_string()],
134/// operator: WhereOperator::Icontains,
135/// value: json!("example.com"),
136/// };
137///
138/// let results = adapter
139/// .execute_where_query("v_user", Some(&where_clause), Some(10), None, None, &[])
140/// .await?;
141///
142/// println!("Found {} users", results.len());
143/// # Ok(())
144/// # }
145/// ```
146#[derive(Clone)]
147pub struct PostgresAdapter {
148 pub(super) pool: Pool,
149 /// Whether mutation timing injection is enabled.
150 mutation_timing_enabled: bool,
151 /// The PostgreSQL session variable name for timing.
152 timing_variable_name: String,
153}
154
155impl std::fmt::Debug for PostgresAdapter {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("PostgresAdapter")
158 .field("mutation_timing_enabled", &self.mutation_timing_enabled)
159 .field("timing_variable_name", &self.timing_variable_name)
160 .field("pool", &"<Pool>")
161 .finish()
162 }
163}
164
165impl PostgresAdapter {
166 /// Create new PostgreSQL adapter with default pool configuration.
167 ///
168 /// # Arguments
169 ///
170 /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
171 ///
172 /// # Errors
173 ///
174 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
175 ///
176 /// # Example
177 ///
178 /// ```rust,no_run
179 /// # use fraiseql_db::postgres::PostgresAdapter;
180 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
181 /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 pub async fn new(connection_string: &str) -> Result<Self> {
186 Self::with_pool_config(
187 connection_string,
188 PoolPrewarmConfig {
189 min_size: 0,
190 max_size: DEFAULT_POOL_SIZE,
191 timeout_secs: None,
192 },
193 )
194 .await
195 }
196
197 /// Create new PostgreSQL adapter with pre-warming and timeout configuration.
198 ///
199 /// Constructs the pool, runs a startup health check, then eagerly opens
200 /// `cfg.min_size` connections so they are ready when the first request arrives.
201 ///
202 /// # Arguments
203 ///
204 /// * `connection_string` - PostgreSQL connection string
205 /// * `cfg` - Pool pre-warming and timeout configuration
206 ///
207 /// # Errors
208 ///
209 /// Returns `FraiseQLError::ConnectionPool` if pool creation or the startup
210 /// health check fails.
211 pub async fn with_pool_config(connection_string: &str, cfg: PoolPrewarmConfig) -> Result<Self> {
212 let pool = build_pool(connection_string, cfg.max_size, cfg.timeout_secs)?;
213
214 // Startup health check — establishes the first connection.
215 let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
216 message: format!("Failed to acquire connection: {e}"),
217 })?;
218
219 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
220 message: format!("Failed to connect to database: {e}"),
221 sql_state: e.code().map(|c| c.code().to_string()),
222 })?;
223
224 // Drop client back to the pool before pre-warming so that the health-check
225 // connection counts as idle slot #1.
226 drop(client);
227
228 let adapter = Self {
229 pool,
230 mutation_timing_enabled: false,
231 timing_variable_name: "fraiseql.started_at".to_string(),
232 };
233
234 // Pre-warm: open `min_size - 1` additional connections (one already exists).
235 let warm_target = cfg.min_size.min(cfg.max_size).saturating_sub(1);
236 if warm_target > 0 {
237 adapter.prewarm(warm_target).await;
238 }
239
240 Ok(adapter)
241 }
242
243 /// Create new PostgreSQL adapter with custom pool size.
244 ///
245 /// # Arguments
246 ///
247 /// * `connection_string` - PostgreSQL connection string
248 /// * `max_size` - Maximum number of connections in pool
249 ///
250 /// # Errors
251 ///
252 /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
253 pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
254 Self::with_pool_config(
255 connection_string,
256 PoolPrewarmConfig {
257 min_size: 0,
258 max_size,
259 timeout_secs: None,
260 },
261 )
262 .await
263 }
264
265 /// Pre-warm the pool by opening `count` additional connections.
266 ///
267 /// Pre-warming is best-effort: failures from individual connections are logged
268 /// but do not prevent startup. A 10-second outer timeout ensures the server
269 /// never blocks indefinitely on a slow or unreachable PostgreSQL instance.
270 async fn prewarm(&self, count: usize) {
271 use futures::future::join_all;
272 use tokio::time::timeout;
273
274 let handles: Vec<_> = (0..count)
275 .map(|_| {
276 let pool = self.pool.clone();
277 tokio::spawn(async move { pool.get().await })
278 })
279 .collect();
280
281 let result = timeout(Duration::from_secs(10), join_all(handles)).await;
282
283 let (succeeded, failed) = match result {
284 Ok(outcomes) => {
285 let s = outcomes
286 .iter()
287 .filter(|r| r.as_ref().map(|inner| inner.is_ok()).unwrap_or(false))
288 .count();
289 (s, count - s)
290 },
291 Err(_elapsed) => {
292 tracing::warn!(
293 target_connections = count,
294 "Pool pre-warm timed out after 10s; server will continue with partial pre-warm"
295 );
296 (0, count)
297 },
298 };
299
300 if failed > 0 {
301 tracing::warn!(
302 succeeded,
303 failed,
304 "Pool pre-warm: some connections could not be established"
305 );
306 } else {
307 tracing::info!(
308 idle_connections = succeeded + 1,
309 "PostgreSQL pool pre-warmed successfully"
310 );
311 }
312 }
313
314 /// Get a reference to the internal connection pool.
315 ///
316 /// This allows sharing the pool with other components like `PostgresIntrospector`.
317 #[must_use]
318 pub const fn pool(&self) -> &Pool {
319 &self.pool
320 }
321
322 /// Enable mutation timing injection.
323 ///
324 /// When enabled, `execute_function_call` wraps each mutation in a transaction
325 /// and sets a session variable to `clock_timestamp()::text` before execution,
326 /// allowing SQL functions to compute their own duration.
327 ///
328 /// # Arguments
329 ///
330 /// * `variable_name` - The PostgreSQL session variable name (e.g., `"fraiseql.started_at"`)
331 #[must_use]
332 pub fn with_mutation_timing(mut self, variable_name: &str) -> Self {
333 self.mutation_timing_enabled = true;
334 self.timing_variable_name = variable_name.to_string();
335 self
336 }
337
338 /// Returns whether mutation timing injection is enabled.
339 #[must_use]
340 pub const fn mutation_timing_enabled(&self) -> bool {
341 self.mutation_timing_enabled
342 }
343
344 /// Execute raw SQL query and return JSONB rows.
345 ///
346 /// # Errors
347 ///
348 /// Returns `FraiseQLError::Database` on query execution failure.
349 #[allow(dead_code)]
350 pub(super) async fn execute_raw(
351 &self,
352 sql: &str,
353 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
354 ) -> Result<Vec<JsonbValue>> {
355 let client = self.acquire_connection_with_retry().await?;
356
357 let rows: Vec<Row> =
358 client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
359 message: format!("Query execution failed: {e}"),
360 sql_state: e.code().map(|c| c.code().to_string()),
361 })?;
362
363 let results = rows
364 .into_iter()
365 .map(|row| {
366 let data: serde_json::Value = row.get(0);
367 JsonbValue::new(data)
368 })
369 .collect();
370
371 Ok(results)
372 }
373
374 /// Acquire a connection from the pool with retry logic.
375 ///
376 /// - `PoolError::Timeout`: the pool was exhausted for the full configured wait period. This is
377 /// not transient — retrying would only multiply the wait. Fails immediately.
378 /// - `PoolError::Backend` / create errors: potentially transient. Retries with exponential
379 /// backoff (up to `MAX_CONNECTION_RETRIES` attempts).
380 ///
381 /// # Errors
382 ///
383 /// Returns `FraiseQLError::ConnectionPool` on timeout or when all retries are exhausted.
384 pub(super) async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
385 use deadpool_postgres::PoolError;
386
387 let mut last_error = None;
388
389 for attempt in 0..MAX_CONNECTION_RETRIES {
390 match self.pool.get().await {
391 Ok(client) => {
392 if attempt > 0 {
393 tracing::info!(attempt, "Successfully acquired connection after retries");
394 }
395 return Ok(client);
396 },
397 // Pool exhausted for the full wait period — not transient, fail immediately.
398 Err(PoolError::Timeout(_)) => {
399 let metrics = self.pool_metrics();
400 tracing::error!(
401 available = metrics.idle_connections,
402 active = metrics.active_connections,
403 max = metrics.total_connections,
404 "Connection pool timeout: all connections busy"
405 );
406 return Err(FraiseQLError::ConnectionPool {
407 message: format!(
408 "Connection pool timeout: {}/{} connections busy. \
409 Increase pool_max_size or reduce concurrent load.",
410 metrics.active_connections, metrics.total_connections,
411 ),
412 });
413 },
414 // Backend/create errors are potentially transient — retry with backoff.
415 Err(e) => {
416 last_error = Some(e);
417 if attempt < MAX_CONNECTION_RETRIES - 1 {
418 let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
419 tracing::warn!(
420 attempt = attempt + 1,
421 total = MAX_CONNECTION_RETRIES,
422 delay_ms = delay,
423 "Transient connection error, retrying"
424 );
425 tokio::time::sleep(Duration::from_millis(delay)).await;
426 }
427 },
428 }
429 }
430
431 // All retries for transient errors exhausted.
432 let pool_metrics = self.pool_metrics();
433 tracing::error!(
434 retries = MAX_CONNECTION_RETRIES,
435 available = pool_metrics.idle_connections,
436 active = pool_metrics.active_connections,
437 max = pool_metrics.total_connections,
438 "Failed to acquire connection after all retries"
439 );
440
441 Err(FraiseQLError::ConnectionPool {
442 message: format!(
443 "Failed to acquire connection after {} retries: {}. \
444 Pool state: idle={}, active={}, max={}",
445 MAX_CONNECTION_RETRIES,
446 last_error.expect("last_error is set on every retry iteration"),
447 pool_metrics.idle_connections,
448 pool_metrics.active_connections,
449 pool_metrics.total_connections,
450 ),
451 })
452 }
453
454 /// Execute query with SQL field projection optimization.
455 ///
456 /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
457 /// only the requested fields from the JSONB column, reducing network payload and
458 /// JSON deserialization overhead.
459 ///
460 /// # Arguments
461 ///
462 /// * `view` - View/table name to query
463 /// * `projection` - Optional SQL projection hint with field list
464 /// * `where_clause` - Optional WHERE clause for filtering
465 /// * `limit` - Optional row limit
466 ///
467 /// # Returns
468 ///
469 /// Vector of projected JSONB rows with only the requested fields
470 ///
471 /// # Errors
472 ///
473 /// Returns `FraiseQLError::Database` on query execution failure.
474 ///
475 /// # Panics
476 ///
477 /// Cannot panic in practice: the inner `expect` is guarded by an `is_none()` check
478 /// immediately above it.
479 ///
480 /// # Example
481 ///
482 /// ```no_run
483 /// // Requires: running PostgreSQL database.
484 /// use fraiseql_db::postgres::PostgresAdapter;
485 /// use fraiseql_db::types::SqlProjectionHint;
486 /// use fraiseql_db::DatabaseType;
487 ///
488 /// # async fn example(adapter: &PostgresAdapter) -> Result<(), Box<dyn std::error::Error>> {
489 /// let projection = SqlProjectionHint {
490 /// database: DatabaseType::PostgreSQL,
491 /// projection_template: "jsonb_build_object('id', data->>'id')".to_string(),
492 /// estimated_reduction_percent: 75,
493 /// };
494 ///
495 /// let results = adapter
496 /// .execute_with_projection("v_user", Some(&projection), None, Some(10), None)
497 /// .await?;
498 /// # Ok(())
499 /// # }
500 /// ```
501 /// Implementation of `execute_with_projection` with ORDER BY support.
502 ///
503 /// Called by both the inherent convenience method and the `DatabaseAdapter`
504 /// trait implementation.
505 pub(super) async fn execute_with_projection_impl(
506 &self,
507 view: &str,
508 projection: Option<&SqlProjectionHint>,
509 where_clause: Option<&WhereClause>,
510 limit: Option<u32>,
511 offset: Option<u32>,
512 order_by: Option<&[OrderByClause]>,
513 session_vars: &[(&str, &str)],
514 ) -> Result<Vec<JsonbValue>> {
515 // If no projection, fall back to standard query
516 if projection.is_none() {
517 return self.execute_where_query(view, where_clause, limit, offset, order_by, session_vars).await;
518 }
519
520 let projection = projection.expect("projection is Some; None was returned above");
521
522 // Build SQL with projection
523 // The projection_template is expected to be the SELECT clause with projection SQL
524 // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
525 let mut sql = format!(
526 "SELECT {} FROM {}",
527 projection.projection_template,
528 quote_postgres_identifier(view)
529 );
530
531 // Add WHERE clause if present
532 let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
533 let generator = PostgresWhereGenerator::new(PostgresDialect);
534 let (where_sql, where_params) = generator.generate(clause)?;
535 sql.push_str(" WHERE ");
536 sql.push_str(&where_sql);
537 where_params.into_iter().map(QueryParam::from).collect()
538 } else {
539 Vec::new()
540 };
541 let mut param_count = typed_params.len();
542
543 // ORDER BY must come before LIMIT/OFFSET in SQL.
544 append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
545
546 // Append LIMIT/OFFSET as BigInt (PostgreSQL requires integer type).
547 // Reason (expect below): fmt::Write for String is infallible.
548 if let Some(lim) = limit {
549 param_count += 1;
550 write!(sql, " LIMIT ${param_count}").expect("write to String");
551 typed_params.push(QueryParam::BigInt(i64::from(lim)));
552 }
553
554 if let Some(off) = offset {
555 param_count += 1;
556 write!(sql, " OFFSET ${param_count}").expect("write to String");
557 typed_params.push(QueryParam::BigInt(i64::from(off)));
558 }
559
560 tracing::debug!("SQL with projection = {}", sql);
561 tracing::debug!("typed_params = {:?}", typed_params);
562
563 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
564 .iter()
565 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
566 .collect();
567
568 let mut client = self.acquire_connection_with_retry().await?;
569
570 if !session_vars.is_empty() {
571 let txn = client.build_transaction().start().await.map_err(|e| FraiseQLError::Database {
572 message: format!("Failed to start transaction: {e}"),
573 sql_state: e.code().map(|c| c.code().to_string()),
574 })?;
575
576 // Set all session variables
577 for (name, value) in session_vars {
578 txn.execute("SELECT set_config($1, $2, true)", &[name, value]).await.map_err(|e| FraiseQLError::Database {
579 message: format!("Failed to set session variable {name}: {e}"),
580 sql_state: e.code().map(|c| c.code().to_string()),
581 })?;
582 }
583
584 // Execute query in same transaction
585 let rows: Vec<tokio_postgres::Row> = txn.query(&sql, ¶m_refs).await.map_err(|e| FraiseQLError::Database {
586 message: format!("Query execution failed: {e}"),
587 sql_state: e.code().map(|c| c.code().to_string()),
588 })?;
589 txn.commit().await.map_err(|e| FraiseQLError::Database {
590 message: format!("Failed to commit transaction: {e}"),
591 sql_state: e.code().map(|c| c.code().to_string()),
592 })?;
593
594 Ok(rows
595 .into_iter()
596 .map(|row| {
597 let data: serde_json::Value = row.get(0);
598 JsonbValue::new(data)
599 })
600 .collect())
601 } else {
602 // Fast path: no session vars, direct execution
603 let rows: Vec<tokio_postgres::Row> = client.query(&sql, ¶m_refs).await.map_err(|e| FraiseQLError::Database {
604 message: format!("Query execution failed: {e}"),
605 sql_state: e.code().map(|c| c.code().to_string()),
606 })?;
607 Ok(rows
608 .into_iter()
609 .map(|row| {
610 let data: serde_json::Value = row.get(0);
611 JsonbValue::new(data)
612 })
613 .collect())
614 }
615 }
616
617 /// Execute query with SQL field projection optimization.
618 ///
619 /// Convenience wrapper for callers that don't need ORDER BY.
620 /// See [`execute_with_projection_impl`](Self::execute_with_projection_impl) for details.
621 ///
622 /// # Errors
623 ///
624 /// Returns `FraiseQLError::Database` on query execution failure.
625 pub async fn execute_with_projection(
626 &self,
627 view: &str,
628 projection: Option<&SqlProjectionHint>,
629 where_clause: Option<&WhereClause>,
630 limit: Option<u32>,
631 offset: Option<u32>,
632 ) -> Result<Vec<JsonbValue>> {
633 self.execute_with_projection_impl(view, projection, where_clause, limit, offset, None, &[])
634 .await
635 }
636}
637
638/// Build a parameterized `SELECT data FROM {view}` SQL string.
639///
640/// Shared by [`PostgresAdapter::execute_where_query`] and
641/// [`PostgresAdapter::explain_where_query`] so that SQL construction
642/// logic is never duplicated.
643///
644/// # Returns
645///
646/// `(sql, typed_params)` — the SQL string and the bound parameter values.
647///
648/// # Errors
649///
650/// Returns `FraiseQLError` if WHERE clause generation fails.
651pub(super) fn build_where_select_sql(
652 view: &str,
653 where_clause: Option<&WhereClause>,
654 limit: Option<u32>,
655 offset: Option<u32>,
656) -> Result<(String, Vec<QueryParam>)> {
657 build_where_select_sql_ordered(view, where_clause, limit, offset, None)
658}
659
660/// Build a parameterized `SELECT data FROM {view}` SQL string with optional ORDER BY.
661///
662/// ORDER BY is inserted between the WHERE clause and LIMIT/OFFSET as required by SQL.
663///
664/// # Returns
665///
666/// `(sql, typed_params)` — the SQL string and the bound parameter values.
667///
668/// # Errors
669///
670/// Returns `FraiseQLError` if WHERE clause generation or field name validation fails.
671pub(super) fn build_where_select_sql_ordered(
672 view: &str,
673 where_clause: Option<&WhereClause>,
674 limit: Option<u32>,
675 offset: Option<u32>,
676 order_by: Option<&[OrderByClause]>,
677) -> Result<(String, Vec<QueryParam>)> {
678 // Build base query
679 let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
680
681 // Collect WHERE clause params (if any)
682 let mut typed_params: Vec<QueryParam> = if let Some(clause) = where_clause {
683 let generator = PostgresWhereGenerator::new(PostgresDialect);
684 let (where_sql, where_params) = generator.generate(clause)?;
685 sql.push_str(" WHERE ");
686 sql.push_str(&where_sql);
687
688 // Convert WHERE clause JSON values to QueryParam
689 where_params.into_iter().map(QueryParam::from).collect()
690 } else {
691 Vec::new()
692 };
693 let mut param_count = typed_params.len();
694
695 // ORDER BY must come before LIMIT/OFFSET in SQL.
696 append_order_by(&mut sql, order_by, DatabaseType::PostgreSQL)?;
697
698 // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT).
699 // Reason (expect below): fmt::Write for String is infallible.
700 if let Some(lim) = limit {
701 param_count += 1;
702 write!(sql, " LIMIT ${param_count}").expect("write to String");
703 typed_params.push(QueryParam::BigInt(i64::from(lim)));
704 }
705
706 // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
707 if let Some(off) = offset {
708 param_count += 1;
709 write!(sql, " OFFSET ${param_count}").expect("write to String");
710 typed_params.push(QueryParam::BigInt(i64::from(off)));
711 }
712
713 Ok((sql, typed_params))
714}